HDFS-17294. Reconfigure the scheduling cycle of the slowPeerCollectorDaemon thread. (#6366)

Signed-off-by: Takanobu Asanuma <tasanuma@apache.org>
This commit is contained in:
huangzhaobo 2023-12-19 08:32:47 +08:00 committed by GitHub
parent 62cc673d00
commit 95ea31fafb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 83 additions and 5 deletions

View File

@ -211,7 +211,7 @@ public class DatanodeManager {
private SlowPeerTracker slowPeerTracker;
private static Set<String> slowNodesUuidSet = Sets.newConcurrentHashSet();
private Daemon slowPeerCollectorDaemon;
private final long slowPeerCollectionInterval;
private volatile long slowPeerCollectionInterval;
private volatile int maxSlowPeerReportNodes;
@Nullable
@ -408,7 +408,7 @@ public class DatanodeManager {
LOG.info("Slow peers collection thread start.");
}
public void stopSlowPeerCollector() {
private void stopSlowPeerCollector() {
LOG.info("Slow peers collection thread shutdown");
if (slowPeerCollectorDaemon == null) {
return;
@ -424,6 +424,17 @@ public class DatanodeManager {
}
}
public void restartSlowPeerCollector(long interval) {
Preconditions.checkNotNull(slowPeerCollectorDaemon,
"slowPeerCollectorDaemon thread is null, not support restart");
stopSlowPeerCollector();
Preconditions.checkNotNull(slowPeerTracker, "slowPeerTracker should not be un-assigned");
this.slowPeerCollectionInterval = interval;
if (slowPeerTracker.isSlowPeerTrackerEnabled()) {
startSlowPeerCollector();
}
}
private static long getStaleIntervalFromConf(Configuration conf,
long heartbeatExpireInterval) {
long staleInterval = conf.getLong(
@ -2289,4 +2300,9 @@ public class DatanodeManager {
public boolean isSlowPeerCollectorInitialized() {
return slowPeerCollectorDaemon == null;
}
@VisibleForTesting
public long getSlowPeerCollectionInterval() {
return slowPeerCollectionInterval;
}
}

View File

@ -143,6 +143,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LOCK_DETAILED_ME
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LOCK_DETAILED_METRICS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_WRITE_LOCK_REPORTING_THRESHOLD_MS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_WRITE_LOCK_REPORTING_THRESHOLD_MS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_MIN_BLOCKS_FOR_WRITE_DEFAULT;
@ -380,7 +382,8 @@ public class NameNode extends ReconfigurableBase implements
IPC_SERVER_LOG_SLOW_RPC_THRESHOLD_MS_KEY,
DFS_NAMENODE_LOCK_DETAILED_METRICS_KEY,
DFS_NAMENODE_WRITE_LOCK_REPORTING_THRESHOLD_MS_KEY,
DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY));
DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY,
DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_KEY));
private static final String USAGE = "Usage: hdfs namenode ["
+ StartupOption.BACKUP.getName() + "] | \n\t["
@ -2374,7 +2377,8 @@ public class NameNode extends ReconfigurableBase implements
DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY)) || (property.equals(
DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY)) || (property.equals(
DFS_DATANODE_PEER_STATS_ENABLED_KEY)) || property.equals(
DFS_DATANODE_MAX_NODES_TO_REPORT_KEY)) {
DFS_DATANODE_MAX_NODES_TO_REPORT_KEY) || property.equals(
DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_KEY)) {
return reconfigureSlowNodesParameters(datanodeManager, property, newVal);
} else if (property.equals(DFS_BLOCK_INVALIDATE_LIMIT_KEY)) {
return reconfigureBlockInvalidateLimit(datanodeManager, property, newVal);
@ -2673,6 +2677,24 @@ public class NameNode extends ReconfigurableBase implements
datanodeManager.setMaxSlowPeersToReport(maxSlowPeersToReport);
break;
}
case DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_KEY: {
if (newVal == null) {
// set to the value of the current system or default
long defaultInterval =
getConf().getTimeDuration(DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_KEY,
DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS);
datanodeManager.restartSlowPeerCollector(defaultInterval);
result = Long.toString(defaultInterval);
} else {
// set to other value
long newInterval =
getConf().getTimeDurationHelper(DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_DEFAULT,
newVal, TimeUnit.MILLISECONDS);
datanodeManager.restartSlowPeerCollector(newInterval);
result = newVal;
}
break;
}
default: {
throw new IllegalArgumentException(
"Unexpected property " + property + " in reconfigureSlowNodesParameters");

View File

@ -36,6 +36,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_NODES_TO_REP
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LOCK_DETAILED_METRICS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_WRITE_LOCK_REPORTING_THRESHOLD_MS_KEY;
import static org.junit.Assert.*;
@ -822,6 +823,45 @@ public class TestNameNodeReconfigure {
}
}
@Test
public void testReconfigureSlowPeerCollectInterval() throws Exception {
final NameNode nameNode = cluster.getNameNode();
final DatanodeManager datanodeManager =
nameNode.namesystem.getBlockManager().getDatanodeManager();
assertFalse("SlowNode tracker is already enabled. It should be disabled by default",
datanodeManager.getSlowPeerTracker().isSlowPeerTrackerEnabled());
assertTrue(datanodeManager.isSlowPeerCollectorInitialized());
try {
nameNode.reconfigureProperty(DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_KEY, "10m");
} catch (NullPointerException e) {
assertEquals("slowPeerCollectorDaemon thread is null, not support restart", e.getMessage());
}
nameNode.reconfigureProperty(DFS_DATANODE_PEER_STATS_ENABLED_KEY, "True");
assertTrue("SlowNode tracker is still disabled. Reconfiguration could not be successful",
datanodeManager.getSlowPeerTracker().isSlowPeerTrackerEnabled());
assertFalse(datanodeManager.isSlowPeerCollectorInitialized());
assertEquals(1800000, datanodeManager.getSlowPeerCollectionInterval());
try {
nameNode.reconfigureProperty(DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_KEY, "non-numeric");
} catch (ReconfigurationException e) {
assertEquals("Could not change property dfs.namenode.slowpeer.collect.interval from "
+ "'30m' to 'non-numeric'", e.getMessage());
}
nameNode.reconfigureProperty(DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_KEY, "10m");
assertFalse(datanodeManager.isSlowPeerCollectorInitialized());
assertEquals(600000, datanodeManager.getSlowPeerCollectionInterval());
nameNode.reconfigureProperty(DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_KEY, null);
assertFalse(datanodeManager.isSlowPeerCollectorInitialized());
// set to the value of the current system
assertEquals(600000, datanodeManager.getSlowPeerCollectionInterval());
}
@After
public void shutDown() throws IOException {
if (cluster != null) {

View File

@ -442,7 +442,7 @@ public class TestDFSAdmin {
final List<String> outs = Lists.newArrayList();
final List<String> errs = Lists.newArrayList();
getReconfigurableProperties("namenode", address, outs, errs);
assertEquals(28, outs.size());
assertEquals(29, outs.size());
assertTrue(outs.get(0).contains("Reconfigurable properties:"));
assertEquals(DFS_BLOCK_INVALIDATE_LIMIT_KEY, outs.get(1));
assertEquals(DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY, outs.get(2));