HADOOP-17995. Stale record should be remove when DataNodePeerMetrics#dumpSendPacketDownstreamAvgInfoAsJson (#3708)
This commit is contained in:
parent
98fe0d0fc3
commit
99b161dec7
@ -167,7 +167,7 @@ synchronized void replaceScheduledTask(int windows, long interval,
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void snapshot(MetricsRecordBuilder builder, boolean all) {
|
public synchronized void snapshot(MetricsRecordBuilder builder, boolean all) {
|
||||||
if (all || changed()) {
|
if (all || changed()) {
|
||||||
for (final Entry<String, LinkedBlockingDeque<SumAndCount>> entry
|
for (final Entry<String, LinkedBlockingDeque<SumAndCount>> entry
|
||||||
: averages.entrySet()) {
|
: averages.entrySet()) {
|
||||||
@ -179,8 +179,11 @@ public void snapshot(MetricsRecordBuilder builder, boolean all) {
|
|||||||
long totalCount = 0;
|
long totalCount = 0;
|
||||||
|
|
||||||
for (final SumAndCount sumAndCount : entry.getValue()) {
|
for (final SumAndCount sumAndCount : entry.getValue()) {
|
||||||
totalCount += sumAndCount.getCount();
|
if (Time.monotonicNow() - sumAndCount.getSnapshotTimeStamp()
|
||||||
totalSum += sumAndCount.getSum();
|
< recordValidityMs) {
|
||||||
|
totalCount += sumAndCount.getCount();
|
||||||
|
totalSum += sumAndCount.getSum();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (totalCount != 0) {
|
if (totalCount != 0) {
|
||||||
|
@ -122,10 +122,16 @@ public void testRemoveStaleRecord() throws Exception {
|
|||||||
GenericTestUtils.waitFor(
|
GenericTestUtils.waitFor(
|
||||||
() -> rollingAverages.getStats(numSamples).size() > 0, 500, 5000);
|
() -> rollingAverages.getStats(numSamples).size() > 0, 500, 5000);
|
||||||
assertEquals(3, rollingAverages.getStats(numSamples).size());
|
assertEquals(3, rollingAverages.getStats(numSamples).size());
|
||||||
|
String json = peerMetrics.dumpSendPacketDownstreamAvgInfoAsJson();
|
||||||
|
for (String peerAddr : peerAddrList) {
|
||||||
|
assertThat(json, containsString(peerAddr));
|
||||||
|
}
|
||||||
/* wait for stale report to be removed */
|
/* wait for stale report to be removed */
|
||||||
GenericTestUtils.waitFor(
|
GenericTestUtils.waitFor(
|
||||||
() -> rollingAverages.getStats(numSamples).isEmpty(), 500, 10000);
|
() -> rollingAverages.getStats(numSamples).isEmpty(), 500, 10000);
|
||||||
assertEquals(0, rollingAverages.getStats(numSamples).size());
|
assertEquals(0, rollingAverages.getStats(numSamples).size());
|
||||||
|
json = peerMetrics.dumpSendPacketDownstreamAvgInfoAsJson();
|
||||||
|
assertEquals("{}", json);
|
||||||
|
|
||||||
/* dn can report peer metrics normally when it added back to cluster */
|
/* dn can report peer metrics normally when it added back to cluster */
|
||||||
for (String peerAddr : peerAddrList) {
|
for (String peerAddr : peerAddrList) {
|
||||||
@ -138,6 +144,10 @@ public void testRemoveStaleRecord() throws Exception {
|
|||||||
GenericTestUtils.waitFor(
|
GenericTestUtils.waitFor(
|
||||||
() -> rollingAverages.getStats(numSamples).size() > 0, 500, 10000);
|
() -> rollingAverages.getStats(numSamples).size() > 0, 500, 10000);
|
||||||
assertEquals(3, rollingAverages.getStats(numSamples).size());
|
assertEquals(3, rollingAverages.getStats(numSamples).size());
|
||||||
|
json = peerMetrics.dumpSendPacketDownstreamAvgInfoAsJson();
|
||||||
|
for (String peerAddr : peerAddrList) {
|
||||||
|
assertThat(json, containsString(peerAddr));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
Loading…
Reference in New Issue
Block a user