From cdcb77a2c5ca99502d2ac2fbf803f22463eb1343 Mon Sep 17 00:00:00 2001 From: Inigo Goiri Date: Wed, 25 Mar 2020 11:30:54 -0700 Subject: [PATCH] HDFS-15075. Remove process command timing from BPServiceActor. Contributed by Xiaoqiao He. --- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 5 +++++ .../hdfs/server/datanode/BPServiceActor.java | 17 +++++++++-------- .../hadoop/hdfs/server/datanode/DNConf.java | 14 ++++++++++++++ .../datanode/metrics/DataNodeMetrics.java | 10 ++++++++++ .../src/main/resources/hdfs-default.xml | 9 +++++++++ .../server/datanode/TestBPOfferService.java | 4 ++++ 6 files changed, 51 insertions(+), 8 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 73cddeec12..b2f8ad2a5a 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -443,6 +443,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final boolean DFS_DATANODE_PMEM_CACHE_RECOVERY_DEFAULT = true; + public static final String DFS_DATANODE_PROCESS_COMMANDS_THRESHOLD_KEY = + "dfs.datanode.processcommands.threshold"; + public static final long DFS_DATANODE_PROCESS_COMMANDS_THRESHOLD_DEFAULT = + TimeUnit.SECONDS.toMillis(2); + public static final String DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_KEY = "dfs.namenode.datanode.registration.ip-hostname-check"; public static final boolean DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_DEFAULT = true; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index 222ee49b3e..a436c94dc2 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -702,15 +702,7 @@ private void offerService() throws Exception { if (state == HAServiceState.ACTIVE) { handleRollingUpgradeStatus(resp); } - - long startProcessCommands = monotonicNow(); commandProcessingThread.enqueue(resp.getCommands()); - long endProcessCommands = monotonicNow(); - if (endProcessCommands - startProcessCommands > 2000) { - LOG.info("Took " + (endProcessCommands - startProcessCommands) - + "ms to process " + resp.getCommands().length - + " commands from NN"); - } } } if (!dn.areIBRDisabledForTests() && @@ -1353,6 +1345,7 @@ private void processQueue() { */ private boolean processCommand(DatanodeCommand[] cmds) { if (cmds != null) { + long startProcessCommands = monotonicNow(); for (DatanodeCommand cmd : cmds) { try { if (!bpos.processCommandFromActor(cmd, actor)) { @@ -1371,6 +1364,14 @@ private boolean processCommand(DatanodeCommand[] cmds) { LOG.warn("Error processing datanode Command", ioe); } } + long processCommandsMs = monotonicNow() - startProcessCommands; + if (cmds.length > 0) { + dn.getMetrics().addNumProcessedCommands(processCommandsMs); + } + if (processCommandsMs > dnConf.getProcessCommandsThresholdMs()) { + LOG.info("Took {} ms to process {} commands from NN", + processCommandsMs, cmds.length); + } } return true; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java index 487c97d9f0..b56dd4ec22 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java @@ -35,6 +35,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PMEM_CACHE_DIRS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PMEM_CACHE_RECOVERY_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PMEM_CACHE_RECOVERY_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PROCESS_COMMANDS_THRESHOLD_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PROCESS_COMMANDS_THRESHOLD_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_DERIVED_QOP_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_DERIVED_QOP_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; @@ -119,6 +121,8 @@ public class DNConf { final long xceiverStopTimeout; final long restartReplicaExpiry; + private final long processCommandsThresholdMs; + final long maxLockedMemory; private final String[] pmemDirs; @@ -292,6 +296,12 @@ public DNConf(final Configurable dn) { this.pmemCacheRecoveryEnabled = getConf().getBoolean( DFS_DATANODE_PMEM_CACHE_RECOVERY_KEY, DFS_DATANODE_PMEM_CACHE_RECOVERY_DEFAULT); + + this.processCommandsThresholdMs = getConf().getTimeDuration( + DFS_DATANODE_PROCESS_COMMANDS_THRESHOLD_KEY, + DFS_DATANODE_PROCESS_COMMANDS_THRESHOLD_DEFAULT, + TimeUnit.MILLISECONDS + ); } // We get minimumNameNodeVersion via a method so it can be mocked out in tests. @@ -445,4 +455,8 @@ public String[] getPmemVolumes() { public boolean getPmemCacheRecoveryEnabled() { return pmemCacheRecoveryEnabled; } + + public long getProcessCommandsThresholdMs() { + return processCommandsThresholdMs; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java index 68eaf72228..cc802375f9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java @@ -164,6 +164,8 @@ public class DataNodeMetrics { private MutableCounterLong sumOfActorCommandQueueLength; @Metric("Num of processed commands of all BPServiceActors") private MutableCounterLong numProcessedCommands; + @Metric("Rate of processed commands of all BPServiceActors") + private MutableRate processedCommandsOp; final MetricsRegistry registry = new MetricsRegistry("datanode"); @Metric("Milliseconds spent on calling NN rpc") @@ -564,4 +566,12 @@ public void incrActorCmdQueueLength(int delta) { public void incrNumProcessedCommands() { numProcessedCommands.incr(); } + + /** + * Add processedCommandsOp metrics. + * @param latency milliseconds of process commands + */ + public void addNumProcessedCommands(long latency) { + processedCommandsOp.add(latency); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 0ce0be6e3e..913e47b75f 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -3059,6 +3059,15 @@ + + dfs.datanode.processcommands.threshold + 2s + The threshold in milliseconds at which we will log a slow + command processing in BPServiceActor. By default, this parameter is set + to 2 seconds. + + + dfs.client.deadnode.detection.enabled false diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java index 163baf5ecb..a305b81b6a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports; +import static org.apache.hadoop.test.MetricsAsserts.assertCounter; import static org.apache.hadoop.test.MetricsAsserts.getLongCounter; import static org.apache.hadoop.test.MetricsAsserts.getMetrics; import static org.junit.Assert.assertEquals; @@ -1206,6 +1207,9 @@ public void testCommandProcessingThread() throws Exception { assertTrue("Process command nums is not expected.", getLongCounter("NumProcessedCommands", mrb) > 0); assertEquals(0, getLongCounter("SumOfActorCommandQueueLength", mrb)); + // Check new metric result about processedCommandsOp. + // One command send back to DataNode here is #FinalizeCommand. + assertCounter("ProcessedCommandsOpNumOps", 1L, mrb); } finally { if (cluster != null) { cluster.shutdown();