YARN-4311. Removing nodes from include and exclude lists will not remove them from decommissioned nodes list. Contributed by Kuhu Shukla

This commit is contained in:
Jason Lowe 2016-05-05 14:07:54 +00:00
parent 72b047715c
commit d0da13229c
11 changed files with 654 additions and 36 deletions

View File

@ -204,6 +204,15 @@ public ResourceUtilization getAggregatedContainersUtilization() {
public ResourceUtilization getNodeUtilization() {
return null;
}
@Override
public long getUntrackedTimeStamp() {
return 0;
}
@Override
public void setUntrackedTimeStamp(long timeStamp) {
}
}
public static RMNode newNodeInfo(String rackName, String hostName,

View File

@ -193,4 +193,13 @@ public ResourceUtilization getAggregatedContainersUtilization() {
public ResourceUtilization getNodeUtilization() {
return node.getNodeUtilization();
}
@Override
public long getUntrackedTimeStamp() {
return 0;
}
@Override
public void setUntrackedTimeStamp(long timeStamp) {
}
}

View File

@ -713,6 +713,15 @@ public static boolean isAclEnabled(Configuration conf) {
public static final String DEFAULT_RM_NODEMANAGER_MINIMUM_VERSION =
"NONE";
/**
* Timeout(msec) for an untracked node to remain in shutdown or decommissioned
* state.
*/
public static final String RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC =
RM_PREFIX + "node-removal-untracked.timeout-ms";
public static final int
DEFAULT_RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC = 60000;
/**
* RM proxy users' prefix
*/

View File

@ -2752,4 +2752,17 @@
<name>yarn.timeline-service.webapp.rest-csrf.methods-to-ignore</name>
<value>GET,OPTIONS,HEAD</value>
</property>
<property>
<description>
The least amount of time(msec.) an inactive (decommissioned or shutdown) node can
stay in the nodes list of the resourcemanager after being declared untracked.
A node is marked untracked if and only if it is absent from both include and
exclude nodemanager lists on the RM. All inactive nodes are checked twice per
timeout interval or every 10 minutes, whichever is lesser, and marked appropriately.
The same is done when refreshNodes command (graceful or otherwise) is invoked.
</description>
<name>yarn.resourcemanager.node-removal-untracked.timeout-ms</name>
<value>60000</value>
</property>
</configuration>

View File

@ -36,6 +36,7 @@
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.HostsFileReader;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -68,6 +69,8 @@ public class NodesListManager extends CompositeService implements
private String excludesFile;
private Resolver resolver;
private Timer removalTimer;
private int nodeRemovalCheckInterval;
public NodesListManager(RMContext rmContext) {
super(NodesListManager.class.getName());
@ -105,9 +108,72 @@ protected void serviceInit(Configuration conf) throws Exception {
} catch (IOException ioe) {
disableHostsFileReader(ioe);
}
final int nodeRemovalTimeout =
conf.getInt(
YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC,
YarnConfiguration.
DEFAULT_RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC);
nodeRemovalCheckInterval = (Math.min(nodeRemovalTimeout/2,
600000));
removalTimer = new Timer("Node Removal Timer");
removalTimer.schedule(new TimerTask() {
@Override
public void run() {
long now = Time.monotonicNow();
for (Map.Entry<NodeId, RMNode> entry :
rmContext.getInactiveRMNodes().entrySet()) {
NodeId nodeId = entry.getKey();
RMNode rmNode = entry.getValue();
if (isUntrackedNode(rmNode.getHostName())) {
if (rmNode.getUntrackedTimeStamp() == 0) {
rmNode.setUntrackedTimeStamp(now);
} else
if (now - rmNode.getUntrackedTimeStamp() >
nodeRemovalTimeout) {
RMNode result = rmContext.getInactiveRMNodes().remove(nodeId);
if (result != null) {
decrInactiveNMMetrics(rmNode);
LOG.info("Removed " +result.getState().toString() + " node "
+ result.getHostName() + " from inactive nodes list");
}
}
} else {
rmNode.setUntrackedTimeStamp(0);
}
}
}
}, nodeRemovalCheckInterval, nodeRemovalCheckInterval);
super.serviceInit(conf);
}
private void decrInactiveNMMetrics(RMNode rmNode) {
ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics();
switch (rmNode.getState()) {
case SHUTDOWN:
clusterMetrics.decrNumShutdownNMs();
break;
case DECOMMISSIONED:
clusterMetrics.decrDecommisionedNMs();
break;
case LOST:
clusterMetrics.decrNumLostNMs();
break;
case REBOOTED:
clusterMetrics.decrNumRebootedNMs();
break;
default:
LOG.debug("Unexpected node state");
}
}
@Override
public void serviceStop() {
removalTimer.cancel();
}
private void printConfiguredHosts() {
if (!LOG.isDebugEnabled()) {
return;
@ -131,10 +197,13 @@ public void refreshNodes(Configuration yarnConf) throws IOException,
for (NodeId nodeId: rmContext.getRMNodes().keySet()) {
if (!isValidNode(nodeId.getHost())) {
RMNodeEventType nodeEventType = isUntrackedNode(nodeId.getHost()) ?
RMNodeEventType.SHUTDOWN : RMNodeEventType.DECOMMISSION;
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION));
new RMNodeEvent(nodeId, nodeEventType));
}
}
updateInactiveNodes();
}
private void refreshHostsReader(Configuration yarnConf) throws IOException,
@ -171,6 +240,16 @@ private void setDecomissionedNMs() {
}
}
@VisibleForTesting
public int getNodeRemovalCheckInterval() {
return nodeRemovalCheckInterval;
}
@VisibleForTesting
public void setNodeRemovalCheckInterval(int interval) {
this.nodeRemovalCheckInterval = interval;
}
@VisibleForTesting
public Resolver getResolver() {
return resolver;
@ -374,6 +453,33 @@ private HostsFileReader createHostsFileReader(String includesFile,
return hostsReader;
}
private void updateInactiveNodes() {
long now = Time.monotonicNow();
for(Entry<NodeId, RMNode> entry :
rmContext.getInactiveRMNodes().entrySet()) {
NodeId nodeId = entry.getKey();
RMNode rmNode = entry.getValue();
if (isUntrackedNode(nodeId.getHost()) &&
rmNode.getUntrackedTimeStamp() == 0) {
rmNode.setUntrackedTimeStamp(now);
}
}
}
public boolean isUntrackedNode(String hostName) {
boolean untracked;
String ip = resolver.resolve(hostName);
synchronized (hostsReader) {
Set<String> hostsList = hostsReader.getHosts();
Set<String> excludeList = hostsReader.getExcludedHosts();
untracked = !hostsList.isEmpty() &&
!hostsList.contains(hostName) && !hostsList.contains(ip) &&
!excludeList.contains(hostName) && !excludeList.contains(ip);
}
return untracked;
}
/**
* Refresh the nodes gracefully
*
@ -384,11 +490,13 @@ private HostsFileReader createHostsFileReader(String includesFile,
public void refreshNodesGracefully(Configuration conf) throws IOException,
YarnException {
refreshHostsReader(conf);
for (Entry<NodeId, RMNode> entry:rmContext.getRMNodes().entrySet()) {
for (Entry<NodeId, RMNode> entry : rmContext.getRMNodes().entrySet()) {
NodeId nodeId = entry.getKey();
if (!isValidNode(nodeId.getHost())) {
RMNodeEventType nodeEventType = isUntrackedNode(nodeId.getHost()) ?
RMNodeEventType.SHUTDOWN : RMNodeEventType.GRACEFUL_DECOMMISSION;
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeEvent(nodeId, RMNodeEventType.GRACEFUL_DECOMMISSION));
new RMNodeEvent(nodeId, nodeEventType));
} else {
// Recommissioning the nodes
if (entry.getValue().getState() == NodeState.DECOMMISSIONING) {
@ -397,6 +505,7 @@ public void refreshNodesGracefully(Configuration conf) throws IOException,
}
}
}
updateInactiveNodes();
}
/**
@ -420,8 +529,11 @@ public Set<NodeId> checkForDecommissioningNodes() {
public void refreshNodesForcefully() {
for (Entry<NodeId, RMNode> entry : rmContext.getRMNodes().entrySet()) {
if (entry.getValue().getState() == NodeState.DECOMMISSIONING) {
RMNodeEventType nodeEventType =
isUntrackedNode(entry.getKey().getHost()) ?
RMNodeEventType.SHUTDOWN : RMNodeEventType.DECOMMISSION;
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeEvent(entry.getKey(), RMNodeEventType.DECOMMISSION));
new RMNodeEvent(entry.getKey(), nodeEventType));
}
}
}

View File

@ -87,7 +87,7 @@ public static List<RMNode> queryRMNodes(RMContext context,
acceptedStates.contains(NodeState.LOST) ||
acceptedStates.contains(NodeState.REBOOTED)) {
for (RMNode rmNode : context.getInactiveRMNodes().values()) {
if (acceptedStates.contains(rmNode.getState())) {
if ((rmNode != null) && acceptedStates.contains(rmNode.getState())) {
results.add(rmNode);
}
}

View File

@ -172,4 +172,7 @@ public void updateNodeHeartbeatResponseForContainersDecreasing(
public QueuedContainersStatus getQueuedContainersStatus();
long getUntrackedTimeStamp();
void setUntrackedTimeStamp(long timeStamp);
}

View File

@ -39,6 +39,7 @@
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
@ -121,6 +122,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
private long lastHealthReportTime;
private String nodeManagerVersion;
private long timeStamp;
/* Aggregated resource utilization for the containers. */
private ResourceUtilization containersUtilization;
/* Resource utilization for the node. */
@ -263,6 +265,9 @@ RMNodeEventType.SIGNAL_CONTAINER, new SignalContainerTransition())
.addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING,
RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition())
.addTransition(NodeState.DECOMMISSIONING, NodeState.SHUTDOWN,
RMNodeEventType.SHUTDOWN,
new DeactivateNodeTransition(NodeState.SHUTDOWN))
// TODO (in YARN-3223) update resource when container finished.
.addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING,
@ -350,6 +355,7 @@ public RMNodeImpl(NodeId nodeId, RMContext context, String hostName,
this.healthReport = "Healthy";
this.lastHealthReportTime = System.currentTimeMillis();
this.nodeManagerVersion = nodeManagerVersion;
this.timeStamp = 0;
this.latestNodeHeartBeatResponse.setResponseId(0);
@ -1015,7 +1021,7 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
}
/**
* Put a node in deactivated (decommissioned) status.
* Put a node in deactivated (decommissioned or shutdown) status.
* @param rmNode
* @param finalState
*/
@ -1032,6 +1038,9 @@ public static void deactivateNode(RMNodeImpl rmNode, NodeState finalState) {
LOG.info("Deactivating Node " + rmNode.nodeId + " as it is now "
+ finalState);
rmNode.context.getInactiveRMNodes().put(rmNode.nodeId, rmNode);
if (rmNode.context.getNodesListManager().isUntrackedNode(rmNode.hostName)) {
rmNode.setUntrackedTimeStamp(Time.monotonicNow());
}
}
/**
@ -1408,4 +1417,14 @@ public void setQueuedContainersStatus(QueuedContainersStatus
this.writeLock.unlock();
}
}
@Override
public long getUntrackedTimeStamp() {
return this.timeStamp;
}
@Override
public void setUntrackedTimeStamp(long ts) {
this.timeStamp = ts;
}
}

View File

@ -264,6 +264,15 @@ public ResourceUtilization getNodeUtilization() {
public QueuedContainersStatus getQueuedContainersStatus() {
return null;
}
@Override
public long getUntrackedTimeStamp() {
return 0;
}
@Override
public void setUntrackedTimeStamp(long timeStamp) {
}
};
private static RMNode buildRMNode(int rack, final Resource perNode,

View File

@ -31,6 +31,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IOUtils;
@ -48,8 +50,6 @@
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
@ -142,12 +142,12 @@ public void testDecommissionWithIncludeHosts() throws Exception {
rm.getNodesListManager().refreshNodes(conf);
checkDecommissionedNMCount(rm, ++metricCount);
checkShutdownNMCount(rm, ++metricCount);
nodeHeartbeat = nm1.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
Assert
.assertEquals(1, ClusterMetrics.getMetrics().getNumDecommisionedNMs());
.assertEquals(1, ClusterMetrics.getMetrics().getNumShutdownNMs());
nodeHeartbeat = nm2.nodeHeartbeat(true);
Assert.assertTrue("Node is not decommisioned.", NodeAction.SHUTDOWN
@ -156,7 +156,8 @@ public void testDecommissionWithIncludeHosts() throws Exception {
nodeHeartbeat = nm3.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
Assert.assertEquals(metricCount, ClusterMetrics.getMetrics()
.getNumDecommisionedNMs());
.getNumShutdownNMs());
rm.stop();
}
/**
@ -227,7 +228,7 @@ public void testAddNewIncludePathToConfiguration() throws Exception {
MockNM nm2 = rm.registerNode("host2:5678", 10240);
ClusterMetrics metrics = ClusterMetrics.getMetrics();
assert(metrics != null);
int initialMetricCount = metrics.getNumDecommisionedNMs();
int initialMetricCount = metrics.getNumShutdownNMs();
NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
Assert.assertEquals(
NodeAction.NORMAL,
@ -240,16 +241,16 @@ public void testAddNewIncludePathToConfiguration() throws Exception {
conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile
.getAbsolutePath());
rm.getNodesListManager().refreshNodes(conf);
checkDecommissionedNMCount(rm, ++initialMetricCount);
checkShutdownNMCount(rm, ++initialMetricCount);
nodeHeartbeat = nm1.nodeHeartbeat(true);
Assert.assertEquals(
"Node should not have been decomissioned.",
"Node should not have been shutdown.",
NodeAction.NORMAL,
nodeHeartbeat.getNodeAction());
nodeHeartbeat = nm2.nodeHeartbeat(true);
Assert.assertEquals("Node should have been decomissioned but is in state" +
nodeHeartbeat.getNodeAction(),
NodeAction.SHUTDOWN, nodeHeartbeat.getNodeAction());
NodeState nodeState =
rm.getRMContext().getInactiveRMNodes().get(nm2.getNodeId()).getState();
Assert.assertEquals("Node should have been shutdown but is in state" +
nodeState, NodeState.SHUTDOWN, nodeState);
}
/**
@ -510,7 +511,8 @@ protected RMNodeLabelsManager createNodeLabelManager() {
Assert.assertNull(nodeLabelsMgr.getNodeLabels().get(nodeId));
Assert
.assertFalse(
"Node Labels should not accepted by RM If its configured with Central configuration",
"Node Labels should not accepted by RM If its configured with " +
"Central configuration",
response.getAreNodeLabelsAcceptedByRM());
if (rm != null) {
rm.stop();
@ -892,15 +894,15 @@ public void testUnhealthyNodeStatus() throws Exception {
// node unhealthy
nm1.nodeHeartbeat(false);
checkUnealthyNMCount(rm, nm1, true, 1);
checkUnhealthyNMCount(rm, nm1, true, 1);
// node healthy again
nm1.nodeHeartbeat(true);
checkUnealthyNMCount(rm, nm1, false, 0);
checkUnhealthyNMCount(rm, nm1, false, 0);
}
private void checkUnealthyNMCount(MockRM rm, MockNM nm1, boolean health,
int count) throws Exception {
private void checkUnhealthyNMCount(MockRM rm, MockNM nm1, boolean health,
int count) throws Exception {
int waitCount = 0;
while((rm.getRMContext().getRMNodes().get(nm1.getNodeId())
@ -1002,7 +1004,7 @@ public void handle(SchedulerEvent event) {
nm1.nodeHeartbeat(true);
nm2.nodeHeartbeat(false);
rm.drainEvents();
checkUnealthyNMCount(rm, nm2, true, 1);
checkUnhealthyNMCount(rm, nm2, true, 1);
final int expectedNMs = ClusterMetrics.getMetrics().getNumActiveNMs();
QueueMetrics metrics = rm.getResourceScheduler().getRootQueueMetrics();
// TODO Metrics incorrect in case of the FifoScheduler
@ -1014,7 +1016,7 @@ public void handle(SchedulerEvent event) {
Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction()));
rm.drainEvents();
Assert.assertEquals(expectedNMs, ClusterMetrics.getMetrics().getNumActiveNMs());
checkUnealthyNMCount(rm, nm2, true, 1);
checkUnhealthyNMCount(rm, nm2, true, 1);
// reconnect of unhealthy node
nm2 = rm.registerNode("host2:5678", 5120);
@ -1022,7 +1024,7 @@ public void handle(SchedulerEvent event) {
Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction()));
rm.drainEvents();
Assert.assertEquals(expectedNMs, ClusterMetrics.getMetrics().getNumActiveNMs());
checkUnealthyNMCount(rm, nm2, true, 1);
checkUnhealthyNMCount(rm, nm2, true, 1);
// unhealthy node changed back to healthy
nm2 = rm.registerNode("host2:5678", 5120);
@ -1104,7 +1106,7 @@ public void testUnhealthyNMUnregistration() throws Exception {
// node unhealthy
nm1.nodeHeartbeat(false);
checkUnealthyNMCount(rm, nm1, true, 1);
checkUnhealthyNMCount(rm, nm1, true, 1);
UnRegisterNodeManagerRequest request = Records
.newRecord(UnRegisterNodeManagerRequest.class);
request.setNodeId(nm1.getNodeId());
@ -1119,8 +1121,6 @@ public void testInvalidNMUnregistration() throws Exception {
rm.start();
ResourceTrackerService resourceTrackerService = rm
.getResourceTrackerService();
int shutdownNMsCount = ClusterMetrics.getMetrics()
.getNumShutdownNMs();
int decommisionedNMsCount = ClusterMetrics.getMetrics()
.getNumDecommisionedNMs();
@ -1145,10 +1145,12 @@ public void testInvalidNMUnregistration() throws Exception {
rm.getNodesListManager().refreshNodes(conf);
NodeHeartbeatResponse heartbeatResponse = nm1.nodeHeartbeat(true);
Assert.assertEquals(NodeAction.SHUTDOWN, heartbeatResponse.getNodeAction());
int shutdownNMsCount = ClusterMetrics.getMetrics().getNumShutdownNMs();
checkShutdownNMCount(rm, shutdownNMsCount);
checkDecommissionedNMCount(rm, ++decommisionedNMsCount);
checkDecommissionedNMCount(rm, decommisionedNMsCount);
request.setNodeId(nm1.getNodeId());
resourceTrackerService.unRegisterNodeManager(request);
shutdownNMsCount = ClusterMetrics.getMetrics().getNumShutdownNMs();
checkShutdownNMCount(rm, shutdownNMsCount);
checkDecommissionedNMCount(rm, decommisionedNMsCount);
@ -1164,8 +1166,9 @@ public void testInvalidNMUnregistration() throws Exception {
rm.getNodesListManager().refreshNodes(conf);
request.setNodeId(nm2.getNodeId());
resourceTrackerService.unRegisterNodeManager(request);
checkShutdownNMCount(rm, shutdownNMsCount);
checkDecommissionedNMCount(rm, ++decommisionedNMsCount);
checkShutdownNMCount(rm, ++shutdownNMsCount);
checkDecommissionedNMCount(rm, decommisionedNMsCount);
rm.stop();
}
@Test(timeout = 30000)
@ -1300,6 +1303,434 @@ public void testIncorrectRecommission() throws Exception {
rm.stop();
}
/**
* Remove a node from all lists and check if its forgotten
*/
@Test
public void testNodeRemovalNormally() throws Exception {
testNodeRemovalUtil(false);
testNodeRemovalUtilLost(false);
testNodeRemovalUtilRebooted(false);
testNodeRemovalUtilUnhealthy(false);
}
@Test
public void testNodeRemovalGracefully() throws Exception {
testNodeRemovalUtil(true);
testNodeRemovalUtilLost(true);
testNodeRemovalUtilRebooted(true);
testNodeRemovalUtilUnhealthy(true);
}
public void refreshNodesOption(boolean doGraceful, Configuration conf)
throws Exception {
if (doGraceful) {
rm.getNodesListManager().refreshNodesGracefully(conf);
} else {
rm.getNodesListManager().refreshNodes(conf);
}
}
public void testNodeRemovalUtil(boolean doGraceful) throws Exception {
Configuration conf = new Configuration();
int timeoutValue = 500;
File excludeHostFile = new File(TEMP_DIR + File.separator +
"excludeHostFile.txt");
conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, "");
conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, "");
conf.setInt(YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC,
timeoutValue);
CountDownLatch latch = new CountDownLatch(1);
rm = new MockRM(conf);
rm.init(conf);
rm.start();
RMContext rmContext = rm.getRMContext();
refreshNodesOption(doGraceful, conf);
MockNM nm1 = rm.registerNode("host1:1234", 5120);
MockNM nm2 = rm.registerNode("host2:5678", 10240);
MockNM nm3 = rm.registerNode("localhost:4433", 1024);
ClusterMetrics metrics = ClusterMetrics.getMetrics();
assert (metrics != null);
//check all 3 nodes joined in as NORMAL
NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
nodeHeartbeat = nm2.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
nodeHeartbeat = nm3.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
rm.drainEvents();
Assert.assertEquals("All 3 nodes should be active",
metrics.getNumActiveNMs(), 3);
//Remove nm2 from include list, should now be shutdown with timer test
String ip = NetUtils.normalizeHostName("localhost");
writeToHostsFile("host1", ip);
conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile
.getAbsolutePath());
refreshNodesOption(doGraceful, conf);
nm1.nodeHeartbeat(true);
rm.drainEvents();
Assert.assertTrue("Node should not be in active node list",
!rmContext.getRMNodes().containsKey(nm2.getNodeId()));
RMNode rmNode = rmContext.getInactiveRMNodes().get(nm2.getNodeId());
Assert.assertEquals("Node should be in inactive node list",
rmNode.getState(), NodeState.SHUTDOWN);
Assert.assertEquals("Active nodes should be 2",
metrics.getNumActiveNMs(), 2);
Assert.assertEquals("Shutdown nodes should be 1",
metrics.getNumShutdownNMs(), 1);
int nodeRemovalTimeout =
conf.getInt(
YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC,
YarnConfiguration.
DEFAULT_RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC);
int nodeRemovalInterval =
rmContext.getNodesListManager().getNodeRemovalCheckInterval();
long maxThreadSleeptime = nodeRemovalInterval + nodeRemovalTimeout;
latch.await(maxThreadSleeptime, TimeUnit.MILLISECONDS);
rmNode = rmContext.getInactiveRMNodes().get(nm2.getNodeId());
Assert.assertEquals("Node should have been forgotten!",
rmNode, null);
Assert.assertEquals("Shutdown nodes should be 0 now",
metrics.getNumShutdownNMs(), 0);
//Check node removal and re-addition before timer expires
writeToHostsFile("host1", ip, "host2");
refreshNodesOption(doGraceful, conf);
nm2 = rm.registerNode("host2:5678", 10240);
rm.drainEvents();
writeToHostsFile("host1", ip);
refreshNodesOption(doGraceful, conf);
rm.drainEvents();
rmNode = rmContext.getInactiveRMNodes().get(nm2.getNodeId());
Assert.assertEquals("Node should be shutdown",
rmNode.getState(), NodeState.SHUTDOWN);
Assert.assertEquals("Active nodes should be 2",
metrics.getNumActiveNMs(), 2);
Assert.assertEquals("Shutdown nodes should be 1",
metrics.getNumShutdownNMs(), 1);
//add back the node before timer expires
latch.await(maxThreadSleeptime - 2000, TimeUnit.MILLISECONDS);
writeToHostsFile("host1", ip, "host2");
refreshNodesOption(doGraceful, conf);
nm2 = rm.registerNode("host2:5678", 10240);
nodeHeartbeat = nm2.nodeHeartbeat(true);
rm.drainEvents();
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
Assert.assertEquals("Shutdown nodes should be 0 now",
metrics.getNumShutdownNMs(), 0);
Assert.assertEquals("All 3 nodes should be active",
metrics.getNumActiveNMs(), 3);
//Decommission this node, check timer doesn't remove it
writeToHostsFile("host1", "host2", ip);
writeToHostsFile(excludeHostFile, "host2");
conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, excludeHostFile
.getAbsolutePath());
refreshNodesOption(doGraceful, conf);
rm.drainEvents();
rmNode = doGraceful ? rmContext.getRMNodes().get(nm2.getNodeId()) :
rmContext.getInactiveRMNodes().get(nm2.getNodeId());
Assert.assertTrue("Node should be DECOMMISSIONED or DECOMMISSIONING",
(rmNode.getState() == NodeState.DECOMMISSIONED) ||
(rmNode.getState() == NodeState.DECOMMISSIONING));
if (rmNode.getState() == NodeState.DECOMMISSIONED) {
Assert.assertEquals("Decommissioned/ing nodes should be 1 now",
metrics.getNumDecommisionedNMs(), 1);
}
latch.await(maxThreadSleeptime, TimeUnit.MILLISECONDS);
rmNode = doGraceful ? rmContext.getRMNodes().get(nm2.getNodeId()) :
rmContext.getInactiveRMNodes().get(nm2.getNodeId());
Assert.assertTrue("Node should be DECOMMISSIONED or DECOMMISSIONING",
(rmNode.getState() == NodeState.DECOMMISSIONED) ||
(rmNode.getState() == NodeState.DECOMMISSIONING));
if (rmNode.getState() == NodeState.DECOMMISSIONED) {
Assert.assertEquals("Decommissioned/ing nodes should be 1 now",
metrics.getNumDecommisionedNMs(), 1);
}
//Test decommed/ing node that transitions to untracked,timer should remove
writeToHostsFile("host1", ip, "host2");
writeToHostsFile(excludeHostFile, "host2");
refreshNodesOption(doGraceful, conf);
nm1.nodeHeartbeat(true);
//nm2.nodeHeartbeat(true);
nm3.nodeHeartbeat(true);
latch.await(maxThreadSleeptime, TimeUnit.MILLISECONDS);
rmNode = doGraceful ? rmContext.getRMNodes().get(nm2.getNodeId()) :
rmContext.getInactiveRMNodes().get(nm2.getNodeId());
Assert.assertNotEquals("Timer for this node was not canceled!",
rmNode, null);
Assert.assertTrue("Node should be DECOMMISSIONED or DECOMMISSIONING",
(rmNode.getState() == NodeState.DECOMMISSIONED) ||
(rmNode.getState() == NodeState.DECOMMISSIONING));
writeToHostsFile("host1", ip);
writeToHostsFile(excludeHostFile, "");
refreshNodesOption(doGraceful, conf);
latch.await(maxThreadSleeptime, TimeUnit.MILLISECONDS);
rmNode = doGraceful ? rmContext.getRMNodes().get(nm2.getNodeId()) :
rmContext.getInactiveRMNodes().get(nm2.getNodeId());
Assert.assertEquals("Node should have been forgotten!",
rmNode, null);
Assert.assertEquals("Shutdown nodes should be 0 now",
metrics.getNumDecommisionedNMs(), 0);
Assert.assertEquals("Shutdown nodes should be 0 now",
metrics.getNumShutdownNMs(), 0);
Assert.assertEquals("Active nodes should be 2",
metrics.getNumActiveNMs(), 2);
rm.stop();
}
private void testNodeRemovalUtilLost(boolean doGraceful) throws Exception {
Configuration conf = new Configuration();
conf.setLong(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, 2000);
int timeoutValue = 500;
File excludeHostFile = new File(TEMP_DIR + File.separator +
"excludeHostFile.txt");
conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
hostFile.getAbsolutePath());
conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
excludeHostFile.getAbsolutePath());
writeToHostsFile(hostFile, "host1", "localhost", "host2");
writeToHostsFile(excludeHostFile, "");
conf.setInt(YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC,
timeoutValue);
rm = new MockRM(conf);
rm.init(conf);
rm.start();
RMContext rmContext = rm.getRMContext();
refreshNodesOption(doGraceful, conf);
MockNM nm1 = rm.registerNode("host1:1234", 5120);
MockNM nm2 = rm.registerNode("host2:5678", 10240);
MockNM nm3 = rm.registerNode("localhost:4433", 1024);
ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics();
ClusterMetrics metrics = clusterMetrics;
assert (metrics != null);
rm.drainEvents();
//check all 3 nodes joined in as NORMAL
NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
nodeHeartbeat = nm2.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
nodeHeartbeat = nm3.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
rm.drainEvents();
Assert.assertEquals("All 3 nodes should be active",
metrics.getNumActiveNMs(), 3);
int waitCount = 0;
while(waitCount ++<20){
synchronized (this) {
wait(200);
}
nm3.nodeHeartbeat(true);
nm1.nodeHeartbeat(true);
}
Assert.assertNotEquals("host2 should be a lost NM!",
rmContext.getInactiveRMNodes().get(nm2.getNodeId()), null);
Assert.assertEquals("host2 should be a lost NM!",
rmContext.getInactiveRMNodes().get(nm2.getNodeId()).getState(),
NodeState.LOST);
Assert.assertEquals("There should be 1 Lost NM!",
clusterMetrics.getNumLostNMs(), 1);
Assert.assertEquals("There should be 2 Active NM!",
clusterMetrics.getNumActiveNMs(), 2);
int nodeRemovalTimeout =
conf.getInt(
YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC,
YarnConfiguration.
DEFAULT_RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC);
int nodeRemovalInterval =
rmContext.getNodesListManager().getNodeRemovalCheckInterval();
long maxThreadSleeptime = nodeRemovalInterval + nodeRemovalTimeout;
writeToHostsFile(hostFile, "host1", "localhost");
writeToHostsFile(excludeHostFile, "");
refreshNodesOption(doGraceful, conf);
nm1.nodeHeartbeat(true);
nm3.nodeHeartbeat(true);
rm.drainEvents();
waitCount = 0;
while(rmContext.getInactiveRMNodes().get(
nm2.getNodeId()) != null && waitCount++ < 2){
synchronized (this) {
wait(maxThreadSleeptime);
nm1.nodeHeartbeat(true);
nm2.nodeHeartbeat(true);
}
}
Assert.assertEquals("host2 should have been forgotten!",
rmContext.getInactiveRMNodes().get(nm2.getNodeId()), null);
Assert.assertEquals("There should be no Lost NMs!",
clusterMetrics.getNumLostNMs(), 0);
Assert.assertEquals("There should be 2 Active NM!",
clusterMetrics.getNumActiveNMs(), 2);
rm.stop();
}
private void testNodeRemovalUtilRebooted(boolean doGraceful)
throws Exception {
Configuration conf = new Configuration();
int timeoutValue = 500;
File excludeHostFile = new File(TEMP_DIR + File.separator +
"excludeHostFile.txt");
conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
hostFile.getAbsolutePath());
conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
excludeHostFile.getAbsolutePath());
writeToHostsFile(hostFile, "host1", "localhost", "host2");
writeToHostsFile(excludeHostFile, "");
conf.setInt(YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC,
timeoutValue);
rm = new MockRM(conf);
rm.init(conf);
rm.start();
RMContext rmContext = rm.getRMContext();
refreshNodesOption(doGraceful, conf);
MockNM nm1 = rm.registerNode("host1:1234", 5120);
MockNM nm2 = rm.registerNode("host2:5678", 10240);
MockNM nm3 = rm.registerNode("localhost:4433", 1024);
ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics();
ClusterMetrics metrics = clusterMetrics;
assert (metrics != null);
NodeHeartbeatResponse nodeHeartbeat = nm2.nodeHeartbeat(
new HashMap<ApplicationId, List<ContainerStatus>>(), true, -100);
rm.drainEvents();
rm.drainEvents();
Assert.assertNotEquals("host2 should be a rebooted NM!",
rmContext.getInactiveRMNodes().get(nm2.getNodeId()), null);
Assert.assertEquals("host2 should be a rebooted NM!",
rmContext.getInactiveRMNodes().get(nm2.getNodeId()).getState(),
NodeState.REBOOTED);
Assert.assertEquals("There should be 1 Rebooted NM!",
clusterMetrics.getNumRebootedNMs(), 1);
Assert.assertEquals("There should be 2 Active NM!",
clusterMetrics.getNumActiveNMs(), 2);
int nodeRemovalTimeout =
conf.getInt(
YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC,
YarnConfiguration.
DEFAULT_RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC);
int nodeRemovalInterval =
rmContext.getNodesListManager().getNodeRemovalCheckInterval();
long maxThreadSleeptime = nodeRemovalInterval + nodeRemovalTimeout;
writeToHostsFile(hostFile, "host1", "localhost");
writeToHostsFile(excludeHostFile, "");
refreshNodesOption(doGraceful, conf);
nm1.nodeHeartbeat(true);
nm2.nodeHeartbeat(true);
nm3.nodeHeartbeat(true);
rm.drainEvents();
int waitCount = 0;
while(rmContext.getInactiveRMNodes().get(
nm2.getNodeId()) != null && waitCount++ < 2){
synchronized (this) {
wait(maxThreadSleeptime);
}
}
Assert.assertEquals("host2 should have been forgotten!",
rmContext.getInactiveRMNodes().get(nm2.getNodeId()), null);
Assert.assertEquals("There should be no Rebooted NMs!",
clusterMetrics.getNumRebootedNMs(), 0);
Assert.assertEquals("There should be 2 Active NM!",
clusterMetrics.getNumActiveNMs(), 2);
rm.stop();
}
private void testNodeRemovalUtilUnhealthy(boolean doGraceful)
throws Exception {
Configuration conf = new Configuration();
int timeoutValue = 500;
File excludeHostFile = new File(TEMP_DIR + File.separator +
"excludeHostFile.txt");
conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
hostFile.getAbsolutePath());
conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
excludeHostFile.getAbsolutePath());
writeToHostsFile(hostFile, "host1", "localhost", "host2");
writeToHostsFile(excludeHostFile, "");
conf.setInt(YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC,
timeoutValue);
rm = new MockRM(conf);
rm.init(conf);
rm.start();
RMContext rmContext = rm.getRMContext();
refreshNodesOption(doGraceful, conf);
MockNM nm1 = rm.registerNode("host1:1234", 5120);
MockNM nm2 = rm.registerNode("host2:5678", 10240);
MockNM nm3 = rm.registerNode("localhost:4433", 1024);
ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics();
ClusterMetrics metrics = clusterMetrics;
assert (metrics != null);
rm.drainEvents();
//check all 3 nodes joined in as NORMAL
NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
nodeHeartbeat = nm2.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
nodeHeartbeat = nm3.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
rm.drainEvents();
Assert.assertEquals("All 3 nodes should be active",
metrics.getNumActiveNMs(), 3);
// node healthy
nm1.nodeHeartbeat(true);
nm2.nodeHeartbeat(false);
nm3.nodeHeartbeat(true);
checkUnhealthyNMCount(rm, nm2, true, 1);
writeToHostsFile(hostFile, "host1", "localhost");
writeToHostsFile(excludeHostFile, "");
refreshNodesOption(doGraceful, conf);
nm1.nodeHeartbeat(true);
nm2.nodeHeartbeat(false);
nm3.nodeHeartbeat(true);
rm.drainEvents();
Assert.assertNotEquals("host2 should be a shutdown NM!",
rmContext.getInactiveRMNodes().get(nm2.getNodeId()), null);
Assert.assertEquals("host2 should be a shutdown NM!",
rmContext.getInactiveRMNodes().get(nm2.getNodeId()).getState(),
NodeState.SHUTDOWN);
Assert.assertEquals("There should be 2 Active NM!",
clusterMetrics.getNumActiveNMs(), 2);
Assert.assertEquals("There should be 1 Shutdown NM!",
clusterMetrics.getNumShutdownNMs(), 1);
Assert.assertEquals("There should be 0 Unhealthy NM!",
clusterMetrics.getUnhealthyNMs(), 0);
int nodeRemovalTimeout =
conf.getInt(
YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC,
YarnConfiguration.
DEFAULT_RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC);
int nodeRemovalInterval =
rmContext.getNodesListManager().getNodeRemovalCheckInterval();
long maxThreadSleeptime = nodeRemovalInterval + nodeRemovalTimeout;
int waitCount = 0;
while(rmContext.getInactiveRMNodes().get(
nm2.getNodeId()) != null && waitCount++ < 2){
synchronized (this) {
wait(maxThreadSleeptime);
}
}
Assert.assertEquals("host2 should have been forgotten!",
rmContext.getInactiveRMNodes().get(nm2.getNodeId()), null);
Assert.assertEquals("There should be no Shutdown NMs!",
clusterMetrics.getNumRebootedNMs(), 0);
Assert.assertEquals("There should be 2 Active NM!",
clusterMetrics.getNumActiveNMs(), 2);
rm.stop();
}
private void writeToHostsFile(String... hosts) throws IOException {
writeToHostsFile(hostFile, hosts);
}

View File

@ -292,8 +292,10 @@ public void testNodesQueryStateLost() throws JSONException, Exception {
RMNode rmNode = rm.getRMContext().getInactiveRMNodes().get(nodeId);
WebServicesTestUtils.checkStringMatch("nodeHTTPAddress", "",
info.getString("nodeHTTPAddress"));
WebServicesTestUtils.checkStringMatch("state", rmNode.getState()
.toString(), info.getString("state"));
if (rmNode != null) {
WebServicesTestUtils.checkStringMatch("state",
rmNode.getState().toString(), info.getString("state"));
}
}
}
@ -319,8 +321,10 @@ public void testSingleNodeQueryStateLost() throws JSONException, Exception {
rm.getRMContext().getInactiveRMNodes().get(rmnode2.getNodeID());
WebServicesTestUtils.checkStringMatch("nodeHTTPAddress", "",
info.getString("nodeHTTPAddress"));
WebServicesTestUtils.checkStringMatch("state",
rmNode.getState().toString(), info.getString("state"));
if (rmNode != null) {
WebServicesTestUtils.checkStringMatch("state",
rmNode.getState().toString(), info.getString("state"));
}
}
@Test