diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
index 85096ba956..bd737bd7ac 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
@@ -204,6 +204,15 @@ public ResourceUtilization getAggregatedContainersUtilization() {
public ResourceUtilization getNodeUtilization() {
return null;
}
+
+ @Override
+ public long getUntrackedTimeStamp() {
+ return 0;
+ }
+
+ @Override
+ public void setUntrackedTimeStamp(long timeStamp) {
+ }
}
public static RMNode newNodeInfo(String rackName, String hostName,
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
index ab82e66229..5048978ef7 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
@@ -193,4 +193,13 @@ public ResourceUtilization getAggregatedContainersUtilization() {
public ResourceUtilization getNodeUtilization() {
return node.getNodeUtilization();
}
+
+ @Override
+ public long getUntrackedTimeStamp() {
+ return 0;
+ }
+
+ @Override
+ public void setUntrackedTimeStamp(long timeStamp) {
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index a4213cef44..965b6c5d83 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -713,6 +713,15 @@ public static boolean isAclEnabled(Configuration conf) {
public static final String DEFAULT_RM_NODEMANAGER_MINIMUM_VERSION =
"NONE";
+ /**
+ * Timeout(msec) for an untracked node to remain in shutdown or decommissioned
+ * state.
+ */
+ public static final String RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC =
+ RM_PREFIX + "node-removal-untracked.timeout-ms";
+ public static final int
+ DEFAULT_RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC = 60000;
+
/**
* RM proxy users' prefix
*/
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 2be402ab6f..a38d0d828d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -2752,4 +2752,17 @@
yarn.timeline-service.webapp.rest-csrf.methods-to-ignore
GET,OPTIONS,HEAD
+
+
+
+ The least amount of time(msec.) an inactive (decommissioned or shutdown) node can
+ stay in the nodes list of the resourcemanager after being declared untracked.
+ A node is marked untracked if and only if it is absent from both include and
+ exclude nodemanager lists on the RM. All inactive nodes are checked twice per
+ timeout interval or every 10 minutes, whichever is lesser, and marked appropriately.
+ The same is done when refreshNodes command (graceful or otherwise) is invoked.
+
+ yarn.resourcemanager.node-removal-untracked.timeout-ms
+ 60000
+
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
index 121c418fc3..bb00e60a47 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
@@ -36,6 +36,7 @@
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.HostsFileReader;
+import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -68,6 +69,8 @@ public class NodesListManager extends CompositeService implements
private String excludesFile;
private Resolver resolver;
+ private Timer removalTimer;
+ private int nodeRemovalCheckInterval;
public NodesListManager(RMContext rmContext) {
super(NodesListManager.class.getName());
@@ -105,9 +108,72 @@ protected void serviceInit(Configuration conf) throws Exception {
} catch (IOException ioe) {
disableHostsFileReader(ioe);
}
+
+ final int nodeRemovalTimeout =
+ conf.getInt(
+ YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC,
+ YarnConfiguration.
+ DEFAULT_RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC);
+ nodeRemovalCheckInterval = (Math.min(nodeRemovalTimeout/2,
+ 600000));
+ removalTimer = new Timer("Node Removal Timer");
+
+ removalTimer.schedule(new TimerTask() {
+ @Override
+ public void run() {
+ long now = Time.monotonicNow();
+ for (Map.Entry entry :
+ rmContext.getInactiveRMNodes().entrySet()) {
+ NodeId nodeId = entry.getKey();
+ RMNode rmNode = entry.getValue();
+ if (isUntrackedNode(rmNode.getHostName())) {
+ if (rmNode.getUntrackedTimeStamp() == 0) {
+ rmNode.setUntrackedTimeStamp(now);
+ } else
+ if (now - rmNode.getUntrackedTimeStamp() >
+ nodeRemovalTimeout) {
+ RMNode result = rmContext.getInactiveRMNodes().remove(nodeId);
+ if (result != null) {
+ decrInactiveNMMetrics(rmNode);
+ LOG.info("Removed " +result.getState().toString() + " node "
+ + result.getHostName() + " from inactive nodes list");
+ }
+ }
+ } else {
+ rmNode.setUntrackedTimeStamp(0);
+ }
+ }
+ }
+ }, nodeRemovalCheckInterval, nodeRemovalCheckInterval);
+
super.serviceInit(conf);
}
+ private void decrInactiveNMMetrics(RMNode rmNode) {
+ ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics();
+ switch (rmNode.getState()) {
+ case SHUTDOWN:
+ clusterMetrics.decrNumShutdownNMs();
+ break;
+ case DECOMMISSIONED:
+ clusterMetrics.decrDecommisionedNMs();
+ break;
+ case LOST:
+ clusterMetrics.decrNumLostNMs();
+ break;
+ case REBOOTED:
+ clusterMetrics.decrNumRebootedNMs();
+ break;
+ default:
+ LOG.debug("Unexpected node state");
+ }
+ }
+
+ @Override
+ public void serviceStop() {
+ removalTimer.cancel();
+ }
+
private void printConfiguredHosts() {
if (!LOG.isDebugEnabled()) {
return;
@@ -131,10 +197,13 @@ public void refreshNodes(Configuration yarnConf) throws IOException,
for (NodeId nodeId: rmContext.getRMNodes().keySet()) {
if (!isValidNode(nodeId.getHost())) {
+ RMNodeEventType nodeEventType = isUntrackedNode(nodeId.getHost()) ?
+ RMNodeEventType.SHUTDOWN : RMNodeEventType.DECOMMISSION;
this.rmContext.getDispatcher().getEventHandler().handle(
- new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION));
+ new RMNodeEvent(nodeId, nodeEventType));
}
}
+ updateInactiveNodes();
}
private void refreshHostsReader(Configuration yarnConf) throws IOException,
@@ -171,6 +240,16 @@ private void setDecomissionedNMs() {
}
}
+ @VisibleForTesting
+ public int getNodeRemovalCheckInterval() {
+ return nodeRemovalCheckInterval;
+ }
+
+ @VisibleForTesting
+ public void setNodeRemovalCheckInterval(int interval) {
+ this.nodeRemovalCheckInterval = interval;
+ }
+
@VisibleForTesting
public Resolver getResolver() {
return resolver;
@@ -374,6 +453,33 @@ private HostsFileReader createHostsFileReader(String includesFile,
return hostsReader;
}
+ private void updateInactiveNodes() {
+ long now = Time.monotonicNow();
+ for(Entry entry :
+ rmContext.getInactiveRMNodes().entrySet()) {
+ NodeId nodeId = entry.getKey();
+ RMNode rmNode = entry.getValue();
+ if (isUntrackedNode(nodeId.getHost()) &&
+ rmNode.getUntrackedTimeStamp() == 0) {
+ rmNode.setUntrackedTimeStamp(now);
+ }
+ }
+ }
+
+ public boolean isUntrackedNode(String hostName) {
+ boolean untracked;
+ String ip = resolver.resolve(hostName);
+
+ synchronized (hostsReader) {
+ Set hostsList = hostsReader.getHosts();
+ Set excludeList = hostsReader.getExcludedHosts();
+ untracked = !hostsList.isEmpty() &&
+ !hostsList.contains(hostName) && !hostsList.contains(ip) &&
+ !excludeList.contains(hostName) && !excludeList.contains(ip);
+ }
+ return untracked;
+ }
+
/**
* Refresh the nodes gracefully
*
@@ -384,11 +490,13 @@ private HostsFileReader createHostsFileReader(String includesFile,
public void refreshNodesGracefully(Configuration conf) throws IOException,
YarnException {
refreshHostsReader(conf);
- for (Entry entry:rmContext.getRMNodes().entrySet()) {
+ for (Entry entry : rmContext.getRMNodes().entrySet()) {
NodeId nodeId = entry.getKey();
if (!isValidNode(nodeId.getHost())) {
+ RMNodeEventType nodeEventType = isUntrackedNode(nodeId.getHost()) ?
+ RMNodeEventType.SHUTDOWN : RMNodeEventType.GRACEFUL_DECOMMISSION;
this.rmContext.getDispatcher().getEventHandler().handle(
- new RMNodeEvent(nodeId, RMNodeEventType.GRACEFUL_DECOMMISSION));
+ new RMNodeEvent(nodeId, nodeEventType));
} else {
// Recommissioning the nodes
if (entry.getValue().getState() == NodeState.DECOMMISSIONING) {
@@ -397,6 +505,7 @@ public void refreshNodesGracefully(Configuration conf) throws IOException,
}
}
}
+ updateInactiveNodes();
}
/**
@@ -420,8 +529,11 @@ public Set checkForDecommissioningNodes() {
public void refreshNodesForcefully() {
for (Entry entry : rmContext.getRMNodes().entrySet()) {
if (entry.getValue().getState() == NodeState.DECOMMISSIONING) {
+ RMNodeEventType nodeEventType =
+ isUntrackedNode(entry.getKey().getHost()) ?
+ RMNodeEventType.SHUTDOWN : RMNodeEventType.DECOMMISSION;
this.rmContext.getDispatcher().getEventHandler().handle(
- new RMNodeEvent(entry.getKey(), RMNodeEventType.DECOMMISSION));
+ new RMNodeEvent(entry.getKey(), nodeEventType));
}
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
index e19d55ee81..1318d5814b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
@@ -87,7 +87,7 @@ public static List queryRMNodes(RMContext context,
acceptedStates.contains(NodeState.LOST) ||
acceptedStates.contains(NodeState.REBOOTED)) {
for (RMNode rmNode : context.getInactiveRMNodes().values()) {
- if (acceptedStates.contains(rmNode.getState())) {
+ if ((rmNode != null) && acceptedStates.contains(rmNode.getState())) {
results.add(rmNode);
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
index 3bf95386de..0e281d84bb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
@@ -172,4 +172,7 @@ public void updateNodeHeartbeatResponseForContainersDecreasing(
public QueuedContainersStatus getQueuedContainersStatus();
+ long getUntrackedTimeStamp();
+
+ void setUntrackedTimeStamp(long timeStamp);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index 3179169b6b..2e3d10f23d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -39,6 +39,7 @@
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
@@ -121,6 +122,7 @@ public class RMNodeImpl implements RMNode, EventHandler {
private long lastHealthReportTime;
private String nodeManagerVersion;
+ private long timeStamp;
/* Aggregated resource utilization for the containers. */
private ResourceUtilization containersUtilization;
/* Resource utilization for the node. */
@@ -263,6 +265,9 @@ RMNodeEventType.SIGNAL_CONTAINER, new SignalContainerTransition())
.addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING,
RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition())
+ .addTransition(NodeState.DECOMMISSIONING, NodeState.SHUTDOWN,
+ RMNodeEventType.SHUTDOWN,
+ new DeactivateNodeTransition(NodeState.SHUTDOWN))
// TODO (in YARN-3223) update resource when container finished.
.addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING,
@@ -350,6 +355,7 @@ public RMNodeImpl(NodeId nodeId, RMContext context, String hostName,
this.healthReport = "Healthy";
this.lastHealthReportTime = System.currentTimeMillis();
this.nodeManagerVersion = nodeManagerVersion;
+ this.timeStamp = 0;
this.latestNodeHeartBeatResponse.setResponseId(0);
@@ -1015,7 +1021,7 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
}
/**
- * Put a node in deactivated (decommissioned) status.
+ * Put a node in deactivated (decommissioned or shutdown) status.
* @param rmNode
* @param finalState
*/
@@ -1032,6 +1038,9 @@ public static void deactivateNode(RMNodeImpl rmNode, NodeState finalState) {
LOG.info("Deactivating Node " + rmNode.nodeId + " as it is now "
+ finalState);
rmNode.context.getInactiveRMNodes().put(rmNode.nodeId, rmNode);
+ if (rmNode.context.getNodesListManager().isUntrackedNode(rmNode.hostName)) {
+ rmNode.setUntrackedTimeStamp(Time.monotonicNow());
+ }
}
/**
@@ -1408,4 +1417,14 @@ public void setQueuedContainersStatus(QueuedContainersStatus
this.writeLock.unlock();
}
}
+
+ @Override
+ public long getUntrackedTimeStamp() {
+ return this.timeStamp;
+ }
+
+ @Override
+ public void setUntrackedTimeStamp(long ts) {
+ this.timeStamp = ts;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
index f5b61a3bb3..2b4d2fc0aa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
@@ -264,6 +264,15 @@ public ResourceUtilization getNodeUtilization() {
public QueuedContainersStatus getQueuedContainersStatus() {
return null;
}
+
+ @Override
+ public long getUntrackedTimeStamp() {
+ return 0;
+ }
+
+ @Override
+ public void setUntrackedTimeStamp(long timeStamp) {
+ }
};
private static RMNode buildRMNode(int rack, final Resource perNode,
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
index f2f71ce7c8..cac4511287 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
@@ -31,6 +31,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IOUtils;
@@ -48,8 +50,6 @@
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
@@ -142,12 +142,12 @@ public void testDecommissionWithIncludeHosts() throws Exception {
rm.getNodesListManager().refreshNodes(conf);
- checkDecommissionedNMCount(rm, ++metricCount);
+ checkShutdownNMCount(rm, ++metricCount);
nodeHeartbeat = nm1.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
Assert
- .assertEquals(1, ClusterMetrics.getMetrics().getNumDecommisionedNMs());
+ .assertEquals(1, ClusterMetrics.getMetrics().getNumShutdownNMs());
nodeHeartbeat = nm2.nodeHeartbeat(true);
Assert.assertTrue("Node is not decommisioned.", NodeAction.SHUTDOWN
@@ -156,7 +156,8 @@ public void testDecommissionWithIncludeHosts() throws Exception {
nodeHeartbeat = nm3.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
Assert.assertEquals(metricCount, ClusterMetrics.getMetrics()
- .getNumDecommisionedNMs());
+ .getNumShutdownNMs());
+ rm.stop();
}
/**
@@ -227,7 +228,7 @@ public void testAddNewIncludePathToConfiguration() throws Exception {
MockNM nm2 = rm.registerNode("host2:5678", 10240);
ClusterMetrics metrics = ClusterMetrics.getMetrics();
assert(metrics != null);
- int initialMetricCount = metrics.getNumDecommisionedNMs();
+ int initialMetricCount = metrics.getNumShutdownNMs();
NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
Assert.assertEquals(
NodeAction.NORMAL,
@@ -240,16 +241,16 @@ public void testAddNewIncludePathToConfiguration() throws Exception {
conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile
.getAbsolutePath());
rm.getNodesListManager().refreshNodes(conf);
- checkDecommissionedNMCount(rm, ++initialMetricCount);
+ checkShutdownNMCount(rm, ++initialMetricCount);
nodeHeartbeat = nm1.nodeHeartbeat(true);
Assert.assertEquals(
- "Node should not have been decomissioned.",
+ "Node should not have been shutdown.",
NodeAction.NORMAL,
nodeHeartbeat.getNodeAction());
- nodeHeartbeat = nm2.nodeHeartbeat(true);
- Assert.assertEquals("Node should have been decomissioned but is in state" +
- nodeHeartbeat.getNodeAction(),
- NodeAction.SHUTDOWN, nodeHeartbeat.getNodeAction());
+ NodeState nodeState =
+ rm.getRMContext().getInactiveRMNodes().get(nm2.getNodeId()).getState();
+ Assert.assertEquals("Node should have been shutdown but is in state" +
+ nodeState, NodeState.SHUTDOWN, nodeState);
}
/**
@@ -510,7 +511,8 @@ protected RMNodeLabelsManager createNodeLabelManager() {
Assert.assertNull(nodeLabelsMgr.getNodeLabels().get(nodeId));
Assert
.assertFalse(
- "Node Labels should not accepted by RM If its configured with Central configuration",
+ "Node Labels should not accepted by RM If its configured with " +
+ "Central configuration",
response.getAreNodeLabelsAcceptedByRM());
if (rm != null) {
rm.stop();
@@ -892,15 +894,15 @@ public void testUnhealthyNodeStatus() throws Exception {
// node unhealthy
nm1.nodeHeartbeat(false);
- checkUnealthyNMCount(rm, nm1, true, 1);
+ checkUnhealthyNMCount(rm, nm1, true, 1);
// node healthy again
nm1.nodeHeartbeat(true);
- checkUnealthyNMCount(rm, nm1, false, 0);
+ checkUnhealthyNMCount(rm, nm1, false, 0);
}
- private void checkUnealthyNMCount(MockRM rm, MockNM nm1, boolean health,
- int count) throws Exception {
+ private void checkUnhealthyNMCount(MockRM rm, MockNM nm1, boolean health,
+ int count) throws Exception {
int waitCount = 0;
while((rm.getRMContext().getRMNodes().get(nm1.getNodeId())
@@ -1002,7 +1004,7 @@ public void handle(SchedulerEvent event) {
nm1.nodeHeartbeat(true);
nm2.nodeHeartbeat(false);
rm.drainEvents();
- checkUnealthyNMCount(rm, nm2, true, 1);
+ checkUnhealthyNMCount(rm, nm2, true, 1);
final int expectedNMs = ClusterMetrics.getMetrics().getNumActiveNMs();
QueueMetrics metrics = rm.getResourceScheduler().getRootQueueMetrics();
// TODO Metrics incorrect in case of the FifoScheduler
@@ -1014,7 +1016,7 @@ public void handle(SchedulerEvent event) {
Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction()));
rm.drainEvents();
Assert.assertEquals(expectedNMs, ClusterMetrics.getMetrics().getNumActiveNMs());
- checkUnealthyNMCount(rm, nm2, true, 1);
+ checkUnhealthyNMCount(rm, nm2, true, 1);
// reconnect of unhealthy node
nm2 = rm.registerNode("host2:5678", 5120);
@@ -1022,7 +1024,7 @@ public void handle(SchedulerEvent event) {
Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction()));
rm.drainEvents();
Assert.assertEquals(expectedNMs, ClusterMetrics.getMetrics().getNumActiveNMs());
- checkUnealthyNMCount(rm, nm2, true, 1);
+ checkUnhealthyNMCount(rm, nm2, true, 1);
// unhealthy node changed back to healthy
nm2 = rm.registerNode("host2:5678", 5120);
@@ -1104,7 +1106,7 @@ public void testUnhealthyNMUnregistration() throws Exception {
// node unhealthy
nm1.nodeHeartbeat(false);
- checkUnealthyNMCount(rm, nm1, true, 1);
+ checkUnhealthyNMCount(rm, nm1, true, 1);
UnRegisterNodeManagerRequest request = Records
.newRecord(UnRegisterNodeManagerRequest.class);
request.setNodeId(nm1.getNodeId());
@@ -1119,8 +1121,6 @@ public void testInvalidNMUnregistration() throws Exception {
rm.start();
ResourceTrackerService resourceTrackerService = rm
.getResourceTrackerService();
- int shutdownNMsCount = ClusterMetrics.getMetrics()
- .getNumShutdownNMs();
int decommisionedNMsCount = ClusterMetrics.getMetrics()
.getNumDecommisionedNMs();
@@ -1145,10 +1145,12 @@ public void testInvalidNMUnregistration() throws Exception {
rm.getNodesListManager().refreshNodes(conf);
NodeHeartbeatResponse heartbeatResponse = nm1.nodeHeartbeat(true);
Assert.assertEquals(NodeAction.SHUTDOWN, heartbeatResponse.getNodeAction());
+ int shutdownNMsCount = ClusterMetrics.getMetrics().getNumShutdownNMs();
checkShutdownNMCount(rm, shutdownNMsCount);
- checkDecommissionedNMCount(rm, ++decommisionedNMsCount);
+ checkDecommissionedNMCount(rm, decommisionedNMsCount);
request.setNodeId(nm1.getNodeId());
resourceTrackerService.unRegisterNodeManager(request);
+ shutdownNMsCount = ClusterMetrics.getMetrics().getNumShutdownNMs();
checkShutdownNMCount(rm, shutdownNMsCount);
checkDecommissionedNMCount(rm, decommisionedNMsCount);
@@ -1164,8 +1166,9 @@ public void testInvalidNMUnregistration() throws Exception {
rm.getNodesListManager().refreshNodes(conf);
request.setNodeId(nm2.getNodeId());
resourceTrackerService.unRegisterNodeManager(request);
- checkShutdownNMCount(rm, shutdownNMsCount);
- checkDecommissionedNMCount(rm, ++decommisionedNMsCount);
+ checkShutdownNMCount(rm, ++shutdownNMsCount);
+ checkDecommissionedNMCount(rm, decommisionedNMsCount);
+ rm.stop();
}
@Test(timeout = 30000)
@@ -1300,6 +1303,434 @@ public void testIncorrectRecommission() throws Exception {
rm.stop();
}
+ /**
+ * Remove a node from all lists and check if its forgotten
+ */
+ @Test
+ public void testNodeRemovalNormally() throws Exception {
+ testNodeRemovalUtil(false);
+ testNodeRemovalUtilLost(false);
+ testNodeRemovalUtilRebooted(false);
+ testNodeRemovalUtilUnhealthy(false);
+ }
+
+ @Test
+ public void testNodeRemovalGracefully() throws Exception {
+ testNodeRemovalUtil(true);
+ testNodeRemovalUtilLost(true);
+ testNodeRemovalUtilRebooted(true);
+ testNodeRemovalUtilUnhealthy(true);
+ }
+
+ public void refreshNodesOption(boolean doGraceful, Configuration conf)
+ throws Exception {
+ if (doGraceful) {
+ rm.getNodesListManager().refreshNodesGracefully(conf);
+ } else {
+ rm.getNodesListManager().refreshNodes(conf);
+ }
+ }
+
+ public void testNodeRemovalUtil(boolean doGraceful) throws Exception {
+ Configuration conf = new Configuration();
+ int timeoutValue = 500;
+ File excludeHostFile = new File(TEMP_DIR + File.separator +
+ "excludeHostFile.txt");
+ conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, "");
+ conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, "");
+ conf.setInt(YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC,
+ timeoutValue);
+ CountDownLatch latch = new CountDownLatch(1);
+ rm = new MockRM(conf);
+ rm.init(conf);
+ rm.start();
+ RMContext rmContext = rm.getRMContext();
+ refreshNodesOption(doGraceful, conf);
+ MockNM nm1 = rm.registerNode("host1:1234", 5120);
+ MockNM nm2 = rm.registerNode("host2:5678", 10240);
+ MockNM nm3 = rm.registerNode("localhost:4433", 1024);
+ ClusterMetrics metrics = ClusterMetrics.getMetrics();
+ assert (metrics != null);
+
+ //check all 3 nodes joined in as NORMAL
+ NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
+ Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
+ nodeHeartbeat = nm2.nodeHeartbeat(true);
+ Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
+ nodeHeartbeat = nm3.nodeHeartbeat(true);
+ Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
+ rm.drainEvents();
+ Assert.assertEquals("All 3 nodes should be active",
+ metrics.getNumActiveNMs(), 3);
+
+ //Remove nm2 from include list, should now be shutdown with timer test
+ String ip = NetUtils.normalizeHostName("localhost");
+ writeToHostsFile("host1", ip);
+ conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile
+ .getAbsolutePath());
+ refreshNodesOption(doGraceful, conf);
+ nm1.nodeHeartbeat(true);
+ rm.drainEvents();
+ Assert.assertTrue("Node should not be in active node list",
+ !rmContext.getRMNodes().containsKey(nm2.getNodeId()));
+
+ RMNode rmNode = rmContext.getInactiveRMNodes().get(nm2.getNodeId());
+ Assert.assertEquals("Node should be in inactive node list",
+ rmNode.getState(), NodeState.SHUTDOWN);
+ Assert.assertEquals("Active nodes should be 2",
+ metrics.getNumActiveNMs(), 2);
+ Assert.assertEquals("Shutdown nodes should be 1",
+ metrics.getNumShutdownNMs(), 1);
+
+ int nodeRemovalTimeout =
+ conf.getInt(
+ YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC,
+ YarnConfiguration.
+ DEFAULT_RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC);
+ int nodeRemovalInterval =
+ rmContext.getNodesListManager().getNodeRemovalCheckInterval();
+ long maxThreadSleeptime = nodeRemovalInterval + nodeRemovalTimeout;
+ latch.await(maxThreadSleeptime, TimeUnit.MILLISECONDS);
+
+ rmNode = rmContext.getInactiveRMNodes().get(nm2.getNodeId());
+ Assert.assertEquals("Node should have been forgotten!",
+ rmNode, null);
+ Assert.assertEquals("Shutdown nodes should be 0 now",
+ metrics.getNumShutdownNMs(), 0);
+
+ //Check node removal and re-addition before timer expires
+ writeToHostsFile("host1", ip, "host2");
+ refreshNodesOption(doGraceful, conf);
+ nm2 = rm.registerNode("host2:5678", 10240);
+ rm.drainEvents();
+ writeToHostsFile("host1", ip);
+ refreshNodesOption(doGraceful, conf);
+ rm.drainEvents();
+ rmNode = rmContext.getInactiveRMNodes().get(nm2.getNodeId());
+ Assert.assertEquals("Node should be shutdown",
+ rmNode.getState(), NodeState.SHUTDOWN);
+ Assert.assertEquals("Active nodes should be 2",
+ metrics.getNumActiveNMs(), 2);
+ Assert.assertEquals("Shutdown nodes should be 1",
+ metrics.getNumShutdownNMs(), 1);
+
+ //add back the node before timer expires
+ latch.await(maxThreadSleeptime - 2000, TimeUnit.MILLISECONDS);
+ writeToHostsFile("host1", ip, "host2");
+ refreshNodesOption(doGraceful, conf);
+ nm2 = rm.registerNode("host2:5678", 10240);
+ nodeHeartbeat = nm2.nodeHeartbeat(true);
+ rm.drainEvents();
+ Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
+ Assert.assertEquals("Shutdown nodes should be 0 now",
+ metrics.getNumShutdownNMs(), 0);
+ Assert.assertEquals("All 3 nodes should be active",
+ metrics.getNumActiveNMs(), 3);
+
+ //Decommission this node, check timer doesn't remove it
+ writeToHostsFile("host1", "host2", ip);
+ writeToHostsFile(excludeHostFile, "host2");
+ conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, excludeHostFile
+ .getAbsolutePath());
+ refreshNodesOption(doGraceful, conf);
+ rm.drainEvents();
+ rmNode = doGraceful ? rmContext.getRMNodes().get(nm2.getNodeId()) :
+ rmContext.getInactiveRMNodes().get(nm2.getNodeId());
+ Assert.assertTrue("Node should be DECOMMISSIONED or DECOMMISSIONING",
+ (rmNode.getState() == NodeState.DECOMMISSIONED) ||
+ (rmNode.getState() == NodeState.DECOMMISSIONING));
+ if (rmNode.getState() == NodeState.DECOMMISSIONED) {
+ Assert.assertEquals("Decommissioned/ing nodes should be 1 now",
+ metrics.getNumDecommisionedNMs(), 1);
+ }
+ latch.await(maxThreadSleeptime, TimeUnit.MILLISECONDS);
+
+ rmNode = doGraceful ? rmContext.getRMNodes().get(nm2.getNodeId()) :
+ rmContext.getInactiveRMNodes().get(nm2.getNodeId());
+ Assert.assertTrue("Node should be DECOMMISSIONED or DECOMMISSIONING",
+ (rmNode.getState() == NodeState.DECOMMISSIONED) ||
+ (rmNode.getState() == NodeState.DECOMMISSIONING));
+ if (rmNode.getState() == NodeState.DECOMMISSIONED) {
+ Assert.assertEquals("Decommissioned/ing nodes should be 1 now",
+ metrics.getNumDecommisionedNMs(), 1);
+ }
+
+ //Test decommed/ing node that transitions to untracked,timer should remove
+ writeToHostsFile("host1", ip, "host2");
+ writeToHostsFile(excludeHostFile, "host2");
+ refreshNodesOption(doGraceful, conf);
+ nm1.nodeHeartbeat(true);
+ //nm2.nodeHeartbeat(true);
+ nm3.nodeHeartbeat(true);
+ latch.await(maxThreadSleeptime, TimeUnit.MILLISECONDS);
+ rmNode = doGraceful ? rmContext.getRMNodes().get(nm2.getNodeId()) :
+ rmContext.getInactiveRMNodes().get(nm2.getNodeId());
+ Assert.assertNotEquals("Timer for this node was not canceled!",
+ rmNode, null);
+ Assert.assertTrue("Node should be DECOMMISSIONED or DECOMMISSIONING",
+ (rmNode.getState() == NodeState.DECOMMISSIONED) ||
+ (rmNode.getState() == NodeState.DECOMMISSIONING));
+
+ writeToHostsFile("host1", ip);
+ writeToHostsFile(excludeHostFile, "");
+ refreshNodesOption(doGraceful, conf);
+ latch.await(maxThreadSleeptime, TimeUnit.MILLISECONDS);
+ rmNode = doGraceful ? rmContext.getRMNodes().get(nm2.getNodeId()) :
+ rmContext.getInactiveRMNodes().get(nm2.getNodeId());
+ Assert.assertEquals("Node should have been forgotten!",
+ rmNode, null);
+ Assert.assertEquals("Shutdown nodes should be 0 now",
+ metrics.getNumDecommisionedNMs(), 0);
+ Assert.assertEquals("Shutdown nodes should be 0 now",
+ metrics.getNumShutdownNMs(), 0);
+ Assert.assertEquals("Active nodes should be 2",
+ metrics.getNumActiveNMs(), 2);
+
+ rm.stop();
+ }
+
+ private void testNodeRemovalUtilLost(boolean doGraceful) throws Exception {
+ Configuration conf = new Configuration();
+ conf.setLong(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, 2000);
+ int timeoutValue = 500;
+ File excludeHostFile = new File(TEMP_DIR + File.separator +
+ "excludeHostFile.txt");
+ conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
+ hostFile.getAbsolutePath());
+ conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
+ excludeHostFile.getAbsolutePath());
+ writeToHostsFile(hostFile, "host1", "localhost", "host2");
+ writeToHostsFile(excludeHostFile, "");
+ conf.setInt(YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC,
+ timeoutValue);
+
+ rm = new MockRM(conf);
+ rm.init(conf);
+ rm.start();
+ RMContext rmContext = rm.getRMContext();
+ refreshNodesOption(doGraceful, conf);
+ MockNM nm1 = rm.registerNode("host1:1234", 5120);
+ MockNM nm2 = rm.registerNode("host2:5678", 10240);
+ MockNM nm3 = rm.registerNode("localhost:4433", 1024);
+ ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics();
+ ClusterMetrics metrics = clusterMetrics;
+ assert (metrics != null);
+ rm.drainEvents();
+ //check all 3 nodes joined in as NORMAL
+ NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
+ Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
+ nodeHeartbeat = nm2.nodeHeartbeat(true);
+ Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
+ nodeHeartbeat = nm3.nodeHeartbeat(true);
+ Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
+ rm.drainEvents();
+ Assert.assertEquals("All 3 nodes should be active",
+ metrics.getNumActiveNMs(), 3);
+ int waitCount = 0;
+ while(waitCount ++<20){
+ synchronized (this) {
+ wait(200);
+ }
+ nm3.nodeHeartbeat(true);
+ nm1.nodeHeartbeat(true);
+ }
+ Assert.assertNotEquals("host2 should be a lost NM!",
+ rmContext.getInactiveRMNodes().get(nm2.getNodeId()), null);
+ Assert.assertEquals("host2 should be a lost NM!",
+ rmContext.getInactiveRMNodes().get(nm2.getNodeId()).getState(),
+ NodeState.LOST);
+ Assert.assertEquals("There should be 1 Lost NM!",
+ clusterMetrics.getNumLostNMs(), 1);
+ Assert.assertEquals("There should be 2 Active NM!",
+ clusterMetrics.getNumActiveNMs(), 2);
+ int nodeRemovalTimeout =
+ conf.getInt(
+ YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC,
+ YarnConfiguration.
+ DEFAULT_RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC);
+ int nodeRemovalInterval =
+ rmContext.getNodesListManager().getNodeRemovalCheckInterval();
+ long maxThreadSleeptime = nodeRemovalInterval + nodeRemovalTimeout;
+ writeToHostsFile(hostFile, "host1", "localhost");
+ writeToHostsFile(excludeHostFile, "");
+ refreshNodesOption(doGraceful, conf);
+ nm1.nodeHeartbeat(true);
+ nm3.nodeHeartbeat(true);
+ rm.drainEvents();
+ waitCount = 0;
+ while(rmContext.getInactiveRMNodes().get(
+ nm2.getNodeId()) != null && waitCount++ < 2){
+ synchronized (this) {
+ wait(maxThreadSleeptime);
+ nm1.nodeHeartbeat(true);
+ nm2.nodeHeartbeat(true);
+ }
+ }
+ Assert.assertEquals("host2 should have been forgotten!",
+ rmContext.getInactiveRMNodes().get(nm2.getNodeId()), null);
+ Assert.assertEquals("There should be no Lost NMs!",
+ clusterMetrics.getNumLostNMs(), 0);
+ Assert.assertEquals("There should be 2 Active NM!",
+ clusterMetrics.getNumActiveNMs(), 2);
+ rm.stop();
+ }
+
+ private void testNodeRemovalUtilRebooted(boolean doGraceful)
+ throws Exception {
+ Configuration conf = new Configuration();
+ int timeoutValue = 500;
+ File excludeHostFile = new File(TEMP_DIR + File.separator +
+ "excludeHostFile.txt");
+ conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
+ hostFile.getAbsolutePath());
+ conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
+ excludeHostFile.getAbsolutePath());
+ writeToHostsFile(hostFile, "host1", "localhost", "host2");
+ writeToHostsFile(excludeHostFile, "");
+ conf.setInt(YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC,
+ timeoutValue);
+
+ rm = new MockRM(conf);
+ rm.init(conf);
+ rm.start();
+ RMContext rmContext = rm.getRMContext();
+ refreshNodesOption(doGraceful, conf);
+ MockNM nm1 = rm.registerNode("host1:1234", 5120);
+ MockNM nm2 = rm.registerNode("host2:5678", 10240);
+ MockNM nm3 = rm.registerNode("localhost:4433", 1024);
+ ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics();
+ ClusterMetrics metrics = clusterMetrics;
+ assert (metrics != null);
+ NodeHeartbeatResponse nodeHeartbeat = nm2.nodeHeartbeat(
+ new HashMap>(), true, -100);
+ rm.drainEvents();
+ rm.drainEvents();
+
+ Assert.assertNotEquals("host2 should be a rebooted NM!",
+ rmContext.getInactiveRMNodes().get(nm2.getNodeId()), null);
+ Assert.assertEquals("host2 should be a rebooted NM!",
+ rmContext.getInactiveRMNodes().get(nm2.getNodeId()).getState(),
+ NodeState.REBOOTED);
+ Assert.assertEquals("There should be 1 Rebooted NM!",
+ clusterMetrics.getNumRebootedNMs(), 1);
+ Assert.assertEquals("There should be 2 Active NM!",
+ clusterMetrics.getNumActiveNMs(), 2);
+
+ int nodeRemovalTimeout =
+ conf.getInt(
+ YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC,
+ YarnConfiguration.
+ DEFAULT_RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC);
+ int nodeRemovalInterval =
+ rmContext.getNodesListManager().getNodeRemovalCheckInterval();
+ long maxThreadSleeptime = nodeRemovalInterval + nodeRemovalTimeout;
+ writeToHostsFile(hostFile, "host1", "localhost");
+ writeToHostsFile(excludeHostFile, "");
+ refreshNodesOption(doGraceful, conf);
+ nm1.nodeHeartbeat(true);
+ nm2.nodeHeartbeat(true);
+ nm3.nodeHeartbeat(true);
+ rm.drainEvents();
+ int waitCount = 0;
+ while(rmContext.getInactiveRMNodes().get(
+ nm2.getNodeId()) != null && waitCount++ < 2){
+ synchronized (this) {
+ wait(maxThreadSleeptime);
+ }
+ }
+ Assert.assertEquals("host2 should have been forgotten!",
+ rmContext.getInactiveRMNodes().get(nm2.getNodeId()), null);
+ Assert.assertEquals("There should be no Rebooted NMs!",
+ clusterMetrics.getNumRebootedNMs(), 0);
+ Assert.assertEquals("There should be 2 Active NM!",
+ clusterMetrics.getNumActiveNMs(), 2);
+ rm.stop();
+ }
+
+ private void testNodeRemovalUtilUnhealthy(boolean doGraceful)
+ throws Exception {
+ Configuration conf = new Configuration();
+ int timeoutValue = 500;
+ File excludeHostFile = new File(TEMP_DIR + File.separator +
+ "excludeHostFile.txt");
+ conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
+ hostFile.getAbsolutePath());
+ conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
+ excludeHostFile.getAbsolutePath());
+ writeToHostsFile(hostFile, "host1", "localhost", "host2");
+ writeToHostsFile(excludeHostFile, "");
+ conf.setInt(YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC,
+ timeoutValue);
+
+ rm = new MockRM(conf);
+ rm.init(conf);
+ rm.start();
+ RMContext rmContext = rm.getRMContext();
+ refreshNodesOption(doGraceful, conf);
+ MockNM nm1 = rm.registerNode("host1:1234", 5120);
+ MockNM nm2 = rm.registerNode("host2:5678", 10240);
+ MockNM nm3 = rm.registerNode("localhost:4433", 1024);
+ ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics();
+ ClusterMetrics metrics = clusterMetrics;
+ assert (metrics != null);
+ rm.drainEvents();
+ //check all 3 nodes joined in as NORMAL
+ NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
+ Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
+ nodeHeartbeat = nm2.nodeHeartbeat(true);
+ Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
+ nodeHeartbeat = nm3.nodeHeartbeat(true);
+ Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
+ rm.drainEvents();
+ Assert.assertEquals("All 3 nodes should be active",
+ metrics.getNumActiveNMs(), 3);
+ // node healthy
+ nm1.nodeHeartbeat(true);
+ nm2.nodeHeartbeat(false);
+ nm3.nodeHeartbeat(true);
+ checkUnhealthyNMCount(rm, nm2, true, 1);
+ writeToHostsFile(hostFile, "host1", "localhost");
+ writeToHostsFile(excludeHostFile, "");
+ refreshNodesOption(doGraceful, conf);
+ nm1.nodeHeartbeat(true);
+ nm2.nodeHeartbeat(false);
+ nm3.nodeHeartbeat(true);
+ rm.drainEvents();
+ Assert.assertNotEquals("host2 should be a shutdown NM!",
+ rmContext.getInactiveRMNodes().get(nm2.getNodeId()), null);
+ Assert.assertEquals("host2 should be a shutdown NM!",
+ rmContext.getInactiveRMNodes().get(nm2.getNodeId()).getState(),
+ NodeState.SHUTDOWN);
+ Assert.assertEquals("There should be 2 Active NM!",
+ clusterMetrics.getNumActiveNMs(), 2);
+ Assert.assertEquals("There should be 1 Shutdown NM!",
+ clusterMetrics.getNumShutdownNMs(), 1);
+ Assert.assertEquals("There should be 0 Unhealthy NM!",
+ clusterMetrics.getUnhealthyNMs(), 0);
+ int nodeRemovalTimeout =
+ conf.getInt(
+ YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC,
+ YarnConfiguration.
+ DEFAULT_RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC);
+ int nodeRemovalInterval =
+ rmContext.getNodesListManager().getNodeRemovalCheckInterval();
+ long maxThreadSleeptime = nodeRemovalInterval + nodeRemovalTimeout;
+ int waitCount = 0;
+ while(rmContext.getInactiveRMNodes().get(
+ nm2.getNodeId()) != null && waitCount++ < 2){
+ synchronized (this) {
+ wait(maxThreadSleeptime);
+ }
+ }
+ Assert.assertEquals("host2 should have been forgotten!",
+ rmContext.getInactiveRMNodes().get(nm2.getNodeId()), null);
+ Assert.assertEquals("There should be no Shutdown NMs!",
+ clusterMetrics.getNumRebootedNMs(), 0);
+ Assert.assertEquals("There should be 2 Active NM!",
+ clusterMetrics.getNumActiveNMs(), 2);
+ rm.stop();
+ }
+
private void writeToHostsFile(String... hosts) throws IOException {
writeToHostsFile(hostFile, hosts);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java
index 94bd2534af..50d4e0460e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java
@@ -292,8 +292,10 @@ public void testNodesQueryStateLost() throws JSONException, Exception {
RMNode rmNode = rm.getRMContext().getInactiveRMNodes().get(nodeId);
WebServicesTestUtils.checkStringMatch("nodeHTTPAddress", "",
info.getString("nodeHTTPAddress"));
- WebServicesTestUtils.checkStringMatch("state", rmNode.getState()
- .toString(), info.getString("state"));
+ if (rmNode != null) {
+ WebServicesTestUtils.checkStringMatch("state",
+ rmNode.getState().toString(), info.getString("state"));
+ }
}
}
@@ -319,8 +321,10 @@ public void testSingleNodeQueryStateLost() throws JSONException, Exception {
rm.getRMContext().getInactiveRMNodes().get(rmnode2.getNodeID());
WebServicesTestUtils.checkStringMatch("nodeHTTPAddress", "",
info.getString("nodeHTTPAddress"));
- WebServicesTestUtils.checkStringMatch("state",
- rmNode.getState().toString(), info.getString("state"));
+ if (rmNode != null) {
+ WebServicesTestUtils.checkStringMatch("state",
+ rmNode.getState().toString(), info.getString("state"));
+ }
}
@Test