HDDS-1454. GC other system pause events can trigger pipeline destroy for all the nodes in the cluster. Contributed by Supratim Deka (#852)

This commit is contained in:
supratimdeka 2019-06-19 20:11:16 +05:30 committed by Nanda kumar
parent d3ac516665
commit 9d6842501c
3 changed files with 258 additions and 30 deletions

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdds.scm.node; package org.apache.hadoop.hdds.scm.node;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -43,6 +44,7 @@
import java.io.Closeable; import java.io.Closeable;
import java.util.*; import java.util.*;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Predicate; import java.util.function.Predicate;
@ -116,6 +118,26 @@ private enum NodeLifeCycleEvent {
*/ */
private final long deadNodeIntervalMs; private final long deadNodeIntervalMs;
/**
* The future is used to pause/unpause the scheduled checks.
*/
private ScheduledFuture<?> healthCheckFuture;
/**
* Test utility - tracks if health check has been paused (unit tests).
*/
private boolean checkPaused;
/**
* timestamp of the latest heartbeat check process.
*/
private long lastHealthCheck;
/**
* number of times the heart beat check was skipped.
*/
private long skippedHealthChecks;
/** /**
* Constructs a NodeStateManager instance with the given configuration. * Constructs a NodeStateManager instance with the given configuration.
* *
@ -143,10 +165,11 @@ public NodeStateManager(Configuration conf, EventPublisher eventPublisher) {
executorService = HadoopExecutors.newScheduledThreadPool(1, executorService = HadoopExecutors.newScheduledThreadPool(1,
new ThreadFactoryBuilder().setDaemon(true) new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("SCM Heartbeat Processing Thread - %d").build()); .setNameFormat("SCM Heartbeat Processing Thread - %d").build());
//BUG:BUG TODO: The return value is ignored, if an exception is thrown in
// the executing funtion, it will be ignored. skippedHealthChecks = 0;
executorService.schedule(this, heartbeatCheckerIntervalMs, checkPaused = false; // accessed only from test functions
TimeUnit.MILLISECONDS);
scheduleNextHealthCheck();
} }
/** /**
@ -464,6 +487,42 @@ public Set<ContainerID> getContainers(UUID uuid)
@Override @Override
public void run() { public void run() {
if (shouldSkipCheck()) {
skippedHealthChecks++;
LOG.info("Detected long delay in scheduling HB processing thread. "
+ "Skipping heartbeat checks for one iteration.");
} else {
checkNodesHealth();
}
// we purposefully make this non-deterministic. Instead of using a
// scheduleAtFixedFrequency we will just go to sleep
// and wake up at the next rendezvous point, which is currentTime +
// heartbeatCheckerIntervalMs. This leads to the issue that we are now
// heart beating not at a fixed cadence, but clock tick + time taken to
// work.
//
// This time taken to work can skew the heartbeat processor thread.
// The reason why we don't care is because of the following reasons.
//
// 1. checkerInterval is general many magnitudes faster than datanode HB
// frequency.
//
// 2. if we have too much nodes, the SCM would be doing only HB
// processing, this could lead to SCM's CPU starvation. With this
// approach we always guarantee that HB thread sleeps for a little while.
//
// 3. It is possible that we will never finish processing the HB's in the
// thread. But that means we have a mis-configured system. We will warn
// the users by logging that information.
//
// 4. And the most important reason, heartbeats are not blocked even if
// this thread does not run, they will go into the processing queue.
scheduleNextHealthCheck();
}
private void checkNodesHealth() {
/* /*
* *
* staleNodeDeadline healthyNodeDeadline * staleNodeDeadline healthyNodeDeadline
@ -558,41 +617,36 @@ public void run() {
heartbeatCheckerIntervalMs); heartbeatCheckerIntervalMs);
} }
// we purposefully make this non-deterministic. Instead of using a }
// scheduleAtFixedFrequency we will just go to sleep
// and wake up at the next rendezvous point, which is currentTime + private void scheduleNextHealthCheck() {
// heartbeatCheckerIntervalMs. This leads to the issue that we are now
// heart beating not at a fixed cadence, but clock tick + time taken to
// work.
//
// This time taken to work can skew the heartbeat processor thread.
// The reason why we don't care is because of the following reasons.
//
// 1. checkerInterval is general many magnitudes faster than datanode HB
// frequency.
//
// 2. if we have too much nodes, the SCM would be doing only HB
// processing, this could lead to SCM's CPU starvation. With this
// approach we always guarantee that HB thread sleeps for a little while.
//
// 3. It is possible that we will never finish processing the HB's in the
// thread. But that means we have a mis-configured system. We will warn
// the users by logging that information.
//
// 4. And the most important reason, heartbeats are not blocked even if
// this thread does not run, they will go into the processing queue.
if (!Thread.currentThread().isInterrupted() && if (!Thread.currentThread().isInterrupted() &&
!executorService.isShutdown()) { !executorService.isShutdown()) {
//BUGBUG: The return future needs to checked here to make sure the //BUGBUG: The return future needs to checked here to make sure the
// exceptions are handled correctly. // exceptions are handled correctly.
executorService.schedule(this, heartbeatCheckerIntervalMs, healthCheckFuture = executorService.schedule(this,
TimeUnit.MILLISECONDS); heartbeatCheckerIntervalMs, TimeUnit.MILLISECONDS);
} else { } else {
LOG.info("Current Thread is interrupted, shutting down HB processing " + LOG.warn("Current Thread is interrupted, shutting down HB processing " +
"thread for Node Manager."); "thread for Node Manager.");
} }
lastHealthCheck = Time.monotonicNow();
}
/**
* if the time since last check exceeds the stale|dead node interval, skip.
* such long delays might be caused by a JVM pause. SCM cannot make reliable
* conclusions about datanode health in such situations.
* @return : true indicates skip HB checks
*/
private boolean shouldSkipCheck() {
long currentTime = Time.monotonicNow();
long minInterval = Math.min(staleNodeIntervalMs, deadNodeIntervalMs);
return ((currentTime - lastHealthCheck) >= minInterval);
} }
/** /**
@ -640,4 +694,57 @@ public void close() {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} }
} }
/**
* Test Utility : return number of times heartbeat check was skipped.
* @return : count of times HB process was skipped
*/
@VisibleForTesting
long getSkippedHealthChecks() {
return skippedHealthChecks;
}
/**
* Test Utility : Pause the periodic node hb check.
* @return ScheduledFuture for the scheduled check that got cancelled.
*/
@VisibleForTesting
ScheduledFuture pause() {
if (executorService.isShutdown() || checkPaused) {
return null;
}
checkPaused = healthCheckFuture.cancel(false);
return healthCheckFuture;
}
/**
* Test utility : unpause the periodic node hb check.
* @return ScheduledFuture for the next scheduled check
*/
@VisibleForTesting
ScheduledFuture unpause() {
if (executorService.isShutdown()) {
return null;
}
if (checkPaused) {
Preconditions.checkState(((healthCheckFuture == null)
|| healthCheckFuture.isCancelled()
|| healthCheckFuture.isDone()));
checkPaused = false;
/**
* We do not call scheduleNextHealthCheck because we are
* not updating the lastHealthCheck timestamp.
*/
healthCheckFuture = executorService.schedule(this,
heartbeatCheckerIntervalMs, TimeUnit.MILLISECONDS);
}
return healthCheckFuture;
}
} }

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.hdds.scm.node; package org.apache.hadoop.hdds.scm.node;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.proto import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos; .StorageContainerDatanodeProtocolProtos;
@ -72,6 +73,7 @@
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ScheduledFuture;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
@ -580,4 +582,31 @@ private String nodeResolve(String hostname) {
return null; return null;
} }
} }
/**
* Test utility to stop heartbeat check process.
* @return ScheduledFuture of next scheduled check that got cancelled.
*/
@VisibleForTesting
ScheduledFuture pauseHealthCheck() {
return nodeStateManager.pause();
}
/**
* Test utility to resume the paused heartbeat check process.
* @return ScheduledFuture of the next scheduled check
*/
@VisibleForTesting
ScheduledFuture unpauseHealthCheck() {
return nodeStateManager.unpause();
}
/**
* Test utility to get the count of skipped heartbeat check iterations.
* @return count of skipped heartbeat check iterations
*/
@VisibleForTesting
long getSkippedHealthChecks() {
return nodeStateManager.getSkippedHealthChecks();
}
} }

