diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 68d4923a0a..bfdfaf6c06 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -2947,6 +2947,7 @@ public void run() { */ // Re-queue the call and continue requeueCall(call); + call = null; continue; } if (LOG.isDebugEnabled()) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java index 854027a116..18f987db97 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hdfs.server.namenode.ha; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY; +import static org.apache.hadoop.test.MetricsAsserts.getLongCounter; +import static org.apache.hadoop.test.MetricsAsserts.getMetrics; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -28,6 +30,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; @@ -48,6 +51,7 @@ import org.apache.hadoop.ipc.RpcScheduler; import org.apache.hadoop.ipc.Schedulable; import org.apache.hadoop.ipc.StandbyException; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.Time; import org.junit.After; @@ -419,6 +423,56 @@ public void testMsyncFileContext() throws Exception { } } + @Test + public void testRpcQueueTimeNumOpsMetrics() throws Exception { + // 0 == not completed, 1 == succeeded, -1 == failed + AtomicInteger readStatus = new AtomicInteger(0); + + // Making an uncoordinated call, which initialize the proxy + // to Observer node. + dfs.getClient().getHAServiceState(); + dfs.mkdir(testPath, FsPermission.getDefault()); + assertSentTo(0); + + Thread reader = new Thread(new Runnable() { + @Override + public void run() { + try { + // this read will block until roll and tail edits happen. + dfs.getFileStatus(testPath); + readStatus.set(1); + } catch (IOException e) { + e.printStackTrace(); + readStatus.set(-1); + } + } + }); + + reader.start(); + // the reader is still blocking, not succeeded yet. + assertEquals(0, readStatus.get()); + dfsCluster.rollEditLogAndTail(0); + // wait a while for all the change to be done + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + return readStatus.get() != 0; + } + }, 100, 10000); + // the reader should have succeed. + assertEquals(1, readStatus.get()); + + final int observerIdx = 2; + NameNode observerNN = dfsCluster.getNameNode(observerIdx); + MetricsRecordBuilder rpcMetrics = + getMetrics("RpcActivityForPort" + + observerNN.getNameNodeAddress().getPort()); + long rpcQueueTimeNumOps = getLongCounter("RpcQueueTimeNumOps", rpcMetrics); + long rpcProcessingTimeNumOps = getLongCounter("RpcProcessingTimeNumOps", + rpcMetrics); + assertEquals(rpcQueueTimeNumOps, rpcProcessingTimeNumOps); + } + private void assertSentTo(int nnIdx) throws IOException { assertTrue("Request was not sent to the expected namenode " + nnIdx, HATestUtil.isSentToAnyOfNameNodes(dfs, dfsCluster, nnIdx));