HDFS-15075. Remove process command timing from BPServiceActor. Contributed by Xiaoqiao He.

This commit is contained in:
Inigo Goiri 2020-03-25 11:30:54 -07:00
parent 6ce189c621
commit cdcb77a2c5
6 changed files with 51 additions and 8 deletions

View File

@ -443,6 +443,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final boolean DFS_DATANODE_PMEM_CACHE_RECOVERY_DEFAULT =
true;
public static final String DFS_DATANODE_PROCESS_COMMANDS_THRESHOLD_KEY =
"dfs.datanode.processcommands.threshold";
public static final long DFS_DATANODE_PROCESS_COMMANDS_THRESHOLD_DEFAULT =
TimeUnit.SECONDS.toMillis(2);
public static final String DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_KEY = "dfs.namenode.datanode.registration.ip-hostname-check";
public static final boolean DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_DEFAULT = true;

View File

@ -702,15 +702,7 @@ private void offerService() throws Exception {
if (state == HAServiceState.ACTIVE) {
handleRollingUpgradeStatus(resp);
}
long startProcessCommands = monotonicNow();
commandProcessingThread.enqueue(resp.getCommands());
long endProcessCommands = monotonicNow();
if (endProcessCommands - startProcessCommands > 2000) {
LOG.info("Took " + (endProcessCommands - startProcessCommands)
+ "ms to process " + resp.getCommands().length
+ " commands from NN");
}
}
}
if (!dn.areIBRDisabledForTests() &&
@ -1353,6 +1345,7 @@ private void processQueue() {
*/
private boolean processCommand(DatanodeCommand[] cmds) {
if (cmds != null) {
long startProcessCommands = monotonicNow();
for (DatanodeCommand cmd : cmds) {
try {
if (!bpos.processCommandFromActor(cmd, actor)) {
@ -1371,6 +1364,14 @@ private boolean processCommand(DatanodeCommand[] cmds) {
LOG.warn("Error processing datanode Command", ioe);
}
}
long processCommandsMs = monotonicNow() - startProcessCommands;
if (cmds.length > 0) {
dn.getMetrics().addNumProcessedCommands(processCommandsMs);
}
if (processCommandsMs > dnConf.getProcessCommandsThresholdMs()) {
LOG.info("Took {} ms to process {} commands from NN",
processCommandsMs, cmds.length);
}
}
return true;
}

View File

@ -35,6 +35,8 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PMEM_CACHE_DIRS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PMEM_CACHE_RECOVERY_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PMEM_CACHE_RECOVERY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PROCESS_COMMANDS_THRESHOLD_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PROCESS_COMMANDS_THRESHOLD_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_DERIVED_QOP_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_DERIVED_QOP_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
@ -119,6 +121,8 @@ public class DNConf {
final long xceiverStopTimeout;
final long restartReplicaExpiry;
private final long processCommandsThresholdMs;
final long maxLockedMemory;
private final String[] pmemDirs;
@ -292,6 +296,12 @@ public DNConf(final Configurable dn) {
this.pmemCacheRecoveryEnabled = getConf().getBoolean(
DFS_DATANODE_PMEM_CACHE_RECOVERY_KEY,
DFS_DATANODE_PMEM_CACHE_RECOVERY_DEFAULT);
this.processCommandsThresholdMs = getConf().getTimeDuration(
DFS_DATANODE_PROCESS_COMMANDS_THRESHOLD_KEY,
DFS_DATANODE_PROCESS_COMMANDS_THRESHOLD_DEFAULT,
TimeUnit.MILLISECONDS
);
}
// We get minimumNameNodeVersion via a method so it can be mocked out in tests.
@ -445,4 +455,8 @@ public String[] getPmemVolumes() {
public boolean getPmemCacheRecoveryEnabled() {
return pmemCacheRecoveryEnabled;
}
public long getProcessCommandsThresholdMs() {
return processCommandsThresholdMs;
}
}

View File

@ -164,6 +164,8 @@ public class DataNodeMetrics {
private MutableCounterLong sumOfActorCommandQueueLength;
@Metric("Num of processed commands of all BPServiceActors")
private MutableCounterLong numProcessedCommands;
@Metric("Rate of processed commands of all BPServiceActors")
private MutableRate processedCommandsOp;
final MetricsRegistry registry = new MetricsRegistry("datanode");
@Metric("Milliseconds spent on calling NN rpc")
@ -564,4 +566,12 @@ public void incrActorCmdQueueLength(int delta) {
public void incrNumProcessedCommands() {
numProcessedCommands.incr();
}
/**
* Add processedCommandsOp metrics.
* @param latency milliseconds of process commands
*/
public void addNumProcessedCommands(long latency) {
processedCommandsOp.add(latency);
}
}

View File

@ -3059,6 +3059,15 @@
</description>
</property>
<property>
<name>dfs.datanode.processcommands.threshold</name>
<value>2s</value>
<description>The threshold in milliseconds at which we will log a slow
command processing in BPServiceActor. By default, this parameter is set
to 2 seconds.
</description>
</property>
<property>
<name>dfs.client.deadnode.detection.enabled</name>
<value>false</value>

View File

@ -26,6 +26,7 @@
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.junit.Assert.assertEquals;
@ -1206,6 +1207,9 @@ public void testCommandProcessingThread() throws Exception {
assertTrue("Process command nums is not expected.",
getLongCounter("NumProcessedCommands", mrb) > 0);
assertEquals(0, getLongCounter("SumOfActorCommandQueueLength", mrb));
// Check new metric result about processedCommandsOp.
// One command send back to DataNode here is #FinalizeCommand.
assertCounter("ProcessedCommandsOpNumOps", 1L, mrb);
} finally {
if (cluster != null) {
cluster.shutdown();