HDFS-10912. Ozone:SCM: Add chill mode support to NodeManager. Contributed by Anu Engineer.
This commit is contained in:
parent
5520f73dee
commit
ed84388fca
@ -94,18 +94,51 @@ void registerNode(DatanodeID nodeReg)
|
||||
List<DatanodeID> getAllNodes();
|
||||
|
||||
/**
|
||||
* Get the minimum number of nodes to get out of safe mode.
|
||||
* Get the minimum number of nodes to get out of chill mode.
|
||||
*
|
||||
* @return int
|
||||
*/
|
||||
int getMinimumSafeModeNodes();
|
||||
int getMinimumChillModeNodes();
|
||||
|
||||
/**
|
||||
* Reports if we have exited out of safe mode by discovering enough nodes.
|
||||
* Reports if we have exited out of chill mode by discovering enough nodes.
|
||||
*
|
||||
* @return True if we are out of Node layer safe mode, false otherwise.
|
||||
* @return True if we are out of Node layer chill mode, false otherwise.
|
||||
*/
|
||||
boolean isOutOfNodeSafeMode();
|
||||
boolean isOutOfNodeChillMode();
|
||||
|
||||
/**
|
||||
* Chill mode is the period when node manager waits for a minimum
|
||||
* configured number of datanodes to report in. This is called chill mode
|
||||
* to indicate the period before node manager gets into action.
|
||||
*
|
||||
* Forcefully exits the chill mode, even if we have not met the minimum
|
||||
* criteria of the nodes reporting in.
|
||||
*/
|
||||
void forceExitChillMode();
|
||||
|
||||
/**
|
||||
* Forcefully enters chill mode, even if all minimum node conditions are met.
|
||||
*/
|
||||
void forceEnterChillMode();
|
||||
|
||||
/**
|
||||
* Clears the manual chill mode flag.
|
||||
*/
|
||||
void clearChillModeFlag();
|
||||
|
||||
/**
|
||||
* Returns a chill mode status string.
|
||||
* @return String
|
||||
*/
|
||||
String getChillModeStatus();
|
||||
|
||||
/**
|
||||
* Returns the status of manual chill mode flag.
|
||||
* @return true if forceEnterChillMode has been called,
|
||||
* false if forceExitChillMode or status is not set. eg. clearChillModeFlag.
|
||||
*/
|
||||
boolean isInManualChillMode();
|
||||
|
||||
/**
|
||||
* Enum that represents the Node State. This is used in calls to getNodeList
|
||||
|
@ -17,6 +17,7 @@
|
||||
package org.apache.hadoop.ozone.scm.node;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Optional;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
@ -35,7 +36,6 @@
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
@ -101,8 +101,9 @@ public class SCMNodeManager implements NodeManager {
|
||||
private long lastHBcheckStart;
|
||||
private long lastHBcheckFinished = 0;
|
||||
private long lastHBProcessedCount;
|
||||
private int safeModeNodeCount;
|
||||
private int chillModeNodeCount;
|
||||
private final int maxHBToProcessPerLoop;
|
||||
private Optional<Boolean> inManualChillMode;
|
||||
|
||||
/**
|
||||
* Constructs SCM machine Manager.
|
||||
@ -120,7 +121,7 @@ public SCMNodeManager(Configuration conf) {
|
||||
totalNodes = new AtomicInteger(0);
|
||||
|
||||
// TODO: Support this value as a Percentage of known machines.
|
||||
safeModeNodeCount = 1;
|
||||
chillModeNodeCount = 1;
|
||||
|
||||
staleNodeIntervalMs = OzoneClientUtils.getStaleNodeInterval(conf);
|
||||
deadNodeIntervalMs = OzoneClientUtils.getDeadNodeInterval(conf);
|
||||
@ -132,6 +133,7 @@ public SCMNodeManager(Configuration conf) {
|
||||
executorService = HadoopExecutors.newScheduledThreadPool(1,
|
||||
new ThreadFactoryBuilder().setDaemon(true)
|
||||
.setNameFormat("SCM Heartbeat Processing Thread - %d").build());
|
||||
this.inManualChillMode = Optional.absent();
|
||||
|
||||
Preconditions.checkState(heartbeatCheckerIntervalMs > 0);
|
||||
executorService.schedule(this, heartbeatCheckerIntervalMs,
|
||||
@ -243,36 +245,111 @@ public List<DatanodeID> getAllNodes() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the minimum number of nodes to get out of safe mode.
|
||||
* Get the minimum number of nodes to get out of Chill mode.
|
||||
*
|
||||
* @return int
|
||||
*/
|
||||
@Override
|
||||
public int getMinimumSafeModeNodes() {
|
||||
return safeModeNodeCount;
|
||||
public int getMinimumChillModeNodes() {
|
||||
return chillModeNodeCount;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the Minimum SafeModeNode count, used only in testing.
|
||||
* Sets the Minimum chill mode nodes count, used only in testing.
|
||||
*
|
||||
* @param count - Number of nodes.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public void setMinimumSafeModeNodes(int count) {
|
||||
safeModeNodeCount = count;
|
||||
public void setMinimumChillModeNodes(int count) {
|
||||
chillModeNodeCount = count;
|
||||
}
|
||||
|
||||
/**
|
||||
* Reports if we have exited out of safe mode.
|
||||
* Reports if we have exited out of chill mode.
|
||||
*
|
||||
* @return true if we are out of safe mode.
|
||||
* @return true if we are out of chill mode.
|
||||
*/
|
||||
@Override
|
||||
public boolean isOutOfNodeSafeMode() {
|
||||
LOG.trace("Node count : {}", totalNodes.get());
|
||||
public boolean isOutOfNodeChillMode() {
|
||||
if (inManualChillMode.isPresent()) {
|
||||
return !inManualChillMode.get();
|
||||
}
|
||||
|
||||
//TODO : Support a boolean to force getting out of Safe mode.
|
||||
return (totalNodes.get() >= getMinimumSafeModeNodes());
|
||||
return (totalNodes.get() >= getMinimumChillModeNodes());
|
||||
}
|
||||
|
||||
/**
|
||||
* Clears the manual chill mode.
|
||||
*/
|
||||
@Override
|
||||
public void clearChillModeFlag() {
|
||||
this.inManualChillMode = Optional.absent();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns chill mode Status string.
|
||||
* @return String
|
||||
*/
|
||||
@Override
|
||||
public String getChillModeStatus() {
|
||||
if (inManualChillMode.isPresent() && inManualChillMode.get()) {
|
||||
return "Manual chill mode is set to true." +
|
||||
getNodeStatus();
|
||||
}
|
||||
|
||||
if (inManualChillMode.isPresent() && !inManualChillMode.get()) {
|
||||
return "Manual chill mode is set to false." +
|
||||
getNodeStatus();
|
||||
}
|
||||
|
||||
if (isOutOfNodeChillMode()) {
|
||||
return "Out of chill mode." + getNodeStatus();
|
||||
} else {
|
||||
return "Still in chill mode. Waiting on nodes to report in."
|
||||
+ getNodeStatus();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a node status string.
|
||||
* @return - String
|
||||
*/
|
||||
private String getNodeStatus() {
|
||||
final String chillModeStatus = " %d of out of total "
|
||||
+ "%d nodes have reported in.";
|
||||
return String.format(chillModeStatus, totalNodes.get(),
|
||||
getMinimumChillModeNodes());
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the status of Manual chill Mode flag.
|
||||
*
|
||||
* @return true if forceEnterChillMode has been called, false if
|
||||
* forceExitChillMode or status is not set. eg. clearChillModeFlag.
|
||||
*/
|
||||
@Override
|
||||
public boolean isInManualChillMode() {
|
||||
if(this.inManualChillMode.isPresent()) {
|
||||
return this.inManualChillMode.get();
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Forcefully exits the chill mode even if we have not met the minimum
|
||||
* criteria of exiting the chill mode.
|
||||
*/
|
||||
@Override
|
||||
public void forceExitChillMode() {
|
||||
this.inManualChillMode = Optional.of(false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Forcefully enters chill mode, even if all chill mode conditions are met.
|
||||
*/
|
||||
@Override
|
||||
public void forceEnterChillMode() {
|
||||
this.inManualChillMode = Optional.of(true);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -23,6 +23,7 @@
|
||||
import org.apache.hadoop.ozone.OzoneConfiguration;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.hamcrest.CoreMatchers;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
@ -107,14 +108,14 @@ DatanodeID getDatanodeID(String uuid) {
|
||||
|
||||
SCMNodeManager createNodeManager(Configuration config) throws IOException {
|
||||
SCMNodeManager nodeManager = new SCMNodeManager(config);
|
||||
assertFalse("Node manager should be in safe mode",
|
||||
nodeManager.isOutOfNodeSafeMode());
|
||||
assertFalse("Node manager should be in chill mode",
|
||||
nodeManager.isOutOfNodeChillMode());
|
||||
return nodeManager;
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that Node manager handles heartbeats correctly, and comes out of Safe
|
||||
* Mode.
|
||||
* Tests that Node manager handles heartbeats correctly, and comes out of
|
||||
* chill Mode.
|
||||
*
|
||||
* @throws IOException
|
||||
* @throws InterruptedException
|
||||
@ -127,7 +128,7 @@ public void testScmHeartbeat() throws IOException,
|
||||
try (SCMNodeManager nodeManager = createNodeManager(getConf())) {
|
||||
|
||||
// Send some heartbeats from different nodes.
|
||||
for (int x = 0; x < nodeManager.getMinimumSafeModeNodes(); x++) {
|
||||
for (int x = 0; x < nodeManager.getMinimumChillModeNodes(); x++) {
|
||||
nodeManager.updateHeartbeat(getDatanodeID());
|
||||
}
|
||||
|
||||
@ -136,13 +137,13 @@ public void testScmHeartbeat() throws IOException,
|
||||
4 * 1000);
|
||||
|
||||
assertTrue("Heartbeat thread should have picked up the scheduled " +
|
||||
"heartbeats and transitioned out of safe mode.",
|
||||
nodeManager.isOutOfNodeSafeMode());
|
||||
"heartbeats and transitioned out of chill mode.",
|
||||
nodeManager.isOutOfNodeChillMode());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* asserts that if we send no heartbeats node manager stays in safemode.
|
||||
* asserts that if we send no heartbeats node manager stays in chillmode.
|
||||
*
|
||||
* @throws IOException
|
||||
* @throws InterruptedException
|
||||
@ -155,13 +156,13 @@ public void testScmNoHeartbeats() throws IOException,
|
||||
try (SCMNodeManager nodeManager = createNodeManager(getConf())) {
|
||||
GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatThead(), 100,
|
||||
4 * 1000);
|
||||
assertFalse("No heartbeats, Node manager should have been in safe mode.",
|
||||
nodeManager.isOutOfNodeSafeMode());
|
||||
assertFalse("No heartbeats, Node manager should have been in chill mode.",
|
||||
nodeManager.isOutOfNodeChillMode());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Asserts that if we don't get enough unique nodes we stay in safemode.
|
||||
* Asserts that if we don't get enough unique nodes we stay in chillmode.
|
||||
*
|
||||
* @throws IOException
|
||||
* @throws InterruptedException
|
||||
@ -172,13 +173,13 @@ public void testScmNotEnoughHeartbeats() throws IOException,
|
||||
InterruptedException, TimeoutException {
|
||||
try (SCMNodeManager nodeManager = createNodeManager(getConf())) {
|
||||
|
||||
// Need 100 nodes to come out of safe mode, only one node is sending HB.
|
||||
nodeManager.setMinimumSafeModeNodes(100);
|
||||
// Need 100 nodes to come out of chill mode, only one node is sending HB.
|
||||
nodeManager.setMinimumChillModeNodes(100);
|
||||
nodeManager.updateHeartbeat(getDatanodeID());
|
||||
GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatThead(), 100,
|
||||
4 * 1000);
|
||||
assertFalse("Not enough heartbeat, Node manager should have been in " +
|
||||
"safemode.", nodeManager.isOutOfNodeSafeMode());
|
||||
"chillmode.", nodeManager.isOutOfNodeChillMode());
|
||||
}
|
||||
}
|
||||
|
||||
@ -195,10 +196,10 @@ public void testScmSameNodeHeartbeats() throws IOException,
|
||||
InterruptedException, TimeoutException {
|
||||
|
||||
try (SCMNodeManager nodeManager = createNodeManager(getConf())) {
|
||||
nodeManager.setMinimumSafeModeNodes(3);
|
||||
nodeManager.setMinimumChillModeNodes(3);
|
||||
DatanodeID datanodeID = getDatanodeID();
|
||||
|
||||
// Send 10 heartbeat from same node, and assert we never leave safe mode.
|
||||
// Send 10 heartbeat from same node, and assert we never leave chill mode.
|
||||
for (int x = 0; x < 10; x++) {
|
||||
nodeManager.updateHeartbeat(datanodeID);
|
||||
}
|
||||
@ -206,7 +207,7 @@ public void testScmSameNodeHeartbeats() throws IOException,
|
||||
GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatThead(), 100,
|
||||
4 * 1000);
|
||||
assertFalse("Not enough nodes have send heartbeat to node manager.",
|
||||
nodeManager.isOutOfNodeSafeMode());
|
||||
nodeManager.isOutOfNodeChillMode());
|
||||
}
|
||||
}
|
||||
|
||||
@ -234,7 +235,7 @@ public void testScmShutdown() throws IOException, InterruptedException,
|
||||
Thread.sleep(2 * 1000);
|
||||
|
||||
assertFalse("Node manager executor service is shutdown, should never exit" +
|
||||
" safe mode", nodeManager.isOutOfNodeSafeMode());
|
||||
" chill mode", nodeManager.isOutOfNodeChillMode());
|
||||
|
||||
assertEquals("Assert new HBs were never processed", 0,
|
||||
nodeManager.getLastHBProcessedCount());
|
||||
@ -861,4 +862,59 @@ public void testScmLogsHeartbeatFlooding() throws IOException,
|
||||
"counts."));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testScmEnterAndExistChillMode() throws IOException,
|
||||
InterruptedException {
|
||||
Configuration conf = getConf();
|
||||
conf.setInt(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, 100);
|
||||
|
||||
try (SCMNodeManager nodeManager = createNodeManager(conf)) {
|
||||
nodeManager.setMinimumChillModeNodes(10);
|
||||
nodeManager.updateHeartbeat(getDatanodeID());
|
||||
String status = nodeManager.getChillModeStatus();
|
||||
Assert.assertThat(status, CoreMatchers.containsString("Still in chill " +
|
||||
"mode. Waiting on nodes to report in."));
|
||||
|
||||
// Should not exist chill mode since 10 nodes have not heartbeat yet.
|
||||
assertFalse(nodeManager.isOutOfNodeChillMode());
|
||||
assertFalse((nodeManager.isInManualChillMode()));
|
||||
|
||||
// Force exit chill mode.
|
||||
nodeManager.forceExitChillMode();
|
||||
assertTrue(nodeManager.isOutOfNodeChillMode());
|
||||
status = nodeManager.getChillModeStatus();
|
||||
Assert.assertThat(status,
|
||||
CoreMatchers.containsString("Manual chill mode is set to false."));
|
||||
assertFalse((nodeManager.isInManualChillMode()));
|
||||
|
||||
|
||||
// Enter back to into chill mode.
|
||||
nodeManager.forceEnterChillMode();
|
||||
assertFalse(nodeManager.isOutOfNodeChillMode());
|
||||
status = nodeManager.getChillModeStatus();
|
||||
Assert.assertThat(status,
|
||||
CoreMatchers.containsString("Manual chill mode is set to true."));
|
||||
assertTrue((nodeManager.isInManualChillMode()));
|
||||
|
||||
|
||||
// Assert that node manager force enter cannot be overridden by nodes HBs.
|
||||
for(int x= 0; x < 20; x++) {
|
||||
nodeManager.updateHeartbeat(getDatanodeID());
|
||||
}
|
||||
|
||||
Thread.sleep(500);
|
||||
assertFalse(nodeManager.isOutOfNodeChillMode());
|
||||
|
||||
// Make sure that once we clear the manual chill mode flag, we fall back
|
||||
// to the number of nodes to get out chill mode.
|
||||
nodeManager.clearChillModeFlag();
|
||||
assertTrue(nodeManager.isOutOfNodeChillMode());
|
||||
status = nodeManager.getChillModeStatus();
|
||||
Assert.assertThat(status,
|
||||
CoreMatchers.containsString("Out of chill mode."));
|
||||
assertFalse(nodeManager.isInManualChillMode());
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user