diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index 6ec9d18e3c..f5cf1d5744 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -1780,6 +1780,7 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg, } slowDiskTracker.addSlowDiskReport(nodeReg.getIpcAddr(false), slowDisks); } + slowDiskTracker.checkAndUpdateReportIfNecessary(); } if (!cmds.isEmpty()) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowDiskTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowDiskTracker.java index d0d1ee437e..894121eeee 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowDiskTracker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowDiskTracker.java @@ -135,10 +135,9 @@ public void addSlowDiskReport(String dataNodeID, diskIDLatencyMap.put(diskID, diskLatency); } - checkAndUpdateReportIfNecessary(); } - private void checkAndUpdateReportIfNecessary() { + public void checkAndUpdateReportIfNecessary() { // Check if it is time for update long now = timer.monotonicNow(); if (now - lastUpdateTime > reportGenerationIntervalMs) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestSlowDiskTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestSlowDiskTracker.java index 78cf927727..63f37b99f4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestSlowDiskTracker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestSlowDiskTracker.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; @@ -56,6 +57,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -401,6 +403,46 @@ public void testEmptyReport() throws Exception { assertTrue(tracker.getSlowDiskReportAsJsonString() == null); } + @Test + public void testRemoveInvalidReport() throws Exception { + MiniDFSCluster cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); + try { + NameNode nn = cluster.getNameNode(0); + + DatanodeManager datanodeManager = + nn.getNamesystem().getBlockManager().getDatanodeManager(); + SlowDiskTracker slowDiskTracker = datanodeManager.getSlowDiskTracker(); + slowDiskTracker.setReportValidityMs(OUTLIERS_REPORT_INTERVAL * 3); + assertTrue(slowDiskTracker.getSlowDisksReport().isEmpty()); + slowDiskTracker.addSlowDiskReport( + "dn1", + generateSlowDiskReport("disk1", + Collections.singletonMap(DiskOp.WRITE, 1.3))); + slowDiskTracker.addSlowDiskReport( + "dn2", + generateSlowDiskReport("disk2", + Collections.singletonMap(DiskOp.WRITE, 1.1))); + + // wait for slow disk report + GenericTestUtils.waitFor(() -> !slowDiskTracker.getSlowDisksReport() + .isEmpty(), 500, 5000); + Map slowDisksReport = + getSlowDisksReportForTesting(slowDiskTracker); + assertEquals(2, slowDisksReport.size()); + + // wait for invalid report to be removed + Thread.sleep(OUTLIERS_REPORT_INTERVAL * 3); + GenericTestUtils.waitFor(() -> slowDiskTracker.getSlowDisksReport() + .isEmpty(), 500, 5000); + slowDisksReport = getSlowDisksReportForTesting(slowDiskTracker); + assertEquals(0, slowDisksReport.size()); + + } finally { + cluster.shutdown(); + } + } + private boolean isDiskInReports(ArrayList reports, String dataNodeID, String disk, DiskOp diskOp, double latency) { String diskID = SlowDiskTracker.getSlowDiskIDForReport(dataNodeID, disk); @@ -430,6 +472,14 @@ private void addSlowDiskForTesting(String dnID, String disk, tracker.addSlowDiskReport(dnID, slowDiskReport); } + private SlowDiskReports generateSlowDiskReport(String disk, + Map latencies) { + Map> slowDisk = Maps.newHashMap(); + slowDisk.put(disk, latencies); + SlowDiskReports slowDiskReport = SlowDiskReports.create(slowDisk); + return slowDiskReport; + } + Map getSlowDisksReportForTesting( SlowDiskTracker slowDiskTracker) { Map slowDisksMap = Maps.newHashMap();