diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java index 137d4c542e..ce5ab9f501 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java @@ -1028,5 +1028,13 @@ public class CommonConfigurationKeysPublic { public static final String HADOOP_HTTP_IDLE_TIMEOUT_MS_KEY = "hadoop.http.idle_timeout.ms"; public static final int HADOOP_HTTP_IDLE_TIMEOUT_MS_DEFAULT = 60000; + + /** + * To configure scheduling of server metrics update thread. This config is used to indicate + * initial delay and delay between each execution of the metric update runnable thread. + */ + public static final String IPC_SERVER_METRICS_UPDATE_RUNNER_INTERVAL = + "ipc.server.metrics.update.runner.interval"; + public static final int IPC_SERVER_METRICS_UPDATE_RUNNER_INTERVAL_DEFAULT = 5000; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index d5d82c25ef..c6143d29eb 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -65,9 +65,12 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; import java.util.stream.Collectors; import javax.security.sasl.Sasl; @@ -127,6 +130,7 @@ import org.apache.hadoop.tracing.TraceUtils; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.thirdparty.protobuf.ByteString; import org.apache.hadoop.thirdparty.protobuf.CodedOutputStream; import org.apache.hadoop.thirdparty.protobuf.Message; @@ -499,6 +503,11 @@ protected ResponseBuffer initialValue() { private Map auxiliaryListenerMap; private Responder responder = null; private Handler[] handlers = null; + private final LongAdder totalRequests = new LongAdder(); + private long lastSeenTotalRequests = 0; + private long totalRequestsPerSecond = 0; + private final long metricsUpdaterInterval; + private final ScheduledExecutorService scheduledExecutorService; private boolean logSlowRPC = false; @@ -510,6 +519,14 @@ protected boolean isLogSlowRPC() { return logSlowRPC; } + public long getTotalRequests() { + return totalRequests.sum(); + } + + public long getTotalRequestsPerSecond() { + return totalRequestsPerSecond; + } + /** * Sets slow RPC flag. * @param logSlowRPCFlag input logSlowRPCFlag. @@ -573,6 +590,7 @@ void logSlowRpcCalls(String methodName, Call call, } void updateMetrics(Call call, long startTime, boolean connDropped) { + totalRequests.increment(); // delta = handler + processing + response long deltaNanos = Time.monotonicNowNanos() - startTime; long timestampNanos = call.timestampNanos; @@ -3193,6 +3211,14 @@ protected Server(String bindAddress, int port, this.exceptionsHandler.addTerseLoggingExceptions(StandbyException.class); this.exceptionsHandler.addTerseLoggingExceptions( HealthCheckFailedException.class); + this.metricsUpdaterInterval = + conf.getLong(CommonConfigurationKeysPublic.IPC_SERVER_METRICS_UPDATE_RUNNER_INTERVAL, + CommonConfigurationKeysPublic.IPC_SERVER_METRICS_UPDATE_RUNNER_INTERVAL_DEFAULT); + this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1, + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Hadoop-Metrics-Updater-%d") + .build()); + this.scheduledExecutorService.scheduleWithFixedDelay(new MetricsUpdateRunner(), + metricsUpdaterInterval, metricsUpdaterInterval, TimeUnit.MILLISECONDS); } public synchronized void addAuxiliaryListener(int auxiliaryPort) @@ -3487,10 +3513,25 @@ public synchronized void stop() { } responder.interrupt(); notifyAll(); + shutdownMetricsUpdaterExecutor(); this.rpcMetrics.shutdown(); this.rpcDetailedMetrics.shutdown(); } + private void shutdownMetricsUpdaterExecutor() { + this.scheduledExecutorService.shutdown(); + try { + boolean isExecutorShutdown = + this.scheduledExecutorService.awaitTermination(3, TimeUnit.SECONDS); + if (!isExecutorShutdown) { + LOG.info("Hadoop Metrics Updater executor could not be shutdown."); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.info("Hadoop Metrics Updater executor shutdown interrupted.", e); + } + } + /** * Wait for the server to be stopped. * Does not wait for all subthreads to finish. @@ -3950,4 +3991,32 @@ protected int getMaxIdleTime() { public String getServerName() { return serverName; } + + /** + * Server metrics updater thread, used to update some metrics on a regular basis. + * For instance, requests per second. + */ + private class MetricsUpdateRunner implements Runnable { + + private long lastExecuted = 0; + + @Override + public synchronized void run() { + long currentTime = Time.monotonicNow(); + if (lastExecuted == 0) { + lastExecuted = currentTime - metricsUpdaterInterval; + } + long currentTotalRequests = totalRequests.sum(); + long totalRequestsDiff = currentTotalRequests - lastSeenTotalRequests; + lastSeenTotalRequests = currentTotalRequests; + if ((currentTime - lastExecuted) > 0) { + double totalRequestsPerSecInDouble = + (double) totalRequestsDiff / TimeUnit.MILLISECONDS.toSeconds( + currentTime - lastExecuted); + totalRequestsPerSecond = ((long) totalRequestsPerSecInDouble); + } + lastExecuted = currentTime; + } + } + } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java index a0351ee357..07c0b66555 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java @@ -146,6 +146,16 @@ public String numOpenConnectionsPerUser() { return server.getNumDroppedConnections(); } + @Metric("Number of total requests") + public long getTotalRequests() { + return server.getTotalRequests(); + } + + @Metric("Number of total requests per second") + public long getTotalRequestsPerSecond() { + return server.getTotalRequestsPerSecond(); + } + public TimeUnit getMetricsTimeUnit() { return metricsTimeUnit; } diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md index 129e3b1dea..9ad1cbd723 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md @@ -103,6 +103,8 @@ The default timeunit used for RPC metrics is milliseconds (as per the below desc | `rpcLockWaitTime`*num*`s90thPercentileLatency` | Shows the 90th percentile of RPC lock wait time in milliseconds (*num* seconds granularity) if `rpc.metrics.quantile.enable` is set to true. *num* is specified by `rpc.metrics.percentiles.intervals`. | | `rpcLockWaitTime`*num*`s95thPercentileLatency` | Shows the 95th percentile of RPC lock wait time in milliseconds (*num* seconds granularity) if `rpc.metrics.quantile.enable` is set to true. *num* is specified by `rpc.metrics.percentiles.intervals`. | | `rpcLockWaitTime`*num*`s99thPercentileLatency` | Shows the 99th percentile of RPC lock wait time in milliseconds (*num* seconds granularity) if `rpc.metrics.quantile.enable` is set to true. *num* is specified by `rpc.metrics.percentiles.intervals`. | +| `TotalRequests` | Total num of requests served by the RPC server. | +| `TotalRequestsPerSeconds` | Total num of requests per second served by the RPC server. | RetryCache/NameNodeRetryCache ----------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java index b78900b609..cdfd380e1f 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java @@ -18,6 +18,7 @@ package org.apache.hadoop.ipc; +import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.thirdparty.protobuf.ServiceException; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.conf.Configuration; @@ -29,6 +30,7 @@ import org.apache.hadoop.ipc.Client.ConnectionId; import org.apache.hadoop.ipc.Server.Call; import org.apache.hadoop.ipc.Server.Connection; +import org.apache.hadoop.ipc.metrics.RpcMetrics; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto; import org.apache.hadoop.ipc.protobuf.TestProtos; @@ -83,6 +85,7 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import static org.assertj.core.api.Assertions.assertThat; @@ -1665,6 +1668,60 @@ public void testRpcMetricsInNanos() throws Exception { } } + @Test + public void testNumTotalRequestsMetrics() throws Exception { + UserGroupInformation ugi = UserGroupInformation. + createUserForTesting("userXyz", new String[0]); + + final Server server = setupTestServer(conf, 1); + + ExecutorService executorService = null; + try { + RpcMetrics rpcMetrics = server.getRpcMetrics(); + assertEquals(0, rpcMetrics.getTotalRequests()); + assertEquals(0, rpcMetrics.getTotalRequestsPerSecond()); + + List> externalCallList = new ArrayList<>(); + + executorService = Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("testNumTotalRequestsMetrics") + .build()); + AtomicInteger rps = new AtomicInteger(0); + CountDownLatch countDownLatch = new CountDownLatch(1); + executorService.submit(() -> { + while (true) { + int numRps = (int) rpcMetrics.getTotalRequestsPerSecond(); + rps.getAndSet(numRps); + if (rps.get() > 0) { + countDownLatch.countDown(); + break; + } + } + }); + + for (int i = 0; i < 100000; i++) { + externalCallList.add(newExtCall(ugi, () -> null)); + } + for (ExternalCall externalCall : externalCallList) { + server.queueCall(externalCall); + } + for (ExternalCall externalCall : externalCallList) { + externalCall.get(); + } + + assertEquals(100000, rpcMetrics.getTotalRequests()); + if (countDownLatch.await(10, TimeUnit.SECONDS)) { + assertTrue(rps.get() > 10); + } else { + throw new AssertionError("total requests per seconds are still 0"); + } + } finally { + if (executorService != null) { + executorService.shutdown(); + } + server.stop(); + } + } public static void main(String[] args) throws Exception { new TestRPC().testCallsInternal(conf); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java index ffe20679e3..8d9aee77ec 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java @@ -34,6 +34,8 @@ import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile; import static org.apache.hadoop.hdfs.server.namenode.FileJournalManager .getLogFile; +import static org.assertj.core.api.Assertions.assertThat; + import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.test.GenericTestUtils; import org.junit.After; @@ -106,6 +108,8 @@ public void testJournalNodeSync() throws Exception { File firstJournalDir = jCluster.getJournalDir(0, jid); File firstJournalCurrentDir = new StorageDirectory(firstJournalDir) .getCurrentDir(); + assertThat(jCluster.getJournalNode(0).getRpcServer().getRpcServer().getRpcMetrics() + .getTotalRequests()).isGreaterThan(20); // Generate some edit logs and delete one. long firstTxId = generateEditLog(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java index decf85c06a..ff6c2288b5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java @@ -20,6 +20,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY; import static org.apache.hadoop.test.MetricsAsserts.getLongCounter; import static org.apache.hadoop.test.MetricsAsserts.getMetrics; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -123,6 +124,7 @@ public void testRequeueCall() throws Exception { + CommonConfigurationKeys.IPC_BACKOFF_ENABLE, true); NameNodeAdapter.getRpcServer(nn).refreshCallQueue(configuration); + assertThat(NameNodeAdapter.getRpcServer(nn).getTotalRequests()).isGreaterThan(0); dfs.create(testPath, (short)1).close(); assertSentTo(0); @@ -132,6 +134,7 @@ public void testRequeueCall() throws Exception { // be triggered and client should retry active NN. dfs.getFileStatus(testPath); assertSentTo(0); + assertThat(NameNodeAdapter.getRpcServer(nn).getTotalRequests()).isGreaterThan(1); // reset the original call queue NameNodeAdapter.getRpcServer(nn).refreshCallQueue(originalConf); }