From f18bbdd9d84cc1a23d33524f5cb61321cdb1b926 Mon Sep 17 00:00:00 2001 From: Inigo Goiri Date: Fri, 1 Nov 2019 10:14:31 -0700 Subject: [PATCH] HDFS-14927. RBF: Add metrics for async callers thread pool. Contributed by Leon Gao. --- .../metrics/FederationRPCMBean.java | 6 ++ .../metrics/FederationRPCMetrics.java | 5 ++ .../federation/router/RouterRpcClient.java | 14 ++++ .../TestRouterClientRejectOverload.java | 70 +++++++++++++++++++ 4 files changed, 95 insertions(+) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java index 76b3ca6404..f57e310d04 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java @@ -93,4 +93,10 @@ public interface FederationRPCMBean { * @return JSON string representation. */ String getRpcClientConnections(); + + /** + * Get the JSON representation of the async caller thread pool. + * @return JSON string representation of the async caller thread pool. + */ + String getAsyncCallerPool(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java index 8e57c6b380..b16a6c0df5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java @@ -220,6 +220,11 @@ public String getRpcClientConnections() { return rpcServer.getRPCClient().getJSON(); } + @Override + public String getAsyncCallerPool() { + return rpcServer.getRPCClient().getAsyncCallerPoolJson(); + } + /** * Add the time to proxy an operation from the moment the Router sends it to * the Namenode until it replied. diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index 563f3d0864..1c17c297ae 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -69,6 +69,7 @@ import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.net.ConnectTimeoutException; import org.apache.hadoop.security.UserGroupInformation; +import org.eclipse.jetty.util.ajax.JSON; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -263,6 +264,19 @@ public String getJSON() { return this.connectionManager.getJSON(); } + /** + * JSON representation of the async caller thread pool. + * + * @return String representation of the JSON. + */ + public String getAsyncCallerPoolJson() { + final Map info = new LinkedHashMap<>(); + info.put("active", executorService.getActiveCount()); + info.put("total", executorService.getPoolSize()); + info.put("max", executorService.getMaximumPoolSize()); + return JSON.toString(info); + } + /** * Get ClientProtocol proxy client for a NameNode. Each combination of user + * NN must use a unique proxy client. Previously created clients are cached diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterClientRejectOverload.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterClientRejectOverload.java index a4611f22e8..cc7f5a61a2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterClientRejectOverload.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterClientRejectOverload.java @@ -31,6 +31,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -48,6 +49,8 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.StandbyException; +import org.apache.hadoop.test.GenericTestUtils; +import org.codehaus.jackson.map.ObjectMapper; import org.junit.After; import org.junit.Rule; import org.junit.Test; @@ -356,4 +359,71 @@ public void testNoNamenodesAvailable() throws Exception{ // Router 0 failures do not change assertEquals(originalRouter0Failures, rpcMetrics0.getProxyOpNoNamenodes()); } + + @Test + public void testAsyncCallerPoolMetrics() throws Exception { + setupCluster(true, false); + simulateSlowNamenode(cluster.getCluster().getNameNode(0), 2); + final ObjectMapper objectMapper = new ObjectMapper(); + + // Set only one router to make test easier + cluster.getRouters().remove(1); + FederationRPCMetrics metrics = cluster.getRouters().get(0).getRouter() + .getRpcServer().getRPCMetrics(); + + // No active connection initially + Map result = objectMapper + .readValue(metrics.getAsyncCallerPool(), Map.class); + assertEquals(0, result.get("active").intValue()); + assertEquals(0, result.get("total").intValue()); + assertEquals(4, result.get("max").intValue()); + + ExecutorService exec = Executors.newSingleThreadExecutor(); + + try { + // Run a client request to create an active connection + exec.submit(() -> { + DFSClient routerClient = null; + try { + routerClient = new DFSClient(new URI("hdfs://fed"), + cluster.getRouterClientConf()); + String clientName = routerClient.getClientName(); + ClientProtocol routerProto = routerClient.getNamenode(); + routerProto.renewLease(clientName); + } catch (Exception e) { + fail("Client request failed: " + e); + } finally { + if (routerClient != null) { + try { + routerClient.close(); + } catch (IOException e) { + LOG.error("Cannot close the client"); + } + } + } + }); + + // Wait for client request to be active + GenericTestUtils.waitFor(() -> { + try { + Map newResult = objectMapper.readValue( + metrics.getAsyncCallerPool(), Map.class); + if (newResult.get("active") != 1) { + return false; + } + if (newResult.get("max") != 4) { + return false; + } + int total = newResult.get("total"); + // "total" is dynamic + return total >= 1 && total <= 4; + } catch (Exception e) { + LOG.error("Not able to parse metrics result: " + e); + } + return false; + }, 100, 2000); + } finally { + exec.shutdown(); + } + } }