diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java index cc9365d409..f2e9827b8c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.StorageType; @@ -34,10 +35,13 @@ import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; import org.apache.hadoop.util.Daemon; +import org.apache.hadoop.util.StopWatch; import org.apache.hadoop.util.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; + /** * Manage the heartbeats received from datanodes. * The datanode list and statistics are synchronized @@ -62,8 +66,8 @@ class HeartbeatManager implements DatanodeStatistics { private final long heartbeatRecheckInterval; /** Heartbeat monitor thread */ private final Daemon heartbeatThread = new Daemon(new Monitor()); + private final StopWatch heartbeatStopWatch = new StopWatch(); - final Namesystem namesystem; final BlockManager blockManager; @@ -260,7 +264,18 @@ synchronized void stopDecommission(final DatanodeDescriptor node) { stats.add(node); } } - + + @VisibleForTesting + void restartHeartbeatStopWatch() { + heartbeatStopWatch.reset().start(); + } + + @VisibleForTesting + boolean shouldAbortHeartbeatCheck(long offset) { + long elapsed = heartbeatStopWatch.now(TimeUnit.MILLISECONDS); + return elapsed + offset > heartbeatRecheckInterval; + } + /** * Check if there are any expired heartbeats, and if so, * whether any blocks have to be re-replicated. @@ -307,6 +322,10 @@ void heartbeatCheck() { int numOfStaleStorages = 0; synchronized(this) { for (DatanodeDescriptor d : datanodes) { + // check if an excessive GC pause has occurred + if (shouldAbortHeartbeatCheck(0)) { + return; + } if (dead == null && dm.isDatanodeDead(d)) { stats.incrExpiredHeartbeats(); dead = d; @@ -375,6 +394,7 @@ private class Monitor implements Runnable { @Override public void run() { while(namesystem.isRunning()) { + restartHeartbeatStopWatch(); try { final long now = Time.monotonicNow(); if (lastHeartbeatCheck + heartbeatRecheckInterval < now) { @@ -396,6 +416,12 @@ public void run() { Thread.sleep(5000); // 5 seconds } catch (InterruptedException ie) { } + // avoid declaring nodes dead for another cycle if a GC pause lasts + // longer than the node recheck interval + if (shouldAbortHeartbeatCheck(-5000)) { + LOG.warn("Skipping next heartbeat scan due to excessive pause"); + lastHeartbeatCheck = Time.monotonicNow(); + } } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java index 3e233c67cc..b77c413f06 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import java.util.ArrayList; @@ -33,6 +35,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; +import org.apache.hadoop.hdfs.server.namenode.Namesystem; import org.apache.hadoop.hdfs.server.protocol.BlockCommand; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; @@ -40,6 +43,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.junit.Test; +import org.mockito.Mockito; /** * Test if FSNamesystem handles heartbeat right @@ -243,4 +247,27 @@ public void testHeartbeatBlockRecovery() throws Exception { cluster.shutdown(); } } + + @Test + public void testHeartbeatStopWatch() throws Exception { + Namesystem ns = Mockito.mock(Namesystem.class); + BlockManager bm = Mockito.mock(BlockManager.class); + Configuration conf = new Configuration(); + long recheck = 2000; + conf.setLong( + DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, recheck); + HeartbeatManager monitor = new HeartbeatManager(ns, bm, conf); + monitor.restartHeartbeatStopWatch(); + assertFalse(monitor.shouldAbortHeartbeatCheck(0)); + // sleep shorter than recheck and verify shouldn't abort + Thread.sleep(100); + assertFalse(monitor.shouldAbortHeartbeatCheck(0)); + // sleep longer than recheck and verify should abort unless ignore delay + Thread.sleep(recheck); + assertTrue(monitor.shouldAbortHeartbeatCheck(0)); + assertFalse(monitor.shouldAbortHeartbeatCheck(-recheck*3)); + // ensure it resets properly + monitor.restartHeartbeatStopWatch(); + assertFalse(monitor.shouldAbortHeartbeatCheck(0)); + } }