View File

@ -53,6 +53,8 @@
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
@ -345,6 +347,96 @@ public void testScmDetectStaleAndDeadNode()
} }
} }
/**
* Simulate a JVM Pause by pausing the health check process
* Ensure that none of the nodes with heartbeats become Dead or Stale.
* @throws IOException
* @throws InterruptedException
* @throws AuthenticationException
*/
@Test
public void testScmHandleJvmPause()
throws IOException, InterruptedException, AuthenticationException {
final int healthCheckInterval = 200; // milliseconds
final int heartbeatInterval = 1; // seconds
final int staleNodeInterval = 3; // seconds
final int deadNodeInterval = 6; // seconds
ScheduledFuture schedFuture;
OzoneConfiguration conf = getConf();
conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL,
healthCheckInterval, MILLISECONDS);
conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL,
heartbeatInterval, SECONDS);
conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL,
staleNodeInterval, SECONDS);
conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL,
deadNodeInterval, SECONDS);
try (SCMNodeManager nodeManager = createNodeManager(conf)) {
DatanodeDetails node1 =
TestUtils.createRandomDatanodeAndRegister(nodeManager);
DatanodeDetails node2 =
TestUtils.createRandomDatanodeAndRegister(nodeManager);
nodeManager.processHeartbeat(node1);
nodeManager.processHeartbeat(node2);
// Sleep so that heartbeat processing thread gets to run.
Thread.sleep(1000);
//Assert all nodes are healthy.
assertEquals(2, nodeManager.getAllNodes().size());
assertEquals(2, nodeManager.getNodeCount(HEALTHY));
/**
* Simulate a JVM Pause and subsequent handling in following steps:
* Step 1 : stop heartbeat check process for stale node interval
* Step 2 : resume heartbeat check
* Step 3 : wait for 1 iteration of heartbeat check thread
* Step 4 : retrieve the state of all nodes - assert all are HEALTHY
* Step 5 : heartbeat for node1
* [TODO : what if there is scheduling delay of test thread in Step 5?]
* Step 6 : wait for some time to allow iterations of check process
* Step 7 : retrieve the state of all nodes - assert node2 is STALE
* and node1 is HEALTHY
*/
// Step 1 : stop health check process (simulate JVM pause)
nodeManager.pauseHealthCheck();
Thread.sleep(MILLISECONDS.convert(staleNodeInterval, SECONDS));
// Step 2 : resume health check
assertTrue("Unexpected, already skipped heartbeat checks",
(nodeManager.getSkippedHealthChecks() == 0));
schedFuture = nodeManager.unpauseHealthCheck();
// Step 3 : wait for 1 iteration of health check
try {
schedFuture.get();
assertTrue("We did not skip any heartbeat checks",
nodeManager.getSkippedHealthChecks() > 0);
} catch (ExecutionException e) {
assertEquals("Unexpected exception waiting for Scheduled Health Check",
0, 1);
}
// Step 4 : all nodes should still be HEALTHY
assertEquals(2, nodeManager.getAllNodes().size());
assertEquals(2, nodeManager.getNodeCount(HEALTHY));
// Step 5 : heartbeat for node1
nodeManager.processHeartbeat(node1);
// Step 6 : wait for health check process to run
Thread.sleep(1000);
// Step 7 : node2 should transition to STALE
assertEquals(1, nodeManager.getNodeCount(HEALTHY));
assertEquals(1, nodeManager.getNodeCount(STALE));
}
}
/** /**
* Check for NPE when datanodeDetails is passed null for sendHeartbeat. * Check for NPE when datanodeDetails is passed null for sendHeartbeat.
* *