From 72054a817dfb43e93916d7036eba19cf2f49cea2 Mon Sep 17 00:00:00 2001 From: Wangda Tan Date: Wed, 18 Jan 2017 13:26:48 -0800 Subject: [PATCH] YARN-5556. CapacityScheduler: Support deleting queues without requiring a RM restart. (Naganarasimha G R via wangda) --- .../scheduler/capacity/CapacityScheduler.java | 4 +- .../CapacitySchedulerQueueManager.java | 57 ++++-- .../scheduler/capacity/ParentQueue.java | 41 ++-- .../capacity/TestCapacityScheduler.java | 192 +++++++++++++++++- 4 files changed, 255 insertions(+), 39 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index c47596766e..ced310e4c3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -90,8 +90,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo; - - import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler; @@ -418,7 +416,7 @@ public void reinitialize(Configuration newConf, RMContext rmContext) } catch (Throwable t) { this.conf = oldConf; refreshMaximumAllocation(this.conf.getMaximumAllocation()); - throw new IOException("Failed to re-init queues", t); + throw new IOException("Failed to re-init queues : "+ t.getMessage(), t); } // update lazy preemption diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java index ddcbc0e826..f204c74ec0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java @@ -18,7 +18,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; -import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.util.ArrayList; import java.util.Comparator; @@ -27,6 +26,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -34,6 +34,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.Permission; @@ -45,6 +46,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerQueueManager; import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager; +import com.google.common.annotations.VisibleForTesting; + /** * * Context of the Queues in Capacity Scheduler. @@ -164,11 +167,11 @@ public void reinitializeQueues(CapacitySchedulerConfiguration newConf) CSQueue newRoot = parseQueue(this.csContext, newConf, null, CapacitySchedulerConfiguration.ROOT, newQueues, queues, NOOP); - // Ensure all existing queues are still present - validateExistingQueues(queues, newQueues); + // Ensure queue hiearchy in the new XML file is proper. + validateQueueHierarchy(queues, newQueues); - // Add new queues - addNewQueues(queues, newQueues); + // Add new queues and delete OldQeueus only after validation. + updateQueues(queues, newQueues); // Re-configure queues root.reinitialize(newRoot, this.csContext.getClusterResource()); @@ -261,13 +264,14 @@ static CSQueue parseQueue( } /** - * Ensure all existing queues are present. Queues cannot be deleted + * Ensure all existing queues are present. Queues cannot be deleted if its not + * in Stopped state, Queue's cannot be moved from one hierarchy to other also. + * * @param queues existing queues * @param newQueues new queues */ - private void validateExistingQueues( - Map queues, Map newQueues) - throws IOException { + private void validateQueueHierarchy(Map queues, + Map newQueues) throws IOException { // check that all static queues are included in the newQueues list for (Map.Entry e : queues.entrySet()) { if (!(e.getValue() instanceof ReservationQueue)) { @@ -275,8 +279,18 @@ private void validateExistingQueues( CSQueue oldQueue = e.getValue(); CSQueue newQueue = newQueues.get(queueName); if (null == newQueue) { - throw new IOException(queueName + " cannot be found during refresh!"); + // old queue doesn't exist in the new XML + if (oldQueue.getState() == QueueState.STOPPED) { + LOG.info("Deleting Queue " + queueName + ", as it is not" + + " present in the modified capacity configuration xml"); + } else { + throw new IOException(oldQueue.getQueuePath() + " is deleted from" + + " the new capacity scheduler configuration, but the" + + " queue is not yet in stopped state. " + + "Current State : " + oldQueue.getState()); + } } else if (!oldQueue.getQueuePath().equals(newQueue.getQueuePath())) { + //Queue's cannot be moved from one hierarchy to other throw new IOException(queueName + " is moved from:" + oldQueue.getQueuePath() + " to:" + newQueue.getQueuePath() + " after refresh, which is not allowed."); @@ -286,18 +300,25 @@ private void validateExistingQueues( } /** - * Add the new queues (only) to our list of queues... - * ... be careful, do not overwrite existing queues. - * @param queues the existing queues - * @param newQueues the new queues + * Updates to our list of queues: Adds the new queues and deletes the removed + * ones... be careful, do not overwrite existing queues. + * + * @param existingQueues, the existing queues + * @param newQueues the new queues based on new XML */ - private void addNewQueues( - Map queues, Map newQueues) { + private void updateQueues(Map existingQueues, + Map newQueues) { for (Map.Entry e : newQueues.entrySet()) { String queueName = e.getKey(); CSQueue queue = e.getValue(); - if (!queues.containsKey(queueName)) { - queues.put(queueName, queue); + if (!existingQueues.containsKey(queueName)) { + existingQueues.put(queueName, queue); + } + } + for (Map.Entry e : existingQueues.entrySet()) { + String queueName = e.getKey(); + if (!newQueues.containsKey(queueName)) { + existingQueues.remove(queueName); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 946fca3166..ec2cccb604 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -18,6 +18,18 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; + import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -42,9 +54,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.AllocationState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal; @@ -56,18 +73,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSetUtils; import org.apache.hadoop.yarn.util.resource.Resources; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeSet; - @Private @Evolving public class ParentQueue extends AbstractCSQueue { @@ -317,6 +322,14 @@ public void reinitialize(CSQueue newlyParsedQueue, } } + // remove the deleted queue in the refreshed xml. + for (Map.Entry e : currentChildQueues.entrySet()) { + String queueName = e.getKey(); + if (!newChildQueues.containsKey(queueName)) { + currentChildQueues.remove(queueName); + } + } + // Re-sort all queues childQueues.clear(); childQueues.addAll(currentChildQueues.values()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 62300988d4..2b60ecfa6c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -20,6 +20,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; @@ -115,13 +116,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event. - ContainerExpiredSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; - -import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils; @@ -131,6 +128,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event. + ContainerExpiredSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; @@ -144,6 +143,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerLeafQueueInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfoList; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; @@ -395,6 +395,16 @@ private void nodeUpdate( resourceManager.getResourceScheduler().handle(nodeUpdate); } + /** + * @param conf + * @return + * root + * / \ + * a b + * / \ / | \ + * a1 a2 b1 b2 b3 + * + */ private CapacitySchedulerConfiguration setupQueueConfiguration( CapacitySchedulerConfiguration conf) { @@ -423,6 +433,67 @@ private CapacitySchedulerConfiguration setupQueueConfiguration( return conf; } + /** + * @param conf, to be modified + * @return, CS configuration which has deleted a queue(b1) + * root + * / \ + * a b + * / \ | \ + * a1 a2 b2 b3 + */ + private CapacitySchedulerConfiguration setupQueueConfigurationWithOutB1( + CapacitySchedulerConfiguration conf) { + + // Define top-level queues + conf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[] { "a", "b" }); + + conf.setCapacity(A, A_CAPACITY); + conf.setCapacity(B, B_CAPACITY); + + // Define 2nd-level queues + conf.setQueues(A, new String[] { "a1", "a2" }); + conf.setCapacity(A1, A1_CAPACITY); + conf.setUserLimitFactor(A1, 100.0f); + conf.setCapacity(A2, A2_CAPACITY); + conf.setUserLimitFactor(A2, 100.0f); + + conf.setQueues(B, new String[] { "b2", "b3" }); + conf.setCapacity(B2, B2_CAPACITY + B1_CAPACITY); //as B1 is deleted + conf.setUserLimitFactor(B2, 100.0f); + conf.setCapacity(B3, B3_CAPACITY); + conf.setUserLimitFactor(B3, 100.0f); + + LOG.info("Setup top-level queues a and b (without b3)"); + return conf; + } + + /** + * @param conf, to be modified + * @return, CS configuration which has deleted a + * Parent queue(b) + */ + private CapacitySchedulerConfiguration setupQueueConfigurationWithOutB( + CapacitySchedulerConfiguration conf) { + + // Define top-level queues + conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] { "a" }); + + conf.setCapacity(A, A_CAPACITY + B_CAPACITY); + + // Define 2nd-level queues + conf.setQueues(A, new String[] { "a1", "a2" }); + conf.setCapacity(A1, A1_CAPACITY); + conf.setUserLimitFactor(A1, 100.0f); + conf.setCapacity(A2, A2_CAPACITY); + conf.setUserLimitFactor(A2, 100.0f); + + LOG.info("Setup top-level queues a"); + return conf; + } + + private CapacitySchedulerConfiguration setupBlockedQueueConfiguration( CapacitySchedulerConfiguration conf) { @@ -3758,4 +3829,117 @@ protected RMNodeLabelsManager createNodeLabelManager() { Assert.assertArrayEquals(new int[][] { { 1, 0, 0 }, { 0, 1, 0 }, { 0, 0, 1 } }, attemptMetrics.getLocalityStatistics()); } + + /** + * Test for queue deletion. + * @throws Exception + */ + @Test + public void testRefreshQueuesWithQueueDelete() throws Exception { + CapacityScheduler cs = new CapacityScheduler(); + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + RMContextImpl rmContext = new RMContextImpl(null, null, null, null, null, + null, new RMContainerTokenSecretManager(conf), + new NMTokenSecretManagerInRM(conf), + new ClientToAMTokenSecretManagerInRM(), null); + setupQueueConfiguration(conf); + cs.setConf(new YarnConfiguration()); + cs.setRMContext(resourceManager.getRMContext()); + cs.init(conf); + cs.start(); + cs.reinitialize(conf, rmContext); + checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY); + + // test delete leaf queue when there is application running. + Map queues = + cs.getCapacitySchedulerQueueManager().getQueues(); + String b1QTobeDeleted = "b1"; + LeafQueue csB1Queue = Mockito.spy((LeafQueue) queues.get(b1QTobeDeleted)); + when(csB1Queue.getState()).thenReturn(QueueState.DRAINING) + .thenReturn(QueueState.STOPPED); + queues.put(b1QTobeDeleted, csB1Queue); + conf = new CapacitySchedulerConfiguration(); + setupQueueConfigurationWithOutB1(conf); + try { + cs.reinitialize(conf, mockContext); + fail("Expected to throw exception when refresh queue tries to delete a" + + " queue with running apps"); + } catch (IOException e) { + // ignore + } + + // test delete leaf queue(root.b.b1) when there is no application running. + conf = new CapacitySchedulerConfiguration(); + setupQueueConfigurationWithOutB1(conf); + try { + cs.reinitialize(conf, mockContext); + } catch (IOException e) { + fail("Expected to NOT throw exception when refresh queue tries to delete" + + " a queue WITHOUT running apps"); + } + CSQueue rootQueue = cs.getRootQueue(); + CSQueue queueB = findQueue(rootQueue, B); + CSQueue queueB3 = findQueue(queueB, B1); + assertNull("Refresh needs to support delete of leaf queue ", queueB3); + + // reset back to default configuration for testing parent queue delete + conf = new CapacitySchedulerConfiguration(); + setupQueueConfiguration(conf); + cs.reinitialize(conf, rmContext); + checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY); + + // set the configurations such that it fails once but should be successfull + // next time + queues = cs.getCapacitySchedulerQueueManager().getQueues(); + CSQueue bQueue = Mockito.spy((ParentQueue) queues.get("b")); + when(bQueue.getState()).thenReturn(QueueState.DRAINING) + .thenReturn(QueueState.STOPPED); + queues.put("b", bQueue); + + bQueue = Mockito.spy((LeafQueue) queues.get("b1")); + when(bQueue.getState()).thenReturn(QueueState.STOPPED); + queues.put("b1", bQueue); + + bQueue = Mockito.spy((LeafQueue) queues.get("b2")); + when(bQueue.getState()).thenReturn(QueueState.STOPPED); + queues.put("b2", bQueue); + + bQueue = Mockito.spy((LeafQueue) queues.get("b3")); + when(bQueue.getState()).thenReturn(QueueState.STOPPED); + queues.put("b3", bQueue); + + // test delete Parent queue when there is application running. + conf = new CapacitySchedulerConfiguration(); + setupQueueConfigurationWithOutB(conf); + try { + cs.reinitialize(conf, mockContext); + fail("Expected to throw exception when refresh queue tries to delete a" + + " parent queue with running apps in children queue"); + } catch (IOException e) { + // ignore + } + + // test delete Parent queue when there is no application running. + conf = new CapacitySchedulerConfiguration(); + setupQueueConfigurationWithOutB(conf); + try { + cs.reinitialize(conf, mockContext); + } catch (IOException e) { + fail("Expected to not throw exception when refresh queue tries to delete" + + " a queue without running apps"); + } + rootQueue = cs.getRootQueue(); + queueB = findQueue(rootQueue, B); + String message = + "Refresh needs to support delete of Parent queue and its children."; + assertNull(message, queueB); + assertNull(message, + cs.getCapacitySchedulerQueueManager().getQueues().get("b")); + assertNull(message, + cs.getCapacitySchedulerQueueManager().getQueues().get("b1")); + assertNull(message, + cs.getCapacitySchedulerQueueManager().getQueues().get("b2")); + + cs.stop(); + } }