From 9ed17f181d96b8719a0ef54a129081948781d57e Mon Sep 17 00:00:00 2001 From: Junping Du Date: Tue, 23 Feb 2016 03:29:46 -0800 Subject: [PATCH] YARN-3223. Resource update during NM graceful decommission. Contributed by Brook Zhou. --- hadoop-yarn-project/CHANGES.txt | 3 + .../resourcemanager/rmnode/RMNodeImpl.java | 30 ++++- .../scheduler/capacity/CapacityScheduler.java | 16 +++ .../scheduler/fair/FairScheduler.java | 15 +++ .../scheduler/fifo/FifoScheduler.java | 16 +++ .../TestRMNodeTransitions.java | 34 ++++++ .../capacity/TestCapacityScheduler.java | 89 ++++++++++++++- .../scheduler/fair/TestFairScheduler.java | 91 +++++++++++++++ .../scheduler/fifo/TestFifoScheduler.java | 107 +++++++++++++++++- 9 files changed, 395 insertions(+), 6 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index e2ffec0160..439c1bbc19 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -441,6 +441,9 @@ Release 2.8.0 - UNRELEASED YARN-4411. RMAppAttemptImpl#createApplicationAttemptReport throws IllegalArgumentException. (Bibin A Chundatt, yarntime via devaraj) + YARN-3223. Resource update during NM graceful decommission. (Brook Zhou + via junping_du) + IMPROVEMENTS YARN-644. Basic null check is not performed on passed in arguments before diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index f4e483b58b..ca9df38cb3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -79,6 +79,7 @@ import org.apache.hadoop.yarn.state.SingleArcTransition; import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachineFactory; +import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.annotations.VisibleForTesting; @@ -110,6 +111,8 @@ public class RMNodeImpl implements RMNode, EventHandler { private int httpPort; private final String nodeAddress; // The containerManager address private String httpAddress; + /* Snapshot of total resources before receiving decommissioning command */ + private volatile Resource originalTotalCapability; private volatile Resource totalCapability; private final Node node; @@ -236,6 +239,9 @@ RMNodeEventType.SIGNAL_CONTAINER, new SignalContainerTransition()) .addTransition(NodeState.DECOMMISSIONING, NodeState.RUNNING, RMNodeEventType.RECOMMISSION, new RecommissionNodeTransition(NodeState.RUNNING)) + .addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING, + RMNodeEventType.RESOURCE_UPDATE, + new UpdateNodeResourceWhenRunningTransition()) .addTransition(NodeState.DECOMMISSIONING, EnumSet.of(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONED), RMNodeEventType.STATUS_UPDATE, @@ -1064,7 +1070,12 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { LOG.info("Put Node " + rmNode.nodeId + " in DECOMMISSIONING."); // Update NM metrics during graceful decommissioning. rmNode.updateMetricsForGracefulDecommission(initState, finalState); - // TODO (in YARN-3223) Keep NM's available resource to be 0 + if (rmNode.originalTotalCapability == null){ + rmNode.originalTotalCapability = + Resources.clone(rmNode.totalCapability); + LOG.info("Preserve original total capability: " + + rmNode.originalTotalCapability); + } } } @@ -1078,11 +1089,22 @@ public RecommissionNodeTransition(NodeState finalState) { @Override public void transition(RMNodeImpl rmNode, RMNodeEvent event) { + // Restore the original total capability + if (rmNode.originalTotalCapability != null) { + rmNode.totalCapability = rmNode.originalTotalCapability; + rmNode.originalTotalCapability = null; + } LOG.info("Node " + rmNode.nodeId + " in DECOMMISSIONING is " + "recommissioned back to RUNNING."); rmNode .updateMetricsForGracefulDecommission(rmNode.getState(), finalState); - // TODO handle NM resource resume in YARN-3223. + //update the scheduler with the restored original total capability + rmNode.context + .getDispatcher() + .getEventHandler() + .handle( + new NodeResourceUpdateSchedulerEvent(rmNode, ResourceOption + .newInstance(rmNode.totalCapability, 0))); } } @@ -1353,4 +1375,8 @@ public List pullNewlyIncreasedContainers() { writeLock.unlock(); } } + + public Resource getOriginalTotalCapability() { + return this.originalTotalCapability; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index ee3a3f99a1..7f844a0376 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; @@ -91,6 +92,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeDecreaseContainerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; @@ -1082,6 +1084,20 @@ private synchronized void nodeUpdate(RMNode nm) { } } + // If the node is decommissioning, send an update to have the total + // resource equal to the used resource, so no available resource to + // schedule. + // TODO: Fix possible race-condition when request comes in before + // update is propagated + if (nm.getState() == NodeState.DECOMMISSIONING) { + this.rmContext + .getDispatcher() + .getEventHandler() + .handle( + new RMNodeResourceUpdateEvent(nm.getNodeID(), ResourceOption + .newInstance(getSchedulerNode(nm.getNodeID()) + .getUsedResource(), 0))); + } schedulerHealth.updateSchedulerReleaseDetails(lastNodeUpdateTime, releaseResources); schedulerHealth.updateSchedulerReleaseCounts(releasedContainers); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 7e013e0ad9..1dbcda2ea0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; @@ -70,6 +71,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; @@ -1057,6 +1059,19 @@ private synchronized void nodeUpdate(RMNode nm) { completedContainer, RMContainerEventType.FINISHED); } + // If the node is decommissioning, send an update to have the total + // resource equal to the used resource, so no available resource to + // schedule. + if (nm.getState() == NodeState.DECOMMISSIONING) { + this.rmContext + .getDispatcher() + .getEventHandler() + .handle( + new RMNodeResourceUpdateEvent(nm.getNodeID(), ResourceOption + .newInstance(getSchedulerNode(nm.getNodeID()) + .getUsedResource(), 0))); + } + if (continuousSchedulingEnabled) { if (!completedContainers.isEmpty()) { attemptScheduling(node); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 5787ba6299..a0e14111ed 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -43,12 +43,14 @@ import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -67,6 +69,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; @@ -747,6 +750,19 @@ private synchronized void nodeUpdate(RMNode rmNode) { rmNode.getAggregatedContainersUtilization()); node.setNodeUtilization(rmNode.getNodeUtilization()); + // If the node is decommissioning, send an update to have the total + // resource equal to the used resource, so no available resource to + // schedule. + if (rmNode.getState() == NodeState.DECOMMISSIONING) { + this.rmContext + .getDispatcher() + .getEventHandler() + .handle( + new RMNodeResourceUpdateEvent(rmNode.getNodeID(), ResourceOption + .newInstance(getSchedulerNode(rmNode.getNodeID()) + .getUsedResource(), 0))); + } + if (rmContext.isWorkPreservingRecoveryEnabled() && !rmContext.isSchedulerReadyForAllocatingContainers()) { return; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java index 78aa139a00..701e51252a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java @@ -975,4 +975,38 @@ public void testContainerExpire() throws Exception { verify(mockExpirer).unregister(expirationInfo1); verify(mockExpirer).unregister(expirationInfo2); } + + @Test + public void testResourceUpdateOnDecommissioningNode() { + RMNodeImpl node = getDecommissioningNode(); + Resource oldCapacity = node.getTotalCapability(); + assertEquals("Memory resource is not match.", oldCapacity.getMemory(), 4096); + assertEquals("CPU resource is not match.", oldCapacity.getVirtualCores(), 4); + node.handle(new RMNodeResourceUpdateEvent(node.getNodeID(), + ResourceOption.newInstance(Resource.newInstance(2048, 2), + ResourceOption.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT))); + Resource originalCapacity = node.getOriginalTotalCapability(); + assertEquals("Memory resource is not match.", originalCapacity.getMemory(), oldCapacity.getMemory()); + assertEquals("CPU resource is not match.", originalCapacity.getVirtualCores(), oldCapacity.getVirtualCores()); + Resource newCapacity = node.getTotalCapability(); + assertEquals("Memory resource is not match.", newCapacity.getMemory(), 2048); + assertEquals("CPU resource is not match.", newCapacity.getVirtualCores(), 2); + + Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState()); + Assert.assertNotNull(nodesListManagerEvent); + Assert.assertEquals(NodesListManagerEventType.NODE_USABLE, + nodesListManagerEvent.getType()); + } + + @Test + public void testResourceUpdateOnRecommissioningNode() { + RMNodeImpl node = getDecommissioningNode(); + Resource oldCapacity = node.getTotalCapability(); + assertEquals("Memory resource is not match.", oldCapacity.getMemory(), 4096); + assertEquals("CPU resource is not match.", oldCapacity.getVirtualCores(), 4); + node.handle(new RMNodeEvent(node.getNodeID(), + RMNodeEventType.RECOMMISSION)); + Resource originalCapacity = node.getOriginalTotalCapability(); + assertEquals("Original total capability not null after recommission", null, originalCapacity); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index e139df65f3..bd2c4fe4b1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -63,6 +63,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueState; @@ -74,6 +75,7 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; +import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -115,6 +117,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; @@ -150,11 +153,11 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; -import org.mockito.Mockito; public class TestCapacityScheduler { private static final Log LOG = LogFactory.getLog(TestCapacityScheduler.class); @@ -3360,4 +3363,88 @@ public void testNodemanagerReconnect() throws Exception { resourceManager.getResourceScheduler().getClusterResource()); privateResourceTrackerService.stop(); } + + @Test + public void testResourceUpdateDecommissioningNode() throws Exception { + // Mock the RMNodeResourceUpdate event handler to update SchedulerNode + // to have 0 available resource + RMContext spyContext = Mockito.spy(resourceManager.getRMContext()); + Dispatcher mockDispatcher = mock(AsyncDispatcher.class); + when(mockDispatcher.getEventHandler()).thenReturn(new EventHandler() { + @Override + public void handle(Event event) { + if (event instanceof RMNodeResourceUpdateEvent) { + RMNodeResourceUpdateEvent resourceEvent = + (RMNodeResourceUpdateEvent) event; + resourceManager + .getResourceScheduler() + .getSchedulerNode(resourceEvent.getNodeId()) + .setTotalResource(resourceEvent.getResourceOption().getResource()); + } + } + }); + Mockito.doReturn(mockDispatcher).when(spyContext).getDispatcher(); + ((CapacityScheduler) resourceManager.getResourceScheduler()) + .setRMContext(spyContext); + ((AsyncDispatcher) mockDispatcher).start(); + // Register node + String host_0 = "host_0"; + org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 = + registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, + Resources.createResource(8 * GB, 4)); + // ResourceRequest priorities + Priority priority_0 = + org.apache.hadoop.yarn.server.resourcemanager.resource.Priority + .create(0); + + // Submit an application + Application application_0 = + new Application("user_0", "a1", resourceManager); + application_0.submit(); + + application_0.addNodeManager(host_0, 1234, nm_0); + + Resource capability_0_0 = Resources.createResource(1 * GB, 1); + application_0.addResourceRequestSpec(priority_0, capability_0_0); + + Task task_0_0 = + new Task(application_0, priority_0, new String[] { host_0 }); + application_0.addTask(task_0_0); + + // Send resource requests to the scheduler + application_0.schedule(); + + nodeUpdate(nm_0); + // Kick off another heartbeat with the node state mocked to decommissioning + // This should update the schedulernodes to have 0 available resource + RMNode spyNode = + Mockito.spy(resourceManager.getRMContext().getRMNodes() + .get(nm_0.getNodeId())); + when(spyNode.getState()).thenReturn(NodeState.DECOMMISSIONING); + resourceManager.getResourceScheduler().handle( + new NodeUpdateSchedulerEvent(spyNode)); + + // Get allocations from the scheduler + application_0.schedule(); + + // Check the used resource is 1 GB 1 core + Assert.assertEquals(1 * GB, nm_0.getUsed().getMemory()); + Resource usedResource = + resourceManager.getResourceScheduler() + .getSchedulerNode(nm_0.getNodeId()).getUsedResource(); + Assert.assertEquals(usedResource.getMemory(), 1 * GB); + Assert.assertEquals(usedResource.getVirtualCores(), 1); + // Check total resource of scheduler node is also changed to 1 GB 1 core + Resource totalResource = + resourceManager.getResourceScheduler() + .getSchedulerNode(nm_0.getNodeId()).getTotalResource(); + Assert.assertEquals(totalResource.getMemory(), 1 * GB); + Assert.assertEquals(totalResource.getVirtualCores(), 1); + // Check the available resource is 0/0 + Resource availableResource = + resourceManager.getResourceScheduler() + .getSchedulerNode(nm_0.getNodeId()).getAvailableResource(); + Assert.assertEquals(availableResource.getMemory(), 0); + Assert.assertEquals(availableResource.getVirtualCores(), 0); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 0575f335c3..a15e8d1575 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -52,6 +52,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.security.GroupMappingServiceProvider; import org.apache.hadoop.yarn.MockApps; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -61,6 +62,7 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.Resource; @@ -68,11 +70,17 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.Event; +import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.resourcemanager.Application; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.Task; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -84,6 +92,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; @@ -99,6 +108,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementRule.Default; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.ControlledClock; import org.apache.hadoop.yarn.util.resource.Resources; @@ -106,12 +116,14 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; import org.xml.sax.SAXException; import com.google.common.collect.Sets; @SuppressWarnings("unchecked") public class TestFairScheduler extends FairSchedulerTestBase { + private final int GB = 1024; private final static String ALLOC_FILE = new File(TEST_DIR, "test-queues").getAbsolutePath(); @@ -4372,4 +4384,83 @@ public void testFairSchedulerContinuousSchedulingInitTime() throws Exception { long initSchedulerTime = lastScheduledContainer.get(priority); assertEquals(DELAY_THRESHOLD_TIME_MS, initSchedulerTime); } + + @Test + public void testResourceUpdateDecommissioningNode() throws Exception { + // Mock the RMNodeResourceUpdate event handler to update SchedulerNode + // to have 0 available resource + RMContext spyContext = Mockito.spy(resourceManager.getRMContext()); + Dispatcher mockDispatcher = mock(AsyncDispatcher.class); + when(mockDispatcher.getEventHandler()).thenReturn(new EventHandler() { + @Override + public void handle(Event event) { + if (event instanceof RMNodeResourceUpdateEvent) { + RMNodeResourceUpdateEvent resourceEvent = + (RMNodeResourceUpdateEvent) event; + resourceManager + .getResourceScheduler() + .getSchedulerNode(resourceEvent.getNodeId()) + .setTotalResource(resourceEvent.getResourceOption().getResource()); + } + } + }); + Mockito.doReturn(mockDispatcher).when(spyContext).getDispatcher(); + ((FairScheduler) resourceManager.getResourceScheduler()) + .setRMContext(spyContext); + ((AsyncDispatcher) mockDispatcher).start(); + // Register node + String host_0 = "host_0"; + org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 = + registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, + Resources.createResource(8 * GB, 4)); + + RMNode node = + resourceManager.getRMContext().getRMNodes().get(nm_0.getNodeId()); + // Send a heartbeat to kick the tires on the Scheduler + NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node); + resourceManager.getResourceScheduler().handle(nodeUpdate); + + // Kick off another heartbeat with the node state mocked to decommissioning + // This should update the schedulernodes to have 0 available resource + RMNode spyNode = + Mockito.spy(resourceManager.getRMContext().getRMNodes() + .get(nm_0.getNodeId())); + when(spyNode.getState()).thenReturn(NodeState.DECOMMISSIONING); + resourceManager.getResourceScheduler().handle( + new NodeUpdateSchedulerEvent(spyNode)); + + // Check the used resource is 0 GB 0 core + // Assert.assertEquals(1 * GB, nm_0.getUsed().getMemory()); + Resource usedResource = + resourceManager.getResourceScheduler() + .getSchedulerNode(nm_0.getNodeId()).getUsedResource(); + Assert.assertEquals(usedResource.getMemory(), 0); + Assert.assertEquals(usedResource.getVirtualCores(), 0); + // Check total resource of scheduler node is also changed to 0 GB 0 core + Resource totalResource = + resourceManager.getResourceScheduler() + .getSchedulerNode(nm_0.getNodeId()).getTotalResource(); + Assert.assertEquals(totalResource.getMemory(), 0 * GB); + Assert.assertEquals(totalResource.getVirtualCores(), 0); + // Check the available resource is 0/0 + Resource availableResource = + resourceManager.getResourceScheduler() + .getSchedulerNode(nm_0.getNodeId()).getAvailableResource(); + Assert.assertEquals(availableResource.getMemory(), 0); + Assert.assertEquals(availableResource.getVirtualCores(), 0); + } + + private org.apache.hadoop.yarn.server.resourcemanager.NodeManager registerNode( + String hostName, int containerManagerPort, int httpPort, String rackName, + Resource capability) throws IOException, YarnException { + org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm = + new org.apache.hadoop.yarn.server.resourcemanager.NodeManager(hostName, + containerManagerPort, httpPort, rackName, capability, + resourceManager); + NodeAddedSchedulerEvent nodeAddEvent1 = + new NodeAddedSchedulerEvent(resourceManager.getRMContext().getRMNodes() + .get(nm.getNodeId())); + resourceManager.getResourceScheduler().handle(nodeAddEvent1); + return nm; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java index 8111e11026..86a017e270 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java @@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.Resource; @@ -52,6 +53,9 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.Event; +import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.InlineDispatcher; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -77,6 +81,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; @@ -106,6 +111,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; public class TestFifoScheduler { private static final Log LOG = LogFactory.getLog(TestFifoScheduler.class); @@ -133,9 +139,15 @@ public void tearDown() throws Exception { registerNode(String hostName, int containerManagerPort, int nmHttpPort, String rackName, Resource capability) throws IOException, YarnException { - return new org.apache.hadoop.yarn.server.resourcemanager.NodeManager( - hostName, containerManagerPort, nmHttpPort, rackName, capability, - resourceManager); + org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm = + new org.apache.hadoop.yarn.server.resourcemanager.NodeManager(hostName, + containerManagerPort, nmHttpPort, rackName, capability, + resourceManager); + NodeAddedSchedulerEvent nodeAddEvent1 = + new NodeAddedSchedulerEvent(resourceManager.getRMContext().getRMNodes() + .get(nm.getNodeId())); + resourceManager.getResourceScheduler().handle(nodeAddEvent1); + return nm; } private ApplicationAttemptId createAppAttemptId(int appId, int attemptId) { @@ -1163,6 +1175,95 @@ public void testResourceOverCommit() throws Exception { rm.stop(); } + @Test + public void testResourceUpdateDecommissioningNode() throws Exception { + // Mock the RMNodeResourceUpdate event handler to update SchedulerNode + // to have 0 available resource + RMContext spyContext = Mockito.spy(resourceManager.getRMContext()); + Dispatcher mockDispatcher = mock(AsyncDispatcher.class); + when(mockDispatcher.getEventHandler()).thenReturn(new EventHandler() { + @Override + public void handle(Event event) { + if (event instanceof RMNodeResourceUpdateEvent) { + RMNodeResourceUpdateEvent resourceEvent = + (RMNodeResourceUpdateEvent) event; + resourceManager + .getResourceScheduler() + .getSchedulerNode(resourceEvent.getNodeId()) + .setTotalResource(resourceEvent.getResourceOption().getResource()); + } + } + }); + Mockito.doReturn(mockDispatcher).when(spyContext).getDispatcher(); + ((FifoScheduler) resourceManager.getResourceScheduler()) + .setRMContext(spyContext); + ((AsyncDispatcher) mockDispatcher).start(); + // Register node + String host_0 = "host_0"; + org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 = + registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, + Resources.createResource(8 * GB, 4)); + // ResourceRequest priorities + Priority priority_0 = + org.apache.hadoop.yarn.server.resourcemanager.resource.Priority + .create(0); + + // Submit an application + Application application_0 = + new Application("user_0", "a1", resourceManager); + application_0.submit(); + + application_0.addNodeManager(host_0, 1234, nm_0); + + Resource capability_0_0 = Resources.createResource(1 * GB, 1); + application_0.addResourceRequestSpec(priority_0, capability_0_0); + + Task task_0_0 = + new Task(application_0, priority_0, new String[] { host_0 }); + application_0.addTask(task_0_0); + + // Send resource requests to the scheduler + application_0.schedule(); + + RMNode node = + resourceManager.getRMContext().getRMNodes().get(nm_0.getNodeId()); + // Send a heartbeat to kick the tires on the Scheduler + NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node); + resourceManager.getResourceScheduler().handle(nodeUpdate); + + // Kick off another heartbeat with the node state mocked to decommissioning + // This should update the schedulernodes to have 0 available resource + RMNode spyNode = + Mockito.spy(resourceManager.getRMContext().getRMNodes() + .get(nm_0.getNodeId())); + when(spyNode.getState()).thenReturn(NodeState.DECOMMISSIONING); + resourceManager.getResourceScheduler().handle( + new NodeUpdateSchedulerEvent(spyNode)); + + // Get allocations from the scheduler + application_0.schedule(); + + // Check the used resource is 1 GB 1 core + // Assert.assertEquals(1 * GB, nm_0.getUsed().getMemory()); + Resource usedResource = + resourceManager.getResourceScheduler() + .getSchedulerNode(nm_0.getNodeId()).getUsedResource(); + Assert.assertEquals(usedResource.getMemory(), 1 * GB); + Assert.assertEquals(usedResource.getVirtualCores(), 1); + // Check total resource of scheduler node is also changed to 1 GB 1 core + Resource totalResource = + resourceManager.getResourceScheduler() + .getSchedulerNode(nm_0.getNodeId()).getTotalResource(); + Assert.assertEquals(totalResource.getMemory(), 1 * GB); + Assert.assertEquals(totalResource.getVirtualCores(), 1); + // Check the available resource is 0/0 + Resource availableResource = + resourceManager.getResourceScheduler() + .getSchedulerNode(nm_0.getNodeId()).getAvailableResource(); + Assert.assertEquals(availableResource.getMemory(), 0); + Assert.assertEquals(availableResource.getVirtualCores(), 0); + } + private void checkApplicationResourceUsage(int expected, Application application) { Assert.assertEquals(expected, application.getUsedResources().getMemory());