From e1ccc9622b2f1fbefea1862fa74d1fb56d8eb264 Mon Sep 17 00:00:00 2001 From: Zhihai Xu Date: Sun, 6 Mar 2016 19:46:09 -0800 Subject: [PATCH] YARN-4761. NMs reconnecting with changed capabilities can lead to wrong cluster resource calculations on fair scheduler. Contributed by Sangjin Lee --- .../scheduler/fair/FairScheduler.java | 4 +- .../scheduler/TestAbstractYarnScheduler.java | 132 ++++++++++++++++++ .../capacity/TestCapacityScheduler.java | 110 --------------- 3 files changed, 134 insertions(+), 112 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 2801bee910..917fc8a834 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -889,7 +889,7 @@ private synchronized void addNode(List containerReports, } else { nodesPerRack.put(rackName, 1); } - Resources.addTo(clusterResource, node.getTotalCapability()); + Resources.addTo(clusterResource, schedulerNode.getTotalResource()); updateMaximumAllocation(schedulerNode, true); triggerUpdate(); @@ -909,7 +909,7 @@ private synchronized void removeNode(RMNode rmNode) { if (node == null) { return; } - Resources.subtractFrom(clusterResource, rmNode.getTotalCapability()); + Resources.subtractFrom(clusterResource, node.getTotalResource()); updateRootQueueMetrics(); triggerUpdate(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java index 8411a4d56d..e7ba58d9d3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java @@ -28,6 +28,8 @@ import java.util.List; import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.Service; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; @@ -38,11 +40,24 @@ import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.DrainDispatcher; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.NMLivelinessMonitor; +import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager; import org.apache.hadoop.yarn.server.resourcemanager.ParameterizedSchedulerTestBase; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -51,12 +66,19 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Assert; import org.junit.Test; +import org.mockito.Mockito; @SuppressWarnings("unchecked") public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase { @@ -615,4 +637,114 @@ private void verifyMaximumResourceCapability( Assert.assertEquals(expectedMaximumResource.getVirtualCores(), schedulerMaximumResourceCapability.getVirtualCores()); } + + private class SleepHandler implements EventHandler { + boolean sleepFlag = false; + int sleepTime = 20; + @Override + public void handle(SchedulerEvent event) { + try { + if (sleepFlag) { + Thread.sleep(sleepTime); + } + } catch(InterruptedException ie) { + } + } + } + + private ResourceTrackerService getPrivateResourceTrackerService( + Dispatcher privateDispatcher, ResourceManager rm, + SleepHandler sleepHandler) { + Configuration conf = getConf(); + + RMContext privateContext = + new RMContextImpl(privateDispatcher, null, null, null, null, null, null, + null, null, null); + privateContext.setNodeLabelManager(Mockito.mock(RMNodeLabelsManager.class)); + + privateDispatcher.register(SchedulerEventType.class, sleepHandler); + privateDispatcher.register(SchedulerEventType.class, + rm.getResourceScheduler()); + privateDispatcher.register(RMNodeEventType.class, + new ResourceManager.NodeEventDispatcher(privateContext)); + ((Service) privateDispatcher).init(conf); + ((Service) privateDispatcher).start(); + NMLivelinessMonitor nmLivelinessMonitor = + new NMLivelinessMonitor(privateDispatcher); + nmLivelinessMonitor.init(conf); + nmLivelinessMonitor.start(); + NodesListManager nodesListManager = new NodesListManager(privateContext); + nodesListManager.init(conf); + RMContainerTokenSecretManager containerTokenSecretManager = + new RMContainerTokenSecretManager(conf); + containerTokenSecretManager.start(); + NMTokenSecretManagerInRM nmTokenSecretManager = + new NMTokenSecretManagerInRM(conf); + nmTokenSecretManager.start(); + ResourceTrackerService privateResourceTrackerService = + new ResourceTrackerService(privateContext, nodesListManager, + nmLivelinessMonitor, containerTokenSecretManager, + nmTokenSecretManager); + privateResourceTrackerService.init(conf); + privateResourceTrackerService.start(); + rm.getResourceScheduler().setRMContext(privateContext); + return privateResourceTrackerService; + } + + /** + * Test the behavior of the scheduler when a node reconnects + * with changed capabilities. This test is to catch any race conditions + * that might occur due to the use of the RMNode object. + * @throws Exception + */ + @Test(timeout = 60000) + public void testNodemanagerReconnect() throws Exception { + configureScheduler(); + Configuration conf = getConf(); + MockRM rm = new MockRM(conf); + try { + rm.start(); + + conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, false); + DrainDispatcher privateDispatcher = new DrainDispatcher(); + SleepHandler sleepHandler = new SleepHandler(); + ResourceTrackerService privateResourceTrackerService = + getPrivateResourceTrackerService(privateDispatcher, rm, sleepHandler); + + // Register node1 + String hostname1 = "localhost1"; + Resource capability = BuilderUtils.newResource(4096, 4); + RecordFactory recordFactory = + RecordFactoryProvider.getRecordFactory(null); + + RegisterNodeManagerRequest request1 = + recordFactory.newRecordInstance(RegisterNodeManagerRequest.class); + NodeId nodeId1 = NodeId.newInstance(hostname1, 0); + request1.setNodeId(nodeId1); + request1.setHttpPort(0); + request1.setResource(capability); + privateResourceTrackerService.registerNodeManager(request1); + privateDispatcher.await(); + Resource clusterResource = + rm.getResourceScheduler().getClusterResource(); + Assert.assertEquals("Initial cluster resources don't match", capability, + clusterResource); + + Resource newCapability = BuilderUtils.newResource(1024, 1); + RegisterNodeManagerRequest request2 = + recordFactory.newRecordInstance(RegisterNodeManagerRequest.class); + request2.setNodeId(nodeId1); + request2.setHttpPort(0); + request2.setResource(newCapability); + // hold up the disaptcher and register the same node with lower capability + sleepHandler.sleepFlag = true; + privateResourceTrackerService.registerNodeManager(request2); + privateDispatcher.await(); + Assert.assertEquals("Cluster resources don't match", newCapability, + rm.getResourceScheduler().getClusterResource()); + privateResourceTrackerService.stop(); + } finally { + rm.stop(); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index c8c97e9144..b6c005ba72 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -46,7 +46,6 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; -import org.apache.hadoop.service.Service; import org.apache.hadoop.yarn.LocalConfigurationProvider; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; @@ -74,7 +73,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -82,7 +80,6 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.ipc.YarnRPC; -import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest; import org.apache.hadoop.yarn.server.resourcemanager.AdminService; import org.apache.hadoop.yarn.server.resourcemanager.Application; @@ -90,13 +87,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; -import org.apache.hadoop.yarn.server.resourcemanager.NMLivelinessMonitor; import org.apache.hadoop.yarn.server.resourcemanager.NodeManager; -import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; -import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService; import org.apache.hadoop.yarn.server.resourcemanager.Task; import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMWithAMS; import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager; @@ -116,7 +110,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; @@ -136,7 +129,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; @@ -3285,108 +3277,6 @@ private void sentRMContainerLaunched(MockRM rm, ContainerId containerId) { } } - private class SleepHandler implements EventHandler { - boolean sleepFlag = false; - int sleepTime = 20; - @Override - public void handle(SchedulerEvent event) { - try { - if(sleepFlag) { - Thread.sleep(sleepTime); - } - } - catch(InterruptedException ie) { - } - } - } - - private ResourceTrackerService getPrivateResourceTrackerService( - Dispatcher privateDispatcher, SleepHandler sleepHandler) { - - Configuration conf = new Configuration(); - ResourceTrackerService privateResourceTrackerService; - - RMContext privateContext = - new RMContextImpl(privateDispatcher, null, null, null, null, null, null, - null, null, null); - privateContext.setNodeLabelManager(Mockito.mock(RMNodeLabelsManager.class)); - - privateDispatcher.register(SchedulerEventType.class, sleepHandler); - privateDispatcher.register(SchedulerEventType.class, - resourceManager.getResourceScheduler()); - privateDispatcher.register(RMNodeEventType.class, - new ResourceManager.NodeEventDispatcher(privateContext)); - ((Service) privateDispatcher).init(conf); - ((Service) privateDispatcher).start(); - NMLivelinessMonitor nmLivelinessMonitor = - new NMLivelinessMonitor(privateDispatcher); - nmLivelinessMonitor.init(conf); - nmLivelinessMonitor.start(); - NodesListManager nodesListManager = new NodesListManager(privateContext); - nodesListManager.init(conf); - RMContainerTokenSecretManager containerTokenSecretManager = - new RMContainerTokenSecretManager(conf); - containerTokenSecretManager.start(); - NMTokenSecretManagerInRM nmTokenSecretManager = - new NMTokenSecretManagerInRM(conf); - nmTokenSecretManager.start(); - privateResourceTrackerService = - new ResourceTrackerService(privateContext, nodesListManager, - nmLivelinessMonitor, containerTokenSecretManager, - nmTokenSecretManager); - privateResourceTrackerService.init(conf); - privateResourceTrackerService.start(); - resourceManager.getResourceScheduler().setRMContext(privateContext); - return privateResourceTrackerService; - } - - /** - * Test the behaviour of the capacity scheduler when a node reconnects - * with changed capabilities. This test is to catch any race conditions - * that might occur due to the use of the RMNode object. - * @throws Exception - */ - @Test - public void testNodemanagerReconnect() throws Exception { - - DrainDispatcher privateDispatcher = new DrainDispatcher(); - SleepHandler sleepHandler = new SleepHandler(); - ResourceTrackerService privateResourceTrackerService = - getPrivateResourceTrackerService(privateDispatcher, - sleepHandler); - - // Register node1 - String hostname1 = "localhost1"; - Resource capability = BuilderUtils.newResource(4096, 4); - RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); - - RegisterNodeManagerRequest request1 = - recordFactory.newRecordInstance(RegisterNodeManagerRequest.class); - NodeId nodeId1 = NodeId.newInstance(hostname1, 0); - request1.setNodeId(nodeId1); - request1.setHttpPort(0); - request1.setResource(capability); - privateResourceTrackerService.registerNodeManager(request1); - privateDispatcher.await(); - Resource clusterResource = resourceManager.getResourceScheduler().getClusterResource(); - Assert.assertEquals("Initial cluster resources don't match", capability, - clusterResource); - - Resource newCapability = BuilderUtils.newResource(1024, 1); - RegisterNodeManagerRequest request2 = - recordFactory.newRecordInstance(RegisterNodeManagerRequest.class); - request2.setNodeId(nodeId1); - request2.setHttpPort(0); - request2.setResource(newCapability); - // hold up the disaptcher and register the same node with lower capability - sleepHandler.sleepFlag = true; - privateResourceTrackerService.registerNodeManager(request2); - privateDispatcher.await(); - Assert.assertEquals("Cluster resources don't match", newCapability, - resourceManager.getResourceScheduler().getClusterResource()); - privateResourceTrackerService.stop(); - } - @Test public void testResourceUpdateDecommissioningNode() throws Exception { // Mock the RMNodeResourceUpdate event handler to update SchedulerNode