From 1cbcd4a491e6a57d466c2897335614dc6770b475 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Tue, 5 Apr 2016 13:40:19 +0000 Subject: [PATCH] YARN-4311. Removing nodes from include and exclude lists will not remove them from decommissioned nodes list. Contributed by Kuhu Shukla --- .../hadoop/yarn/sls/nodemanager/NodeInfo.java | 9 + .../yarn/sls/scheduler/RMNodeWrapper.java | 9 + .../hadoop/yarn/conf/YarnConfiguration.java | 9 + .../src/main/resources/yarn-default.xml | 13 ++ .../resourcemanager/NodesListManager.java | 104 ++++++++- .../server/resourcemanager/RMServerUtils.java | 2 +- .../ResourceTrackerService.java | 8 +- .../server/resourcemanager/rmnode/RMNode.java | 4 + .../resourcemanager/rmnode/RMNodeImpl.java | 22 +- .../server/resourcemanager/MockNodes.java | 9 + .../TestResourceTrackerService.java | 216 ++++++++++++++++-- .../webapp/TestRMWebServicesNodes.java | 12 +- 12 files changed, 387 insertions(+), 30 deletions(-) diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java index 92d586bfa3..951f5a850d 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java @@ -199,6 +199,15 @@ public ResourceUtilization getAggregatedContainersUtilization() { public ResourceUtilization getNodeUtilization() { return null; } + + @Override + public long getUntrackedTimeStamp() { + return 0; + } + + @Override + public void setUntrackedTimeStamp(long timeStamp) { + } } public static RMNode newNodeInfo(String rackName, String hostName, diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java index 2e9cccb277..e5013c43d7 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java @@ -188,4 +188,13 @@ public ResourceUtilization getAggregatedContainersUtilization() { public ResourceUtilization getNodeUtilization() { return node.getNodeUtilization(); } + + @Override + public long getUntrackedTimeStamp() { + return 0; + } + + @Override + public void setUntrackedTimeStamp(long timeStamp) { + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 8acee579ff..66b293f959 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -647,6 +647,15 @@ public static boolean isAclEnabled(Configuration conf) { public static final String DEFAULT_RM_NODEMANAGER_MINIMUM_VERSION = "NONE"; + /** + * Timeout(msec) for an untracked node to remain in shutdown or decommissioned + * state. + */ + public static final String RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC = + RM_PREFIX + "node-removal-untracked.timeout-ms"; + public static final int + DEFAULT_RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC = 60000; + /** * RM proxy users' prefix */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 506cf3d9fc..9e8b5e9b20 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -2722,4 +2722,17 @@ yarn.timeline-service.webapp.rest-csrf.methods-to-ignore GET,OPTIONS,HEAD + + + + The least amount of time(msec.) an inactive (decommissioned or shutdown) node can + stay in the nodes list of the resourcemanager after being declared untracked. + A node is marked untracked if and only if it is absent from both include and + exclude nodemanager lists on the RM. All inactive nodes are checked twice per + timeout interval or every 10 minutes, whichever is lesser, and marked appropriately. + The same is done when refreshNodes command (graceful or otherwise) is invoked. + + yarn.resourcemanager.node-removal-untracked.timeout-ms + 60000 + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java index ec2708ebb3..65a9d9498f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java @@ -36,6 +36,7 @@ import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.HostsFileReader; +import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -68,6 +69,8 @@ public class NodesListManager extends CompositeService implements private String excludesFile; private Resolver resolver; + private Timer removalTimer; + private int nodeRemovalCheckInterval; public NodesListManager(RMContext rmContext) { super(NodesListManager.class.getName()); @@ -105,9 +108,56 @@ protected void serviceInit(Configuration conf) throws Exception { } catch (IOException ioe) { disableHostsFileReader(ioe); } + + final int nodeRemovalTimeout = + conf.getInt( + YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC, + YarnConfiguration. + DEFAULT_RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC); + nodeRemovalCheckInterval = (Math.min(nodeRemovalTimeout/2, + 600000)); + removalTimer = new Timer("Node Removal Timer"); + + removalTimer.schedule(new TimerTask() { + @Override + public void run() { + long now = Time.monotonicNow(); + for (Map.Entry entry : + rmContext.getInactiveRMNodes().entrySet()) { + NodeId nodeId = entry.getKey(); + RMNode rmNode = entry.getValue(); + if (isUntrackedNode(rmNode.getHostName())) { + if (rmNode.getUntrackedTimeStamp() == 0) { + rmNode.setUntrackedTimeStamp(now); + } else if (now - rmNode.getUntrackedTimeStamp() > + nodeRemovalTimeout) { + RMNode result = rmContext.getInactiveRMNodes().remove(nodeId); + if (result != null) { + ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics(); + if (rmNode.getState() == NodeState.SHUTDOWN) { + clusterMetrics.decrNumShutdownNMs(); + } else { + clusterMetrics.decrDecommisionedNMs(); + } + LOG.info("Removed "+result.getHostName() + + " from inactive nodes list"); + } + } + } else { + rmNode.setUntrackedTimeStamp(0); + } + } + } + }, nodeRemovalCheckInterval, nodeRemovalCheckInterval); + super.serviceInit(conf); } + @Override + public void serviceStop() { + removalTimer.cancel(); + } + private void printConfiguredHosts() { if (!LOG.isDebugEnabled()) { return; @@ -131,10 +181,13 @@ public void refreshNodes(Configuration yarnConf) throws IOException, for (NodeId nodeId: rmContext.getRMNodes().keySet()) { if (!isValidNode(nodeId.getHost())) { + RMNodeEventType nodeEventType = isUntrackedNode(nodeId.getHost()) ? + RMNodeEventType.SHUTDOWN : RMNodeEventType.DECOMMISSION; this.rmContext.getDispatcher().getEventHandler().handle( - new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION)); + new RMNodeEvent(nodeId, nodeEventType)); } } + updateInactiveNodes(); } private void refreshHostsReader(Configuration yarnConf) throws IOException, @@ -171,6 +224,16 @@ private void setDecomissionedNMs() { } } + @VisibleForTesting + public int getNodeRemovalCheckInterval() { + return nodeRemovalCheckInterval; + } + + @VisibleForTesting + public void setNodeRemovalCheckInterval(int interval) { + this.nodeRemovalCheckInterval = interval; + } + @VisibleForTesting public Resolver getResolver() { return resolver; @@ -374,6 +437,33 @@ private HostsFileReader createHostsFileReader(String includesFile, return hostsReader; } + private void updateInactiveNodes() { + long now = Time.monotonicNow(); + for(Entry entry : + rmContext.getInactiveRMNodes().entrySet()) { + NodeId nodeId = entry.getKey(); + RMNode rmNode = entry.getValue(); + if (isUntrackedNode(nodeId.getHost()) && + rmNode.getUntrackedTimeStamp() == 0) { + rmNode.setUntrackedTimeStamp(now); + } + } + } + + public boolean isUntrackedNode(String hostName) { + boolean untracked; + String ip = resolver.resolve(hostName); + + synchronized (hostsReader) { + Set hostsList = hostsReader.getHosts(); + Set excludeList = hostsReader.getExcludedHosts(); + untracked = !hostsList.isEmpty() && + !hostsList.contains(hostName) && !hostsList.contains(ip) && + !excludeList.contains(hostName) && !excludeList.contains(ip); + } + return untracked; + } + /** * Refresh the nodes gracefully * @@ -384,11 +474,13 @@ private HostsFileReader createHostsFileReader(String includesFile, public void refreshNodesGracefully(Configuration conf) throws IOException, YarnException { refreshHostsReader(conf); - for (Entry entry:rmContext.getRMNodes().entrySet()) { + for (Entry entry : rmContext.getRMNodes().entrySet()) { NodeId nodeId = entry.getKey(); if (!isValidNode(nodeId.getHost())) { + RMNodeEventType nodeEventType = isUntrackedNode(nodeId.getHost()) ? + RMNodeEventType.SHUTDOWN : RMNodeEventType.GRACEFUL_DECOMMISSION; this.rmContext.getDispatcher().getEventHandler().handle( - new RMNodeEvent(nodeId, RMNodeEventType.GRACEFUL_DECOMMISSION)); + new RMNodeEvent(nodeId, nodeEventType)); } else { // Recommissioning the nodes if (entry.getValue().getState() == NodeState.DECOMMISSIONING) { @@ -397,6 +489,7 @@ public void refreshNodesGracefully(Configuration conf) throws IOException, } } } + updateInactiveNodes(); } /** @@ -420,8 +513,11 @@ public Set checkForDecommissioningNodes() { public void refreshNodesForcefully() { for (Entry entry : rmContext.getRMNodes().entrySet()) { if (entry.getValue().getState() == NodeState.DECOMMISSIONING) { + RMNodeEventType nodeEventType = + isUntrackedNode(entry.getKey().getHost()) ? + RMNodeEventType.SHUTDOWN : RMNodeEventType.DECOMMISSION; this.rmContext.getDispatcher().getEventHandler().handle( - new RMNodeEvent(entry.getKey(), RMNodeEventType.DECOMMISSION)); + new RMNodeEvent(entry.getKey(), nodeEventType)); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java index e19d55ee81..1318d5814b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java @@ -87,7 +87,7 @@ public static List queryRMNodes(RMContext context, acceptedStates.contains(NodeState.LOST) || acceptedStates.contains(NodeState.REBOOTED)) { for (RMNode rmNode : context.getInactiveRMNodes().values()) { - if (acceptedStates.contains(rmNode.getState())) { + if ((rmNode != null) && acceptedStates.contains(rmNode.getState())) { results.add(rmNode); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index b0bc565e6c..238e5bcf1c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -320,7 +320,8 @@ public RegisterNodeManagerResponse registerNodeManager( } // Check if this node is a 'valid' node - if (!this.nodesListManager.isValidNode(host)) { + if (!this.nodesListManager.isValidNode(host) || + this.nodesListManager.isUntrackedNode(host)) { String message = "Disallowed NodeManager from " + host + ", Sending SHUTDOWN signal to the NodeManager."; @@ -451,8 +452,9 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) // 1. Check if it's a valid (i.e. not excluded) node, if not, see if it is // in decommissioning. - if (!this.nodesListManager.isValidNode(nodeId.getHost()) - && !isNodeInDecommissioning(nodeId)) { + if ((!this.nodesListManager.isValidNode(nodeId.getHost()) && + !isNodeInDecommissioning(nodeId)) || + this.nodesListManager.isUntrackedNode(nodeId.getHost())) { String message = "Disallowed NodeManager nodeId: " + nodeId + " hostname: " + nodeId.getHost(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java index d8df9f16ef..e599576592 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java @@ -168,4 +168,8 @@ public void updateNodeHeartbeatResponseForContainersDecreasing( NodeHeartbeatResponse response); public List pullNewlyIncreasedContainers(); + + long getUntrackedTimeStamp(); + + void setUntrackedTimeStamp(long timer); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 5f8317e890..42608613ba 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -39,6 +39,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.net.Node; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; @@ -120,6 +121,7 @@ public class RMNodeImpl implements RMNode, EventHandler { private long lastHealthReportTime; private String nodeManagerVersion; + private long timeStamp; /* Aggregated resource utilization for the containers. */ private ResourceUtilization containersUtilization; /* Resource utilization for the node. */ @@ -259,6 +261,9 @@ RMNodeEventType.SIGNAL_CONTAINER, new SignalContainerTransition()) .addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING, RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition()) + .addTransition(NodeState.DECOMMISSIONING, NodeState.SHUTDOWN, + RMNodeEventType.SHUTDOWN, + new DeactivateNodeTransition(NodeState.SHUTDOWN)) // TODO (in YARN-3223) update resource when container finished. .addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING, @@ -346,6 +351,7 @@ public RMNodeImpl(NodeId nodeId, RMContext context, String hostName, this.healthReport = "Healthy"; this.lastHealthReportTime = System.currentTimeMillis(); this.nodeManagerVersion = nodeManagerVersion; + this.timeStamp = 0; this.latestNodeHeartBeatResponse.setResponseId(0); @@ -1011,7 +1017,7 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { } /** - * Put a node in deactivated (decommissioned) status. + * Put a node in deactivated (decommissioned or shutdown) status. * @param rmNode * @param finalState */ @@ -1028,6 +1034,10 @@ public static void deactivateNode(RMNodeImpl rmNode, NodeState finalState) { LOG.info("Deactivating Node " + rmNode.nodeId + " as it is now " + finalState); rmNode.context.getInactiveRMNodes().put(rmNode.nodeId, rmNode); + if (finalState == NodeState.SHUTDOWN && + rmNode.context.getNodesListManager().isUntrackedNode(rmNode.hostName)) { + rmNode.setUntrackedTimeStamp(Time.monotonicNow()); + } } /** @@ -1383,4 +1393,14 @@ public List pullNewlyIncreasedContainers() { public Resource getOriginalTotalCapability() { return this.originalTotalCapability; } + + @Override + public long getUntrackedTimeStamp() { + return this.timeStamp; + } + + @Override + public void setUntrackedTimeStamp(long ts) { + this.timeStamp = ts; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java index 89aff29b83..921b18eee9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java @@ -260,6 +260,15 @@ public ResourceUtilization getAggregatedContainersUtilization() { public ResourceUtilization getNodeUtilization() { return this.nodeUtilization; } + + @Override + public long getUntrackedTimeStamp() { + return 0; + } + + @Override + public void setUntrackedTimeStamp(long timeStamp) { + } }; private static RMNode buildRMNode(int rack, final Resource perNode, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index 9ed79a31c2..dd37b67fdf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -31,6 +31,8 @@ import java.util.HashMap; import java.util.List; import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.IOUtils; @@ -48,8 +50,6 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase; @@ -141,12 +141,12 @@ public void testDecommissionWithIncludeHosts() throws Exception { rm.getNodesListManager().refreshNodes(conf); - checkDecommissionedNMCount(rm, ++metricCount); + checkShutdownNMCount(rm, ++metricCount); nodeHeartbeat = nm1.nodeHeartbeat(true); Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); Assert - .assertEquals(1, ClusterMetrics.getMetrics().getNumDecommisionedNMs()); + .assertEquals(1, ClusterMetrics.getMetrics().getNumShutdownNMs()); nodeHeartbeat = nm2.nodeHeartbeat(true); Assert.assertTrue("Node is not decommisioned.", NodeAction.SHUTDOWN @@ -155,7 +155,8 @@ public void testDecommissionWithIncludeHosts() throws Exception { nodeHeartbeat = nm3.nodeHeartbeat(true); Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); Assert.assertEquals(metricCount, ClusterMetrics.getMetrics() - .getNumDecommisionedNMs()); + .getNumShutdownNMs()); + rm.stop(); } /** @@ -228,7 +229,7 @@ public void testAddNewIncludePathToConfiguration() throws Exception { MockNM nm2 = rm.registerNode("host2:5678", 10240); ClusterMetrics metrics = ClusterMetrics.getMetrics(); assert(metrics != null); - int initialMetricCount = metrics.getNumDecommisionedNMs(); + int initialMetricCount = metrics.getNumShutdownNMs(); NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true); Assert.assertEquals( NodeAction.NORMAL, @@ -241,16 +242,16 @@ public void testAddNewIncludePathToConfiguration() throws Exception { conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile .getAbsolutePath()); rm.getNodesListManager().refreshNodes(conf); - checkDecommissionedNMCount(rm, ++initialMetricCount); + checkShutdownNMCount(rm, ++initialMetricCount); nodeHeartbeat = nm1.nodeHeartbeat(true); Assert.assertEquals( - "Node should not have been decomissioned.", + "Node should not have been shutdown.", NodeAction.NORMAL, nodeHeartbeat.getNodeAction()); - nodeHeartbeat = nm2.nodeHeartbeat(true); - Assert.assertEquals("Node should have been decomissioned but is in state" + - nodeHeartbeat.getNodeAction(), - NodeAction.SHUTDOWN, nodeHeartbeat.getNodeAction()); + NodeState nodeState = + rm.getRMContext().getInactiveRMNodes().get(nm2.getNodeId()).getState(); + Assert.assertEquals("Node should have been shutdown but is in state" + + nodeState, NodeState.SHUTDOWN, nodeState); } /** @@ -1123,8 +1124,6 @@ public void testInvalidNMUnregistration() throws Exception { rm.start(); ResourceTrackerService resourceTrackerService = rm .getResourceTrackerService(); - int shutdownNMsCount = ClusterMetrics.getMetrics() - .getNumShutdownNMs(); int decommisionedNMsCount = ClusterMetrics.getMetrics() .getNumDecommisionedNMs(); @@ -1149,10 +1148,12 @@ public void testInvalidNMUnregistration() throws Exception { rm.getNodesListManager().refreshNodes(conf); NodeHeartbeatResponse heartbeatResponse = nm1.nodeHeartbeat(true); Assert.assertEquals(NodeAction.SHUTDOWN, heartbeatResponse.getNodeAction()); + int shutdownNMsCount = ClusterMetrics.getMetrics().getNumShutdownNMs(); checkShutdownNMCount(rm, shutdownNMsCount); - checkDecommissionedNMCount(rm, ++decommisionedNMsCount); + checkDecommissionedNMCount(rm, decommisionedNMsCount); request.setNodeId(nm1.getNodeId()); resourceTrackerService.unRegisterNodeManager(request); + shutdownNMsCount = ClusterMetrics.getMetrics().getNumShutdownNMs(); checkShutdownNMCount(rm, shutdownNMsCount); checkDecommissionedNMCount(rm, decommisionedNMsCount); @@ -1168,8 +1169,9 @@ public void testInvalidNMUnregistration() throws Exception { rm.getNodesListManager().refreshNodes(conf); request.setNodeId(nm2.getNodeId()); resourceTrackerService.unRegisterNodeManager(request); - checkShutdownNMCount(rm, shutdownNMsCount); - checkDecommissionedNMCount(rm, ++decommisionedNMsCount); + checkShutdownNMCount(rm, ++shutdownNMsCount); + checkDecommissionedNMCount(rm, decommisionedNMsCount); + rm.stop(); } @Test(timeout = 30000) @@ -1304,6 +1306,186 @@ public void testIncorrectRecommission() throws Exception { rm.stop(); } + /** + * Remove a node from all lists and check if its forgotten + */ + @Test + public void testNodeRemovalNormally() throws Exception { + testNodeRemovalUtil(false); + } + + @Test + public void testNodeRemovalGracefully() throws Exception { + testNodeRemovalUtil(true); + } + + public void refreshNodesOption(boolean doGraceful, Configuration conf) + throws Exception { + if (doGraceful) { + rm.getNodesListManager().refreshNodesGracefully(conf); + } else { + rm.getNodesListManager().refreshNodes(conf); + } + } + + public void testNodeRemovalUtil(boolean doGraceful) throws Exception { + Configuration conf = new Configuration(); + int timeoutValue = 500; + File excludeHostFile = new File(TEMP_DIR + File.separator + + "excludeHostFile.txt"); + conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, ""); + conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, ""); + conf.setInt(YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC, + timeoutValue); + CountDownLatch latch = new CountDownLatch(1); + rm = new MockRM(conf); + rm.init(conf); + rm.start(); + RMContext rmContext = rm.getRMContext(); + refreshNodesOption(doGraceful, conf); + MockNM nm1 = rm.registerNode("host1:1234", 5120); + MockNM nm2 = rm.registerNode("host2:5678", 10240); + MockNM nm3 = rm.registerNode("localhost:4433", 1024); + ClusterMetrics metrics = ClusterMetrics.getMetrics(); + assert (metrics != null); + + //check all 3 nodes joined in as NORMAL + NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true); + Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); + nodeHeartbeat = nm2.nodeHeartbeat(true); + Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); + nodeHeartbeat = nm3.nodeHeartbeat(true); + Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); + rm.drainEvents(); + Assert.assertEquals("All 3 nodes should be active", + metrics.getNumActiveNMs(), 3); + + //Remove nm2 from include list, should now be shutdown with timer test + String ip = NetUtils.normalizeHostName("localhost"); + writeToHostsFile("host1", ip); + conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile + .getAbsolutePath()); + refreshNodesOption(doGraceful, conf); + nm1.nodeHeartbeat(true); + rm.drainEvents(); + Assert.assertTrue("Node should not be in active node list", + !rmContext.getRMNodes().containsKey(nm2.getNodeId())); + + RMNode rmNode = rmContext.getInactiveRMNodes().get(nm2.getNodeId()); + Assert.assertEquals("Node should be in inactive node list", + rmNode.getState(), NodeState.SHUTDOWN); + Assert.assertEquals("Active nodes should be 2", + metrics.getNumActiveNMs(), 2); + Assert.assertEquals("Shutdown nodes should be 1", + metrics.getNumShutdownNMs(), 1); + + int nodeRemovalTimeout = + conf.getInt( + YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC, + YarnConfiguration. + DEFAULT_RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC); + int nodeRemovalInterval = + rmContext.getNodesListManager().getNodeRemovalCheckInterval(); + long maxThreadSleeptime = nodeRemovalInterval + nodeRemovalTimeout; + latch.await(maxThreadSleeptime, TimeUnit.MILLISECONDS); + + rmNode = rmContext.getInactiveRMNodes().get(nm2.getNodeId()); + Assert.assertEquals("Node should have been forgotten!", + rmNode, null); + Assert.assertEquals("Shutdown nodes should be 0 now", + metrics.getNumShutdownNMs(), 0); + + //Check node removal and re-addition before timer expires + writeToHostsFile("host1", ip, "host2"); + refreshNodesOption(doGraceful, conf); + nm2 = rm.registerNode("host2:5678", 10240); + rm.drainEvents(); + writeToHostsFile("host1", ip); + refreshNodesOption(doGraceful, conf); + rm.drainEvents(); + rmNode = rmContext.getInactiveRMNodes().get(nm2.getNodeId()); + Assert.assertEquals("Node should be shutdown", + rmNode.getState(), NodeState.SHUTDOWN); + Assert.assertEquals("Active nodes should be 2", + metrics.getNumActiveNMs(), 2); + Assert.assertEquals("Shutdown nodes should be 1", + metrics.getNumShutdownNMs(), 1); + + //add back the node before timer expires + latch.await(maxThreadSleeptime - 2000, TimeUnit.MILLISECONDS); + writeToHostsFile("host1", ip, "host2"); + refreshNodesOption(doGraceful, conf); + nm2 = rm.registerNode("host2:5678", 10240); + nodeHeartbeat = nm2.nodeHeartbeat(true); + rm.drainEvents(); + Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); + Assert.assertEquals("Shutdown nodes should be 0 now", + metrics.getNumShutdownNMs(), 0); + Assert.assertEquals("All 3 nodes should be active", + metrics.getNumActiveNMs(), 3); + + //Decommission this node, check timer doesn't remove it + writeToHostsFile("host1", "host2", ip); + writeToHostsFile(excludeHostFile, "host2"); + conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, excludeHostFile + .getAbsolutePath()); + refreshNodesOption(doGraceful, conf); + rm.drainEvents(); + rmNode = doGraceful ? rmContext.getRMNodes().get(nm2.getNodeId()) : + rmContext.getInactiveRMNodes().get(nm2.getNodeId()); + Assert.assertTrue("Node should be DECOMMISSIONED or DECOMMISSIONING", + (rmNode.getState() == NodeState.DECOMMISSIONED) || + (rmNode.getState() == NodeState.DECOMMISSIONING)); + if (rmNode.getState() == NodeState.DECOMMISSIONED) { + Assert.assertEquals("Decommissioned/ing nodes should be 1 now", + metrics.getNumDecommisionedNMs(), 1); + } + latch.await(maxThreadSleeptime, TimeUnit.MILLISECONDS); + + rmNode = doGraceful ? rmContext.getRMNodes().get(nm2.getNodeId()) : + rmContext.getInactiveRMNodes().get(nm2.getNodeId()); + Assert.assertTrue("Node should be DECOMMISSIONED or DECOMMISSIONING", + (rmNode.getState() == NodeState.DECOMMISSIONED) || + (rmNode.getState() == NodeState.DECOMMISSIONING)); + if (rmNode.getState() == NodeState.DECOMMISSIONED) { + Assert.assertEquals("Decommissioned/ing nodes should be 1 now", + metrics.getNumDecommisionedNMs(), 1); + } + + //Test decommed/ing node that transitions to untracked,timer should remove + writeToHostsFile("host1", ip, "host2"); + writeToHostsFile(excludeHostFile, "host2"); + refreshNodesOption(doGraceful, conf); + nm1.nodeHeartbeat(true); + //nm2.nodeHeartbeat(true); + nm3.nodeHeartbeat(true); + latch.await(maxThreadSleeptime, TimeUnit.MILLISECONDS); + rmNode = doGraceful ? rmContext.getRMNodes().get(nm2.getNodeId()) : + rmContext.getInactiveRMNodes().get(nm2.getNodeId()); + Assert.assertNotEquals("Timer for this node was not canceled!", + rmNode, null); + Assert.assertTrue("Node should be DECOMMISSIONED or DECOMMISSIONING", + (rmNode.getState() == NodeState.DECOMMISSIONED) || + (rmNode.getState() == NodeState.DECOMMISSIONING)); + + writeToHostsFile("host1", ip); + writeToHostsFile(excludeHostFile, ""); + refreshNodesOption(doGraceful, conf); + latch.await(maxThreadSleeptime, TimeUnit.MILLISECONDS); + rmNode = doGraceful ? rmContext.getRMNodes().get(nm2.getNodeId()) : + rmContext.getInactiveRMNodes().get(nm2.getNodeId()); + Assert.assertEquals("Node should have been forgotten!", + rmNode, null); + Assert.assertEquals("Shutdown nodes should be 0 now", + metrics.getNumDecommisionedNMs(), 0); + Assert.assertEquals("Shutdown nodes should be 0 now", + metrics.getNumShutdownNMs(), 0); + Assert.assertEquals("Active nodes should be 2", + metrics.getNumActiveNMs(), 2); + + rm.stop(); + } + private void writeToHostsFile(String... hosts) throws IOException { writeToHostsFile(hostFile, hosts); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java index 3fd1fd5f6f..4b6ca1244e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java @@ -272,8 +272,10 @@ public void testNodesQueryStateLost() throws JSONException, Exception { RMNode rmNode = rm.getRMContext().getInactiveRMNodes().get(nodeId); WebServicesTestUtils.checkStringMatch("nodeHTTPAddress", "", info.getString("nodeHTTPAddress")); - WebServicesTestUtils.checkStringMatch("state", rmNode.getState() - .toString(), info.getString("state")); + if (rmNode != null) { + WebServicesTestUtils.checkStringMatch("state", + rmNode.getState().toString(), info.getString("state")); + } } } @@ -304,8 +306,10 @@ public void testSingleNodeQueryStateLost() throws JSONException, Exception { RMNode rmNode = rm.getRMContext().getInactiveRMNodes().get(nodeId); WebServicesTestUtils.checkStringMatch("nodeHTTPAddress", "", info.getString("nodeHTTPAddress")); - WebServicesTestUtils.checkStringMatch("state", - rmNode.getState().toString(), info.getString("state")); + if (rmNode != null) { + WebServicesTestUtils.checkStringMatch("state", + rmNode.getState().toString(), info.getString("state")); + } } @Test