diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 3562bea49c..f0aa8c1882 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -470,6 +470,9 @@ Release 2.1.0-beta - 2013-07-02 HDFS-5020. Make DatanodeProtocol#blockReceivedAndDeleted idempotent. (jing9) + HDFS-5024. Make DatanodeProtocol#commitBlockSynchronization idempotent. + (Arpit Agarwal via jing9) + OPTIMIZATIONS HDFS-4465. Optimize datanode ReplicasMap and ReplicaInfo. (atm) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 7ee94e6e40..4af8edf428 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -3068,7 +3068,7 @@ void removePathAndBlocks(String src, BlocksMapUpdateInfo blocks, for (Block b : blocks.getToDeleteList()) { if (trackBlockCounts) { - BlockInfo bi = blockManager.getStoredBlock(b); + BlockInfo bi = getStoredBlock(b); if (bi.isComplete()) { numRemovedComplete++; if (bi.numNodes() >= blockManager.minReplication) { @@ -3509,6 +3509,11 @@ private void finalizeINodeFileUnderConstruction(String src, blockManager.checkReplication(newFile); } + @VisibleForTesting + BlockInfo getStoredBlock(Block block) { + return blockManager.getStoredBlock(block); + } + void commitBlockSynchronization(ExtendedBlock lastblock, long newgenerationstamp, long newlength, boolean closeFile, boolean deleteblock, DatanodeID[] newtargets, @@ -3534,16 +3539,28 @@ void commitBlockSynchronization(ExtendedBlock lastblock, "Cannot commitBlockSynchronization while in safe mode", safeMode); } - final BlockInfo storedBlock = blockManager.getStoredBlock(ExtendedBlock - .getLocalBlock(lastblock)); + final BlockInfo storedBlock = getStoredBlock( + ExtendedBlock.getLocalBlock(lastblock)); if (storedBlock == null) { - throw new IOException("Block (=" + lastblock + ") not found"); + if (deleteblock) { + // This may be a retry attempt so ignore the failure + // to locate the block. + if (LOG.isDebugEnabled()) { + LOG.debug("Block (=" + lastblock + ") not found"); + } + return; + } else { + throw new IOException("Block (=" + lastblock + ") not found"); + } } INodeFile iFile = ((INode)storedBlock.getBlockCollection()).asFile(); if (!iFile.isUnderConstruction() || storedBlock.isComplete()) { - throw new IOException("Unexpected block (=" + lastblock - + ") since the file (=" + iFile.getLocalName() - + ") is not under construction"); + if (LOG.isDebugEnabled()) { + LOG.debug("Unexpected block (=" + lastblock + + ") since the file (=" + iFile.getLocalName() + + ") is not under construction"); + } + return; } long recoveryId = @@ -3559,11 +3576,9 @@ void commitBlockSynchronization(ExtendedBlock lastblock, if (deleteblock) { Block blockToDel = ExtendedBlock.getLocalBlock(lastblock); boolean remove = pendingFile.removeLastBlock(blockToDel); - if (!remove) { - throw new IOException("Trying to delete non-existant block " - + blockToDel); + if (remove) { + blockManager.removeBlockFromMap(storedBlock); } - blockManager.removeBlockFromMap(storedBlock); } else { // update last block @@ -3593,17 +3608,11 @@ void commitBlockSynchronization(ExtendedBlock lastblock, pendingFile.setLastBlock(storedBlock, descriptors); } - src = leaseManager.findPath(pendingFile); if (closeFile) { - // commit the last block and complete it if it has minimum replicas - commitOrCompleteLastBlock(pendingFile, storedBlock); - - //remove lease, close file - finalizeINodeFileUnderConstruction(src, pendingFile, - Snapshot.findLatestSnapshot(pendingFile, null)); + src = closeFileCommitBlocks(pendingFile, storedBlock); } else { // If this commit does not want to close the file, persist blocks - dir.persistBlocks(src, pendingFile); + src = persistBlocks(pendingFile); } } finally { writeUnlock(); @@ -3620,6 +3629,44 @@ void commitBlockSynchronization(ExtendedBlock lastblock, } } + /** + * + * @param pendingFile + * @param storedBlock + * @return Path of the file that was closed. + * @throws IOException + */ + @VisibleForTesting + String closeFileCommitBlocks(INodeFileUnderConstruction pendingFile, + BlockInfo storedBlock) + throws IOException { + + String src = leaseManager.findPath(pendingFile); + + // commit the last block and complete it if it has minimum replicas + commitOrCompleteLastBlock(pendingFile, storedBlock); + + //remove lease, close file + finalizeINodeFileUnderConstruction(src, pendingFile, + Snapshot.findLatestSnapshot(pendingFile, null)); + + return src; + } + + /** + * Persist the block list for the given file. + * + * @param pendingFile + * @return Path to the given file. + * @throws IOException + */ + @VisibleForTesting + String persistBlocks(INodeFileUnderConstruction pendingFile) + throws IOException { + String src = leaseManager.findPath(pendingFile); + dir.persistBlocks(src, pendingFile); + return src; + } /** * Renew the lease(s) held by the given client @@ -4662,7 +4709,7 @@ public void decrementSafeBlockCount(Block b) { SafeModeInfo safeMode = this.safeMode; if (safeMode == null) // mostly true return; - BlockInfo storedBlock = blockManager.getStoredBlock(b); + BlockInfo storedBlock = getStoredBlock(b); if (storedBlock.isComplete()) { safeMode.decrementSafeBlockCount((short)blockManager.countNodes(b).liveReplicas()); } @@ -5279,7 +5326,7 @@ private INodeFileUnderConstruction checkUCBlock(ExtendedBlock block, } // check stored block state - BlockInfo storedBlock = blockManager.getStoredBlock(ExtendedBlock.getLocalBlock(block)); + BlockInfo storedBlock = getStoredBlock(ExtendedBlock.getLocalBlock(block)); if (storedBlock == null || storedBlock.getBlockUCState() != BlockUCState.UNDER_CONSTRUCTION) { throw new IOException(block + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java index 8a500f4db6..27a10998d4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java @@ -166,7 +166,7 @@ public void errorReport(DatanodeRegistration registration, /** * Commit block synchronization in lease recovery */ - @AtMostOnce + @Idempotent public void commitBlockSynchronization(ExtendedBlock block, long newgenerationstamp, long newlength, boolean closeFile, boolean deleteblock, DatanodeID[] newtargets, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockSynchronization.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockSynchronization.java new file mode 100644 index 0000000000..34ce90e995 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockSynchronization.java @@ -0,0 +1,172 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.namenode; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.*; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.protocol.*; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; +import org.junit.Test; + +/** + * Verify that TestCommitBlockSynchronization is idempotent. + */ +public class TestCommitBlockSynchronization { + private static final long blockId = 100; + private static final long length = 200; + private static final long genStamp = 300; + + private FSNamesystem makeNameSystemSpy(Block block, + INodeFileUnderConstruction file) + throws IOException { + Configuration conf = new Configuration(); + FSImage image = new FSImage(conf); + DatanodeDescriptor[] targets = new DatanodeDescriptor[0]; + + FSNamesystem namesystem = new FSNamesystem(conf, image); + FSNamesystem namesystemSpy = spy(namesystem); + BlockInfoUnderConstruction blockInfo = new BlockInfoUnderConstruction( + block, 1, HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, targets); + blockInfo.setBlockCollection(file); + blockInfo.setGenerationStamp(genStamp); + blockInfo.initializeBlockRecovery(genStamp); + doReturn(true).when(file).removeLastBlock(any(Block.class)); + + doReturn(blockInfo).when(namesystemSpy).getStoredBlock(any(Block.class)); + doReturn("").when(namesystemSpy).closeFileCommitBlocks( + any(INodeFileUnderConstruction.class), + any(BlockInfo.class)); + doReturn("").when(namesystemSpy).persistBlocks( + any(INodeFileUnderConstruction.class)); + doReturn(mock(FSEditLog.class)).when(namesystemSpy).getEditLog(); + + return namesystemSpy; + } + + @Test + public void testCommitBlockSynchronization() throws IOException { + INodeFileUnderConstruction file = mock(INodeFileUnderConstruction.class); + Block block = new Block(blockId, length, genStamp); + FSNamesystem namesystemSpy = makeNameSystemSpy(block, file); + DatanodeID[] newTargets = new DatanodeID[0]; + + ExtendedBlock lastBlock = new ExtendedBlock(); + namesystemSpy.commitBlockSynchronization( + lastBlock, genStamp, length, false, + false, newTargets, null); + + // Repeat the call to make sure it does not throw + namesystemSpy.commitBlockSynchronization( + lastBlock, genStamp, length, false, false, newTargets, null); + + // Simulate 'completing' the block. + BlockInfo completedBlockInfo = new BlockInfo(block, 1); + completedBlockInfo.setBlockCollection(file); + completedBlockInfo.setGenerationStamp(genStamp); + doReturn(completedBlockInfo).when(namesystemSpy) + .getStoredBlock(any(Block.class)); + + // Repeat the call to make sure it does not throw + namesystemSpy.commitBlockSynchronization( + lastBlock, genStamp, length, false, false, newTargets, null); + } + + @Test + public void testCommitBlockSynchronization2() throws IOException { + INodeFileUnderConstruction file = mock(INodeFileUnderConstruction.class); + Block block = new Block(blockId, length, genStamp); + FSNamesystem namesystemSpy = makeNameSystemSpy(block, file); + DatanodeID[] newTargets = new DatanodeID[0]; + + ExtendedBlock lastBlock = new ExtendedBlock(); + namesystemSpy.commitBlockSynchronization( + lastBlock, genStamp, length, false, + false, newTargets, null); + + // Make sure the call fails if the generation stamp does not match + // the block recovery ID. + try { + namesystemSpy.commitBlockSynchronization( + lastBlock, genStamp - 1, length, false, false, newTargets, null); + fail("Failed to get expected IOException on generation stamp/" + + "recovery ID mismatch"); + } catch (IOException ioe) { + // Expected exception. + } + } + + @Test + public void testCommitBlockSynchronizationWithDelete() throws IOException { + INodeFileUnderConstruction file = mock(INodeFileUnderConstruction.class); + Block block = new Block(blockId, length, genStamp); + FSNamesystem namesystemSpy = makeNameSystemSpy(block, file); + DatanodeDescriptor[] targets = new DatanodeDescriptor[0]; + DatanodeID[] newTargets = new DatanodeID[0]; + + ExtendedBlock lastBlock = new ExtendedBlock(); + namesystemSpy.commitBlockSynchronization( + lastBlock, genStamp, length, false, + true, newTargets, null); + + // Simulate removing the last block from the file. + doReturn(false).when(file).removeLastBlock(any(Block.class)); + + // Repeat the call to make sure it does not throw + namesystemSpy.commitBlockSynchronization( + lastBlock, genStamp, length, false, true, newTargets, null); + } + + @Test + public void testCommitBlockSynchronizationWithClose() throws IOException { + INodeFileUnderConstruction file = mock(INodeFileUnderConstruction.class); + Block block = new Block(blockId, length, genStamp); + FSNamesystem namesystemSpy = makeNameSystemSpy(block, file); + DatanodeDescriptor[] targets = new DatanodeDescriptor[0]; + DatanodeID[] newTargets = new DatanodeID[0]; + + ExtendedBlock lastBlock = new ExtendedBlock(); + namesystemSpy.commitBlockSynchronization( + lastBlock, genStamp, length, true, + false, newTargets, null); + + // Repeat the call to make sure it returns true + namesystemSpy.commitBlockSynchronization( + lastBlock, genStamp, length, true, false, newTargets, null); + + BlockInfo completedBlockInfo = new BlockInfo(block, 1); + completedBlockInfo.setBlockCollection(file); + completedBlockInfo.setGenerationStamp(genStamp); + doReturn(completedBlockInfo).when(namesystemSpy) + .getStoredBlock(any(Block.class)); + + namesystemSpy.commitBlockSynchronization( + lastBlock, genStamp, length, true, false, newTargets, null); + } +}