From 7d2d8d25ba0cb10a3c6192d4123f27ede5ef2ba6 Mon Sep 17 00:00:00 2001 From: Varun Saxena Date: Tue, 1 Nov 2016 15:32:04 +0530 Subject: [PATCH] YARN-5788. Apps not activiated and AM limit resource in UI and REST not updated after -replaceLabelsOnNode (Bibin A Chundatt via Varun Saxena) --- .../scheduler/capacity/CapacityScheduler.java | 115 ++++++++++-------- .../TestCapacitySchedulerNodeLabelUpdate.java | 74 ++++++++++- 2 files changed, 133 insertions(+), 56 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index cfdcb10dec..d759d470ec 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -1126,57 +1126,52 @@ public class CapacityScheduler extends writeLock.unlock(); } } - + /** * Process node labels update on a node. */ private void updateLabelsOnNode(NodeId nodeId, Set newLabels) { - try { - writeLock.lock(); - FiCaSchedulerNode node = nodeTracker.getNode(nodeId); - if (null == node) { - return; - } - - // Get new partition, we have only one partition per node - String newPartition; - if (newLabels.isEmpty()) { - newPartition = RMNodeLabelsManager.NO_LABEL; - } else{ - newPartition = newLabels.iterator().next(); - } - - // old partition as well - String oldPartition = node.getPartition(); - - // Update resources of these containers - for (RMContainer rmContainer : node.getCopiedListOfRunningContainers()) { - FiCaSchedulerApp application = getApplicationAttempt( - rmContainer.getApplicationAttemptId()); - if (null != application) { - application.nodePartitionUpdated(rmContainer, oldPartition, - newPartition); - } else{ - LOG.warn("There's something wrong, some RMContainers running on" - + " a node, but we cannot find SchedulerApplicationAttempt " - + "for it. Node=" + node.getNodeID() + " applicationAttemptId=" - + rmContainer.getApplicationAttemptId()); - continue; - } - } - - // Unreserve container on this node - RMContainer reservedContainer = node.getReservedContainer(); - if (null != reservedContainer) { - killReservedContainer(reservedContainer); - } - - // Update node labels after we've done this - node.updateLabels(newLabels); - } finally { - writeLock.unlock(); + FiCaSchedulerNode node = nodeTracker.getNode(nodeId); + if (null == node) { + return; } + + // Get new partition, we have only one partition per node + String newPartition; + if (newLabels.isEmpty()) { + newPartition = RMNodeLabelsManager.NO_LABEL; + } else{ + newPartition = newLabels.iterator().next(); + } + + // old partition as well + String oldPartition = node.getPartition(); + + // Update resources of these containers + for (RMContainer rmContainer : node.getCopiedListOfRunningContainers()) { + FiCaSchedulerApp application = getApplicationAttempt( + rmContainer.getApplicationAttemptId()); + if (null != application) { + application.nodePartitionUpdated(rmContainer, oldPartition, + newPartition); + } else{ + LOG.warn("There's something wrong, some RMContainers running on" + + " a node, but we cannot find SchedulerApplicationAttempt " + + "for it. Node=" + node.getNodeID() + " applicationAttemptId=" + + rmContainer.getApplicationAttemptId()); + continue; + } + } + + // Unreserve container on this node + RMContainer reservedContainer = node.getReservedContainer(); + if (null != reservedContainer) { + killReservedContainer(reservedContainer); + } + + // Update node labels after we've done this + node.updateLabels(newLabels); } private void updateSchedulerHealth(long now, FiCaSchedulerNode node, @@ -1371,13 +1366,8 @@ public class CapacityScheduler extends { NodeLabelsUpdateSchedulerEvent labelUpdateEvent = (NodeLabelsUpdateSchedulerEvent) event; - - for (Entry> entry : labelUpdateEvent - .getUpdatedNodeToLabels().entrySet()) { - NodeId id = entry.getKey(); - Set labels = entry.getValue(); - updateLabelsOnNode(id, labels); - } + + updateNodeLabelsAndQueueResource(labelUpdateEvent); } break; case NODE_UPDATE: @@ -1482,6 +1472,27 @@ public class CapacityScheduler extends } } + /** + * Process node labels update. + */ + private void updateNodeLabelsAndQueueResource( + NodeLabelsUpdateSchedulerEvent labelUpdateEvent) { + try { + writeLock.lock(); + for (Entry> entry : labelUpdateEvent + .getUpdatedNodeToLabels().entrySet()) { + NodeId id = entry.getKey(); + Set labels = entry.getValue(); + updateLabelsOnNode(id, labels); + } + Resource clusterResource = getClusterResource(); + root.updateClusterResource(clusterResource, + new ResourceLimits(clusterResource)); + } finally { + writeLock.unlock(); + } + } + private void addNode(RMNode nodeManager) { try { writeLock.lock(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java index 0ae77f2439..439e9dfed7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java @@ -413,7 +413,7 @@ public class TestCapacitySchedulerNodeLabelUpdate { rm.close(); } - @Test(timeout = 3000000) + @Test(timeout = 300000) public void testMoveApplicationWithLabel() throws Exception { // set node -> label mgr.addToCluserNodeLabelsWithDefaultExclusivity( @@ -589,7 +589,49 @@ public class TestCapacitySchedulerNodeLabelUpdate { rm.close(); } - @Test (timeout = 60000) + @Test + public void testAMResourceLimitNodeUpdatePartition() throws Exception { + conf.setInt("yarn.scheduler.minimum-allocation-mb", 64); + // inject node label manager + MockRM rm = new MockRM(getConfigurationWithQueueLabels(conf)) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + rm.getRMContext().setNodeLabelManager(mgr); + rm.start(); + rm.registerNode("h1:1234", 6400); + mgr.addToCluserNodeLabelsWithDefaultExclusivity( + ImmutableSet.of("x", "y", "z")); + + // .1 percentage of 6400 will be for am + checkAMResourceLimit(rm, "a", 640, ""); + checkAMResourceLimit(rm, "a", 0, "x"); + checkAMResourceLimit(rm, "a", 0, "y"); + checkAMResourceLimit(rm, "a", 0, "z"); + + mgr.replaceLabelsOnNode( + ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"))); + rm.drainEvents(); + + checkAMResourceLimit(rm, "a", 640, "x"); + checkAMResourceLimit(rm, "a", 0, "y"); + checkAMResourceLimit(rm, "a", 0, "z"); + checkAMResourceLimit(rm, "a", 0, ""); + + // Switch + mgr.replaceLabelsOnNode( + ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("y"))); + rm.drainEvents(); + + checkAMResourceLimit(rm, "a", 0, "x"); + checkAMResourceLimit(rm, "a", 640, "y"); + checkAMResourceLimit(rm, "a", 0, "z"); + checkAMResourceLimit(rm, "a", 0, ""); + } + + @Test(timeout = 60000) public void testAMResourceUsageWhenNodeUpdatesPartition() throws Exception { // set node -> label @@ -638,8 +680,8 @@ public class TestCapacitySchedulerNodeLabelUpdate { FiCaSchedulerApp app = cs.getApplicationAttempt(am1.getApplicationAttemptId()); // change h1's label to z - cs.handle(new NodeLabelsUpdateSchedulerEvent(ImmutableMap.of(nm1.getNodeId(), - toSet("z")))); + cs.handle(new NodeLabelsUpdateSchedulerEvent( + ImmutableMap.of(nm1.getNodeId(), toSet("z")))); // Now the resources also should change from x to z. Verify AM and normal // used resource are successfully changed. @@ -677,4 +719,28 @@ public class TestCapacitySchedulerNodeLabelUpdate { rm.close(); } + + private void checkAMResourceLimit(MockRM rm, String queuename, int memory, + String label) throws InterruptedException { + Assert.assertEquals(memory, + waitForResourceUpdate(rm, queuename, memory, label, 3000L)); + } + + private long waitForResourceUpdate(MockRM rm, String queuename, long memory, + String label, long timeout) throws InterruptedException { + long start = System.currentTimeMillis(); + long memorySize = 0; + while (System.currentTimeMillis() - start < timeout) { + CapacityScheduler scheduler = + (CapacityScheduler) rm.getResourceScheduler(); + CSQueue queue = scheduler.getQueue(queuename); + memorySize = + queue.getQueueResourceUsage().getAMLimit(label).getMemorySize(); + if (memory == memorySize) { + return memorySize; + } + Thread.sleep(100); + } + return memorySize; + } }