From 9e3fc40ecb5aff56163dcee6565368405b5100cd Mon Sep 17 00:00:00 2001 From: caozhiqiang Date: Fri, 17 Jun 2022 02:11:25 +0800 Subject: [PATCH] HDFS-16613. EC: Improve performance of decommissioning dn with many ec blocks (#4398) --- .../blockmanagement/DatanodeManager.java | 13 +++++++--- .../hdfs/server/namenode/FSNamesystem.java | 4 +-- .../blockmanagement/TestDatanodeManager.java | 25 +++++++++++++++---- 3 files changed, 31 insertions(+), 11 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index e75caeffef..548f22c563 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -1790,8 +1790,8 @@ private void addCacheCommands(String blockPoolId, DatanodeDescriptor nodeinfo, /** Handle heartbeat from datanodes. */ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg, StorageReport[] reports, final String blockPoolId, - long cacheCapacity, long cacheUsed, int xceiverCount, - int maxTransfers, int failedVolumes, + long cacheCapacity, long cacheUsed, int xceiverCount, + int xmitsInProgress, int failedVolumes, VolumeFailureSummary volumeFailureSummary, @Nonnull SlowPeerReports slowPeers, @Nonnull SlowDiskReports slowDisks) throws IOException { @@ -1835,6 +1835,14 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg, int totalECBlocks = nodeinfo.getNumberOfBlocksToBeErasureCoded(); int totalBlocks = totalReplicateBlocks + totalECBlocks; if (totalBlocks > 0) { + int maxTransfers; + if (nodeinfo.isDecommissionInProgress()) { + maxTransfers = blockManager.getReplicationStreamsHardLimit() + - xmitsInProgress; + } else { + maxTransfers = blockManager.getMaxReplicationStreams() + - xmitsInProgress; + } int numReplicationTasks = (int) Math.ceil( (double) (totalReplicateBlocks * maxTransfers) / totalBlocks); int numECTasks = (int) Math.ceil( @@ -2249,4 +2257,3 @@ public Map getDatanodeMap() { return datanodeMap; } } - diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index ab3c49fc26..13894b4fec 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -4393,11 +4393,9 @@ HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg, readLock(); try { //get datanode commands - final int maxTransfer = blockManager.getMaxReplicationStreams() - - xmitsInProgress; DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat( nodeReg, reports, getBlockPoolId(), cacheCapacity, cacheUsed, - xceiverCount, maxTransfer, failedVolumes, volumeFailureSummary, + xceiverCount, xmitsInProgress, failedVolumes, volumeFailureSummary, slowPeers, slowDisks); long blockReportLeaseId = 0; if (requestFullBlockReportLease) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java index 232424d440..35ff36a856 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java @@ -86,6 +86,10 @@ public class TestDatanodeManager { private static DatanodeManager mockDatanodeManager( FSNamesystem fsn, Configuration conf) throws IOException { BlockManager bm = Mockito.mock(BlockManager.class); + Mockito.when(bm.getMaxReplicationStreams()).thenReturn( + conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 2)); + Mockito.when(bm.getReplicationStreamsHardLimit()).thenReturn( + conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY, 2)); BlockReportLeaseManager blm = new BlockReportLeaseManager(conf); Mockito.when(bm.getBlockReportLeaseManager()).thenReturn(blm); DatanodeManager dm = new DatanodeManager(bm, fsn, conf); @@ -965,25 +969,33 @@ public void testRemoveIncludedNode() throws IOException { * @param numReplicationBlocks the number of replication blocks in the queue. * @param numECBlocks number of EC blocks in the queue. * @param maxTransfers the maxTransfer value. + * @param maxTransfersHardLimit the maxTransfer hard limit value. * @param numReplicationTasks the number of replication tasks polled from * the queue. * @param numECTasks the number of EC tasks polled from the queue. + * @param isDecommissioning if the node is in the decommissioning process. * * @throws IOException */ private void verifyPendingRecoveryTasks( int numReplicationBlocks, int numECBlocks, - int maxTransfers, int numReplicationTasks, int numECTasks) + int maxTransfers, int maxTransfersHardLimit, + int numReplicationTasks, int numECTasks, boolean isDecommissioning) throws IOException { FSNamesystem fsn = Mockito.mock(FSNamesystem.class); Mockito.when(fsn.hasWriteLock()).thenReturn(true); Configuration conf = new Configuration(); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, maxTransfers); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY, + maxTransfersHardLimit); DatanodeManager dm = Mockito.spy(mockDatanodeManager(fsn, conf)); DatanodeDescriptor nodeInfo = Mockito.mock(DatanodeDescriptor.class); Mockito.when(nodeInfo.isRegistered()).thenReturn(true); Mockito.when(nodeInfo.getStorageInfos()) .thenReturn(new DatanodeStorageInfo[0]); + Mockito.when(nodeInfo.isDecommissionInProgress()) + .thenReturn(isDecommissioning); if (numReplicationBlocks > 0) { Mockito.when(nodeInfo.getNumberOfReplicateBlocks()) @@ -1010,7 +1022,7 @@ private void verifyPendingRecoveryTasks( DatanodeRegistration dnReg = Mockito.mock(DatanodeRegistration.class); Mockito.when(dm.getDatanode(dnReg)).thenReturn(nodeInfo); DatanodeCommand[] cmds = dm.handleHeartbeat( - dnReg, new StorageReport[1], "bp-123", 0, 0, 10, maxTransfers, 0, null, + dnReg, new StorageReport[1], "bp-123", 0, 0, 10, 0, 0, null, SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT); long expectedNumCmds = Arrays.stream( @@ -1042,11 +1054,14 @@ private void verifyPendingRecoveryTasks( @Test public void testPendingRecoveryTasks() throws IOException { // Tasks are slitted according to the ratio between queue lengths. - verifyPendingRecoveryTasks(20, 20, 20, 10, 10); - verifyPendingRecoveryTasks(40, 10, 20, 16, 4); + verifyPendingRecoveryTasks(20, 20, 20, 30, 10, 10, false); + verifyPendingRecoveryTasks(40, 10, 20, 30, 16, 4, false); // Approximately load tasks if the ratio between queue length is large. - verifyPendingRecoveryTasks(400, 1, 20, 20, 1); + verifyPendingRecoveryTasks(400, 1, 20, 30, 20, 1, false); + + // Tasks use dfs.namenode.replication.max-streams-hard-limit for decommissioning node + verifyPendingRecoveryTasks(30, 30, 20, 30, 15, 15, true); } @Test