diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java index b290d64f6c..3a0fa2016d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java @@ -73,6 +73,7 @@ public class FederationRPCPerformanceMonitor implements RouterRpcMonitor { /** Thread pool for logging stats. */ private ExecutorService executor; + public static final String CONCURRENT = "concurrent"; @Override public void init(Configuration configuration, RouterRpcServer rpcServer, @@ -85,10 +86,13 @@ public void init(Configuration configuration, RouterRpcServer rpcServer, // Create metrics this.metrics = FederationRPCMetrics.create(conf, server); for (String nameservice : FederationUtil.getAllConfiguredNS(conf)) { - LOG.info("Create Nameservice RPC Metrics for " + nameservice); + LOG.info("Create Nameservice RPC Metrics for {}", nameservice); this.nameserviceRPCMetricsMap.computeIfAbsent(nameservice, k -> NameserviceRPCMetrics.create(conf, k)); } + LOG.info("Create Nameservice RPC Metrics for {}", CONCURRENT); + this.nameserviceRPCMetricsMap.computeIfAbsent(CONCURRENT, + k -> NameserviceRPCMetrics.create(conf, k)); // Create thread pool ThreadFactory threadFactory = new ThreadFactoryBuilder() @@ -153,7 +157,7 @@ public void proxyOpComplete(boolean success, String nsId, if (success) { long proxyTime = getProxyTime(); if (proxyTime >= 0) { - if (metrics != null) { + if (metrics != null && !CONCURRENT.equals(nsId)) { metrics.addProxyTime(proxyTime, state); } if (nameserviceRPCMetricsMap != null && diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index c658a047f8..f3a00f07f3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -24,6 +24,7 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_TIMEOUT_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_IP_PROXY_USERS; import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS; +import static org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCPerformanceMonitor.CONCURRENT; import java.io.EOFException; import java.io.FileNotFoundException; @@ -1572,7 +1573,9 @@ public Map invokeConcurrent( results.add(new RemoteResult<>(location, ioe)); } } - + if (rpcMonitor != null) { + rpcMonitor.proxyOpComplete(true, CONCURRENT, null); + } return results; } catch (RejectedExecutionException e) { if (rpcMonitor != null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/metrics/TestNameserviceRPCMetrics.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/metrics/TestNameserviceRPCMetrics.java index 7b6bcb2143..5c8c3564bd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/metrics/TestNameserviceRPCMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/metrics/TestNameserviceRPCMetrics.java @@ -33,8 +33,10 @@ import java.io.IOException; +import static org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCPerformanceMonitor.CONCURRENT; import static org.apache.hadoop.hdfs.server.federation.metrics.NameserviceRPCMetrics.NAMESERVICE_RPC_METRICS_PREFIX; import static org.apache.hadoop.test.MetricsAsserts.assertCounter; +import static org.apache.hadoop.test.MetricsAsserts.getLongCounter; import static org.apache.hadoop.test.MetricsAsserts.getMetrics; /** @@ -134,5 +136,26 @@ public void testProxyOp() throws IOException { getMetrics(NAMESERVICE_RPC_METRICS_PREFIX + "ns1")); } + @Test + public void testProxyOpCompleteConcurrent() throws IOException { + + long ns0ProxyOpBefore = getLongCounter("ProxyOp", + getMetrics(NAMESERVICE_RPC_METRICS_PREFIX + "ns0")); + long ns1ProxyOpBefore = getLongCounter("ProxyOp", + getMetrics(NAMESERVICE_RPC_METRICS_PREFIX + "ns1")); + long concurrentProxyOpBefore = getLongCounter("ProxyOp", + getMetrics(NAMESERVICE_RPC_METRICS_PREFIX + "concurrent")); + // RPC which uses invokeConcurrent. + router.getRpcServer().setBalancerBandwidth(1024 * 1024L); + + assertCounter("ProxyOp", ns0ProxyOpBefore + 1, + getMetrics(NAMESERVICE_RPC_METRICS_PREFIX + "ns0")); + assertCounter("ProxyOp", ns1ProxyOpBefore + 1, + getMetrics(NAMESERVICE_RPC_METRICS_PREFIX + "ns1")); + assertCounter("ProxyOp", concurrentProxyOpBefore + 1, + getMetrics(NAMESERVICE_RPC_METRICS_PREFIX + CONCURRENT)); + + } + }