HDFS-16327. Make DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY reconfigurable (#3716)

This commit is contained in:
litao 2021-12-14 10:19:06 +08:00 committed by GitHub
parent a5bcf4c792
commit c56a07f36b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 50 additions and 8 deletions

View File

@ -214,7 +214,7 @@ public class DatanodeManager {
private static Set<String> slowNodesUuidSet = Sets.newConcurrentHashSet(); private static Set<String> slowNodesUuidSet = Sets.newConcurrentHashSet();
private Daemon slowPeerCollectorDaemon; private Daemon slowPeerCollectorDaemon;
private final long slowPeerCollectionInterval; private final long slowPeerCollectionInterval;
private final int maxSlowPeerReportNodes; private volatile int maxSlowPeerReportNodes;
@Nullable @Nullable
private final SlowDiskTracker slowDiskTracker; private final SlowDiskTracker slowDiskTracker;
@ -515,6 +515,15 @@ public boolean getEnableAvoidSlowDataNodesForRead() {
return this.avoidSlowDataNodesForRead; return this.avoidSlowDataNodesForRead;
} }
public void setMaxSlowpeerCollectNodes(int maxNodes) {
this.maxSlowPeerReportNodes = maxNodes;
}
@VisibleForTesting
public int getMaxSlowpeerCollectNodes() {
return this.maxSlowPeerReportNodes;
}
/** /**
* Sort the non-striped located blocks by the distance to the target host. * Sort the non-striped located blocks by the distance to the target host.
* *

View File

@ -191,6 +191,8 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_DEFAULT;
import static org.apache.hadoop.util.ExitUtil.terminate; import static org.apache.hadoop.util.ExitUtil.terminate;
import static org.apache.hadoop.util.ToolRunner.confirmPrompt; import static org.apache.hadoop.util.ToolRunner.confirmPrompt;
@ -334,7 +336,8 @@ public enum OperationCategory {
DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY, DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY,
DFS_IMAGE_PARALLEL_LOAD_KEY, DFS_IMAGE_PARALLEL_LOAD_KEY,
DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY, DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY,
DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY)); DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY,
DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY));
private static final String USAGE = "Usage: hdfs namenode [" private static final String USAGE = "Usage: hdfs namenode ["
+ StartupOption.BACKUP.getName() + "] | \n\t[" + StartupOption.BACKUP.getName() + "] | \n\t["
@ -2204,7 +2207,8 @@ protected String reconfigurePropertyImpl(String property, String newVal)
} else if (property.equals(DFS_IMAGE_PARALLEL_LOAD_KEY)) { } else if (property.equals(DFS_IMAGE_PARALLEL_LOAD_KEY)) {
return reconfigureParallelLoad(newVal); return reconfigureParallelLoad(newVal);
} else if (property.equals(DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY) } else if (property.equals(DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY)
|| (property.equals(DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY))) { || (property.equals(DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY))
|| (property.equals(DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY))) {
return reconfigureSlowNodesParameters(datanodeManager, property, newVal); return reconfigureSlowNodesParameters(datanodeManager, property, newVal);
} else { } else {
throw new ReconfigurationException(property, newVal, getConf().get( throw new ReconfigurationException(property, newVal, getConf().get(
@ -2396,24 +2400,32 @@ String reconfigureSlowNodesParameters(final DatanodeManager datanodeManager,
final String property, final String newVal) throws ReconfigurationException { final String property, final String newVal) throws ReconfigurationException {
BlockManager bm = namesystem.getBlockManager(); BlockManager bm = namesystem.getBlockManager();
namesystem.writeLock(); namesystem.writeLock();
boolean enable; String result;
try { try {
if (property.equals(DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY)) { if (property.equals(DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY)) {
enable = (newVal == null ? DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_DEFAULT : boolean enable = (newVal == null ? DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_DEFAULT :
Boolean.parseBoolean(newVal)); Boolean.parseBoolean(newVal));
result = Boolean.toString(enable);
datanodeManager.setAvoidSlowDataNodesForReadEnabled(enable); datanodeManager.setAvoidSlowDataNodesForReadEnabled(enable);
} else if (property.equals( } else if (property.equals(
DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY)) { DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY)) {
enable = (newVal == null ? boolean enable = (newVal == null ?
DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_DEFAULT : DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_DEFAULT :
Boolean.parseBoolean(newVal)); Boolean.parseBoolean(newVal));
result = Boolean.toString(enable);
bm.setExcludeSlowNodesEnabled(enable); bm.setExcludeSlowNodesEnabled(enable);
} else if (property.equals(DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY)) {
int maxSlowpeerCollectNodes = (newVal == null ?
DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_DEFAULT :
Integer.parseInt(newVal));
result = Integer.toString(maxSlowpeerCollectNodes);
datanodeManager.setMaxSlowpeerCollectNodes(maxSlowpeerCollectNodes);
} else { } else {
throw new IllegalArgumentException("Unexpected property " + throw new IllegalArgumentException("Unexpected property " +
property + " in reconfigureSlowNodesParameters"); property + " in reconfigureSlowNodesParameters");
} }
LOG.info("RECONFIGURE* changed {} to {}", property, newVal); LOG.info("RECONFIGURE* changed {} to {}", property, newVal);
return Boolean.toString(enable); return result;
} catch (IllegalArgumentException e) { } catch (IllegalArgumentException e) {
throw new ReconfigurationException(property, newVal, getConf().get( throw new ReconfigurationException(property, newVal, getConf().get(
property), e); property), e);

View File

@ -55,6 +55,7 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_BACKOFF_ENABLE_DEFAULT; import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_BACKOFF_ENABLE_DEFAULT;
public class TestNameNodeReconfigure { public class TestNameNodeReconfigure {
@ -430,6 +431,24 @@ public void testEnableSlowNodesParametersAfterReconfigured()
getExcludeSlowNodesEnabled(BlockType.STRIPED)); getExcludeSlowNodesEnabled(BlockType.STRIPED));
} }
@Test
public void testReconfigureMaxSlowpeerCollectNodes()
throws ReconfigurationException {
final NameNode nameNode = cluster.getNameNode();
final DatanodeManager datanodeManager = nameNode.namesystem
.getBlockManager().getDatanodeManager();
// By default, DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY is 5.
assertEquals(5, datanodeManager.getMaxSlowpeerCollectNodes());
// Reconfigure.
nameNode.reconfigureProperty(
DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY, Integer.toString(10));
// Assert DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY is 10.
assertEquals(10, datanodeManager.getMaxSlowpeerCollectNodes());
}
@After @After
public void shutDown() throws IOException { public void shutDown() throws IOException {
if (cluster != null) { if (cluster != null) {

View File

@ -90,6 +90,7 @@
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY;
import static org.apache.hadoop.hdfs.client.HdfsAdmin.TRASH_PERMISSION; import static org.apache.hadoop.hdfs.client.HdfsAdmin.TRASH_PERMISSION;
import static org.hamcrest.CoreMatchers.allOf; import static org.hamcrest.CoreMatchers.allOf;
import static org.hamcrest.CoreMatchers.anyOf; import static org.hamcrest.CoreMatchers.anyOf;
@ -432,7 +433,7 @@ public void testNameNodeGetReconfigurableProperties() throws IOException {
final List<String> outs = Lists.newArrayList(); final List<String> outs = Lists.newArrayList();
final List<String> errs = Lists.newArrayList(); final List<String> errs = Lists.newArrayList();
getReconfigurableProperties("namenode", address, outs, errs); getReconfigurableProperties("namenode", address, outs, errs);
assertEquals(15, outs.size()); assertEquals(16, outs.size());
assertEquals(DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY, outs.get(1)); assertEquals(DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY, outs.get(1));
assertEquals(DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, outs.get(2)); assertEquals(DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, outs.get(2));
assertEquals(DFS_HEARTBEAT_INTERVAL_KEY, outs.get(3)); assertEquals(DFS_HEARTBEAT_INTERVAL_KEY, outs.get(3));
@ -440,6 +441,7 @@ public void testNameNodeGetReconfigurableProperties() throws IOException {
assertEquals(DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY, outs.get(5)); assertEquals(DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY, outs.get(5));
assertEquals(DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY, outs.get(6)); assertEquals(DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY, outs.get(6));
assertEquals(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, outs.get(7)); assertEquals(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, outs.get(7));
assertEquals(DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY, outs.get(8));
assertEquals(errs.size(), 0); assertEquals(errs.size(), 0);
} }