HADOOP-18288. Total requests and total requests per sec served by RPC servers (#4431)
Reviewed-by: Steve Loughran <stevel@apache.org> Signed-off-by: Tao Li <tomscut@apache.org>
This commit is contained in:
parent
62e4476102
commit
e38e13be03
@ -1054,5 +1054,13 @@ public class CommonConfigurationKeysPublic {
|
|||||||
public static final String HADOOP_HTTP_IDLE_TIMEOUT_MS_KEY =
|
public static final String HADOOP_HTTP_IDLE_TIMEOUT_MS_KEY =
|
||||||
"hadoop.http.idle_timeout.ms";
|
"hadoop.http.idle_timeout.ms";
|
||||||
public static final int HADOOP_HTTP_IDLE_TIMEOUT_MS_DEFAULT = 60000;
|
public static final int HADOOP_HTTP_IDLE_TIMEOUT_MS_DEFAULT = 60000;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* To configure scheduling of server metrics update thread. This config is used to indicate
|
||||||
|
* initial delay and delay between each execution of the metric update runnable thread.
|
||||||
|
*/
|
||||||
|
public static final String IPC_SERVER_METRICS_UPDATE_RUNNER_INTERVAL =
|
||||||
|
"ipc.server.metrics.update.runner.interval";
|
||||||
|
public static final int IPC_SERVER_METRICS_UPDATE_RUNNER_INTERVAL_DEFAULT = 5000;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -65,9 +65,12 @@
|
|||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
import java.util.concurrent.atomic.LongAdder;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import javax.security.sasl.Sasl;
|
import javax.security.sasl.Sasl;
|
||||||
@ -127,6 +130,8 @@
|
|||||||
import org.apache.hadoop.tracing.TraceUtils;
|
import org.apache.hadoop.tracing.TraceUtils;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import org.apache.hadoop.classification.VisibleForTesting;
|
import org.apache.hadoop.classification.VisibleForTesting;
|
||||||
|
|
||||||
|
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
import org.apache.hadoop.thirdparty.protobuf.ByteString;
|
import org.apache.hadoop.thirdparty.protobuf.ByteString;
|
||||||
import org.apache.hadoop.thirdparty.protobuf.CodedOutputStream;
|
import org.apache.hadoop.thirdparty.protobuf.CodedOutputStream;
|
||||||
import org.apache.hadoop.thirdparty.protobuf.Message;
|
import org.apache.hadoop.thirdparty.protobuf.Message;
|
||||||
@ -500,6 +505,11 @@ protected ResponseBuffer initialValue() {
|
|||||||
private Responder responder = null;
|
private Responder responder = null;
|
||||||
private Handler[] handlers = null;
|
private Handler[] handlers = null;
|
||||||
private final AtomicInteger numInProcessHandler = new AtomicInteger();
|
private final AtomicInteger numInProcessHandler = new AtomicInteger();
|
||||||
|
private final LongAdder totalRequests = new LongAdder();
|
||||||
|
private long lastSeenTotalRequests = 0;
|
||||||
|
private long totalRequestsPerSecond = 0;
|
||||||
|
private final long metricsUpdaterInterval;
|
||||||
|
private final ScheduledExecutorService scheduledExecutorService;
|
||||||
|
|
||||||
private boolean logSlowRPC = false;
|
private boolean logSlowRPC = false;
|
||||||
|
|
||||||
@ -515,6 +525,14 @@ public int getNumInProcessHandler() {
|
|||||||
return numInProcessHandler.get();
|
return numInProcessHandler.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public long getTotalRequests() {
|
||||||
|
return totalRequests.sum();
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getTotalRequestsPerSecond() {
|
||||||
|
return totalRequestsPerSecond;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sets slow RPC flag.
|
* Sets slow RPC flag.
|
||||||
* @param logSlowRPCFlag input logSlowRPCFlag.
|
* @param logSlowRPCFlag input logSlowRPCFlag.
|
||||||
@ -578,6 +596,7 @@ void logSlowRpcCalls(String methodName, Call call,
|
|||||||
}
|
}
|
||||||
|
|
||||||
void updateMetrics(Call call, long startTime, boolean connDropped) {
|
void updateMetrics(Call call, long startTime, boolean connDropped) {
|
||||||
|
totalRequests.increment();
|
||||||
// delta = handler + processing + response
|
// delta = handler + processing + response
|
||||||
long deltaNanos = Time.monotonicNowNanos() - startTime;
|
long deltaNanos = Time.monotonicNowNanos() - startTime;
|
||||||
long timestampNanos = call.timestampNanos;
|
long timestampNanos = call.timestampNanos;
|
||||||
@ -3304,6 +3323,14 @@ protected Server(String bindAddress, int port,
|
|||||||
this.exceptionsHandler.addTerseLoggingExceptions(StandbyException.class);
|
this.exceptionsHandler.addTerseLoggingExceptions(StandbyException.class);
|
||||||
this.exceptionsHandler.addTerseLoggingExceptions(
|
this.exceptionsHandler.addTerseLoggingExceptions(
|
||||||
HealthCheckFailedException.class);
|
HealthCheckFailedException.class);
|
||||||
|
this.metricsUpdaterInterval =
|
||||||
|
conf.getLong(CommonConfigurationKeysPublic.IPC_SERVER_METRICS_UPDATE_RUNNER_INTERVAL,
|
||||||
|
CommonConfigurationKeysPublic.IPC_SERVER_METRICS_UPDATE_RUNNER_INTERVAL_DEFAULT);
|
||||||
|
this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1,
|
||||||
|
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Hadoop-Metrics-Updater-%d")
|
||||||
|
.build());
|
||||||
|
this.scheduledExecutorService.scheduleWithFixedDelay(new MetricsUpdateRunner(),
|
||||||
|
metricsUpdaterInterval, metricsUpdaterInterval, TimeUnit.MILLISECONDS);
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized void addAuxiliaryListener(int auxiliaryPort)
|
public synchronized void addAuxiliaryListener(int auxiliaryPort)
|
||||||
@ -3598,10 +3625,25 @@ public synchronized void stop() {
|
|||||||
}
|
}
|
||||||
responder.interrupt();
|
responder.interrupt();
|
||||||
notifyAll();
|
notifyAll();
|
||||||
|
shutdownMetricsUpdaterExecutor();
|
||||||
this.rpcMetrics.shutdown();
|
this.rpcMetrics.shutdown();
|
||||||
this.rpcDetailedMetrics.shutdown();
|
this.rpcDetailedMetrics.shutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void shutdownMetricsUpdaterExecutor() {
|
||||||
|
this.scheduledExecutorService.shutdown();
|
||||||
|
try {
|
||||||
|
boolean isExecutorShutdown =
|
||||||
|
this.scheduledExecutorService.awaitTermination(3, TimeUnit.SECONDS);
|
||||||
|
if (!isExecutorShutdown) {
|
||||||
|
LOG.info("Hadoop Metrics Updater executor could not be shutdown.");
|
||||||
|
}
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
LOG.info("Hadoop Metrics Updater executor shutdown interrupted.", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Wait for the server to be stopped.
|
* Wait for the server to be stopped.
|
||||||
* Does not wait for all subthreads to finish.
|
* Does not wait for all subthreads to finish.
|
||||||
@ -4061,4 +4103,32 @@ protected int getMaxIdleTime() {
|
|||||||
public String getServerName() {
|
public String getServerName() {
|
||||||
return serverName;
|
return serverName;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Server metrics updater thread, used to update some metrics on a regular basis.
|
||||||
|
* For instance, requests per second.
|
||||||
|
*/
|
||||||
|
private class MetricsUpdateRunner implements Runnable {
|
||||||
|
|
||||||
|
private long lastExecuted = 0;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void run() {
|
||||||
|
long currentTime = Time.monotonicNow();
|
||||||
|
if (lastExecuted == 0) {
|
||||||
|
lastExecuted = currentTime - metricsUpdaterInterval;
|
||||||
|
}
|
||||||
|
long currentTotalRequests = totalRequests.sum();
|
||||||
|
long totalRequestsDiff = currentTotalRequests - lastSeenTotalRequests;
|
||||||
|
lastSeenTotalRequests = currentTotalRequests;
|
||||||
|
if ((currentTime - lastExecuted) > 0) {
|
||||||
|
double totalRequestsPerSecInDouble =
|
||||||
|
(double) totalRequestsDiff / TimeUnit.MILLISECONDS.toSeconds(
|
||||||
|
currentTime - lastExecuted);
|
||||||
|
totalRequestsPerSecond = ((long) totalRequestsPerSecInDouble);
|
||||||
|
}
|
||||||
|
lastExecuted = currentTime;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -151,6 +151,16 @@ public String numOpenConnectionsPerUser() {
|
|||||||
return server.getNumDroppedConnections();
|
return server.getNumDroppedConnections();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Metric("Number of total requests")
|
||||||
|
public long getTotalRequests() {
|
||||||
|
return server.getTotalRequests();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Metric("Number of total requests per second")
|
||||||
|
public long getTotalRequestsPerSecond() {
|
||||||
|
return server.getTotalRequestsPerSecond();
|
||||||
|
}
|
||||||
|
|
||||||
public TimeUnit getMetricsTimeUnit() {
|
public TimeUnit getMetricsTimeUnit() {
|
||||||
return metricsTimeUnit;
|
return metricsTimeUnit;
|
||||||
}
|
}
|
||||||
|
@ -104,6 +104,8 @@ The default timeunit used for RPC metrics is milliseconds (as per the below desc
|
|||||||
| `rpcLockWaitTime`*num*`s90thPercentileLatency` | Shows the 90th percentile of RPC lock wait time in milliseconds (*num* seconds granularity) if `rpc.metrics.quantile.enable` is set to true. *num* is specified by `rpc.metrics.percentiles.intervals`. |
|
| `rpcLockWaitTime`*num*`s90thPercentileLatency` | Shows the 90th percentile of RPC lock wait time in milliseconds (*num* seconds granularity) if `rpc.metrics.quantile.enable` is set to true. *num* is specified by `rpc.metrics.percentiles.intervals`. |
|
||||||
| `rpcLockWaitTime`*num*`s95thPercentileLatency` | Shows the 95th percentile of RPC lock wait time in milliseconds (*num* seconds granularity) if `rpc.metrics.quantile.enable` is set to true. *num* is specified by `rpc.metrics.percentiles.intervals`. |
|
| `rpcLockWaitTime`*num*`s95thPercentileLatency` | Shows the 95th percentile of RPC lock wait time in milliseconds (*num* seconds granularity) if `rpc.metrics.quantile.enable` is set to true. *num* is specified by `rpc.metrics.percentiles.intervals`. |
|
||||||
| `rpcLockWaitTime`*num*`s99thPercentileLatency` | Shows the 99th percentile of RPC lock wait time in milliseconds (*num* seconds granularity) if `rpc.metrics.quantile.enable` is set to true. *num* is specified by `rpc.metrics.percentiles.intervals`. |
|
| `rpcLockWaitTime`*num*`s99thPercentileLatency` | Shows the 99th percentile of RPC lock wait time in milliseconds (*num* seconds granularity) if `rpc.metrics.quantile.enable` is set to true. *num* is specified by `rpc.metrics.percentiles.intervals`. |
|
||||||
|
| `TotalRequests` | Total num of requests served by the RPC server. |
|
||||||
|
| `TotalRequestsPerSeconds` | Total num of requests per second served by the RPC server. |
|
||||||
|
|
||||||
RetryCache/NameNodeRetryCache
|
RetryCache/NameNodeRetryCache
|
||||||
-----------------------------
|
-----------------------------
|
||||||
|
@ -19,6 +19,8 @@
|
|||||||
package org.apache.hadoop.ipc;
|
package org.apache.hadoop.ipc;
|
||||||
|
|
||||||
import org.apache.hadoop.ipc.metrics.RpcMetrics;
|
import org.apache.hadoop.ipc.metrics.RpcMetrics;
|
||||||
|
|
||||||
|
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
import org.apache.hadoop.thirdparty.protobuf.ServiceException;
|
import org.apache.hadoop.thirdparty.protobuf.ServiceException;
|
||||||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
@ -84,6 +86,7 @@
|
|||||||
import java.util.concurrent.ThreadLocalRandom;
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
import static org.assertj.core.api.Assertions.assertThat;
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
@ -1697,6 +1700,61 @@ public void testRpcMetricsInNanos() throws Exception {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNumTotalRequestsMetrics() throws Exception {
|
||||||
|
UserGroupInformation ugi = UserGroupInformation.
|
||||||
|
createUserForTesting("userXyz", new String[0]);
|
||||||
|
|
||||||
|
final Server server = setupTestServer(conf, 1);
|
||||||
|
|
||||||
|
ExecutorService executorService = null;
|
||||||
|
try {
|
||||||
|
RpcMetrics rpcMetrics = server.getRpcMetrics();
|
||||||
|
assertEquals(0, rpcMetrics.getTotalRequests());
|
||||||
|
assertEquals(0, rpcMetrics.getTotalRequestsPerSecond());
|
||||||
|
|
||||||
|
List<ExternalCall<Void>> externalCallList = new ArrayList<>();
|
||||||
|
|
||||||
|
executorService = Executors.newSingleThreadExecutor(
|
||||||
|
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("testNumTotalRequestsMetrics")
|
||||||
|
.build());
|
||||||
|
AtomicInteger rps = new AtomicInteger(0);
|
||||||
|
CountDownLatch countDownLatch = new CountDownLatch(1);
|
||||||
|
executorService.submit(() -> {
|
||||||
|
while (true) {
|
||||||
|
int numRps = (int) rpcMetrics.getTotalRequestsPerSecond();
|
||||||
|
rps.getAndSet(numRps);
|
||||||
|
if (rps.get() > 0) {
|
||||||
|
countDownLatch.countDown();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
for (int i = 0; i < 100000; i++) {
|
||||||
|
externalCallList.add(newExtCall(ugi, () -> null));
|
||||||
|
}
|
||||||
|
for (ExternalCall<Void> externalCall : externalCallList) {
|
||||||
|
server.queueCall(externalCall);
|
||||||
|
}
|
||||||
|
for (ExternalCall<Void> externalCall : externalCallList) {
|
||||||
|
externalCall.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
assertEquals(100000, rpcMetrics.getTotalRequests());
|
||||||
|
if (countDownLatch.await(10, TimeUnit.SECONDS)) {
|
||||||
|
assertTrue(rps.get() > 10);
|
||||||
|
} else {
|
||||||
|
throw new AssertionError("total requests per seconds are still 0");
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
if (executorService != null) {
|
||||||
|
executorService.shutdown();
|
||||||
|
}
|
||||||
|
server.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
new TestRPC().testCallsInternal(conf);
|
new TestRPC().testCallsInternal(conf);
|
||||||
|
@ -33,6 +33,8 @@
|
|||||||
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
|
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
|
||||||
import static org.apache.hadoop.hdfs.server.namenode.FileJournalManager
|
import static org.apache.hadoop.hdfs.server.namenode.FileJournalManager
|
||||||
.getLogFile;
|
.getLogFile;
|
||||||
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.util.Lists;
|
import org.apache.hadoop.util.Lists;
|
||||||
@ -106,6 +108,8 @@ public void testJournalNodeSync() throws Exception {
|
|||||||
File firstJournalDir = jCluster.getJournalDir(0, jid);
|
File firstJournalDir = jCluster.getJournalDir(0, jid);
|
||||||
File firstJournalCurrentDir = new StorageDirectory(firstJournalDir)
|
File firstJournalCurrentDir = new StorageDirectory(firstJournalDir)
|
||||||
.getCurrentDir();
|
.getCurrentDir();
|
||||||
|
assertThat(jCluster.getJournalNode(0).getRpcServer().getRpcServer().getRpcMetrics()
|
||||||
|
.getTotalRequests()).isGreaterThan(20);
|
||||||
|
|
||||||
// Generate some edit logs and delete one.
|
// Generate some edit logs and delete one.
|
||||||
long firstTxId = generateEditLog();
|
long firstTxId = generateEditLog();
|
||||||
|
@ -20,6 +20,7 @@
|
|||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY;
|
||||||
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
|
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
|
||||||
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
|
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
|
||||||
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
@ -123,6 +124,7 @@ public void testRequeueCall() throws Exception {
|
|||||||
+ CommonConfigurationKeys.IPC_BACKOFF_ENABLE, true);
|
+ CommonConfigurationKeys.IPC_BACKOFF_ENABLE, true);
|
||||||
|
|
||||||
NameNodeAdapter.getRpcServer(nn).refreshCallQueue(configuration);
|
NameNodeAdapter.getRpcServer(nn).refreshCallQueue(configuration);
|
||||||
|
assertThat(NameNodeAdapter.getRpcServer(nn).getTotalRequests()).isGreaterThan(0);
|
||||||
|
|
||||||
dfs.create(testPath, (short)1).close();
|
dfs.create(testPath, (short)1).close();
|
||||||
assertSentTo(0);
|
assertSentTo(0);
|
||||||
@ -132,6 +134,7 @@ public void testRequeueCall() throws Exception {
|
|||||||
// be triggered and client should retry active NN.
|
// be triggered and client should retry active NN.
|
||||||
dfs.getFileStatus(testPath);
|
dfs.getFileStatus(testPath);
|
||||||
assertSentTo(0);
|
assertSentTo(0);
|
||||||
|
assertThat(NameNodeAdapter.getRpcServer(nn).getTotalRequests()).isGreaterThan(1);
|
||||||
// reset the original call queue
|
// reset the original call queue
|
||||||
NameNodeAdapter.getRpcServer(nn).refreshCallQueue(originalConf);
|
NameNodeAdapter.getRpcServer(nn).refreshCallQueue(originalConf);
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user