From 2ffec347eb4303ad78643431cd2e517d54bc3282 Mon Sep 17 00:00:00 2001 From: Ayush Saxena Date: Tue, 5 Nov 2019 01:37:15 +0530 Subject: [PATCH] HDFS-14946. Erasure Coding: Block recovery failed during decommissioning. Contributed by Fei Hui. --- .../server/blockmanagement/BlockManager.java | 39 ++++++-- .../hdfs/TestDecommissionWithStriped.java | 88 +++++++++++++++++-- 2 files changed, 116 insertions(+), 11 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 3b76eec0d4..6eec1e7499 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -2076,17 +2076,18 @@ public class BlockManager implements BlockStatsMXBean { numReplicas.decommissioning() - numReplicas.liveEnteringMaintenanceReplicas(); } - byte[] indices = new byte[liveBlockIndices.size()]; - for (int i = 0 ; i < liveBlockIndices.size(); i++) { - indices[i] = liveBlockIndices.get(i); - } + final DatanodeDescriptor[] newSrcNodes = + new DatanodeDescriptor[srcNodes.length]; + byte[] newIndices = new byte[liveBlockIndices.size()]; + adjustSrcNodesAndIndices((BlockInfoStriped)block, + srcNodes, liveBlockIndices, newSrcNodes, newIndices); byte[] busyIndices = new byte[liveBusyBlockIndices.size()]; for (int i = 0; i < liveBusyBlockIndices.size(); i++) { busyIndices[i] = liveBusyBlockIndices.get(i); } - return new ErasureCodingWork(getBlockPoolId(), block, bc, srcNodes, + return new ErasureCodingWork(getBlockPoolId(), block, bc, newSrcNodes, containingNodes, liveReplicaNodes, additionalReplRequired, - priority, indices, busyIndices); + priority, newIndices, busyIndices); } else { return new ReplicationWork(block, bc, srcNodes, containingNodes, liveReplicaNodes, additionalReplRequired, @@ -2094,6 +2095,32 @@ public class BlockManager implements BlockStatsMXBean { } } + /** + * Adjust srcNodes and indices which are used to reconstruction block. + * We should guarantee the indexes of first minRequiredSources nodes + + are different. + */ + private void adjustSrcNodesAndIndices(BlockInfoStriped block, + DatanodeDescriptor[] srcNodes, List indices, + DatanodeDescriptor[] newSrcNodes, byte[] newIndices) { + BitSet bitSet = new BitSet(block.getRealTotalBlockNum()); + List skipIndexList = new ArrayList<>(); + for (int i = 0, j = 0; i < srcNodes.length; i++) { + if (!bitSet.get(indices.get(i))) { + bitSet.set(indices.get(i)); + newSrcNodes[j] = srcNodes[i]; + newIndices[j++] = indices.get(i); + } else { + skipIndexList.add(i); + } + } + for(int i = srcNodes.length - skipIndexList.size(), j = 0; + i < srcNodes.length; i++, j++) { + newSrcNodes[i] = srcNodes[skipIndexList.get(j)]; + newIndices[i] = indices.get(skipIndexList.get(j)); + } + } + private boolean validateReconstructionWork(BlockReconstructionWork rw) { BlockInfo block = rw.getBlock(); int priority = rw.getPriority(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java index f4a99e9062..eb233651be 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java @@ -500,14 +500,15 @@ public class TestDecommissionWithStriped { return new DFSClient(nn.getNameNodeAddress(), conf); } - private void writeStripedFile(DistributedFileSystem dfs, Path ecFile, - int writeBytes) throws IOException, Exception { + private byte[] writeStripedFile(DistributedFileSystem fs, Path ecFile, + int writeBytes) throws Exception { byte[] bytes = StripedFileTestUtil.generateBytes(writeBytes); - DFSTestUtil.writeFile(dfs, ecFile, new String(bytes)); - StripedFileTestUtil.waitBlockGroupsReported(dfs, ecFile.toString()); + DFSTestUtil.writeFile(fs, ecFile, new String(bytes)); + StripedFileTestUtil.waitBlockGroupsReported(fs, ecFile.toString()); - StripedFileTestUtil.checkData(dfs, ecFile, writeBytes, + StripedFileTestUtil.checkData(fs, ecFile, writeBytes, new ArrayList(), null, blockGroupSize); + return bytes; } private void writeConfigFile(Path name, List nodes) @@ -958,4 +959,81 @@ public class TestDecommissionWithStriped { assertEquals(9, bm.countNodes(blockInfo).liveReplicas()); cleanupFile(dfs, ecFile); } + + /** + * Test recovery for an ec block, its storage array contains these internal + * blocks which are {b0, b1, b2, b3, null, b5, b6, b7, b8, b0, b1, b2, + * b3}, array[0]{b0} in decommissioning, array[1-3]{b1, b2, b3} are + * in decommissioned. array[4] is null, array[5-12]{b[5-8],b[0-3]} are + * in live. + */ + @Test (timeout = 120000) + public void testRecoveryWithDecommission() throws Exception { + final Path ecFile = new Path(ecDir, "testRecoveryWithDecommission"); + int writeBytes = cellSize * dataBlocks; + byte[] originBytesArray = writeStripedFile(dfs, ecFile, writeBytes); + List lbs = ((HdfsDataInputStream) dfs.open(ecFile)) + .getAllBlocks(); + LocatedStripedBlock blk = (LocatedStripedBlock) lbs.get(0); + DatanodeInfo[] dnList = blk.getLocations(); + BlockInfoStriped blockInfo = + (BlockInfoStriped)bm.getStoredBlock( + new Block(blk.getBlock().getBlockId())); + + // Decommission datanode dn0 contains block b0 + // Aim to add storageinfo of replicated block b0 to storages[9] of ec block + List decommissionedNodes = new ArrayList<>(); + decommissionedNodes.add(dnList[0]); + decommissionNode(0, decommissionedNodes, AdminStates.DECOMMISSIONED); + + // Now storages of ec block are (b0{decommissioned}, b[1-8]{live}, + // b0{live}) + assertEquals(9, bm.countNodes(blockInfo).liveReplicas()); + assertEquals(1, bm.countNodes(blockInfo).decommissioned()); + + int decommissionNodesNum = 4; + + // Decommission nodes contain blocks of b[0-3] + // dn0 has been decommissioned + for (int i = 1; i < decommissionNodesNum; i++) { + decommissionedNodes.add(dnList[i]); + } + decommissionNode(0, decommissionedNodes, AdminStates.DECOMMISSIONED); + + // Now storages of ec block are (b[0-3]{decommissioned}, b[4-8]{live}, + // b0{live}, b[1-3]{live}) + // There are 9 live and 4 decommissioned internal blocks + assertEquals(9, bm.countNodes(blockInfo).liveReplicas()); + assertEquals(4, bm.countNodes(blockInfo).decommissioned()); + + // There are no reconstruction tasks + assertEquals(0, bm.getDatanodeManager().getDatanodeAdminManager() + .getNumPendingNodes()); + assertEquals(0, bm.getUnderReplicatedNotMissingBlocks()); + + // Set dn0 in decommissioning + // So that the block on dn0 can be used for reconstruction task + DatanodeDescriptor dn0 = bm.getDatanodeManager() + .getDatanode(dnList[0].getDatanodeUuid()); + dn0.startDecommission(); + + // Stop the datanode contains b4 + DataNode dn = cluster.getDataNode( + dnList[decommissionNodesNum].getIpcPort()); + cluster.stopDataNode(dnList[decommissionNodesNum].getXferAddr()); + cluster.setDataNodeDead(dn.getDatanodeId()); + + // Now storages of ec block are (b[0]{decommissioning}, + // b[1-3]{decommissioned}, null, b[5-8]{live}, b0{live}, b[1-3]{live}) + // There are 8 live and 1 decommissioning internal blocks + // Wait for reconstruction EC block. + GenericTestUtils.waitFor( + () -> bm.countNodes(blockInfo).liveReplicas() == 9, + 100, 10000); + + byte[] readBytesArray = new byte[writeBytes]; + StripedFileTestUtil.verifyPread(dfs, ecFile, writeBytes, + originBytesArray, readBytesArray, ecPolicy); + cleanupFile(dfs, ecFile); + } }