YARN-4311. Removing nodes from include and exclude lists will not remove them from decommissioned nodes list. Contributed by Kuhu Shukla

This commit is contained in:
Jason Lowe 2016-04-05 13:40:19 +00:00
parent 776b549e2a
commit 1cbcd4a491
12 changed files with 387 additions and 30 deletions

View File

@ -199,6 +199,15 @@ public ResourceUtilization getAggregatedContainersUtilization() {
public ResourceUtilization getNodeUtilization() {
return null;
}
@Override
public long getUntrackedTimeStamp() {
return 0;
}
@Override
public void setUntrackedTimeStamp(long timeStamp) {
}
}
public static RMNode newNodeInfo(String rackName, String hostName,

View File

@ -188,4 +188,13 @@ public ResourceUtilization getAggregatedContainersUtilization() {
public ResourceUtilization getNodeUtilization() {
return node.getNodeUtilization();
}
@Override
public long getUntrackedTimeStamp() {
return 0;
}
@Override
public void setUntrackedTimeStamp(long timeStamp) {
}
}

View File

@ -647,6 +647,15 @@ public static boolean isAclEnabled(Configuration conf) {
public static final String DEFAULT_RM_NODEMANAGER_MINIMUM_VERSION =
"NONE";
/**
* Timeout(msec) for an untracked node to remain in shutdown or decommissioned
* state.
*/
public static final String RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC =
RM_PREFIX + "node-removal-untracked.timeout-ms";
public static final int
DEFAULT_RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC = 60000;
/**
* RM proxy users' prefix
*/

View File

@ -2722,4 +2722,17 @@
<name>yarn.timeline-service.webapp.rest-csrf.methods-to-ignore</name>
<value>GET,OPTIONS,HEAD</value>
</property>
<property>
<description>
The least amount of time(msec.) an inactive (decommissioned or shutdown) node can
stay in the nodes list of the resourcemanager after being declared untracked.
A node is marked untracked if and only if it is absent from both include and
exclude nodemanager lists on the RM. All inactive nodes are checked twice per
timeout interval or every 10 minutes, whichever is lesser, and marked appropriately.
The same is done when refreshNodes command (graceful or otherwise) is invoked.
</description>
<name>yarn.resourcemanager.node-removal-untracked.timeout-ms</name>
<value>60000</value>
</property>
</configuration>

View File

@ -36,6 +36,7 @@
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.HostsFileReader;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -68,6 +69,8 @@ public class NodesListManager extends CompositeService implements
private String excludesFile;
private Resolver resolver;
private Timer removalTimer;
private int nodeRemovalCheckInterval;
public NodesListManager(RMContext rmContext) {
super(NodesListManager.class.getName());
@ -105,9 +108,56 @@ protected void serviceInit(Configuration conf) throws Exception {
} catch (IOException ioe) {
disableHostsFileReader(ioe);
}
final int nodeRemovalTimeout =
conf.getInt(
YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC,
YarnConfiguration.
DEFAULT_RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC);
nodeRemovalCheckInterval = (Math.min(nodeRemovalTimeout/2,
600000));
removalTimer = new Timer("Node Removal Timer");
removalTimer.schedule(new TimerTask() {
@Override
public void run() {
long now = Time.monotonicNow();
for (Map.Entry<NodeId, RMNode> entry :
rmContext.getInactiveRMNodes().entrySet()) {
NodeId nodeId = entry.getKey();
RMNode rmNode = entry.getValue();
if (isUntrackedNode(rmNode.getHostName())) {
if (rmNode.getUntrackedTimeStamp() == 0) {
rmNode.setUntrackedTimeStamp(now);
} else if (now - rmNode.getUntrackedTimeStamp() >
nodeRemovalTimeout) {
RMNode result = rmContext.getInactiveRMNodes().remove(nodeId);
if (result != null) {
ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics();
if (rmNode.getState() == NodeState.SHUTDOWN) {
clusterMetrics.decrNumShutdownNMs();
} else {
clusterMetrics.decrDecommisionedNMs();
}
LOG.info("Removed "+result.getHostName() +
" from inactive nodes list");
}
}
} else {
rmNode.setUntrackedTimeStamp(0);
}
}
}
}, nodeRemovalCheckInterval, nodeRemovalCheckInterval);
super.serviceInit(conf);
}
@Override
public void serviceStop() {
removalTimer.cancel();
}
private void printConfiguredHosts() {
if (!LOG.isDebugEnabled()) {
return;
@ -131,10 +181,13 @@ public void refreshNodes(Configuration yarnConf) throws IOException,
for (NodeId nodeId: rmContext.getRMNodes().keySet()) {
if (!isValidNode(nodeId.getHost())) {
RMNodeEventType nodeEventType = isUntrackedNode(nodeId.getHost()) ?
RMNodeEventType.SHUTDOWN : RMNodeEventType.DECOMMISSION;
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION));
new RMNodeEvent(nodeId, nodeEventType));
}
}
updateInactiveNodes();
}
private void refreshHostsReader(Configuration yarnConf) throws IOException,
@ -171,6 +224,16 @@ private void setDecomissionedNMs() {
}
}
@VisibleForTesting
public int getNodeRemovalCheckInterval() {
return nodeRemovalCheckInterval;
}
@VisibleForTesting
public void setNodeRemovalCheckInterval(int interval) {
this.nodeRemovalCheckInterval = interval;
}
@VisibleForTesting
public Resolver getResolver() {
return resolver;
@ -374,6 +437,33 @@ private HostsFileReader createHostsFileReader(String includesFile,
return hostsReader;
}
private void updateInactiveNodes() {
long now = Time.monotonicNow();
for(Entry<NodeId, RMNode> entry :
rmContext.getInactiveRMNodes().entrySet()) {
NodeId nodeId = entry.getKey();
RMNode rmNode = entry.getValue();
if (isUntrackedNode(nodeId.getHost()) &&
rmNode.getUntrackedTimeStamp() == 0) {
rmNode.setUntrackedTimeStamp(now);
}
}
}
public boolean isUntrackedNode(String hostName) {
boolean untracked;
String ip = resolver.resolve(hostName);
synchronized (hostsReader) {
Set<String> hostsList = hostsReader.getHosts();
Set<String> excludeList = hostsReader.getExcludedHosts();
untracked = !hostsList.isEmpty() &&
!hostsList.contains(hostName) && !hostsList.contains(ip) &&
!excludeList.contains(hostName) && !excludeList.contains(ip);
}
return untracked;
}
/**
* Refresh the nodes gracefully
*
@ -384,11 +474,13 @@ private HostsFileReader createHostsFileReader(String includesFile,
public void refreshNodesGracefully(Configuration conf) throws IOException,
YarnException {
refreshHostsReader(conf);
for (Entry<NodeId, RMNode> entry:rmContext.getRMNodes().entrySet()) {
for (Entry<NodeId, RMNode> entry : rmContext.getRMNodes().entrySet()) {
NodeId nodeId = entry.getKey();
if (!isValidNode(nodeId.getHost())) {
RMNodeEventType nodeEventType = isUntrackedNode(nodeId.getHost()) ?
RMNodeEventType.SHUTDOWN : RMNodeEventType.GRACEFUL_DECOMMISSION;
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeEvent(nodeId, RMNodeEventType.GRACEFUL_DECOMMISSION));
new RMNodeEvent(nodeId, nodeEventType));
} else {
// Recommissioning the nodes
if (entry.getValue().getState() == NodeState.DECOMMISSIONING) {
@ -397,6 +489,7 @@ public void refreshNodesGracefully(Configuration conf) throws IOException,
}
}
}
updateInactiveNodes();
}
/**
@ -420,8 +513,11 @@ public Set<NodeId> checkForDecommissioningNodes() {
public void refreshNodesForcefully() {
for (Entry<NodeId, RMNode> entry : rmContext.getRMNodes().entrySet()) {
if (entry.getValue().getState() == NodeState.DECOMMISSIONING) {
RMNodeEventType nodeEventType =
isUntrackedNode(entry.getKey().getHost()) ?
RMNodeEventType.SHUTDOWN : RMNodeEventType.DECOMMISSION;
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeEvent(entry.getKey(), RMNodeEventType.DECOMMISSION));
new RMNodeEvent(entry.getKey(), nodeEventType));
}
}
}

View File

@ -87,7 +87,7 @@ public static List<RMNode> queryRMNodes(RMContext context,
acceptedStates.contains(NodeState.LOST) ||
acceptedStates.contains(NodeState.REBOOTED)) {
for (RMNode rmNode : context.getInactiveRMNodes().values()) {
if (acceptedStates.contains(rmNode.getState())) {
if ((rmNode != null) && acceptedStates.contains(rmNode.getState())) {
results.add(rmNode);
}
}

View File

@ -320,7 +320,8 @@ public RegisterNodeManagerResponse registerNodeManager(
}
// Check if this node is a 'valid' node
if (!this.nodesListManager.isValidNode(host)) {
if (!this.nodesListManager.isValidNode(host) ||
this.nodesListManager.isUntrackedNode(host)) {
String message =
"Disallowed NodeManager from " + host
+ ", Sending SHUTDOWN signal to the NodeManager.";
@ -451,8 +452,9 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
// 1. Check if it's a valid (i.e. not excluded) node, if not, see if it is
// in decommissioning.
if (!this.nodesListManager.isValidNode(nodeId.getHost())
&& !isNodeInDecommissioning(nodeId)) {
if ((!this.nodesListManager.isValidNode(nodeId.getHost()) &&
!isNodeInDecommissioning(nodeId)) ||
this.nodesListManager.isUntrackedNode(nodeId.getHost())) {
String message =
"Disallowed NodeManager nodeId: " + nodeId + " hostname: "
+ nodeId.getHost();

View File

@ -168,4 +168,8 @@ public void updateNodeHeartbeatResponseForContainersDecreasing(
NodeHeartbeatResponse response);
public List<Container> pullNewlyIncreasedContainers();
long getUntrackedTimeStamp();
void setUntrackedTimeStamp(long timer);
}

View File

@ -39,6 +39,7 @@
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
@ -120,6 +121,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
private long lastHealthReportTime;
private String nodeManagerVersion;
private long timeStamp;
/* Aggregated resource utilization for the containers. */
private ResourceUtilization containersUtilization;
/* Resource utilization for the node. */
@ -259,6 +261,9 @@ RMNodeEventType.SIGNAL_CONTAINER, new SignalContainerTransition())
.addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING,
RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition())
.addTransition(NodeState.DECOMMISSIONING, NodeState.SHUTDOWN,
RMNodeEventType.SHUTDOWN,
new DeactivateNodeTransition(NodeState.SHUTDOWN))
// TODO (in YARN-3223) update resource when container finished.
.addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING,
@ -346,6 +351,7 @@ public RMNodeImpl(NodeId nodeId, RMContext context, String hostName,
this.healthReport = "Healthy";
this.lastHealthReportTime = System.currentTimeMillis();
this.nodeManagerVersion = nodeManagerVersion;
this.timeStamp = 0;
this.latestNodeHeartBeatResponse.setResponseId(0);
@ -1011,7 +1017,7 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
}
/**
* Put a node in deactivated (decommissioned) status.
* Put a node in deactivated (decommissioned or shutdown) status.
* @param rmNode
* @param finalState
*/
@ -1028,6 +1034,10 @@ public static void deactivateNode(RMNodeImpl rmNode, NodeState finalState) {
LOG.info("Deactivating Node " + rmNode.nodeId + " as it is now "
+ finalState);
rmNode.context.getInactiveRMNodes().put(rmNode.nodeId, rmNode);
if (finalState == NodeState.SHUTDOWN &&
rmNode.context.getNodesListManager().isUntrackedNode(rmNode.hostName)) {
rmNode.setUntrackedTimeStamp(Time.monotonicNow());
}
}
/**
@ -1383,4 +1393,14 @@ public List<Container> pullNewlyIncreasedContainers() {
public Resource getOriginalTotalCapability() {
return this.originalTotalCapability;
}
@Override
public long getUntrackedTimeStamp() {
return this.timeStamp;
}
@Override
public void setUntrackedTimeStamp(long ts) {
this.timeStamp = ts;
}
}

View File

@ -260,6 +260,15 @@ public ResourceUtilization getAggregatedContainersUtilization() {
public ResourceUtilization getNodeUtilization() {
return this.nodeUtilization;
}
@Override
public long getUntrackedTimeStamp() {
return 0;
}
@Override
public void setUntrackedTimeStamp(long timeStamp) {
}
};
private static RMNode buildRMNode(int rack, final Resource perNode,

View File

@ -31,6 +31,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IOUtils;
@ -48,8 +50,6 @@
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase;
@ -141,12 +141,12 @@ public void testDecommissionWithIncludeHosts() throws Exception {
rm.getNodesListManager().refreshNodes(conf);
checkDecommissionedNMCount(rm, ++metricCount);
checkShutdownNMCount(rm, ++metricCount);
nodeHeartbeat = nm1.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
Assert
.assertEquals(1, ClusterMetrics.getMetrics().getNumDecommisionedNMs());
.assertEquals(1, ClusterMetrics.getMetrics().getNumShutdownNMs());
nodeHeartbeat = nm2.nodeHeartbeat(true);
Assert.assertTrue("Node is not decommisioned.", NodeAction.SHUTDOWN
@ -155,7 +155,8 @@ public void testDecommissionWithIncludeHosts() throws Exception {
nodeHeartbeat = nm3.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
Assert.assertEquals(metricCount, ClusterMetrics.getMetrics()
.getNumDecommisionedNMs());
.getNumShutdownNMs());
rm.stop();
}
/**
@ -228,7 +229,7 @@ public void testAddNewIncludePathToConfiguration() throws Exception {
MockNM nm2 = rm.registerNode("host2:5678", 10240);
ClusterMetrics metrics = ClusterMetrics.getMetrics();
assert(metrics != null);
int initialMetricCount = metrics.getNumDecommisionedNMs();
int initialMetricCount = metrics.getNumShutdownNMs();
NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
Assert.assertEquals(
NodeAction.NORMAL,
@ -241,16 +242,16 @@ public void testAddNewIncludePathToConfiguration() throws Exception {
conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile
.getAbsolutePath());
rm.getNodesListManager().refreshNodes(conf);
checkDecommissionedNMCount(rm, ++initialMetricCount);
checkShutdownNMCount(rm, ++initialMetricCount);
nodeHeartbeat = nm1.nodeHeartbeat(true);
Assert.assertEquals(
"Node should not have been decomissioned.",
"Node should not have been shutdown.",
NodeAction.NORMAL,
nodeHeartbeat.getNodeAction());
nodeHeartbeat = nm2.nodeHeartbeat(true);
Assert.assertEquals("Node should have been decomissioned but is in state" +
nodeHeartbeat.getNodeAction(),
NodeAction.SHUTDOWN, nodeHeartbeat.getNodeAction());
NodeState nodeState =
rm.getRMContext().getInactiveRMNodes().get(nm2.getNodeId()).getState();
Assert.assertEquals("Node should have been shutdown but is in state" +
nodeState, NodeState.SHUTDOWN, nodeState);
}
/**
@ -1123,8 +1124,6 @@ public void testInvalidNMUnregistration() throws Exception {
rm.start();
ResourceTrackerService resourceTrackerService = rm
.getResourceTrackerService();
int shutdownNMsCount = ClusterMetrics.getMetrics()
.getNumShutdownNMs();
int decommisionedNMsCount = ClusterMetrics.getMetrics()
.getNumDecommisionedNMs();
@ -1149,10 +1148,12 @@ public void testInvalidNMUnregistration() throws Exception {
rm.getNodesListManager().refreshNodes(conf);
NodeHeartbeatResponse heartbeatResponse = nm1.nodeHeartbeat(true);
Assert.assertEquals(NodeAction.SHUTDOWN, heartbeatResponse.getNodeAction());
int shutdownNMsCount = ClusterMetrics.getMetrics().getNumShutdownNMs();
checkShutdownNMCount(rm, shutdownNMsCount);
checkDecommissionedNMCount(rm, ++decommisionedNMsCount);
checkDecommissionedNMCount(rm, decommisionedNMsCount);
request.setNodeId(nm1.getNodeId());
resourceTrackerService.unRegisterNodeManager(request);
shutdownNMsCount = ClusterMetrics.getMetrics().getNumShutdownNMs();
checkShutdownNMCount(rm, shutdownNMsCount);
checkDecommissionedNMCount(rm, decommisionedNMsCount);
@ -1168,8 +1169,9 @@ public void testInvalidNMUnregistration() throws Exception {
rm.getNodesListManager().refreshNodes(conf);
request.setNodeId(nm2.getNodeId());
resourceTrackerService.unRegisterNodeManager(request);
checkShutdownNMCount(rm, shutdownNMsCount);
checkDecommissionedNMCount(rm, ++decommisionedNMsCount);
checkShutdownNMCount(rm, ++shutdownNMsCount);
checkDecommissionedNMCount(rm, decommisionedNMsCount);
rm.stop();
}
@Test(timeout = 30000)
@ -1304,6 +1306,186 @@ public void testIncorrectRecommission() throws Exception {
rm.stop();
}
/**
* Remove a node from all lists and check if its forgotten
*/
@Test
public void testNodeRemovalNormally() throws Exception {
testNodeRemovalUtil(false);
}
@Test
public void testNodeRemovalGracefully() throws Exception {
testNodeRemovalUtil(true);
}
public void refreshNodesOption(boolean doGraceful, Configuration conf)
throws Exception {
if (doGraceful) {
rm.getNodesListManager().refreshNodesGracefully(conf);
} else {
rm.getNodesListManager().refreshNodes(conf);
}
}
public void testNodeRemovalUtil(boolean doGraceful) throws Exception {
Configuration conf = new Configuration();
int timeoutValue = 500;
File excludeHostFile = new File(TEMP_DIR + File.separator +
"excludeHostFile.txt");
conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, "");
conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, "");
conf.setInt(YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC,
timeoutValue);
CountDownLatch latch = new CountDownLatch(1);
rm = new MockRM(conf);
rm.init(conf);
rm.start();
RMContext rmContext = rm.getRMContext();
refreshNodesOption(doGraceful, conf);
MockNM nm1 = rm.registerNode("host1:1234", 5120);
MockNM nm2 = rm.registerNode("host2:5678", 10240);
MockNM nm3 = rm.registerNode("localhost:4433", 1024);
ClusterMetrics metrics = ClusterMetrics.getMetrics();
assert (metrics != null);
//check all 3 nodes joined in as NORMAL
NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
nodeHeartbeat = nm2.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
nodeHeartbeat = nm3.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
rm.drainEvents();
Assert.assertEquals("All 3 nodes should be active",
metrics.getNumActiveNMs(), 3);
//Remove nm2 from include list, should now be shutdown with timer test
String ip = NetUtils.normalizeHostName("localhost");
writeToHostsFile("host1", ip);
conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile
.getAbsolutePath());
refreshNodesOption(doGraceful, conf);
nm1.nodeHeartbeat(true);
rm.drainEvents();
Assert.assertTrue("Node should not be in active node list",
!rmContext.getRMNodes().containsKey(nm2.getNodeId()));
RMNode rmNode = rmContext.getInactiveRMNodes().get(nm2.getNodeId());
Assert.assertEquals("Node should be in inactive node list",
rmNode.getState(), NodeState.SHUTDOWN);
Assert.assertEquals("Active nodes should be 2",
metrics.getNumActiveNMs(), 2);
Assert.assertEquals("Shutdown nodes should be 1",
metrics.getNumShutdownNMs(), 1);
int nodeRemovalTimeout =
conf.getInt(
YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC,
YarnConfiguration.
DEFAULT_RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC);
int nodeRemovalInterval =
rmContext.getNodesListManager().getNodeRemovalCheckInterval();
long maxThreadSleeptime = nodeRemovalInterval + nodeRemovalTimeout;
latch.await(maxThreadSleeptime, TimeUnit.MILLISECONDS);
rmNode = rmContext.getInactiveRMNodes().get(nm2.getNodeId());
Assert.assertEquals("Node should have been forgotten!",
rmNode, null);
Assert.assertEquals("Shutdown nodes should be 0 now",
metrics.getNumShutdownNMs(), 0);
//Check node removal and re-addition before timer expires
writeToHostsFile("host1", ip, "host2");
refreshNodesOption(doGraceful, conf);
nm2 = rm.registerNode("host2:5678", 10240);
rm.drainEvents();
writeToHostsFile("host1", ip);
refreshNodesOption(doGraceful, conf);
rm.drainEvents();
rmNode = rmContext.getInactiveRMNodes().get(nm2.getNodeId());
Assert.assertEquals("Node should be shutdown",
rmNode.getState(), NodeState.SHUTDOWN);
Assert.assertEquals("Active nodes should be 2",
metrics.getNumActiveNMs(), 2);
Assert.assertEquals("Shutdown nodes should be 1",
metrics.getNumShutdownNMs(), 1);
//add back the node before timer expires
latch.await(maxThreadSleeptime - 2000, TimeUnit.MILLISECONDS);
writeToHostsFile("host1", ip, "host2");
refreshNodesOption(doGraceful, conf);
nm2 = rm.registerNode("host2:5678", 10240);
nodeHeartbeat = nm2.nodeHeartbeat(true);
rm.drainEvents();
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
Assert.assertEquals("Shutdown nodes should be 0 now",
metrics.getNumShutdownNMs(), 0);
Assert.assertEquals("All 3 nodes should be active",
metrics.getNumActiveNMs(), 3);
//Decommission this node, check timer doesn't remove it
writeToHostsFile("host1", "host2", ip);
writeToHostsFile(excludeHostFile, "host2");
conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, excludeHostFile
.getAbsolutePath());
refreshNodesOption(doGraceful, conf);
rm.drainEvents();
rmNode = doGraceful ? rmContext.getRMNodes().get(nm2.getNodeId()) :
rmContext.getInactiveRMNodes().get(nm2.getNodeId());
Assert.assertTrue("Node should be DECOMMISSIONED or DECOMMISSIONING",
(rmNode.getState() == NodeState.DECOMMISSIONED) ||
(rmNode.getState() == NodeState.DECOMMISSIONING));
if (rmNode.getState() == NodeState.DECOMMISSIONED) {
Assert.assertEquals("Decommissioned/ing nodes should be 1 now",
metrics.getNumDecommisionedNMs(), 1);
}
latch.await(maxThreadSleeptime, TimeUnit.MILLISECONDS);
rmNode = doGraceful ? rmContext.getRMNodes().get(nm2.getNodeId()) :
rmContext.getInactiveRMNodes().get(nm2.getNodeId());
Assert.assertTrue("Node should be DECOMMISSIONED or DECOMMISSIONING",
(rmNode.getState() == NodeState.DECOMMISSIONED) ||
(rmNode.getState() == NodeState.DECOMMISSIONING));
if (rmNode.getState() == NodeState.DECOMMISSIONED) {
Assert.assertEquals("Decommissioned/ing nodes should be 1 now",
metrics.getNumDecommisionedNMs(), 1);
}
//Test decommed/ing node that transitions to untracked,timer should remove
writeToHostsFile("host1", ip, "host2");
writeToHostsFile(excludeHostFile, "host2");
refreshNodesOption(doGraceful, conf);
nm1.nodeHeartbeat(true);
//nm2.nodeHeartbeat(true);
nm3.nodeHeartbeat(true);
latch.await(maxThreadSleeptime, TimeUnit.MILLISECONDS);
rmNode = doGraceful ? rmContext.getRMNodes().get(nm2.getNodeId()) :
rmContext.getInactiveRMNodes().get(nm2.getNodeId());
Assert.assertNotEquals("Timer for this node was not canceled!",
rmNode, null);
Assert.assertTrue("Node should be DECOMMISSIONED or DECOMMISSIONING",
(rmNode.getState() == NodeState.DECOMMISSIONED) ||
(rmNode.getState() == NodeState.DECOMMISSIONING));
writeToHostsFile("host1", ip);
writeToHostsFile(excludeHostFile, "");
refreshNodesOption(doGraceful, conf);
latch.await(maxThreadSleeptime, TimeUnit.MILLISECONDS);
rmNode = doGraceful ? rmContext.getRMNodes().get(nm2.getNodeId()) :
rmContext.getInactiveRMNodes().get(nm2.getNodeId());
Assert.assertEquals("Node should have been forgotten!",
rmNode, null);
Assert.assertEquals("Shutdown nodes should be 0 now",
metrics.getNumDecommisionedNMs(), 0);
Assert.assertEquals("Shutdown nodes should be 0 now",
metrics.getNumShutdownNMs(), 0);
Assert.assertEquals("Active nodes should be 2",
metrics.getNumActiveNMs(), 2);
rm.stop();
}
private void writeToHostsFile(String... hosts) throws IOException {
writeToHostsFile(hostFile, hosts);
}

View File

@ -272,8 +272,10 @@ public void testNodesQueryStateLost() throws JSONException, Exception {
RMNode rmNode = rm.getRMContext().getInactiveRMNodes().get(nodeId);
WebServicesTestUtils.checkStringMatch("nodeHTTPAddress", "",
info.getString("nodeHTTPAddress"));
WebServicesTestUtils.checkStringMatch("state", rmNode.getState()
.toString(), info.getString("state"));
if (rmNode != null) {
WebServicesTestUtils.checkStringMatch("state",
rmNode.getState().toString(), info.getString("state"));
}
}
}
@ -304,8 +306,10 @@ public void testSingleNodeQueryStateLost() throws JSONException, Exception {
RMNode rmNode = rm.getRMContext().getInactiveRMNodes().get(nodeId);
WebServicesTestUtils.checkStringMatch("nodeHTTPAddress", "",
info.getString("nodeHTTPAddress"));
WebServicesTestUtils.checkStringMatch("state",
rmNode.getState().toString(), info.getString("state"));
if (rmNode != null) {
WebServicesTestUtils.checkStringMatch("state",
rmNode.getState().toString(), info.getString("state"));
}
}
@Test