HDFS-16040. RpcQueueTime metric counts requeued calls as unique events. Contributed by Simbarashe Dzinamarira.
(cherry picked from commit 8ce30f51f999c0a80db53a2a96b5be5505d4d151)
This commit is contained in:
parent
d8689f1a08
commit
c5535caf6e
@ -2947,6 +2947,7 @@ public abstract class Server {
|
||||
*/
|
||||
// Re-queue the call and continue
|
||||
requeueCall(call);
|
||||
call = null;
|
||||
continue;
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
|
@ -18,6 +18,8 @@
|
||||
package org.apache.hadoop.hdfs.server.namenode.ha;
|
||||
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY;
|
||||
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
|
||||
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
@ -28,6 +30,7 @@ import java.util.Collections;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
@ -48,6 +51,7 @@ import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.ipc.RpcScheduler;
|
||||
import org.apache.hadoop.ipc.Schedulable;
|
||||
import org.apache.hadoop.ipc.StandbyException;
|
||||
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.junit.After;
|
||||
@ -419,6 +423,56 @@ public class TestConsistentReadsObserver {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRpcQueueTimeNumOpsMetrics() throws Exception {
|
||||
// 0 == not completed, 1 == succeeded, -1 == failed
|
||||
AtomicInteger readStatus = new AtomicInteger(0);
|
||||
|
||||
// Making an uncoordinated call, which initialize the proxy
|
||||
// to Observer node.
|
||||
dfs.getClient().getHAServiceState();
|
||||
dfs.mkdir(testPath, FsPermission.getDefault());
|
||||
assertSentTo(0);
|
||||
|
||||
Thread reader = new Thread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
// this read will block until roll and tail edits happen.
|
||||
dfs.getFileStatus(testPath);
|
||||
readStatus.set(1);
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
readStatus.set(-1);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
reader.start();
|
||||
// the reader is still blocking, not succeeded yet.
|
||||
assertEquals(0, readStatus.get());
|
||||
dfsCluster.rollEditLogAndTail(0);
|
||||
// wait a while for all the change to be done
|
||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||
@Override
|
||||
public Boolean get() {
|
||||
return readStatus.get() != 0;
|
||||
}
|
||||
}, 100, 10000);
|
||||
// the reader should have succeed.
|
||||
assertEquals(1, readStatus.get());
|
||||
|
||||
final int observerIdx = 2;
|
||||
NameNode observerNN = dfsCluster.getNameNode(observerIdx);
|
||||
MetricsRecordBuilder rpcMetrics =
|
||||
getMetrics("RpcActivityForPort"
|
||||
+ observerNN.getNameNodeAddress().getPort());
|
||||
long rpcQueueTimeNumOps = getLongCounter("RpcQueueTimeNumOps", rpcMetrics);
|
||||
long rpcProcessingTimeNumOps = getLongCounter("RpcProcessingTimeNumOps",
|
||||
rpcMetrics);
|
||||
assertEquals(rpcQueueTimeNumOps, rpcProcessingTimeNumOps);
|
||||
}
|
||||
|
||||
private void assertSentTo(int nnIdx) throws IOException {
|
||||
assertTrue("Request was not sent to the expected namenode " + nnIdx,
|
||||
HATestUtil.isSentToAnyOfNameNodes(dfs, dfsCluster, nnIdx));
|
||||
|
Loading…
x
Reference in New Issue
Block a user