HADOOP-17995. Stale record should be remove when DataNodePeerMetrics#dumpSendPacketDownstreamAvgInfoAsJson (#3630)

This commit is contained in:
huhaiyang 2021-11-17 22:41:06 +08:00 committed by GitHub
parent 646c470e5d
commit 91af256a5b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 15 additions and 2 deletions

View File

@ -179,8 +179,11 @@ public void snapshot(MetricsRecordBuilder builder, boolean all) {
long totalCount = 0; long totalCount = 0;
for (final SumAndCount sumAndCount : entry.getValue()) { for (final SumAndCount sumAndCount : entry.getValue()) {
totalCount += sumAndCount.getCount(); if (Time.monotonicNow() - sumAndCount.getSnapshotTimeStamp()
totalSum += sumAndCount.getSum(); < recordValidityMs) {
totalCount += sumAndCount.getCount();
totalSum += sumAndCount.getSum();
}
} }
if (totalCount != 0) { if (totalCount != 0) {

View File

@ -122,10 +122,16 @@ public void testRemoveStaleRecord() throws Exception {
GenericTestUtils.waitFor( GenericTestUtils.waitFor(
() -> rollingAverages.getStats(numSamples).size() > 0, 500, 5000); () -> rollingAverages.getStats(numSamples).size() > 0, 500, 5000);
assertEquals(3, rollingAverages.getStats(numSamples).size()); assertEquals(3, rollingAverages.getStats(numSamples).size());
String json = peerMetrics.dumpSendPacketDownstreamAvgInfoAsJson();
for (String peerAddr : peerAddrList) {
assertThat(json, containsString(peerAddr));
}
/* wait for stale report to be removed */ /* wait for stale report to be removed */
GenericTestUtils.waitFor( GenericTestUtils.waitFor(
() -> rollingAverages.getStats(numSamples).isEmpty(), 500, 10000); () -> rollingAverages.getStats(numSamples).isEmpty(), 500, 10000);
assertEquals(0, rollingAverages.getStats(numSamples).size()); assertEquals(0, rollingAverages.getStats(numSamples).size());
json = peerMetrics.dumpSendPacketDownstreamAvgInfoAsJson();
assertEquals("{}", json);
/* dn can report peer metrics normally when it added back to cluster */ /* dn can report peer metrics normally when it added back to cluster */
for (String peerAddr : peerAddrList) { for (String peerAddr : peerAddrList) {
@ -138,6 +144,10 @@ public void testRemoveStaleRecord() throws Exception {
GenericTestUtils.waitFor( GenericTestUtils.waitFor(
() -> rollingAverages.getStats(numSamples).size() > 0, 500, 10000); () -> rollingAverages.getStats(numSamples).size() > 0, 500, 10000);
assertEquals(3, rollingAverages.getStats(numSamples).size()); assertEquals(3, rollingAverages.getStats(numSamples).size());
json = peerMetrics.dumpSendPacketDownstreamAvgInfoAsJson();
for (String peerAddr : peerAddrList) {
assertThat(json, containsString(peerAddr));
}
} }
/** /**