diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 18c7b4eb7d..d2e81a50d9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -1106,12 +1106,16 @@ protected void nodeUpdate(RMNode nm) { } // Process new container information + // NOTICE: it is possible to not find the NodeID as a node can be + // decommissioned at the same time. Skip updates if node is null. SchedulerNode schedulerNode = getNode(nm.getNodeID()); List completedContainers = updateNewContainerInfo(nm, schedulerNode); // Notify Scheduler Node updated. - schedulerNode.notifyNodeUpdate(); + if (schedulerNode != null) { + schedulerNode.notifyNodeUpdate(); + } // Process completed containers Resource releasedResources = Resource.newInstance(0, 0); @@ -1121,9 +1125,7 @@ protected void nodeUpdate(RMNode nm) { // If the node is decommissioning, send an update to have the total // resource equal to the used resource, so no available resource to // schedule. - // TODO YARN-5128: Fix possible race-condition when request comes in before - // update is propagated - if (nm.getState() == NodeState.DECOMMISSIONING) { + if (nm.getState() == NodeState.DECOMMISSIONING && schedulerNode != null) { this.rmContext .getDispatcher() .getEventHandler() @@ -1133,13 +1135,16 @@ protected void nodeUpdate(RMNode nm) { } updateSchedulerHealthInformation(releasedResources, releasedContainers); - updateNodeResourceUtilization(nm, schedulerNode); + if (schedulerNode != null) { + updateNodeResourceUtilization(nm, schedulerNode); + } // Now node data structures are up-to-date and ready for scheduling. if(LOG.isDebugEnabled()) { LOG.debug( - "Node being looked for scheduling " + nm + " availableResource: " - + schedulerNode.getUnallocatedResource()); + "Node being looked for scheduling " + nm + " availableResource: " + + (schedulerNode == null ? "unknown (decommissioned)" : + schedulerNode.getUnallocatedResource())); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 123f7110cf..557e684b67 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -1096,7 +1096,7 @@ void attemptScheduling(FSSchedulerNode node) { return; } - final NodeId nodeID = node.getNodeID(); + final NodeId nodeID = (node != null ? node.getNodeID() : null); if (!nodeTracker.exists(nodeID)) { // The node might have just been removed while this thread was waiting // on the synchronized lock before it entered this synchronized method diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 7ac9027a78..8396db54ad 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -966,8 +966,10 @@ protected synchronized void nodeUpdate(RMNode nm) { return; } - if (Resources.greaterThanOrEqual(resourceCalculator, getClusterResource(), - node.getUnallocatedResource(), minimumAllocation)) { + // A decommissioned node might be removed before we get here + if (node != null && + Resources.greaterThanOrEqual(resourceCalculator, getClusterResource(), + node.getUnallocatedResource(), minimumAllocation)) { LOG.debug("Node heartbeat " + nm.getNodeID() + " available resource = " + node.getUnallocatedResource()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 1d2aadcf2a..0b54010c27 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -258,14 +258,12 @@ public void testConfValidation() throws Exception { } } - private NodeManager - registerNode(String hostName, int containerManagerPort, int httpPort, - String rackName, Resource capability) + private NodeManager registerNode(String hostName, int containerManagerPort, + int httpPort, String rackName, + Resource capability) throws IOException, YarnException { - NodeManager nm = - new NodeManager( - hostName, containerManagerPort, httpPort, rackName, capability, - resourceManager); + NodeManager nm = new NodeManager(hostName, containerManagerPort, httpPort, + rackName, capability, resourceManager); NodeAddedSchedulerEvent nodeAddEvent1 = new NodeAddedSchedulerEvent(resourceManager.getRMContext() .getRMNodes().get(nm.getNodeId())); @@ -280,13 +278,13 @@ public void testCapacityScheduler() throws Exception { // Register node1 String host_0 = "host_0"; - org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 = + NodeManager nm_0 = registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, Resources.createResource(4 * GB, 1)); // Register node2 String host_1 = "host_1"; - org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_1 = + NodeManager nm_1 = registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, Resources.createResource(2 * GB, 1)); @@ -4038,6 +4036,29 @@ private void sentRMContainerLaunched(MockRM rm, ContainerId containerId) { Assert.fail("Cannot find RMContainer"); } } + @Test + public void testRemovedNodeDecomissioningNode() throws Exception { + // Register nodemanager + NodeManager nm = registerNode("host_decom", 1234, 2345, + NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4)); + + RMNode node = + resourceManager.getRMContext().getRMNodes().get(nm.getNodeId()); + // Send a heartbeat to kick the tires on the Scheduler + NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node); + resourceManager.getResourceScheduler().handle(nodeUpdate); + + // force remove the node to simulate race condition + ((CapacityScheduler) resourceManager.getResourceScheduler()).getNodeTracker(). + removeNode(nm.getNodeId()); + // Kick off another heartbeat with the node state mocked to decommissioning + RMNode spyNode = + Mockito.spy(resourceManager.getRMContext().getRMNodes() + .get(nm.getNodeId())); + when(spyNode.getState()).thenReturn(NodeState.DECOMMISSIONING); + resourceManager.getResourceScheduler().handle( + new NodeUpdateSchedulerEvent(spyNode)); + } @Test public void testResourceUpdateDecommissioningNode() throws Exception { @@ -4064,9 +4085,8 @@ public void handle(Event event) { ((AsyncDispatcher) mockDispatcher).start(); // Register node String host_0 = "host_0"; - org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 = - registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(8 * GB, 4)); + NodeManager nm_0 = registerNode(host_0, 1234, 2345, + NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4)); // ResourceRequest priorities Priority priority_0 = Priority.newInstance(0); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 2f6c2cf259..9120d3a6cc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -82,6 +82,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.NodeManager; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; @@ -4973,6 +4974,30 @@ public void testUserAsDefaultQueueWithLeadingTrailingSpaceUserName() .get(attId3.getApplicationId()).getQueue()); } + @Test + public void testRemovedNodeDecomissioningNode() throws Exception { + // Register nodemanager + NodeManager nm = registerNode("host_decom", 1234, 2345, + NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4)); + + RMNode node = + resourceManager.getRMContext().getRMNodes().get(nm.getNodeId()); + // Send a heartbeat to kick the tires on the Scheduler + NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node); + resourceManager.getResourceScheduler().handle(nodeUpdate); + + // Force remove the node to simulate race condition + ((FairScheduler) resourceManager.getResourceScheduler()) + .getNodeTracker().removeNode(nm.getNodeId()); + // Kick off another heartbeat with the node state mocked to decommissioning + RMNode spyNode = + Mockito.spy(resourceManager.getRMContext().getRMNodes() + .get(nm.getNodeId())); + when(spyNode.getState()).thenReturn(NodeState.DECOMMISSIONING); + resourceManager.getResourceScheduler().handle( + new NodeUpdateSchedulerEvent(spyNode)); + } + @Test public void testResourceUpdateDecommissioningNode() throws Exception { // Mock the RMNodeResourceUpdate event handler to update SchedulerNode @@ -4998,9 +5023,8 @@ public void handle(Event event) { ((AsyncDispatcher) mockDispatcher).start(); // Register node String host_0 = "host_0"; - org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 = - registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(8 * GB, 4)); + NodeManager nm_0 = registerNode(host_0, 1234, 2345, + NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4)); RMNode node = resourceManager.getRMContext().getRMNodes().get(nm_0.getNodeId()); @@ -5038,13 +5062,12 @@ public void handle(Event event) { Assert.assertEquals(availableResource.getVirtualCores(), 0); } - private org.apache.hadoop.yarn.server.resourcemanager.NodeManager registerNode( - String hostName, int containerManagerPort, int httpPort, String rackName, - Resource capability) throws IOException, YarnException { - org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm = - new org.apache.hadoop.yarn.server.resourcemanager.NodeManager(hostName, - containerManagerPort, httpPort, rackName, capability, - resourceManager); + private NodeManager registerNode(String hostName, int containerManagerPort, + int httpPort, String rackName, + Resource capability) + throws IOException, YarnException { + NodeManager nm = new NodeManager(hostName, containerManagerPort, httpPort, + rackName, capability, resourceManager); // after YARN-5375, scheduler event is processed in rm main dispatcher, // wait it processed, or may lead dead lock diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java index 8814c0e542..ee66a49032 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java @@ -66,6 +66,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.NodeManager; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; @@ -138,14 +139,12 @@ public void tearDown() throws Exception { resourceManager.stop(); } - private org.apache.hadoop.yarn.server.resourcemanager.NodeManager - registerNode(String hostName, int containerManagerPort, int nmHttpPort, - String rackName, Resource capability) throws IOException, - YarnException { - org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm = - new org.apache.hadoop.yarn.server.resourcemanager.NodeManager(hostName, - containerManagerPort, nmHttpPort, rackName, capability, - resourceManager); + private NodeManager registerNode(String hostName, int containerManagerPort, + int nmHttpPort, String rackName, + Resource capability) + throws IOException, YarnException { + NodeManager nm = new NodeManager(hostName, containerManagerPort, + nmHttpPort, rackName, capability, resourceManager); NodeAddedSchedulerEvent nodeAddEvent1 = new NodeAddedSchedulerEvent(resourceManager.getRMContext().getRMNodes() .get(nm.getNodeId())); @@ -1195,6 +1194,30 @@ public void testResourceOverCommit() throws Exception { rm.stop(); } + @Test + public void testRemovedNodeDecomissioningNode() throws Exception { + // Register nodemanager + NodeManager nm = registerNode("host_decom", 1234, 2345, + NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4)); + + RMNode node = + resourceManager.getRMContext().getRMNodes().get(nm.getNodeId()); + // Send a heartbeat to kick the tires on the Scheduler + NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node); + resourceManager.getResourceScheduler().handle(nodeUpdate); + + // Force remove the node to simulate race condition + ((FifoScheduler) resourceManager.getResourceScheduler()) + .getNodeTracker().removeNode(nm.getNodeId()); + // Kick off another heartbeat with the node state mocked to decommissioning + RMNode spyNode = + Mockito.spy(resourceManager.getRMContext().getRMNodes() + .get(nm.getNodeId())); + when(spyNode.getState()).thenReturn(NodeState.DECOMMISSIONING); + resourceManager.getResourceScheduler().handle( + new NodeUpdateSchedulerEvent(spyNode)); + } + @Test public void testResourceUpdateDecommissioningNode() throws Exception { // Mock the RMNodeResourceUpdate event handler to update SchedulerNode @@ -1220,9 +1243,8 @@ public void handle(Event event) { ((AsyncDispatcher) mockDispatcher).start(); // Register node String host_0 = "host_0"; - org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 = - registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, - Resources.createResource(8 * GB, 4)); + NodeManager nm_0 = registerNode(host_0, 1234, 2345, + NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4)); // ResourceRequest priorities Priority priority_0 = Priority.newInstance(0);