From d1c303a49763029fffa5164295034af8e81e74a0 Mon Sep 17 00:00:00 2001 From: Surendra Singh Lilhore Date: Thu, 12 Sep 2019 19:11:50 +0530 Subject: [PATCH] HDFS-14699. Erasure Coding: Storage not considered in live replica when replication streams hard limit reached to threshold. Contributed by Zhao Yi Ming. --- .../server/blockmanagement/BlockManager.java | 24 ++++-- .../blockmanagement/TestBlockManager.java | 74 +++++++++++++++++++ 2 files changed, 90 insertions(+), 8 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 8561cfd548..3f718a04b2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -2351,6 +2351,22 @@ DatanodeDescriptor[] chooseSourceDatanodes(BlockInfo block, && node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams) { continue; // already reached replication limit } + + // for EC here need to make sure the numReplicas replicates state correct + // because in the scheduleReconstruction it need the numReplicas to check + // whether need to reconstruct the ec internal block + byte blockIndex = -1; + if (isStriped) { + blockIndex = ((BlockInfoStriped) block) + .getStorageBlockIndex(storage); + if (!bitSet.get(blockIndex)) { + bitSet.set(blockIndex); + } else if (state == StoredReplicaState.LIVE) { + numReplicas.subtract(StoredReplicaState.LIVE, 1); + numReplicas.add(StoredReplicaState.REDUNDANT, 1); + } + } + if (node.getNumberOfBlocksToBeReplicated() >= replicationStreamsHardLimit) { continue; } @@ -2358,15 +2374,7 @@ DatanodeDescriptor[] chooseSourceDatanodes(BlockInfo block, if(isStriped || srcNodes.isEmpty()) { srcNodes.add(node); if (isStriped) { - byte blockIndex = ((BlockInfoStriped) block). - getStorageBlockIndex(storage); liveBlockIndices.add(blockIndex); - if (!bitSet.get(blockIndex)) { - bitSet.set(blockIndex); - } else if (state == StoredReplicaState.LIVE) { - numReplicas.subtract(StoredReplicaState.LIVE, 1); - numReplicas.add(StoredReplicaState.REDUNDANT, 1); - } } continue; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java index de0e1a69cd..006513c624 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockType; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; @@ -67,6 +68,7 @@ import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.erasurecode.ECSchema; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.net.NetworkTopology; @@ -685,6 +687,67 @@ public void testHighestPriReplSrcChosenDespiteMaxReplLimit() throws Exception { LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY).length); } + @Test + public void testChooseSrcDatanodesWithDupEC() throws Exception { + bm.maxReplicationStreams = 4; + + long blockId = -9223372036854775776L; // real ec block id + Block aBlock = new Block(blockId, 0, 0); + // ec policy + ECSchema rsSchema = new ECSchema("rs", 3, 2); + String policyName = "RS-3-2-128k"; + int cellSize = 128 * 1024; + ErasureCodingPolicy ecPolicy = + new ErasureCodingPolicy(policyName, rsSchema, cellSize, (byte) -1); + // striped blockInfo + BlockInfoStriped aBlockInfoStriped = new BlockInfoStriped(aBlock, ecPolicy); + // ec storageInfo + DatanodeStorageInfo ds1 = DFSTestUtil.createDatanodeStorageInfo( + "storage1", "1.1.1.1", "rack1", "host1"); + DatanodeStorageInfo ds2 = DFSTestUtil.createDatanodeStorageInfo( + "storage2", "2.2.2.2", "rack2", "host2"); + DatanodeStorageInfo ds3 = DFSTestUtil.createDatanodeStorageInfo( + "storage3", "3.3.3.3", "rack3", "host3"); + DatanodeStorageInfo ds4 = DFSTestUtil.createDatanodeStorageInfo( + "storage4", "4.4.4.4", "rack4", "host4"); + DatanodeStorageInfo ds5 = DFSTestUtil.createDatanodeStorageInfo( + "storage5", "5.5.5.5", "rack5", "host5"); + // link block with storage + aBlockInfoStriped.addStorage(ds1, aBlock); + aBlockInfoStriped.addStorage(ds2, new Block(blockId + 1, 0, 0)); + aBlockInfoStriped.addStorage(ds3, new Block(blockId + 2, 0, 0)); + // dup internal block + aBlockInfoStriped.addStorage(ds4, new Block(blockId + 3, 0, 0)); + aBlockInfoStriped.addStorage(ds5, new Block(blockId + 3, 0, 0)); + // simulate the node 2 arrive maxReplicationStreams + for(int i = 0; i < 4; i++){ + ds4.getDatanodeDescriptor().incrementPendingReplicationWithoutTargets(); + } + + addEcBlockToBM(blockId, ecPolicy); + List cntNodes = new LinkedList(); + List liveNodes = new LinkedList(); + NumberReplicas numReplicas = new NumberReplicas(); + List liveBlockIndices = new ArrayList<>(); + + bm.chooseSourceDatanodes( + aBlockInfoStriped, + cntNodes, + liveNodes, + numReplicas, liveBlockIndices, + LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY); + + assertEquals("Choose the source node for reconstruction with one node reach" + + " the MAX maxReplicationStreams, the numReplicas still return the" + + " correct live replicas.", 4, + numReplicas.liveReplicas()); + + assertEquals("Choose the source node for reconstruction with one node reach" + + " the MAX maxReplicationStreams, the numReplicas should return" + + " the correct redundant Internal Blocks.", 1, + numReplicas.redundantInternalBlocks()); + } + @Test public void testFavorDecomUntilHardLimit() throws Exception { bm.maxReplicationStreams = 0; @@ -979,6 +1042,17 @@ public void testUCBlockNotConsideredMissing() throws Exception { bm.setInitializedReplQueues(false); } + private BlockInfo addEcBlockToBM(long blkId, ErasureCodingPolicy ecPolicy) { + Block block = new Block(blkId); + BlockInfo blockInfo = new BlockInfoStriped(block, ecPolicy); + long inodeId = ++mockINodeId; + final INodeFile bc = TestINodeFile.createINodeFile(inodeId); + bm.blocksMap.addBlockCollection(blockInfo, bc); + blockInfo.setBlockCollectionId(inodeId); + doReturn(bc).when(fsn).getBlockCollection(inodeId); + return blockInfo; + } + private BlockInfo addBlockToBM(long blkId) { Block block = new Block(blkId); BlockInfo blockInfo = new BlockInfoContiguous(block, (short) 3);