From d36b6e045f317c94e97cb41a163aa974d161a404 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Mon, 23 Nov 2015 20:30:26 +0000 Subject: [PATCH] YARN-4344. NMs reconnecting with changed capabilities can lead to wrong cluster resource calculations. Contributed by Varun Vasudev --- hadoop-yarn-project/CHANGES.txt | 6 + .../scheduler/capacity/CapacityScheduler.java | 6 +- .../scheduler/fifo/FifoScheduler.java | 4 +- .../capacity/TestCapacityScheduler.java | 114 +++++++++++++++++- 4 files changed, 124 insertions(+), 6 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 29eca0c6df..7a59387cc6 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -1082,6 +1082,9 @@ Release 2.7.3 - UNRELEASED YARN-3769. Consider user limit when calculating total pending resource for preemption policy in Capacity Scheduler. (Eric Payne via wangda) + YARN-4344. NMs reconnecting with changed capabilities can lead to wrong + cluster resource calculations (Varun Vasudev via jlowe) + Release 2.7.2 - UNRELEASED INCOMPATIBLE CHANGES @@ -1936,6 +1939,9 @@ Release 2.6.3 - UNRELEASED YARN-2859. ApplicationHistoryServer binds to default port 8188 in MiniYARNCluster. (Vinod Kumar Vavilapalli via xgong) + YARN-4344. NMs reconnecting with changed capabilities can lead to wrong + cluster resource calculations (Varun Vasudev via jlowe) + Release 2.6.2 - 2015-10-28 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 1075ee03ef..00d3ab470a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -1411,12 +1411,12 @@ private synchronized void addNode(RMNode nodeManager) { FiCaSchedulerNode schedulerNode = new FiCaSchedulerNode(nodeManager, usePortForNodeName, nodeManager.getNodeLabels()); this.nodes.put(nodeManager.getNodeID(), schedulerNode); - Resources.addTo(clusterResource, nodeManager.getTotalCapability()); + Resources.addTo(clusterResource, schedulerNode.getTotalResource()); // update this node to node label manager if (labelManager != null) { labelManager.activateNode(nodeManager.getNodeID(), - nodeManager.getTotalCapability()); + schedulerNode.getTotalResource()); } root.updateClusterResource(clusterResource, new ResourceLimits( @@ -1442,7 +1442,7 @@ private synchronized void removeNode(RMNode nodeInfo) { if (node == null) { return; } - Resources.subtractFrom(clusterResource, node.getRMNode().getTotalCapability()); + Resources.subtractFrom(clusterResource, node.getTotalResource()); root.updateClusterResource(clusterResource, new ResourceLimits( clusterResource)); int numNodes = numNodeManagers.decrementAndGet(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 2ec2311dbd..5999eb70b3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -939,7 +939,7 @@ private synchronized void removeNode(RMNode nodeInfo) { updateMaximumAllocation(node, false); // Update cluster metrics - Resources.subtractFrom(clusterResource, node.getRMNode().getTotalCapability()); + Resources.subtractFrom(clusterResource, node.getTotalResource()); } @Override @@ -962,7 +962,7 @@ private synchronized void addNode(RMNode nodeManager) { FiCaSchedulerNode schedulerNode = new FiCaSchedulerNode(nodeManager, usePortForNodeName); this.nodes.put(nodeManager.getNodeID(), schedulerNode); - Resources.addTo(clusterResource, nodeManager.getTotalCapability()); + Resources.addTo(clusterResource, schedulerNode.getTotalResource()); updateMaximumAllocation(schedulerNode, true); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index f0a1d03275..7c95cdca1b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -46,6 +46,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.service.Service; import org.apache.hadoop.yarn.LocalConfigurationProvider; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; @@ -71,11 +72,15 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.DrainDispatcher; +import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest; import org.apache.hadoop.yarn.server.resourcemanager.AdminService; import org.apache.hadoop.yarn.server.resourcemanager.Application; @@ -83,10 +88,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.NMLivelinessMonitor; import org.apache.hadoop.yarn.server.resourcemanager.NodeManager; +import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService; import org.apache.hadoop.yarn.server.resourcemanager.Task; import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMWithAMS; import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager; @@ -106,6 +114,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; @@ -124,6 +133,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; @@ -144,7 +154,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; - +import org.mockito.Mockito; public class TestCapacityScheduler { private static final Log LOG = LogFactory.getLog(TestCapacityScheduler.class); @@ -3248,4 +3258,106 @@ private void sentRMContainerLaunched(MockRM rm, ContainerId containerId) { Assert.fail("Cannot find RMContainer"); } } + + private class SleepHandler implements EventHandler { + boolean sleepFlag = false; + int sleepTime = 20; + @Override + public void handle(SchedulerEvent event) { + try { + if(sleepFlag) { + Thread.sleep(sleepTime); + } + } + catch(InterruptedException ie) { + } + } + } + + private ResourceTrackerService getPrivateResourceTrackerService( + Dispatcher privateDispatcher, SleepHandler sleepHandler) { + + Configuration conf = new Configuration(); + ResourceTrackerService privateResourceTrackerService; + + RMContext privateContext = + new RMContextImpl(privateDispatcher, null, null, null, null, null, null, + null, null, null); + privateContext.setNodeLabelManager(Mockito.mock(RMNodeLabelsManager.class)); + + privateDispatcher.register(SchedulerEventType.class, sleepHandler); + privateDispatcher.register(SchedulerEventType.class, + resourceManager.getResourceScheduler()); + privateDispatcher.register(RMNodeEventType.class, + new ResourceManager.NodeEventDispatcher(privateContext)); + ((Service) privateDispatcher).init(conf); + ((Service) privateDispatcher).start(); + NMLivelinessMonitor nmLivelinessMonitor = + new NMLivelinessMonitor(privateDispatcher); + nmLivelinessMonitor.init(conf); + nmLivelinessMonitor.start(); + NodesListManager nodesListManager = new NodesListManager(privateContext); + nodesListManager.init(conf); + RMContainerTokenSecretManager containerTokenSecretManager = + new RMContainerTokenSecretManager(conf); + containerTokenSecretManager.start(); + NMTokenSecretManagerInRM nmTokenSecretManager = + new NMTokenSecretManagerInRM(conf); + nmTokenSecretManager.start(); + privateResourceTrackerService = + new ResourceTrackerService(privateContext, nodesListManager, + nmLivelinessMonitor, containerTokenSecretManager, + nmTokenSecretManager); + privateResourceTrackerService.init(conf); + privateResourceTrackerService.start(); + resourceManager.getResourceScheduler().setRMContext(privateContext); + return privateResourceTrackerService; + } + + /** + * Test the behaviour of the capacity scheduler when a node reconnects + * with changed capabilities. This test is to catch any race conditions + * that might occur due to the use of the RMNode object. + * @throws Exception + */ + @Test + public void testNodemanagerReconnect() throws Exception { + + DrainDispatcher privateDispatcher = new DrainDispatcher(); + SleepHandler sleepHandler = new SleepHandler(); + ResourceTrackerService privateResourceTrackerService = + getPrivateResourceTrackerService(privateDispatcher, + sleepHandler); + + // Register node1 + String hostname1 = "localhost1"; + Resource capability = BuilderUtils.newResource(4096, 4); + RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); + + RegisterNodeManagerRequest request1 = + recordFactory.newRecordInstance(RegisterNodeManagerRequest.class); + NodeId nodeId1 = NodeId.newInstance(hostname1, 0); + request1.setNodeId(nodeId1); + request1.setHttpPort(0); + request1.setResource(capability); + privateResourceTrackerService.registerNodeManager(request1); + privateDispatcher.await(); + Resource clusterResource = resourceManager.getResourceScheduler().getClusterResource(); + Assert.assertEquals("Initial cluster resources don't match", capability, + clusterResource); + + Resource newCapability = BuilderUtils.newResource(1024, 1); + RegisterNodeManagerRequest request2 = + recordFactory.newRecordInstance(RegisterNodeManagerRequest.class); + request2.setNodeId(nodeId1); + request2.setHttpPort(0); + request2.setResource(newCapability); + // hold up the disaptcher and register the same node with lower capability + sleepHandler.sleepFlag = true; + privateResourceTrackerService.registerNodeManager(request2); + privateDispatcher.await(); + Assert.assertEquals("Cluster resources don't match", newCapability, + resourceManager.getResourceScheduler().getClusterResource()); + privateResourceTrackerService.stop(); + } }