diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java index c54944bc35..08a68be2e0 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdds.scm.node; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.conf.Configuration; @@ -43,6 +44,7 @@ import java.io.Closeable; import java.util.*; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.function.Predicate; @@ -116,6 +118,26 @@ private enum NodeLifeCycleEvent { */ private final long deadNodeIntervalMs; + /** + * The future is used to pause/unpause the scheduled checks. + */ + private ScheduledFuture healthCheckFuture; + + /** + * Test utility - tracks if health check has been paused (unit tests). + */ + private boolean checkPaused; + + /** + * timestamp of the latest heartbeat check process. + */ + private long lastHealthCheck; + + /** + * number of times the heart beat check was skipped. + */ + private long skippedHealthChecks; + /** * Constructs a NodeStateManager instance with the given configuration. * @@ -143,10 +165,11 @@ public NodeStateManager(Configuration conf, EventPublisher eventPublisher) { executorService = HadoopExecutors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setDaemon(true) .setNameFormat("SCM Heartbeat Processing Thread - %d").build()); - //BUG:BUG TODO: The return value is ignored, if an exception is thrown in - // the executing funtion, it will be ignored. - executorService.schedule(this, heartbeatCheckerIntervalMs, - TimeUnit.MILLISECONDS); + + skippedHealthChecks = 0; + checkPaused = false; // accessed only from test functions + + scheduleNextHealthCheck(); } /** @@ -464,6 +487,42 @@ public Set getContainers(UUID uuid) @Override public void run() { + if (shouldSkipCheck()) { + skippedHealthChecks++; + LOG.info("Detected long delay in scheduling HB processing thread. " + + "Skipping heartbeat checks for one iteration."); + } else { + checkNodesHealth(); + } + + // we purposefully make this non-deterministic. Instead of using a + // scheduleAtFixedFrequency we will just go to sleep + // and wake up at the next rendezvous point, which is currentTime + + // heartbeatCheckerIntervalMs. This leads to the issue that we are now + // heart beating not at a fixed cadence, but clock tick + time taken to + // work. + // + // This time taken to work can skew the heartbeat processor thread. + // The reason why we don't care is because of the following reasons. + // + // 1. checkerInterval is general many magnitudes faster than datanode HB + // frequency. + // + // 2. if we have too much nodes, the SCM would be doing only HB + // processing, this could lead to SCM's CPU starvation. With this + // approach we always guarantee that HB thread sleeps for a little while. + // + // 3. It is possible that we will never finish processing the HB's in the + // thread. But that means we have a mis-configured system. We will warn + // the users by logging that information. + // + // 4. And the most important reason, heartbeats are not blocked even if + // this thread does not run, they will go into the processing queue. + scheduleNextHealthCheck(); + } + + private void checkNodesHealth() { + /* * * staleNodeDeadline healthyNodeDeadline @@ -558,41 +617,36 @@ public void run() { heartbeatCheckerIntervalMs); } - // we purposefully make this non-deterministic. Instead of using a - // scheduleAtFixedFrequency we will just go to sleep - // and wake up at the next rendezvous point, which is currentTime + - // heartbeatCheckerIntervalMs. This leads to the issue that we are now - // heart beating not at a fixed cadence, but clock tick + time taken to - // work. - // - // This time taken to work can skew the heartbeat processor thread. - // The reason why we don't care is because of the following reasons. - // - // 1. checkerInterval is general many magnitudes faster than datanode HB - // frequency. - // - // 2. if we have too much nodes, the SCM would be doing only HB - // processing, this could lead to SCM's CPU starvation. With this - // approach we always guarantee that HB thread sleeps for a little while. - // - // 3. It is possible that we will never finish processing the HB's in the - // thread. But that means we have a mis-configured system. We will warn - // the users by logging that information. - // - // 4. And the most important reason, heartbeats are not blocked even if - // this thread does not run, they will go into the processing queue. + } + + private void scheduleNextHealthCheck() { if (!Thread.currentThread().isInterrupted() && !executorService.isShutdown()) { //BUGBUG: The return future needs to checked here to make sure the // exceptions are handled correctly. - executorService.schedule(this, heartbeatCheckerIntervalMs, - TimeUnit.MILLISECONDS); + healthCheckFuture = executorService.schedule(this, + heartbeatCheckerIntervalMs, TimeUnit.MILLISECONDS); } else { - LOG.info("Current Thread is interrupted, shutting down HB processing " + + LOG.warn("Current Thread is interrupted, shutting down HB processing " + "thread for Node Manager."); } + lastHealthCheck = Time.monotonicNow(); + } + + /** + * if the time since last check exceeds the stale|dead node interval, skip. + * such long delays might be caused by a JVM pause. SCM cannot make reliable + * conclusions about datanode health in such situations. + * @return : true indicates skip HB checks + */ + private boolean shouldSkipCheck() { + + long currentTime = Time.monotonicNow(); + long minInterval = Math.min(staleNodeIntervalMs, deadNodeIntervalMs); + + return ((currentTime - lastHealthCheck) >= minInterval); } /** @@ -640,4 +694,57 @@ public void close() { Thread.currentThread().interrupt(); } } + + /** + * Test Utility : return number of times heartbeat check was skipped. + * @return : count of times HB process was skipped + */ + @VisibleForTesting + long getSkippedHealthChecks() { + return skippedHealthChecks; + } + + /** + * Test Utility : Pause the periodic node hb check. + * @return ScheduledFuture for the scheduled check that got cancelled. + */ + @VisibleForTesting + ScheduledFuture pause() { + + if (executorService.isShutdown() || checkPaused) { + return null; + } + + checkPaused = healthCheckFuture.cancel(false); + + return healthCheckFuture; + } + + /** + * Test utility : unpause the periodic node hb check. + * @return ScheduledFuture for the next scheduled check + */ + @VisibleForTesting + ScheduledFuture unpause() { + + if (executorService.isShutdown()) { + return null; + } + + if (checkPaused) { + Preconditions.checkState(((healthCheckFuture == null) + || healthCheckFuture.isCancelled() + || healthCheckFuture.isDone())); + + checkPaused = false; + /** + * We do not call scheduleNextHealthCheck because we are + * not updating the lastHealthCheck timestamp. + */ + healthCheckFuture = executorService.schedule(this, + heartbeatCheckerIntervalMs, TimeUnit.MILLISECONDS); + } + + return healthCheckFuture; + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java index eaa2255cb0..a85271e270 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdds.scm.node; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos; @@ -72,6 +73,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ScheduledFuture; import java.util.stream.Collectors; /** @@ -580,4 +582,31 @@ private String nodeResolve(String hostname) { return null; } } + + /** + * Test utility to stop heartbeat check process. + * @return ScheduledFuture of next scheduled check that got cancelled. + */ + @VisibleForTesting + ScheduledFuture pauseHealthCheck() { + return nodeStateManager.pause(); + } + + /** + * Test utility to resume the paused heartbeat check process. + * @return ScheduledFuture of the next scheduled check + */ + @VisibleForTesting + ScheduledFuture unpauseHealthCheck() { + return nodeStateManager.unpause(); + } + + /** + * Test utility to get the count of skipped heartbeat check iterations. + * @return count of skipped heartbeat check iterations + */ + @VisibleForTesting + long getSkippedHealthChecks() { + return nodeStateManager.getSkippedHealthChecks(); + } } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java index 60fc2045b0..ae810716da 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java @@ -53,6 +53,8 @@ import java.util.ArrayList; import java.util.List; import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -345,6 +347,96 @@ public void testScmDetectStaleAndDeadNode() } } + /** + * Simulate a JVM Pause by pausing the health check process + * Ensure that none of the nodes with heartbeats become Dead or Stale. + * @throws IOException + * @throws InterruptedException + * @throws AuthenticationException + */ + @Test + public void testScmHandleJvmPause() + throws IOException, InterruptedException, AuthenticationException { + final int healthCheckInterval = 200; // milliseconds + final int heartbeatInterval = 1; // seconds + final int staleNodeInterval = 3; // seconds + final int deadNodeInterval = 6; // seconds + ScheduledFuture schedFuture; + + OzoneConfiguration conf = getConf(); + conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, + healthCheckInterval, MILLISECONDS); + conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL, + heartbeatInterval, SECONDS); + conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, + staleNodeInterval, SECONDS); + conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, + deadNodeInterval, SECONDS); + + try (SCMNodeManager nodeManager = createNodeManager(conf)) { + DatanodeDetails node1 = + TestUtils.createRandomDatanodeAndRegister(nodeManager); + DatanodeDetails node2 = + TestUtils.createRandomDatanodeAndRegister(nodeManager); + + nodeManager.processHeartbeat(node1); + nodeManager.processHeartbeat(node2); + + // Sleep so that heartbeat processing thread gets to run. + Thread.sleep(1000); + + //Assert all nodes are healthy. + assertEquals(2, nodeManager.getAllNodes().size()); + assertEquals(2, nodeManager.getNodeCount(HEALTHY)); + + /** + * Simulate a JVM Pause and subsequent handling in following steps: + * Step 1 : stop heartbeat check process for stale node interval + * Step 2 : resume heartbeat check + * Step 3 : wait for 1 iteration of heartbeat check thread + * Step 4 : retrieve the state of all nodes - assert all are HEALTHY + * Step 5 : heartbeat for node1 + * [TODO : what if there is scheduling delay of test thread in Step 5?] + * Step 6 : wait for some time to allow iterations of check process + * Step 7 : retrieve the state of all nodes - assert node2 is STALE + * and node1 is HEALTHY + */ + + // Step 1 : stop health check process (simulate JVM pause) + nodeManager.pauseHealthCheck(); + Thread.sleep(MILLISECONDS.convert(staleNodeInterval, SECONDS)); + + // Step 2 : resume health check + assertTrue("Unexpected, already skipped heartbeat checks", + (nodeManager.getSkippedHealthChecks() == 0)); + schedFuture = nodeManager.unpauseHealthCheck(); + + // Step 3 : wait for 1 iteration of health check + try { + schedFuture.get(); + assertTrue("We did not skip any heartbeat checks", + nodeManager.getSkippedHealthChecks() > 0); + } catch (ExecutionException e) { + assertEquals("Unexpected exception waiting for Scheduled Health Check", + 0, 1); + } + + // Step 4 : all nodes should still be HEALTHY + assertEquals(2, nodeManager.getAllNodes().size()); + assertEquals(2, nodeManager.getNodeCount(HEALTHY)); + + // Step 5 : heartbeat for node1 + nodeManager.processHeartbeat(node1); + + // Step 6 : wait for health check process to run + Thread.sleep(1000); + + // Step 7 : node2 should transition to STALE + assertEquals(1, nodeManager.getNodeCount(HEALTHY)); + assertEquals(1, nodeManager.getNodeCount(STALE)); + } + } + /** * Check for NPE when datanodeDetails is passed null for sendHeartbeat. *