diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 6070d02217..5e5ff2a95e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -37,6 +37,7 @@ import org.apache.commons.collections.keyvalue.DefaultMapEntry; import org.apache.hadoop.yarn.server.api.records.NodeStatus; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -1354,8 +1355,23 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { initialState.equals(NodeState.DECOMMISSIONING); if (isNodeDecommissioning) { List keepAliveApps = statusEvent.getKeepAliveAppIds(); + // hasScheduledAMContainers solves the following race condition - + // 1. launch AM container on a node with 0 containers. + // 2. gracefully decommission this node. + // 3. Node heartbeats to RM. In StatusUpdateWhenHealthyTransition, + // rmNode.runningApplications will be empty as it is updated after + // call to RMNodeImpl.deactivateNode. This will cause the node to be + // deactivated even though container is running on it and hence kill + // all containers running on it. + // In order to avoid such race conditions the ground truth is retrieved + // from the scheduler before deactivating a DECOMMISSIONING node. + // Only AM containers are considered as AM container reattempts can + // cause application failures if max attempts is set to 1. if (rmNode.runningApplications.isEmpty() && - (keepAliveApps == null || keepAliveApps.isEmpty())) { + (keepAliveApps == null || keepAliveApps.isEmpty()) && + !hasScheduledAMContainers(rmNode)) { + LOG.info("No containers running on " + rmNode.nodeId + ". " + + "Attempting to deactivate decommissioning node."); RMNodeImpl.deactivateNode(rmNode, NodeState.DECOMMISSIONED); return NodeState.DECOMMISSIONED; } @@ -1401,6 +1417,17 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { return initialState; } + + /** + * Checks if the scheduler has scheduled any AMs on the given node. + * @return true if node has any AM scheduled on it. + */ + private boolean hasScheduledAMContainers(RMNodeImpl rmNode) { + return rmNode.context.getScheduler() + .getSchedulerNode(rmNode.getNodeID()) + .getCopiedListOfRunningContainers() + .stream().anyMatch(RMContainer::isAMContainer); + } } public static class StatusUpdateWhenUnHealthyTransition implements diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java index dad27839cf..3346b57d98 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java @@ -68,6 +68,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; @@ -81,6 +83,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentMatchers; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -125,7 +128,7 @@ public void setUp() throws Exception { rmContext = new RMContextImpl(rmDispatcher, mock(ContainerAllocationExpirer.class), null, null, mock(DelegationTokenRenewer.class), null, null, null, - null, null); + null, getMockResourceScheduler()); NodesListManager nodesListManager = mock(NodesListManager.class); HostsFileReader reader = mock(HostsFileReader.class); when(nodesListManager.getHostsReader()).thenReturn(reader); @@ -193,6 +196,16 @@ private RMNodeStatusEvent getMockRMNodeStatusEventWithRunningApps() { return event; } + private ResourceScheduler getMockResourceScheduler() { + ResourceScheduler resourceScheduler = mock(ResourceScheduler.class); + SchedulerNode schedulerNode = mock(SchedulerNode.class); + when(schedulerNode.getCopiedListOfRunningContainers()) + .thenReturn(Collections.emptyList()); + when(resourceScheduler.getSchedulerNode(ArgumentMatchers.any())) + .thenReturn(schedulerNode); + return resourceScheduler; + } + private List getAppIdList() { List appIdList = new ArrayList(); appIdList.add(BuilderUtils.newApplicationId(0, 0)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index 6ec0d05354..9feb54c7d4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -463,6 +463,64 @@ public void testGracefulDecommissionWithApp() throws Exception { rm.waitForState(id1, NodeState.DECOMMISSIONED); } + /** + * Test graceful decommission of node when an AM container is scheduled on a + * node just before it is gracefully decommissioned. + */ + @Test (timeout = 60000) + public void testGracefulDecommissionAfterAMContainerAlloc() throws Exception { + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, hostFile + .getAbsolutePath()); + + writeToHostsFile(""); + rm = new MockRM(conf); + rm.start(); + + MockNM nm1 = rm.registerNode("host1:1234", 10240); + MockNM nm2 = rm.registerNode("host2:5678", 20480); + MockNM nm3 = rm.registerNode("host3:4433", 10240); + + NodeId id1 = nm1.getNodeId(); + NodeId id2 = nm2.getNodeId(); + NodeId id3 = nm3.getNodeId(); + + rm.waitForState(id1, NodeState.RUNNING); + rm.waitForState(id2, NodeState.RUNNING); + rm.waitForState(id3, NodeState.RUNNING); + + // Create an app and schedule AM on host1. + RMApp app = MockRMAppSubmitter.submitWithMemory(2000, rm); + MockAM am = MockRM.launchAM(app, rm, nm1); + + // Before sending heartbeat we gracefully decommission the node on which AM + // is scheduled to simulate race condition. + writeToHostsFile("host1", "host3"); + rm.getNodesListManager().refreshNodes(conf, true); + rm.waitForState(id1, NodeState.DECOMMISSIONING); + rm.waitForState(id3, NodeState.DECOMMISSIONING); + + // Heartbeat after the node is in DECOMMISSIONING state. This will be the + // first heartbeat containing information about the AM container since the + // application was submitted. + ApplicationAttemptId aaid = app.getCurrentAppAttempt().getAppAttemptId(); + nm1.nodeHeartbeat(aaid, 1, ContainerState.RUNNING); + nm3.nodeHeartbeat(true); + + // host1 should stay in DECOMMISSIONING as it has container running on it. + rm.waitForState(id1, NodeState.DECOMMISSIONING); + rm.waitForState(id3, NodeState.DECOMMISSIONED); + + // Go through the normal application flow and wait for it to finish. + am.registerAppAttempt(); + rm.waitForState(app.getApplicationId(), RMAppState.RUNNING); + MockRM.finishAMAndVerifyAppState(app, rm, nm1, am); + nm1.nodeHeartbeat(aaid, 1, ContainerState.COMPLETE); + rm.waitForState(app.getApplicationId(), RMAppState.FINISHED); + rm.waitForState(id1, NodeState.DECOMMISSIONED); + } + + /** * Decommissioning using a post-configured include hosts file */