From 033ceca090cbc3f6684e181b2ef7a645beed5175 Mon Sep 17 00:00:00 2001 From: huhaiyang Date: Thu, 10 Nov 2022 13:37:09 +0800 Subject: [PATCH] HDFS-16811. Support DecommissionBackoffMonitor parameters reconfigurable (#5122) Signed-off-by: Tao Li --- .../DatanodeAdminBackoffMonitor.java | 27 +++++- .../DatanodeAdminDefaultMonitor.java | 23 +++++ .../blockmanagement/DatanodeAdminManager.java | 26 ++++++ .../DatanodeAdminMonitorInterface.java | 8 ++ .../hadoop/hdfs/server/namenode/NameNode.java | 44 +++++++++- .../namenode/TestNameNodeReconfigure.java | 85 +++++++++++++++++++ .../hadoop/hdfs/tools/TestDFSAdmin.java | 10 ++- 7 files changed, 214 insertions(+), 9 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminBackoffMonitor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminBackoffMonitor.java index 656003ffad..a4eb1776fb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminBackoffMonitor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminBackoffMonitor.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hdfs.server.namenode.INodeId; import org.apache.hadoop.hdfs.util.LightWeightHashSet; import org.apache.hadoop.hdfs.util.LightWeightLinkedSet; +import org.apache.hadoop.classification.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.HashMap; @@ -70,10 +71,10 @@ public class DatanodeAdminBackoffMonitor extends DatanodeAdminMonitorBase outOfServiceNodeBlocks = new HashMap<>(); /** - * The numbe of blocks to process when moving blocks to pendingReplication + * The number of blocks to process when moving blocks to pendingReplication * before releasing and reclaiming the namenode lock. */ - private int blocksPerLock; + private volatile int blocksPerLock; /** * The number of blocks that have been checked on this tick. @@ -82,7 +83,7 @@ public class DatanodeAdminBackoffMonitor extends DatanodeAdminMonitorBase /** * The maximum number of blocks to hold in PendingRep at any time. */ - private int pendingRepLimit; + private volatile int pendingRepLimit; /** * The list of blocks which have been placed onto the replication queue @@ -801,6 +802,26 @@ private boolean isBlockReplicatedOk(DatanodeDescriptor datanode, return false; } + @VisibleForTesting + @Override + public int getPendingRepLimit() { + return pendingRepLimit; + } + + public void setPendingRepLimit(int pendingRepLimit) { + this.pendingRepLimit = pendingRepLimit; + } + + @VisibleForTesting + @Override + public int getBlocksPerLock() { + return blocksPerLock; + } + + public void setBlocksPerLock(int blocksPerLock) { + this.blocksPerLock = blocksPerLock; + } + static class BlockStats { private LightWeightHashSet openFiles = new LightWeightLinkedSet<>(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminDefaultMonitor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminDefaultMonitor.java index 49ccf80bb4..f43f8cf10d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminDefaultMonitor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminDefaultMonitor.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hdfs.util.LightWeightHashSet; import org.apache.hadoop.hdfs.util.LightWeightLinkedSet; import org.apache.hadoop.util.ChunkedArrayList; +import org.apache.hadoop.classification.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -137,6 +138,28 @@ public int getNumNodesChecked() { return numNodesChecked; } + @VisibleForTesting + @Override + public int getPendingRepLimit() { + return 0; + } + + @Override + public void setPendingRepLimit(int pendingRepLimit) { + // nothing. + } + + @VisibleForTesting + @Override + public int getBlocksPerLock() { + return 0; + } + + @Override + public void setBlocksPerLock(int blocksPerLock) { + // nothing. + } + @Override public void run() { LOG.debug("DatanodeAdminMonitor is running."); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminManager.java index 3318ee7fb4..2ccc1eb253 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminManager.java @@ -416,4 +416,30 @@ void runMonitorForTest() throws ExecutionException, InterruptedException { executor.submit(monitor).get(); } + public void refreshPendingRepLimit(int pendingRepLimit, String key) { + ensurePositiveInt(pendingRepLimit, key); + this.monitor.setPendingRepLimit(pendingRepLimit); + } + + @VisibleForTesting + public int getPendingRepLimit() { + return this.monitor.getPendingRepLimit(); + } + + public void refreshBlocksPerLock(int blocksPerLock, String key) { + ensurePositiveInt(blocksPerLock, key); + this.monitor.setBlocksPerLock(blocksPerLock); + } + + @VisibleForTesting + public int getBlocksPerLock() { + return this.monitor.getBlocksPerLock(); + } + + private void ensurePositiveInt(int val, String key) { + checkArgument( + (val > 0), + key + " = '" + val + "' is invalid. " + + "It should be a positive, non-zero integer value."); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminMonitorInterface.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminMonitorInterface.java index 89673a759e..a477474210 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminMonitorInterface.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminMonitorInterface.java @@ -37,4 +37,12 @@ public interface DatanodeAdminMonitorInterface extends Runnable { void setBlockManager(BlockManager bm); void setDatanodeAdminManager(DatanodeAdminManager dnm); void setNameSystem(Namesystem ns); + + int getPendingRepLimit(); + + void setPendingRepLimit(int pendingRepLimit); + + int getBlocksPerLock(); + + void setBlocksPerLock(int blocksPerLock); } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java index 204074c4c5..62a35c201b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java @@ -197,6 +197,10 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_LIMIT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_LIMIT_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_BLOCKS_PER_LOCK; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_BLOCKS_PER_LOCK_DEFAULT; import static org.apache.hadoop.util.ExitUtil.terminate; import static org.apache.hadoop.util.ToolRunner.confirmPrompt; @@ -343,7 +347,9 @@ public enum OperationCategory { DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY, DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY, DFS_BLOCK_INVALIDATE_LIMIT_KEY, - DFS_DATANODE_PEER_STATS_ENABLED_KEY)); + DFS_DATANODE_PEER_STATS_ENABLED_KEY, + DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_LIMIT, + DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_BLOCKS_PER_LOCK)); private static final String USAGE = "Usage: hdfs namenode [" + StartupOption.BACKUP.getName() + "] | \n\t[" @@ -1848,7 +1854,7 @@ public static void main(String argv[]) throws Exception { } } - synchronized void monitorHealth() + synchronized void monitorHealth() throws HealthCheckFailedException, AccessControlException { namesystem.checkSuperuserPrivilege(); if (!haEnabled) { @@ -1872,7 +1878,7 @@ synchronized void monitorHealth() } } - synchronized void transitionToActive() + synchronized void transitionToActive() throws ServiceFailedException, AccessControlException { namesystem.checkSuperuserPrivilege(); if (!haEnabled) { @@ -2216,6 +2222,10 @@ protected String reconfigurePropertyImpl(String property, String newVal) return reconfigureSlowNodesParameters(datanodeManager, property, newVal); } else if (property.equals(DFS_BLOCK_INVALIDATE_LIMIT_KEY)) { return reconfigureBlockInvalidateLimit(datanodeManager, property, newVal); + } else if (property.equals(DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_LIMIT) || + (property.equals(DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_BLOCKS_PER_LOCK))) { + return reconfigureDecommissionBackoffMonitorParameters(datanodeManager, property, + newVal); } else { throw new ReconfigurationException(property, newVal, getConf().get( property)); @@ -2481,6 +2491,34 @@ private String reconfigureBlockInvalidateLimit(final DatanodeManager datanodeMan } } + private String reconfigureDecommissionBackoffMonitorParameters( + final DatanodeManager datanodeManager, final String property, final String newVal) + throws ReconfigurationException { + String newSetting = null; + try { + if (property.equals(DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_LIMIT)) { + int pendingRepLimit = (newVal == null ? + DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_LIMIT_DEFAULT : + Integer.parseInt(newVal)); + datanodeManager.getDatanodeAdminManager().refreshPendingRepLimit(pendingRepLimit, + DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_LIMIT); + newSetting = String.valueOf(datanodeManager.getDatanodeAdminManager().getPendingRepLimit()); + } else if (property.equals( + DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_BLOCKS_PER_LOCK)) { + int blocksPerLock = (newVal == null ? + DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_BLOCKS_PER_LOCK_DEFAULT : + Integer.parseInt(newVal)); + datanodeManager.getDatanodeAdminManager().refreshBlocksPerLock(blocksPerLock, + DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_BLOCKS_PER_LOCK); + newSetting = String.valueOf(datanodeManager.getDatanodeAdminManager().getBlocksPerLock()); + } + LOG.info("RECONFIGURE* changed reconfigureDecommissionBackoffMonitorParameters {} to {}", + property, newSetting); + return newSetting; + } catch (IllegalArgumentException e) { + throw new ReconfigurationException(property, newVal, getConf().get(property), e); + } + } @Override // ReconfigurableBase protected Configuration getNewConf() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeReconfigure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeReconfigure.java index fc63e9de2c..cc60809dfb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeReconfigure.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeReconfigure.java @@ -41,6 +41,8 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfyManager; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeAdminBackoffMonitor; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeAdminMonitorInterface; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.test.GenericTestUtils; @@ -57,6 +59,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_LIMIT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_BLOCKS_PER_LOCK; import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_BACKOFF_ENABLE_DEFAULT; public class TestNameNodeReconfigure { @@ -509,6 +513,87 @@ public void testSlowPeerTrackerEnabled() throws Exception { } + @Test + public void testReconfigureDecommissionBackoffMonitorParameters() + throws ReconfigurationException, IOException { + Configuration conf = new HdfsConfiguration(); + conf.setClass(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_MONITOR_CLASS, + DatanodeAdminBackoffMonitor.class, DatanodeAdminMonitorInterface.class); + int defaultPendingRepLimit = 1000; + conf.setInt(DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_LIMIT, defaultPendingRepLimit); + int defaultBlocksPerLock = 1000; + conf.setInt(DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_BLOCKS_PER_LOCK, + defaultBlocksPerLock); + + try (MiniDFSCluster newCluster = new MiniDFSCluster.Builder(conf).build()) { + newCluster.waitActive(); + final NameNode nameNode = newCluster.getNameNode(); + final DatanodeManager datanodeManager = nameNode.namesystem + .getBlockManager().getDatanodeManager(); + + // verify defaultPendingRepLimit. + assertEquals(datanodeManager.getDatanodeAdminManager().getPendingRepLimit(), + defaultPendingRepLimit); + + // try invalid pendingRepLimit. + try { + nameNode.reconfigureProperty(DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_LIMIT, + "non-numeric"); + fail("Should not reach here"); + } catch (ReconfigurationException e) { + assertEquals("Could not change property " + + "dfs.namenode.decommission.backoff.monitor.pending.limit from '" + + defaultPendingRepLimit + "' to 'non-numeric'", e.getMessage()); + } + + try { + nameNode.reconfigureProperty(DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_LIMIT, + "-1"); + fail("Should not reach here"); + } catch (ReconfigurationException e) { + assertEquals("Could not change property " + + "dfs.namenode.decommission.backoff.monitor.pending.limit from '" + + defaultPendingRepLimit + "' to '-1'", e.getMessage()); + } + + // try correct pendingRepLimit. + nameNode.reconfigureProperty(DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_LIMIT, + "20000"); + assertEquals(datanodeManager.getDatanodeAdminManager().getPendingRepLimit(), 20000); + + // verify defaultBlocksPerLock. + assertEquals(datanodeManager.getDatanodeAdminManager().getBlocksPerLock(), + defaultBlocksPerLock); + + // try invalid blocksPerLock. + try { + nameNode.reconfigureProperty( + DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_BLOCKS_PER_LOCK, + "non-numeric"); + fail("Should not reach here"); + } catch (ReconfigurationException e) { + assertEquals("Could not change property " + + "dfs.namenode.decommission.backoff.monitor.pending.blocks.per.lock from '" + + defaultBlocksPerLock + "' to 'non-numeric'", e.getMessage()); + } + + try { + nameNode.reconfigureProperty( + DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_BLOCKS_PER_LOCK, "-1"); + fail("Should not reach here"); + } catch (ReconfigurationException e) { + assertEquals("Could not change property " + + "dfs.namenode.decommission.backoff.monitor.pending.blocks.per.lock from '" + + defaultBlocksPerLock + "' to '-1'", e.getMessage()); + } + + // try correct blocksPerLock. + nameNode.reconfigureProperty( + DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_BLOCKS_PER_LOCK, "10000"); + assertEquals(datanodeManager.getDatanodeAdminManager().getBlocksPerLock(), 10000); + } + } + @After public void shutDown() throws IOException { if (cluster != null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java index 5b0646f3d1..00cd952b03 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java @@ -29,6 +29,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_LIMIT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_BLOCKS_PER_LOCK; import java.util.function.Supplier; import org.apache.hadoop.thirdparty.com.google.common.collect.Lists; @@ -429,7 +431,7 @@ public void testNameNodeGetReconfigurableProperties() throws IOException, Interr final List outs = Lists.newArrayList(); final List errs = Lists.newArrayList(); getReconfigurableProperties("namenode", address, outs, errs); - assertEquals(18, outs.size()); + assertEquals(20, outs.size()); assertTrue(outs.get(0).contains("Reconfigurable properties:")); assertEquals(DFS_BLOCK_INVALIDATE_LIMIT_KEY, outs.get(1)); assertEquals(DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY, outs.get(2)); @@ -439,8 +441,10 @@ public void testNameNodeGetReconfigurableProperties() throws IOException, Interr assertEquals(DFS_IMAGE_PARALLEL_LOAD_KEY, outs.get(6)); assertEquals(DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY, outs.get(7)); assertEquals(DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY, outs.get(8)); - assertEquals(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, outs.get(9)); - assertEquals(DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY, outs.get(10)); + assertEquals(DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_BLOCKS_PER_LOCK, outs.get(9)); + assertEquals(DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_LIMIT, outs.get(10)); + assertEquals(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, outs.get(11)); + assertEquals(DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY, outs.get(12)); assertEquals(errs.size(), 0); }