From 35c65005d0e67c5b43db3d2b2850cffd58a3c1a1 Mon Sep 17 00:00:00 2001 From: caozhiqiang Date: Tue, 29 Nov 2022 09:51:21 +0800 Subject: [PATCH] HDFS-16846. EC: Only EC blocks should be effected by max-streams-hard-limit configuration (#5143) Signed-off-by: Takanobu Asanuma --- .../blockmanagement/DatanodeDescriptor.java | 47 ++++++++++++--- .../blockmanagement/DatanodeManager.java | 43 +++++++++----- .../blockmanagement/ErasureCodingWork.java | 2 +- .../hdfs/TestDecommissionWithStriped.java | 6 +- .../blockmanagement/TestDatanodeManager.java | 59 ++++++++++++------- 5 files changed, 107 insertions(+), 50 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index a2b7afedfd..c77d54591a 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@ -197,8 +197,10 @@ public Type getType() { /** A queue of blocks to be replicated by this datanode */ private final BlockQueue replicateBlocks = new BlockQueue<>(); - /** A queue of blocks to be erasure coded by this datanode */ - private final BlockQueue erasurecodeBlocks = + /** A queue of ec blocks to be replicated by this datanode. */ + private final BlockQueue ecBlocksToBeReplicated = new BlockQueue<>(); + /** A queue of ec blocks to be erasure coded by this datanode. */ + private final BlockQueue ecBlocksToBeErasureCoded = new BlockQueue<>(); /** A queue of blocks to be recovered by this datanode */ private final BlockQueue recoverBlocks = new BlockQueue<>(); @@ -358,7 +360,8 @@ public void clearBlockQueues() { } this.recoverBlocks.clear(); this.replicateBlocks.clear(); - this.erasurecodeBlocks.clear(); + this.ecBlocksToBeReplicated.clear(); + this.ecBlocksToBeErasureCoded.clear(); // pendingCached, cached, and pendingUncached are protected by the // FSN lock. this.pendingCached.clear(); @@ -678,6 +681,15 @@ public void addBlockToBeReplicated(Block block, replicateBlocks.offer(new BlockTargetPair(block, targets)); } + /** + * Store ec block to be replicated work. + */ + @VisibleForTesting + public void addECBlockToBeReplicated(Block block, DatanodeStorageInfo[] targets) { + assert (block != null && targets != null && targets.length > 0); + ecBlocksToBeReplicated.offer(new BlockTargetPair(block, targets)); + } + /** * Store block erasure coding work. */ @@ -687,9 +699,9 @@ void addBlockToBeErasureCoded(ExtendedBlock block, assert (block != null && sources != null && sources.length > 0); BlockECReconstructionInfo task = new BlockECReconstructionInfo(block, sources, targets, liveBlockIndices, excludeReconstrutedIndices, ecPolicy); - erasurecodeBlocks.offer(task); + ecBlocksToBeErasureCoded.offer(task); BlockManager.LOG.debug("Adding block reconstruction task " + task + "to " - + getName() + ", current queue size is " + erasurecodeBlocks.size()); + + getName() + ", current queue size is " + ecBlocksToBeErasureCoded.size()); } /** @@ -720,7 +732,8 @@ void addBlocksToBeInvalidated(List blocklist) { * The number of work items that are pending to be replicated. */ int getNumberOfBlocksToBeReplicated() { - return pendingReplicationWithoutTargets + replicateBlocks.size(); + return pendingReplicationWithoutTargets + replicateBlocks.size() + + ecBlocksToBeReplicated.size(); } /** @@ -728,7 +741,15 @@ int getNumberOfBlocksToBeReplicated() { */ @VisibleForTesting public int getNumberOfBlocksToBeErasureCoded() { - return erasurecodeBlocks.size(); + return ecBlocksToBeErasureCoded.size(); + } + + /** + * The number of ec work items that are pending to be replicated. + */ + @VisibleForTesting + public int getNumberOfECBlocksToBeReplicated() { + return ecBlocksToBeReplicated.size(); } @VisibleForTesting @@ -740,9 +761,13 @@ List getReplicationCommand(int maxTransfers) { return replicateBlocks.poll(maxTransfers); } + List getECReplicatedCommand(int maxTransfers) { + return ecBlocksToBeReplicated.poll(maxTransfers); + } + public List getErasureCodeCommand( int maxTransfers) { - return erasurecodeBlocks.poll(maxTransfers); + return ecBlocksToBeErasureCoded.poll(maxTransfers); } public BlockInfo[] getLeaseRecoveryCommand(int maxTransfers) { @@ -994,7 +1019,11 @@ public String dumpDatanode() { if (repl > 0) { sb.append(" ").append(repl).append(" blocks to be replicated;"); } - int ec = erasurecodeBlocks.size(); + int ecRepl = ecBlocksToBeReplicated.size(); + if (ecRepl > 0) { + sb.append(" ").append(ecRepl).append(" ec blocks to be replicated;"); + } + int ec = ecBlocksToBeErasureCoded.size(); if(ec > 0) { sb.append(" ").append(ec).append(" blocks to be erasure coded;"); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index b2c5cb0b55..88f3ac4e7c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -1825,28 +1825,41 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg, // Allocate _approximately_ maxTransfers pending tasks to DataNode. // NN chooses pending tasks based on the ratio between the lengths of // replication and erasure-coded block queues. - int totalReplicateBlocks = nodeinfo.getNumberOfReplicateBlocks(); - int totalECBlocks = nodeinfo.getNumberOfBlocksToBeErasureCoded(); - int totalBlocks = totalReplicateBlocks + totalECBlocks; + int replicationBlocks = nodeinfo.getNumberOfReplicateBlocks(); + int ecBlocksToBeReplicated = nodeinfo.getNumberOfECBlocksToBeReplicated(); + int ecBlocksToBeErasureCoded = nodeinfo.getNumberOfBlocksToBeErasureCoded(); + int totalBlocks = replicationBlocks + ecBlocksToBeReplicated + ecBlocksToBeErasureCoded; if (totalBlocks > 0) { - int maxTransfers; + int maxTransfers = blockManager.getMaxReplicationStreams() - xmitsInProgress; + int maxECReplicatedTransfers; if (nodeinfo.isDecommissionInProgress()) { - maxTransfers = blockManager.getReplicationStreamsHardLimit() + maxECReplicatedTransfers = blockManager.getReplicationStreamsHardLimit() - xmitsInProgress; } else { - maxTransfers = blockManager.getMaxReplicationStreams() - - xmitsInProgress; + maxECReplicatedTransfers = maxTransfers; } int numReplicationTasks = (int) Math.ceil( - (double) (totalReplicateBlocks * maxTransfers) / totalBlocks); - int numECTasks = (int) Math.ceil( - (double) (totalECBlocks * maxTransfers) / totalBlocks); - LOG.debug("Pending replication tasks: {} erasure-coded tasks: {}.", - numReplicationTasks, numECTasks); + (double) (replicationBlocks * maxTransfers) / totalBlocks); + int numEcReplicatedTasks = (int) Math.ceil( + (double) (ecBlocksToBeReplicated * maxECReplicatedTransfers) / totalBlocks); + int numECReconstructedTasks = (int) Math.ceil( + (double) (ecBlocksToBeErasureCoded * maxTransfers) / totalBlocks); + LOG.debug("Pending replication tasks: {} ec to be replicated tasks: {} " + + "ec reconstruction tasks: {}.", + numReplicationTasks, numEcReplicatedTasks, numECReconstructedTasks); // check pending replication tasks - List pendingList = nodeinfo.getReplicationCommand( + List pendingReplicationList = nodeinfo.getReplicationCommand( numReplicationTasks); - if (pendingList != null && !pendingList.isEmpty()) { + List pendingECReplicatedList = nodeinfo.getECReplicatedCommand( + numEcReplicatedTasks); + List pendingList = new ArrayList(); + if(pendingReplicationList != null && !pendingReplicationList.isEmpty()) { + pendingList.addAll(pendingReplicationList); + } + if(pendingECReplicatedList != null && !pendingECReplicatedList.isEmpty()) { + pendingList.addAll(pendingECReplicatedList); + } + if (!pendingList.isEmpty()) { // If the block is deleted, the block size will become // BlockCommand.NO_ACK (LONG.MAX_VALUE) . This kind of block we don't // need @@ -1868,7 +1881,7 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg, } // check pending erasure coding tasks List pendingECList = nodeinfo - .getErasureCodeCommand(numECTasks); + .getErasureCodeCommand(numECReconstructedTasks); if (pendingECList != null && !pendingECList.isEmpty()) { cmds.add(new BlockECReconstructionCommand( DNA_ERASURE_CODING_RECONSTRUCTION, pendingECList)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java index e5303a28d7..147f4c3fd6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java @@ -164,7 +164,7 @@ private void createReplicationWork(int sourceIndex, stripedBlk.getDataBlockNum(), blockIndex); final Block targetBlk = new Block(stripedBlk.getBlockId() + blockIndex, internBlkLen, stripedBlk.getGenerationStamp()); - source.addBlockToBeReplicated(targetBlk, + source.addECBlockToBeReplicated(targetBlk, new DatanodeStorageInfo[] {target}); LOG.debug("Add replication task from source {} to " + "target {} for EC block {}", source, target, targetBlk); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java index c68cb1707c..206f75eae7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java @@ -759,7 +759,7 @@ public void testDecommissionWithFailedReplicating() throws Exception { DatanodeInfo extraDn = getDatanodeOutOfTheBlock(blk); DatanodeDescriptor target = bm.getDatanodeManager() .getDatanode(extraDn.getDatanodeUuid()); - dn0.addBlockToBeReplicated(targetBlk, + dn0.addECBlockToBeReplicated(targetBlk, new DatanodeStorageInfo[] {target.getStorageInfos()[0]}); // dn0 replicates in success @@ -883,7 +883,7 @@ public void testDecommissionWithMissingBlock() throws Exception { .getDatanode(extraDn.getDatanodeUuid()); DatanodeDescriptor dnStartIndexDecommission = bm.getDatanodeManager() .getDatanode(dnLocs[decommNodeIndex].getDatanodeUuid()); - dnStartIndexDecommission.addBlockToBeReplicated(targetBlk, + dnStartIndexDecommission.addECBlockToBeReplicated(targetBlk, new DatanodeStorageInfo[] {target.getStorageInfos()[0]}); // Wait for replication success. @@ -972,7 +972,7 @@ public void testCountNodes() throws Exception{ DatanodeInfo extraDn = getDatanodeOutOfTheBlock(blk); DatanodeDescriptor target = bm.getDatanodeManager() .getDatanode(extraDn.getDatanodeUuid()); - dn0.addBlockToBeReplicated(targetBlk, + dn0.addECBlockToBeReplicated(targetBlk, new DatanodeStorageInfo[] {target.getStorageInfos()[0]}); // dn0 replicates in success diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java index 35ff36a856..015a0385a7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java @@ -967,20 +967,22 @@ public void testRemoveIncludedNode() throws IOException { * Verify the correctness of pending recovery process. * * @param numReplicationBlocks the number of replication blocks in the queue. - * @param numECBlocks number of EC blocks in the queue. + * @param numEcBlocksToBeReplicated the number of EC blocks to be replicated in the queue. + * @param numBlocksToBeErasureCoded number of EC blocks to be erasure coded in the queue. * @param maxTransfers the maxTransfer value. * @param maxTransfersHardLimit the maxTransfer hard limit value. - * @param numReplicationTasks the number of replication tasks polled from - * the queue. - * @param numECTasks the number of EC tasks polled from the queue. + * @param numReplicationTasks the number of replication tasks polled from the queue. + * @param numECTasksToBeReplicated the number of EC tasks to be replicated polled from the queue. + * @param numECTasksToBeErasureCoded the number of EC tasks to be erasure coded polled from + * the queue. * @param isDecommissioning if the node is in the decommissioning process. * * @throws IOException */ private void verifyPendingRecoveryTasks( - int numReplicationBlocks, int numECBlocks, - int maxTransfers, int maxTransfersHardLimit, - int numReplicationTasks, int numECTasks, boolean isDecommissioning) + int numReplicationBlocks, int numEcBlocksToBeReplicated, int numBlocksToBeErasureCoded, + int maxTransfers, int maxTransfersHardLimit, int numReplicationTasks, + int numECTasksToBeReplicated, int numECTasksToBeErasureCoded, boolean isDecommissioning) throws IOException { FSNamesystem fsn = Mockito.mock(FSNamesystem.class); Mockito.when(fsn.hasWriteLock()).thenReturn(true); @@ -1009,13 +1011,25 @@ private void verifyPendingRecoveryTasks( .thenReturn(tasks); } - if (numECBlocks > 0) { + if (numEcBlocksToBeReplicated > 0) { + Mockito.when(nodeInfo.getNumberOfECBlocksToBeReplicated()) + .thenReturn(numEcBlocksToBeReplicated); + + List ecReplicatedTasks = + Collections.nCopies( + Math.min(numECTasksToBeReplicated, numEcBlocksToBeReplicated), + new BlockTargetPair(null, null)); + Mockito.when(nodeInfo.getECReplicatedCommand(numECTasksToBeReplicated)) + .thenReturn(ecReplicatedTasks); + } + + if (numBlocksToBeErasureCoded > 0) { Mockito.when(nodeInfo.getNumberOfBlocksToBeErasureCoded()) - .thenReturn(numECBlocks); + .thenReturn(numBlocksToBeErasureCoded); List tasks = - Collections.nCopies(numECTasks, null); - Mockito.when(nodeInfo.getErasureCodeCommand(numECTasks)) + Collections.nCopies(numECTasksToBeErasureCoded, null); + Mockito.when(nodeInfo.getErasureCodeCommand(numECTasksToBeErasureCoded)) .thenReturn(tasks); } @@ -1026,42 +1040,43 @@ private void verifyPendingRecoveryTasks( SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT); long expectedNumCmds = Arrays.stream( - new int[]{numReplicationTasks, numECTasks}) + new int[]{numReplicationTasks + numECTasksToBeReplicated, numECTasksToBeErasureCoded}) .filter(x -> x > 0) .count(); assertEquals(expectedNumCmds, cmds.length); int idx = 0; - if (numReplicationTasks > 0) { + if (numReplicationTasks > 0 || numECTasksToBeReplicated > 0) { assertTrue(cmds[idx] instanceof BlockCommand); BlockCommand cmd = (BlockCommand) cmds[0]; - assertEquals(numReplicationTasks, cmd.getBlocks().length); - assertEquals(numReplicationTasks, cmd.getTargets().length); + assertEquals(numReplicationTasks + numECTasksToBeReplicated, cmd.getBlocks().length); + assertEquals(numReplicationTasks + numECTasksToBeReplicated, cmd.getTargets().length); idx++; } - if (numECTasks > 0) { + if (numECTasksToBeErasureCoded > 0) { assertTrue(cmds[idx] instanceof BlockECReconstructionCommand); BlockECReconstructionCommand cmd = (BlockECReconstructionCommand) cmds[idx]; - assertEquals(numECTasks, cmd.getECTasks().size()); + assertEquals(numECTasksToBeErasureCoded, cmd.getECTasks().size()); } Mockito.verify(nodeInfo).getReplicationCommand(numReplicationTasks); - Mockito.verify(nodeInfo).getErasureCodeCommand(numECTasks); + Mockito.verify(nodeInfo).getECReplicatedCommand(numECTasksToBeReplicated); + Mockito.verify(nodeInfo).getErasureCodeCommand(numECTasksToBeErasureCoded); } @Test public void testPendingRecoveryTasks() throws IOException { // Tasks are slitted according to the ratio between queue lengths. - verifyPendingRecoveryTasks(20, 20, 20, 30, 10, 10, false); - verifyPendingRecoveryTasks(40, 10, 20, 30, 16, 4, false); + verifyPendingRecoveryTasks(20, 0, 20, 20, 30, 10, 0, 10, false); + verifyPendingRecoveryTasks(40, 0, 10, 20, 30, 16, 0, 4, false); // Approximately load tasks if the ratio between queue length is large. - verifyPendingRecoveryTasks(400, 1, 20, 30, 20, 1, false); + verifyPendingRecoveryTasks(400, 0, 1, 20, 30, 20, 0, 1, false); // Tasks use dfs.namenode.replication.max-streams-hard-limit for decommissioning node - verifyPendingRecoveryTasks(30, 30, 20, 30, 15, 15, true); + verifyPendingRecoveryTasks(20, 10, 10, 20, 40, 10, 10, 5, true); } @Test