From a73c4804d882d9bc1a424080f20c4902cb9db832 Mon Sep 17 00:00:00 2001 From: ZanderXu <15040255127@163.com> Date: Sat, 17 Sep 2022 01:09:01 +0800 Subject: [PATCH] HADOOP-18446. [SBN read] Add a re-queue metric to RpcMetrics to quantify the number of re-queued RPCs (#4871) Signed-off-by: Erik Krogen Co-authored-by: zengqiang.xu --- .../java/org/apache/hadoop/ipc/Server.java | 1 + .../apache/hadoop/ipc/metrics/RpcMetrics.java | 18 ++++++++ .../server/namenode/ha/TestObserverNode.java | 44 +++++++++++++++++++ 3 files changed, 63 insertions(+) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index db34af6ee6..17366eb956 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -3156,6 +3156,7 @@ private void requeueCall(Call call) throws IOException, InterruptedException { try { internalQueueCall(call, false); + rpcMetrics.incrRequeueCalls(); } catch (RpcServerException rse) { call.doResponse(rse.getCause(), rse.getRpcStatusProto()); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java index f01cd5bcfd..282eca3cf8 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java @@ -128,6 +128,8 @@ public static RpcMetrics create(Server server, Configuration conf) { MutableCounterLong rpcClientBackoff; @Metric("Number of Slow RPC calls") MutableCounterLong rpcSlowCalls; + @Metric("Number of requeue calls") + MutableCounterLong rpcRequeueCalls; @Metric("Number of open connections") public int numOpenConnections() { return server.getNumOpenConnections(); @@ -304,6 +306,13 @@ public void incrSlowRpc() { rpcSlowCalls.incr(); } + /** + * Increments the Requeue Calls counter. + */ + public void incrRequeueCalls() { + rpcRequeueCalls.incr(); + } + /** * Returns a MutableRate Counter. * @return Mutable Rate @@ -344,6 +353,15 @@ public long getRpcSlowCalls() { return rpcSlowCalls.value(); } + /** + * Returns the number of requeue calls. + * @return long + */ + @VisibleForTesting + public long getRpcRequeueCalls() { + return rpcRequeueCalls.value(); + } + public MutableRate getDeferredRpcProcessingTime() { return deferredRpcProcessingTime; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java index a910117194..60728284e5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java @@ -21,6 +21,7 @@ import static org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter.getServiceState; import static org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider.*; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; @@ -36,7 +37,10 @@ import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; @@ -61,9 +65,12 @@ import org.apache.hadoop.hdfs.server.namenode.FSEditLog; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; +import org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer; import org.apache.hadoop.hdfs.server.namenode.TestFsck; import org.apache.hadoop.hdfs.tools.GetGroups; import org.apache.hadoop.ipc.ObserverRetryOnActiveException; +import org.apache.hadoop.ipc.metrics.RpcMetrics; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.Time; import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.junit.After; @@ -124,6 +131,43 @@ public static void shutDownCluster() throws IOException { } } + @Test + public void testObserverRequeue() throws Exception { + ScheduledExecutorService interruptor = + Executors.newScheduledThreadPool(1); + + FSNamesystem observerFsNS = dfsCluster.getNamesystem(2); + RpcMetrics obRpcMetrics = ((NameNodeRpcServer)dfsCluster + .getNameNodeRpc(2)).getClientRpcServer().getRpcMetrics(); + try { + // Stop EditlogTailer of Observer NameNode. + observerFsNS.getEditLogTailer().stop(); + long oldRequeueNum = obRpcMetrics.getRpcRequeueCalls(); + ScheduledFuture scheduledFuture = interruptor.schedule( + () -> { + Path tmpTestPath = new Path("/TestObserverRequeue"); + dfs.create(tmpTestPath, (short)1).close(); + assertSentTo(0); + // This operation will be blocked in ObserverNameNode + // until EditlogTailer tailed edits from journalNode. + FileStatus fileStatus = dfs.getFileStatus(tmpTestPath); + assertSentTo(2); + return fileStatus; + }, 0, TimeUnit.SECONDS); + + GenericTestUtils.waitFor(() -> obRpcMetrics.getRpcRequeueCalls() > oldRequeueNum, + 50, 10000); + + observerFsNS.getEditLogTailer().doTailEdits(); + FileStatus fileStatus = scheduledFuture.get(10000, TimeUnit.MILLISECONDS); + assertNotNull(fileStatus); + } finally { + EditLogTailer editLogTailer = new EditLogTailer(observerFsNS, conf); + observerFsNS.setEditLogTailerForTests(editLogTailer); + editLogTailer.start(); + } + } + @Test public void testNoActiveToObserver() throws Exception { try {