HDFS-17023. RBF: Record proxy time when call invokeConcurrent method. (#5683). Contributed by farmmamba.
Reviewed-by: Inigo Goiri <inigoiri@apache.org> Reviewed-by: Simbarashe Dzinamarira <sdzinamarira@linkedin.com> Signed-off-by: Ayush Saxena <ayushsaxena@apache.org>
This commit is contained in:
parent
7a45ef4164
commit
35158db711
@ -73,6 +73,7 @@ public class FederationRPCPerformanceMonitor implements RouterRpcMonitor {
|
|||||||
/** Thread pool for logging stats. */
|
/** Thread pool for logging stats. */
|
||||||
private ExecutorService executor;
|
private ExecutorService executor;
|
||||||
|
|
||||||
|
public static final String CONCURRENT = "concurrent";
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void init(Configuration configuration, RouterRpcServer rpcServer,
|
public void init(Configuration configuration, RouterRpcServer rpcServer,
|
||||||
@ -85,10 +86,13 @@ public void init(Configuration configuration, RouterRpcServer rpcServer,
|
|||||||
// Create metrics
|
// Create metrics
|
||||||
this.metrics = FederationRPCMetrics.create(conf, server);
|
this.metrics = FederationRPCMetrics.create(conf, server);
|
||||||
for (String nameservice : FederationUtil.getAllConfiguredNS(conf)) {
|
for (String nameservice : FederationUtil.getAllConfiguredNS(conf)) {
|
||||||
LOG.info("Create Nameservice RPC Metrics for " + nameservice);
|
LOG.info("Create Nameservice RPC Metrics for {}", nameservice);
|
||||||
this.nameserviceRPCMetricsMap.computeIfAbsent(nameservice,
|
this.nameserviceRPCMetricsMap.computeIfAbsent(nameservice,
|
||||||
k -> NameserviceRPCMetrics.create(conf, k));
|
k -> NameserviceRPCMetrics.create(conf, k));
|
||||||
}
|
}
|
||||||
|
LOG.info("Create Nameservice RPC Metrics for {}", CONCURRENT);
|
||||||
|
this.nameserviceRPCMetricsMap.computeIfAbsent(CONCURRENT,
|
||||||
|
k -> NameserviceRPCMetrics.create(conf, k));
|
||||||
|
|
||||||
// Create thread pool
|
// Create thread pool
|
||||||
ThreadFactory threadFactory = new ThreadFactoryBuilder()
|
ThreadFactory threadFactory = new ThreadFactoryBuilder()
|
||||||
@ -153,7 +157,7 @@ public void proxyOpComplete(boolean success, String nsId,
|
|||||||
if (success) {
|
if (success) {
|
||||||
long proxyTime = getProxyTime();
|
long proxyTime = getProxyTime();
|
||||||
if (proxyTime >= 0) {
|
if (proxyTime >= 0) {
|
||||||
if (metrics != null) {
|
if (metrics != null && !CONCURRENT.equals(nsId)) {
|
||||||
metrics.addProxyTime(proxyTime, state);
|
metrics.addProxyTime(proxyTime, state);
|
||||||
}
|
}
|
||||||
if (nameserviceRPCMetricsMap != null &&
|
if (nameserviceRPCMetricsMap != null &&
|
||||||
|
@ -24,6 +24,7 @@
|
|||||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_TIMEOUT_KEY;
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_TIMEOUT_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_IP_PROXY_USERS;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_IP_PROXY_USERS;
|
||||||
import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS;
|
import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS;
|
||||||
|
import static org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCPerformanceMonitor.CONCURRENT;
|
||||||
|
|
||||||
import java.io.EOFException;
|
import java.io.EOFException;
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
@ -1572,7 +1573,9 @@ public <T extends RemoteLocationContext, R> Map<T, R> invokeConcurrent(
|
|||||||
results.add(new RemoteResult<>(location, ioe));
|
results.add(new RemoteResult<>(location, ioe));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (rpcMonitor != null) {
|
||||||
|
rpcMonitor.proxyOpComplete(true, CONCURRENT, null);
|
||||||
|
}
|
||||||
return results;
|
return results;
|
||||||
} catch (RejectedExecutionException e) {
|
} catch (RejectedExecutionException e) {
|
||||||
if (rpcMonitor != null) {
|
if (rpcMonitor != null) {
|
||||||
|
@ -33,8 +33,10 @@
|
|||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCPerformanceMonitor.CONCURRENT;
|
||||||
import static org.apache.hadoop.hdfs.server.federation.metrics.NameserviceRPCMetrics.NAMESERVICE_RPC_METRICS_PREFIX;
|
import static org.apache.hadoop.hdfs.server.federation.metrics.NameserviceRPCMetrics.NAMESERVICE_RPC_METRICS_PREFIX;
|
||||||
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
|
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
|
||||||
|
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
|
||||||
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
|
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -134,5 +136,26 @@ public void testProxyOp() throws IOException {
|
|||||||
getMetrics(NAMESERVICE_RPC_METRICS_PREFIX + "ns1"));
|
getMetrics(NAMESERVICE_RPC_METRICS_PREFIX + "ns1"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testProxyOpCompleteConcurrent() throws IOException {
|
||||||
|
|
||||||
|
long ns0ProxyOpBefore = getLongCounter("ProxyOp",
|
||||||
|
getMetrics(NAMESERVICE_RPC_METRICS_PREFIX + "ns0"));
|
||||||
|
long ns1ProxyOpBefore = getLongCounter("ProxyOp",
|
||||||
|
getMetrics(NAMESERVICE_RPC_METRICS_PREFIX + "ns1"));
|
||||||
|
long concurrentProxyOpBefore = getLongCounter("ProxyOp",
|
||||||
|
getMetrics(NAMESERVICE_RPC_METRICS_PREFIX + "concurrent"));
|
||||||
|
// RPC which uses invokeConcurrent.
|
||||||
|
router.getRpcServer().setBalancerBandwidth(1024 * 1024L);
|
||||||
|
|
||||||
|
assertCounter("ProxyOp", ns0ProxyOpBefore + 1,
|
||||||
|
getMetrics(NAMESERVICE_RPC_METRICS_PREFIX + "ns0"));
|
||||||
|
assertCounter("ProxyOp", ns1ProxyOpBefore + 1,
|
||||||
|
getMetrics(NAMESERVICE_RPC_METRICS_PREFIX + "ns1"));
|
||||||
|
assertCounter("ProxyOp", concurrentProxyOpBefore + 1,
|
||||||
|
getMetrics(NAMESERVICE_RPC_METRICS_PREFIX + CONCURRENT));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user