From cd655ee817a3307bc1a1a119eb4266978ecd7fb2 Mon Sep 17 00:00:00 2001 From: Jing Zhao Date: Mon, 30 Mar 2015 13:35:36 -0700 Subject: [PATCH] HDFS-8005. Erasure Coding: simplify striped block recovery work computation and add tests. Contributed by Jing Zhao. --- .../server/blockmanagement/BlockManager.java | 134 +++++------ .../blockmanagement/DatanodeDescriptor.java | 14 +- .../hdfs/server/namenode/INodeFile.java | 1 + .../blockmanagement/TestBlockManager.java | 33 +-- .../TestRecoverStripedBlocks.java | 107 --------- .../server/namenode/TestAddStripedBlocks.java | 2 +- .../namenode/TestRecoverStripedBlocks.java | 210 ++++++++++++++++++ 7 files changed, 290 insertions(+), 211 deletions(-) delete mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestRecoverStripedBlocks.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index b943ba4221..97d8379f5c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -541,7 +541,7 @@ private void dumpBlockMeta(Block block, PrintWriter out) { // source node returned is not used chooseSourceDatanodes(getStoredBlock(block), containingNodes, containingLiveReplicasNodes, numReplicas, - new LinkedList(), 1, UnderReplicatedBlocks.LEVEL); + new LinkedList(), UnderReplicatedBlocks.LEVEL); // containingLiveReplicasNodes can include READ_ONLY_SHARED replicas which are // not included in the numReplicas.liveReplicas() count @@ -1389,7 +1389,7 @@ int computeBlockRecoveryWork(int blocksToProcess) { int computeRecoveryWorkForBlocks(List> blocksToRecover) { int requiredReplication, numEffectiveReplicas; List containingNodes; - BlockCollection bc = null; + BlockCollection bc; int additionalReplRequired; int scheduledWork = 0; @@ -1417,13 +1417,10 @@ int computeRecoveryWorkForBlocks(List> blocksToRecover) { containingNodes = new ArrayList<>(); List liveReplicaNodes = new ArrayList<>(); NumberReplicas numReplicas = new NumberReplicas(); - List missingBlockIndices = new LinkedList<>(); - DatanodeDescriptor[] srcNodes; - int numSourceNodes = bc.isStriped() ? - HdfsConstants.NUM_DATA_BLOCKS : 1; - srcNodes = chooseSourceDatanodes( - block, containingNodes, liveReplicaNodes, numReplicas, - missingBlockIndices, numSourceNodes, priority); + List liveBlockIndices = new ArrayList<>(); + final DatanodeDescriptor[] srcNodes = chooseSourceDatanodes(block, + containingNodes, liveReplicaNodes, numReplicas, + liveBlockIndices, priority); if(srcNodes == null || srcNodes.length == 0) { // block can not be replicated from any node LOG.debug("Block " + block + " cannot be recovered " + @@ -1455,15 +1452,14 @@ int computeRecoveryWorkForBlocks(List> blocksToRecover) { } else { additionalReplRequired = 1; // Needed on a new rack } - if (bc.isStriped()) { + if (block.isStriped()) { + short[] indices = new short[liveBlockIndices.size()]; + for (int i = 0 ; i < liveBlockIndices.size(); i++) { + indices[i] = liveBlockIndices.get(i); + } ErasureCodingWork ecw = new ErasureCodingWork(block, bc, srcNodes, containingNodes, liveReplicaNodes, additionalReplRequired, - priority); - short[] missingBlockArray = new short[missingBlockIndices.size()]; - for (int i = 0 ; i < missingBlockIndices.size(); i++) { - missingBlockArray[i] = missingBlockIndices.get(i); - } - ecw.setMissingBlockIndices(missingBlockArray); + priority, indices); recovWork.add(ecw); } else { recovWork.add(new ReplicationWork(block, bc, srcNodes, @@ -1543,15 +1539,14 @@ int computeRecoveryWorkForBlocks(List> blocksToRecover) { } // Add block to the to be replicated list - if (bc.isStriped()) { + if (block.isStriped()) { assert rw instanceof ErasureCodingWork; assert rw.targets.length > 0; rw.targets[0].getDatanodeDescriptor().addBlockToBeErasureCoded( new ExtendedBlock(namesystem.getBlockPoolId(), block), rw.srcNodes, rw.targets, - ((ErasureCodingWork)rw).getMissingBlockIndicies()); - } - else { + ((ErasureCodingWork) rw).liveBlockIndicies); + } else { rw.srcNodes[0].addBlockToBeReplicated(block, targets); } scheduledWork++; @@ -1581,9 +1576,9 @@ int computeRecoveryWorkForBlocks(List> blocksToRecover) { DatanodeStorageInfo[] targets = rw.targets; if (targets != null && targets.length != 0) { StringBuilder targetList = new StringBuilder("datanode(s)"); - for (int k = 0; k < targets.length; k++) { + for (DatanodeStorageInfo target : targets) { targetList.append(' '); - targetList.append(targets[k].getDatanodeDescriptor()); + targetList.append(target.getDatanodeDescriptor()); } blockLog.info("BLOCK* ask {} to replicate {} to {}", rw.srcNodes, rw.block, targetList); @@ -1694,11 +1689,8 @@ List getDatanodeDescriptors(List nodes) { * @param numReplicas NumberReplicas instance to be initialized with the * counts of live, corrupt, excess, and decommissioned * replicas of the given block. - * @param missingBlockIndices List to be populated with indices of missing - * blocks in a striped block group or missing - * replicas of a replicated block - * @param numSourceNodes integer specifying the number of source nodes to - * choose + * @param liveBlockIndices List to be populated with indices of healthy + * blocks in a striped block group * @param priority integer representing replication priority of the given * block * @return the array of DatanodeDescriptor of the chosen nodes from which to @@ -1709,24 +1701,20 @@ DatanodeDescriptor[] chooseSourceDatanodes(BlockInfo block, List containingNodes, List nodesContainingLiveReplicas, NumberReplicas numReplicas, - List missingBlockIndices, int numSourceNodes, int priority) { + List liveBlockIndices, int priority) { containingNodes.clear(); nodesContainingLiveReplicas.clear(); - LinkedList srcNodes = new LinkedList<>(); + List srcNodes = new ArrayList<>(); int live = 0; int decommissioned = 0; int decommissioning = 0; int corrupt = 0; int excess = 0; - missingBlockIndices.clear(); - Set healthyIndices = new HashSet<>(); + liveBlockIndices.clear(); + final boolean isStriped = block.isStriped(); Collection nodesCorrupt = corruptReplicas.getNodes(block); - for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) { - if (block.isStriped()) { - healthyIndices.add((short) ((BlockInfoStriped) block). - getStorageBlockIndex(storage)); - } + for (DatanodeStorageInfo storage : blocksMap.getStorages(block)) { final DatanodeDescriptor node = storage.getDatanodeDescriptor(); LightWeightLinkedSet excessBlocks = excessReplicateMap.get(node.getDatanodeUuid()); @@ -1765,27 +1753,19 @@ else if (node.isDecommissionInProgress()) { if(node.isDecommissioned()) continue; - // We got this far, current node is a reasonable choice - if(srcNodes.size() < numSourceNodes) { + if(isStriped || srcNodes.isEmpty()) { srcNodes.add(node); + if (isStriped) { + liveBlockIndices.add((short) ((BlockInfoStriped) block). + getStorageBlockIndex(storage)); + } continue; } - // switch to a different node randomly + // for replicated block, switch to a different node randomly // this to prevent from deterministically selecting the same node even // if the node failed to replicate the block on previous iterations - if(ThreadLocalRandom.current().nextBoolean()) { - int pos = ThreadLocalRandom.current().nextInt(numSourceNodes); - if(!srcNodes.get(pos).isDecommissionInProgress()) { - srcNodes.set(pos, node); - } - } - } - if (block.isStriped()) { - for (short i = 0; i < HdfsConstants.NUM_DATA_BLOCKS + - HdfsConstants.NUM_PARITY_BLOCKS; i++) { - if (!healthyIndices.contains(i)) { - missingBlockIndices.add(i); - } + if (!isStriped && ThreadLocalRandom.current().nextBoolean()) { + srcNodes.set(0, node); } } if(numReplicas != null) @@ -3889,25 +3869,25 @@ public static LocatedBlock newLocatedBlock( * to represent a task to recover a block through replication or erasure * coding. Recovery is done by transferring data from srcNodes to targets */ - private static class BlockRecoveryWork { - protected final BlockInfo block; - protected final BlockCollection bc; + private abstract static class BlockRecoveryWork { + final BlockInfo block; + final BlockCollection bc; /** * An erasure coding recovery task has multiple source nodes. * A replication task only has 1 source node, stored on top of the array */ - protected final DatanodeDescriptor[] srcNodes; + final DatanodeDescriptor[] srcNodes; /** Nodes containing the block; avoid them in choosing new targets */ - protected final List containingNodes; + final List containingNodes; /** Required by {@link BlockPlacementPolicy#chooseTarget} */ - protected final List liveReplicaStorages; - protected final int additionalReplRequired; + final List liveReplicaStorages; + final int additionalReplRequired; - protected DatanodeStorageInfo[] targets; - protected final int priority; + DatanodeStorageInfo[] targets; + final int priority; - public BlockRecoveryWork(BlockInfo block, + BlockRecoveryWork(BlockInfo block, BlockCollection bc, DatanodeDescriptor[] srcNodes, List containingNodes, @@ -3924,15 +3904,13 @@ public BlockRecoveryWork(BlockInfo block, this.targets = null; } - protected void chooseTargets(BlockPlacementPolicy blockplacement, + abstract void chooseTargets(BlockPlacementPolicy blockplacement, BlockStoragePolicySuite storagePolicySuite, - Set excludedNodes) { - } + Set excludedNodes); } private static class ReplicationWork extends BlockRecoveryWork { - - public ReplicationWork(BlockInfo block, + ReplicationWork(BlockInfo block, BlockCollection bc, DatanodeDescriptor[] srcNodes, List containingNodes, @@ -3944,7 +3922,8 @@ public ReplicationWork(BlockInfo block, LOG.debug("Creating a ReplicationWork to recover " + block); } - protected void chooseTargets(BlockPlacementPolicy blockplacement, + @Override + void chooseTargets(BlockPlacementPolicy blockplacement, BlockStoragePolicySuite storagePolicySuite, Set excludedNodes) { assert srcNodes.length > 0 @@ -3961,30 +3940,23 @@ protected void chooseTargets(BlockPlacementPolicy blockplacement, } private static class ErasureCodingWork extends BlockRecoveryWork { + final short[] liveBlockIndicies; - private short[] missingBlockIndicies = null; - - public ErasureCodingWork(BlockInfo block, + ErasureCodingWork(BlockInfo block, BlockCollection bc, DatanodeDescriptor[] srcNodes, List containingNodes, List liveReplicaStorages, int additionalReplRequired, - int priority) { + int priority, short[] liveBlockIndicies) { super(block, bc, srcNodes, containingNodes, liveReplicaStorages, additionalReplRequired, priority); + this.liveBlockIndicies = liveBlockIndicies; LOG.debug("Creating an ErasureCodingWork to recover " + block); } - public short[] getMissingBlockIndicies() { - return missingBlockIndicies; - } - - public void setMissingBlockIndices(short[] missingBlockIndicies) { - this.missingBlockIndicies = missingBlockIndicies; - } - - protected void chooseTargets(BlockPlacementPolicy blockplacement, + @Override + void chooseTargets(BlockPlacementPolicy blockplacement, BlockStoragePolicySuite storagePolicySuite, Set excludedNodes) { try { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index 7bc5e7e48a..15427f7590 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@ -106,14 +106,14 @@ public static class BlockECRecoveryInfo { public final ExtendedBlock block; public final DatanodeDescriptor[] sources; public final DatanodeStorageInfo[] targets; - public final short[] missingBlockIndices; + public final short[] liveBlockIndices; BlockECRecoveryInfo(ExtendedBlock block, DatanodeDescriptor[] sources, - DatanodeStorageInfo[] targets, short[] missingBlockIndices) { + DatanodeStorageInfo[] targets, short[] liveBlockIndices) { this.block = block; this.sources = sources; this.targets = targets; - this.missingBlockIndices = missingBlockIndices; + this.liveBlockIndices = liveBlockIndices; } @Override @@ -122,6 +122,7 @@ public String toString() { append("Recovering ").append(block). append(" From: ").append(Arrays.asList(sources)). append(" To: ").append(Arrays.asList(targets)).append(")\n"). + append(" Block Indices: ").append(Arrays.asList(liveBlockIndices)). toString(); } } @@ -635,10 +636,10 @@ void addBlockToBeReplicated(Block block, DatanodeStorageInfo[] targets) { * Store block erasure coding work. */ void addBlockToBeErasureCoded(ExtendedBlock block, DatanodeDescriptor[] sources, - DatanodeStorageInfo[] targets, short[] missingBlockIndicies) { + DatanodeStorageInfo[] targets, short[] liveBlockIndices) { assert(block != null && sources != null && sources.length > 0); BlockECRecoveryInfo task = new BlockECRecoveryInfo(block, sources, targets, - missingBlockIndicies); + liveBlockIndices); erasurecodeBlocks.offer(task); BlockManager.LOG.debug("Adding block recovery task " + task + "to " + getName() + ", current queue size is " + @@ -679,7 +680,8 @@ int getNumberOfBlocksToBeReplicated() { /** * The number of work items that are pending to be replicated */ - int getNumberOfBlocksToBeErasureCoded() { + @VisibleForTesting + public int getNumberOfBlocksToBeErasureCoded() { return erasurecodeBlocks.size(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java index d5a809c8fd..51e24db657 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java @@ -404,6 +404,7 @@ public final short getFileReplication(int snapshot) { /** The same as getFileReplication(null). */ @Override // INodeFileAttributes + // TODO striped public final short getFileReplication() { return getFileReplication(CURRENT_STATE_ID); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java index 3a1b19e24c..074be16e20 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java @@ -513,30 +513,33 @@ public void testHighestPriReplSrcChosenDespiteMaxReplLimit() throws Exception { cntNodes, liveNodes, new NumberReplicas(), - new LinkedList(), 1, UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY)[0]); + new ArrayList(), + UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY)[0]); - assertNull("Does not choose a source node for a less-than-highest-priority" - + " replication since all available source nodes have reached" - + " their replication limits.", + assertEquals("Does not choose a source node for a less-than-highest-priority" + + " replication since all available source nodes have reached" + + " their replication limits.", 0, bm.chooseSourceDatanodes( bm.getStoredBlock(aBlock), cntNodes, liveNodes, new NumberReplicas(), - new LinkedList(), 1, UnderReplicatedBlocks.QUEUE_VERY_UNDER_REPLICATED)[0]); + new ArrayList(), + UnderReplicatedBlocks.QUEUE_VERY_UNDER_REPLICATED).length); // Increase the replication count to test replication count > hard limit DatanodeStorageInfo targets[] = { origNodes.get(1).getStorageInfos()[0] }; origNodes.get(0).addBlockToBeReplicated(aBlock, targets); - assertNull("Does not choose a source node for a highest-priority" - + " replication when all available nodes exceed the hard limit.", + assertEquals("Does not choose a source node for a highest-priority" + + " replication when all available nodes exceed the hard limit.", 0, bm.chooseSourceDatanodes( bm.getStoredBlock(aBlock), cntNodes, liveNodes, new NumberReplicas(), - new LinkedList(), 1, UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY)[0]); + new ArrayList(), + UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY).length); } @Test @@ -561,26 +564,24 @@ public void testFavorDecomUntilHardLimit() throws Exception { bm.getStoredBlock(aBlock), cntNodes, liveNodes, - new NumberReplicas(), new LinkedList(), 1, - UnderReplicatedBlocks.QUEUE_UNDER_REPLICATED)); + new NumberReplicas(), new LinkedList(), + UnderReplicatedBlocks.QUEUE_UNDER_REPLICATED)[0]); // Increase the replication count to test replication count > hard limit DatanodeStorageInfo targets[] = { origNodes.get(1).getStorageInfos()[0] }; origNodes.get(0).addBlockToBeReplicated(aBlock, targets); - assertNull("Does not choose a source decommissioning node for a normal" - + " replication when all available nodes exceed the hard limit.", + assertEquals("Does not choose a source decommissioning node for a normal" + + " replication when all available nodes exceed the hard limit.", 0, bm.chooseSourceDatanodes( bm.getStoredBlock(aBlock), cntNodes, liveNodes, - new NumberReplicas(), new LinkedList(), 1, - UnderReplicatedBlocks.QUEUE_UNDER_REPLICATED)); + new NumberReplicas(), new LinkedList(), + UnderReplicatedBlocks.QUEUE_UNDER_REPLICATED).length); } - - @Test public void testSafeModeIBR() throws Exception { DatanodeDescriptor node = spy(nodes.get(0)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestRecoverStripedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestRecoverStripedBlocks.java deleted file mode 100644 index d883c9b323..0000000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestRecoverStripedBlocks.java +++ /dev/null @@ -1,107 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.blockmanagement; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.DistributedFileSystem; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.DFSTestUtil; -import org.apache.hadoop.hdfs.HdfsConfiguration; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.client.HdfsAdmin; -import org.apache.hadoop.hdfs.protocol.HdfsConstants; -import org.apache.hadoop.hdfs.server.datanode.DataNode; -import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; -import org.junit.Before; -import org.junit.Test; - -import java.io.IOException; -import java.util.Iterator; - -import static org.apache.hadoop.hdfs.protocol.HdfsConstants.EC_STORAGE_POLICY_NAME; -import static org.junit.Assert.assertTrue; - -public class TestRecoverStripedBlocks { - private final short GROUP_SIZE = - HdfsConstants.NUM_DATA_BLOCKS + HdfsConstants.NUM_PARITY_BLOCKS; - private final short NUM_OF_DATANODES = GROUP_SIZE + 1; - private Configuration conf; - private MiniDFSCluster cluster; - private DistributedFileSystem fs; - private static final int BLOCK_SIZE = 1024; - private HdfsAdmin dfsAdmin; - private FSNamesystem namesystem; - private Path ECFilePath; - - @Before - public void setupCluster() throws IOException { - conf = new HdfsConfiguration(); - conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); - // Large value to make sure the pending replication request can stay in - // DatanodeDescriptor.replicateBlocks before test timeout. - conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 100); - // Make sure BlockManager can pull all blocks from UnderReplicatedBlocks via - // chooseUnderReplicatedBlocks at once. - conf.setInt( - DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION, 5); - - cluster = new MiniDFSCluster.Builder(conf). - numDataNodes(NUM_OF_DATANODES).build(); - cluster.waitActive(); - fs = cluster.getFileSystem(); - dfsAdmin = new HdfsAdmin(cluster.getURI(), conf); - namesystem = cluster.getNamesystem(); - ECFilePath = new Path("/ecfile"); - DFSTestUtil.createFile(fs, ECFilePath, 4 * BLOCK_SIZE, GROUP_SIZE, 0); - dfsAdmin.setStoragePolicy(ECFilePath, EC_STORAGE_POLICY_NAME); - } - - @Test - public void testMissingStripedBlock() throws Exception { - final BlockManager bm = cluster.getNamesystem().getBlockManager(); - ExtendedBlock b = DFSTestUtil.getFirstBlock(fs, ECFilePath); - Iterator storageInfos = - bm.blocksMap.getStorages(b.getLocalBlock()) - .iterator(); - - DatanodeDescriptor firstDn = storageInfos.next().getDatanodeDescriptor(); - Iterator it = firstDn.getBlockIterator(); - int missingBlkCnt = 0; - while (it.hasNext()) { - BlockInfo blk = it.next(); - BlockManager.LOG.debug("Block " + blk + " will be lost"); - missingBlkCnt++; - } - BlockManager.LOG.debug("Missing in total " + missingBlkCnt + " blocks"); - - bm.getDatanodeManager().removeDatanode(firstDn); - - bm.computeDatanodeWork(); - - short cnt = 0; - for (DataNode dn : cluster.getDataNodes()) { - DatanodeDescriptor dnDescriptor = - bm.getDatanodeManager().getDatanode(dn.getDatanodeUuid()); - cnt += dnDescriptor.getNumberOfBlocksToBeErasureCoded(); - } - - assertTrue("Counting the number of outstanding EC tasks", cnt == missingBlkCnt); - } -} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java index 7d7c81e0fe..215a4e4b8e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java @@ -224,7 +224,7 @@ public void testAddUCReplica() throws Exception { int i = 0; for (DataNode dn : cluster.getDataNodes()) { final Block block = new Block(lastBlock.getBlockId() + i++, - lastBlock.getGenerationStamp(), 0); + 0, lastBlock.getGenerationStamp()); DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString()); storageIDs.add(storage.getStorageID()); StorageReceivedDeletedBlocks[] reports = DFSTestUtil diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java new file mode 100644 index 0000000000..b9fd4fee16 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java @@ -0,0 +1,210 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.client.HdfsAdmin; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockECRecoveryInfo; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; +import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; +import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; +import org.apache.hadoop.io.IOUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; +import java.util.UUID; + +import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CHUNK_SIZE; +import static org.apache.hadoop.hdfs.protocol.HdfsConstants.EC_STORAGE_POLICY_NAME; +import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_DATA_BLOCKS; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class TestRecoverStripedBlocks { + private final short GROUP_SIZE = + NUM_DATA_BLOCKS + HdfsConstants.NUM_PARITY_BLOCKS; + private MiniDFSCluster cluster; + private final Path dirPath = new Path("/dir"); + private Path filePath = new Path(dirPath, "file"); + + @Before + public void setup() throws IOException { + final Configuration conf = new HdfsConfiguration(); + // Large value to make sure the pending replication request can stay in + // DatanodeDescriptor.replicateBlocks before test timeout. + conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 100); + // Make sure BlockManager can pull all blocks from UnderReplicatedBlocks via + // chooseUnderReplicatedBlocks at once. + conf.setInt( + DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION, 5); + + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(GROUP_SIZE + 1) + .build(); + cluster.waitActive(); + } + + @After + public void tearDown() throws Exception { + if (cluster != null) { + cluster.shutdown(); + } + } + + public static void createECFile(MiniDFSCluster cluster, Path file, Path dir, + int numBlocks) throws Exception { + DistributedFileSystem dfs = cluster.getFileSystem(); + dfs.mkdirs(dir); + dfs.setStoragePolicy(dir, EC_STORAGE_POLICY_NAME); + + FSDataOutputStream out = null; + try { + out = dfs.create(file, (short) 1); // create an empty file + + FSNamesystem ns = cluster.getNamesystem(); + FSDirectory fsdir = ns.getFSDirectory(); + INodeFile fileNode = fsdir.getINode4Write(file.toString()).asFile(); + + ExtendedBlock previous = null; + for (int i = 0; i < numBlocks; i++) { + Block newBlock = createBlock(cluster.getDataNodes(), ns, + file.toString(), fileNode, dfs.getClient().getClientName(), + previous); + previous = new ExtendedBlock(ns.getBlockPoolId(), newBlock); + } + + ns.completeFile(file.toString(), dfs.getClient().getClientName(), + previous, fileNode.getId()); + } finally { + IOUtils.cleanup(null, out); + } + } + + static Block createBlock(List dataNodes, FSNamesystem ns, + String file, INodeFile fileNode, String clientName, + ExtendedBlock previous) throws Exception { + ns.getAdditionalBlock(file, fileNode.getId(), clientName, previous, null, + null); + + final BlockInfo lastBlock = fileNode.getLastBlock(); + final int groupSize = fileNode.getBlockReplication(); + // 1. RECEIVING_BLOCK IBR + int i = 0; + for (DataNode dn : dataNodes) { + if (i < groupSize) { + final Block block = new Block(lastBlock.getBlockId() + i++, 0, + lastBlock.getGenerationStamp()); + DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString()); + StorageReceivedDeletedBlocks[] reports = DFSTestUtil + .makeReportForReceivedBlock(block, + ReceivedDeletedBlockInfo.BlockStatus.RECEIVING_BLOCK, storage); + for (StorageReceivedDeletedBlocks report : reports) { + ns.processIncrementalBlockReport(dn.getDatanodeId(), report); + } + } + } + + // 2. RECEIVED_BLOCK IBR + i = 0; + for (DataNode dn : dataNodes) { + if (i < groupSize) { + final Block block = new Block(lastBlock.getBlockId() + i++, + BLOCK_STRIPED_CHUNK_SIZE, lastBlock.getGenerationStamp()); + DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString()); + StorageReceivedDeletedBlocks[] reports = DFSTestUtil + .makeReportForReceivedBlock(block, + ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, storage); + for (StorageReceivedDeletedBlocks report : reports) { + ns.processIncrementalBlockReport(dn.getDatanodeId(), report); + } + } + } + + lastBlock.setNumBytes(BLOCK_STRIPED_CHUNK_SIZE * NUM_DATA_BLOCKS); + return lastBlock; + } + + @Test + public void testMissingStripedBlock() throws Exception { + final int numBlocks = 4; + createECFile(cluster, filePath, dirPath, numBlocks); + + // make sure the file is complete in NN + final INodeFile fileNode = cluster.getNamesystem().getFSDirectory() + .getINode4Write(filePath.toString()).asFile(); + assertFalse(fileNode.isUnderConstruction()); + assertTrue(fileNode.isWithStripedBlocks()); + BlockInfo[] blocks = fileNode.getBlocks(); + assertEquals(numBlocks, blocks.length); + for (BlockInfo blk : blocks) { + assertTrue(blk.isStriped()); + assertTrue(blk.isComplete()); + assertEquals(BLOCK_STRIPED_CHUNK_SIZE * NUM_DATA_BLOCKS, blk.getNumBytes()); + final BlockInfoStriped sb = (BlockInfoStriped) blk; + assertEquals(GROUP_SIZE, sb.numNodes()); + } + + final BlockManager bm = cluster.getNamesystem().getBlockManager(); + BlockInfo firstBlock = fileNode.getBlocks()[0]; + DatanodeStorageInfo[] storageInfos = bm.getStorages(firstBlock); + + DatanodeDescriptor secondDn = storageInfos[1].getDatanodeDescriptor(); + assertEquals(numBlocks, secondDn.numBlocks()); + + bm.getDatanodeManager().removeDatanode(secondDn); + + BlockManagerTestUtil.getComputedDatanodeWork(bm); + + // all the recovery work will be scheduled on the last DN + DataNode lastDn = cluster.getDataNodes().get(GROUP_SIZE); + DatanodeDescriptor last = + bm.getDatanodeManager().getDatanode(lastDn.getDatanodeId()); + assertEquals("Counting the number of outstanding EC tasks", numBlocks, + last.getNumberOfBlocksToBeErasureCoded()); + List recovery = last.getErasureCodeCommand(numBlocks); + for (BlockECRecoveryInfo info : recovery) { + assertEquals(1, info.targets.length); + assertEquals(last, info.targets[0].getDatanodeDescriptor()); + assertEquals(GROUP_SIZE - 1, info.sources.length); + assertEquals(GROUP_SIZE - 1, info.liveBlockIndices.length); + } + } +}