From 1eb81867032b016a59662043cbae50daa52dafa9 Mon Sep 17 00:00:00 2001 From: Sunil G Date: Wed, 8 Mar 2017 12:04:30 +0530 Subject: [PATCH] YARN-6207. Move application across queues should handle delayed event processing. Contributed by Bibin A Chundatt. --- .../SchedulerApplicationAttempt.java | 5 +- .../scheduler/capacity/CapacityScheduler.java | 69 +++--- .../capacity/TestCapacityScheduler.java | 200 ++++++++++++++++++ 3 files changed, 248 insertions(+), 26 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index f894a40f09..91e29d59ab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -1069,6 +1069,7 @@ public void move(Queue newQueue) { QueueMetrics newMetrics = newQueue.getMetrics(); String newQueueName = newQueue.getQueueName(); String user = getUser(); + for (RMContainer liveContainer : liveContainers.values()) { Resource resource = liveContainer.getContainer().getResource(); ((RMContainerImpl) liveContainer).setQueueName(newQueueName); @@ -1084,7 +1085,9 @@ public void move(Queue newQueue) { } } - appSchedulingInfo.move(newQueue); + if (!isStopped) { + appSchedulingInfo.move(newQueue); + } this.queue = newQueue; } finally { writeLock.unlock(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 20ea607cd4..f6e7942686 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -1939,36 +1939,47 @@ public String moveApplication(ApplicationId appId, String targetQueueName) throws YarnException { try { writeLock.lock(); - FiCaSchedulerApp app = getApplicationAttempt( - ApplicationAttemptId.newInstance(appId, 0)); - String sourceQueueName = app.getQueue().getQueueName(); - LeafQueue source = this.queueManager.getAndCheckLeafQueue( - sourceQueueName); + SchedulerApplication application = + applications.get(appId); + if (application == null) { + throw new YarnException("App to be moved " + appId + " not found."); + } + String sourceQueueName = application.getQueue().getQueueName(); + LeafQueue source = + this.queueManager.getAndCheckLeafQueue(sourceQueueName); String destQueueName = handleMoveToPlanQueue(targetQueueName); LeafQueue dest = this.queueManager.getAndCheckLeafQueue(destQueueName); - String user = app.getUser(); + String user = application.getUser(); try { dest.submitApplication(appId, user, destQueueName); } catch (AccessControlException e) { throw new YarnException(e); } - // Move all live containers - for (RMContainer rmContainer : app.getLiveContainers()) { - source.detachContainer(getClusterResource(), app, rmContainer); - // attach the Container to another queue - dest.attachContainer(getClusterResource(), app, rmContainer); + + FiCaSchedulerApp app = application.getCurrentAppAttempt(); + if (app != null) { + // Move all live containers even when stopped. + // For transferStateFromPreviousAttempt required + for (RMContainer rmContainer : app.getLiveContainers()) { + source.detachContainer(getClusterResource(), app, rmContainer); + // attach the Container to another queue + dest.attachContainer(getClusterResource(), app, rmContainer); + } + if (!app.isStopped()) { + source.finishApplicationAttempt(app, sourceQueueName); + // Submit to a new queue + dest.submitApplicationAttempt(app, user); + } + // Finish app & update metrics + app.move(dest); } + source.appFinished(); // Detach the application.. - source.finishApplicationAttempt(app, sourceQueueName); - source.getParent().finishApplication(appId, app.getUser()); - // Finish app & update metrics - app.move(dest); - // Submit to a new queue - dest.submitApplicationAttempt(app, user); - applications.get(appId).setQueue(dest); - LOG.info("App: " + app.getApplicationId() + " successfully moved from " - + sourceQueueName + " to: " + destQueueName); + source.getParent().finishApplication(appId, user); + application.setQueue(dest); + LOG.info("App: " + appId + " successfully moved from " + sourceQueueName + + " to: " + destQueueName); return targetQueueName; } finally { writeLock.unlock(); @@ -1980,15 +1991,23 @@ public void preValidateMoveApplication(ApplicationId appId, String newQueue) throws YarnException { try { writeLock.lock(); - FiCaSchedulerApp app = getApplicationAttempt( - ApplicationAttemptId.newInstance(appId, 0)); - String sourceQueueName = app.getQueue().getQueueName(); + SchedulerApplication application = + applications.get(appId); + if (application == null) { + throw new YarnException("App to be moved " + appId + " not found."); + } + String sourceQueueName = application.getQueue().getQueueName(); this.queueManager.getAndCheckLeafQueue(sourceQueueName); String destQueueName = handleMoveToPlanQueue(newQueue); LeafQueue dest = this.queueManager.getAndCheckLeafQueue(destQueueName); // Validation check - ACLs, submission limits for user & queue - String user = app.getUser(); - checkQueuePartition(app, dest); + String user = application.getUser(); + // Check active partition only when attempt is available + FiCaSchedulerApp appAttempt = + getApplicationAttempt(ApplicationAttemptId.newInstance(appId, 0)); + if (null != appAttempt) { + checkQueuePartition(appAttempt, dest); + } try { dest.validateSubmitApplication(appId, user, destQueueName); } catch (AccessControlException e) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 2b60ecfa6c..293bac2117 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerUpdateType; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; @@ -110,12 +111,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; @@ -2209,6 +2212,203 @@ public void testMoveAllAppsInvalidSource() throws Exception { rm.stop(); } + @Test(timeout = 60000) + public void testMoveAttemptNotAdded() throws Exception { + Configuration conf = new Configuration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + MockRM rm = new MockRM(getCapacityConfiguration(conf)); + rm.start(); + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + + ApplicationId appId = BuilderUtils.newApplicationId(100, 1); + ApplicationAttemptId appAttemptId = + BuilderUtils.newApplicationAttemptId(appId, 1); + + RMAppAttemptMetrics attemptMetric = + new RMAppAttemptMetrics(appAttemptId, rm.getRMContext()); + RMAppImpl app = mock(RMAppImpl.class); + when(app.getApplicationId()).thenReturn(appId); + RMAppAttemptImpl attempt = mock(RMAppAttemptImpl.class); + Container container = mock(Container.class); + when(attempt.getMasterContainer()).thenReturn(container); + ApplicationSubmissionContext submissionContext = + mock(ApplicationSubmissionContext.class); + when(attempt.getSubmissionContext()).thenReturn(submissionContext); + when(attempt.getAppAttemptId()).thenReturn(appAttemptId); + when(attempt.getRMAppAttemptMetrics()).thenReturn(attemptMetric); + when(app.getCurrentAppAttempt()).thenReturn(attempt); + + rm.getRMContext().getRMApps().put(appId, app); + + SchedulerEvent addAppEvent = + new AppAddedSchedulerEvent(appId, "a1", "user"); + try { + cs.moveApplication(appId, "b1"); + fail("Move should throw exception app not available"); + } catch (YarnException e) { + assertEquals("App to be moved application_100_0001 not found.", + e.getMessage()); + } + cs.handle(addAppEvent); + cs.moveApplication(appId, "b1"); + SchedulerEvent addAttemptEvent = + new AppAttemptAddedSchedulerEvent(appAttemptId, false); + cs.handle(addAttemptEvent); + CSQueue rootQ = cs.getRootQueue(); + CSQueue queueB = cs.getQueue("b"); + CSQueue queueA = cs.getQueue("a"); + CSQueue queueA1 = cs.getQueue("a1"); + CSQueue queueB1 = cs.getQueue("b1"); + Assert.assertEquals(1, rootQ.getNumApplications()); + Assert.assertEquals(0, queueA.getNumApplications()); + Assert.assertEquals(1, queueB.getNumApplications()); + Assert.assertEquals(0, queueA1.getNumApplications()); + Assert.assertEquals(1, queueB1.getNumApplications()); + + rm.close(); + } + + @Test + public void testRemoveAttemptMoveAdded() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + CapacityScheduler.class); + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); + // Create Mock RM + MockRM rm = new MockRM(getCapacityConfiguration(conf)); + CapacityScheduler sch = (CapacityScheduler) rm.getResourceScheduler(); + // add node + Resource newResource = Resource.newInstance(4 * GB, 1); + RMNode node = MockNodes.newNodeInfo(0, newResource, 1, "127.0.0.1"); + SchedulerEvent addNode = new NodeAddedSchedulerEvent(node); + sch.handle(addNode); + // create appid + ApplicationId appId = BuilderUtils.newApplicationId(100, 1); + ApplicationAttemptId appAttemptId = + BuilderUtils.newApplicationAttemptId(appId, 1); + + RMAppAttemptMetrics attemptMetric = + new RMAppAttemptMetrics(appAttemptId, rm.getRMContext()); + RMAppImpl app = mock(RMAppImpl.class); + when(app.getApplicationId()).thenReturn(appId); + RMAppAttemptImpl attempt = mock(RMAppAttemptImpl.class); + Container container = mock(Container.class); + when(attempt.getMasterContainer()).thenReturn(container); + ApplicationSubmissionContext submissionContext = + mock(ApplicationSubmissionContext.class); + when(attempt.getSubmissionContext()).thenReturn(submissionContext); + when(attempt.getAppAttemptId()).thenReturn(appAttemptId); + when(attempt.getRMAppAttemptMetrics()).thenReturn(attemptMetric); + when(app.getCurrentAppAttempt()).thenReturn(attempt); + + rm.getRMContext().getRMApps().put(appId, app); + // Add application + SchedulerEvent addAppEvent = + new AppAddedSchedulerEvent(appId, "a1", "user"); + sch.handle(addAppEvent); + // Add application attempt + SchedulerEvent addAttemptEvent = + new AppAttemptAddedSchedulerEvent(appAttemptId, false); + sch.handle(addAttemptEvent); + // get Queues + CSQueue queueA1 = sch.getQueue("a1"); + CSQueue queueB = sch.getQueue("b"); + CSQueue queueB1 = sch.getQueue("b1"); + + // add Running rm container and simulate live containers to a1 + ContainerId newContainerId = ContainerId.newContainerId(appAttemptId, 2); + RMContainerImpl rmContainer = mock(RMContainerImpl.class); + when(rmContainer.getState()).thenReturn(RMContainerState.RUNNING); + Container container2 = mock(Container.class); + when(rmContainer.getContainer()).thenReturn(container2); + Resource resource = Resource.newInstance(1024, 1); + when(container2.getResource()).thenReturn(resource); + when(rmContainer.getExecutionType()).thenReturn(ExecutionType.GUARANTEED); + when(container2.getNodeId()).thenReturn(node.getNodeID()); + when(container2.getId()).thenReturn(newContainerId); + when(rmContainer.getNodeLabelExpression()) + .thenReturn(RMNodeLabelsManager.NO_LABEL); + when(rmContainer.getContainerId()).thenReturn(newContainerId); + sch.getApplicationAttempt(appAttemptId).getLiveContainersMap() + .put(newContainerId, rmContainer); + QueueMetrics queueA1M = queueA1.getMetrics(); + queueA1M.incrPendingResources("user1", 1, resource); + queueA1M.allocateResources("user1", resource); + // remove attempt + sch.handle(new AppAttemptRemovedSchedulerEvent(appAttemptId, + RMAppAttemptState.KILLED, true)); + // Move application to queue b1 + sch.moveApplication(appId, "b1"); + // Check queue metrics after move + Assert.assertEquals(0, queueA1.getNumApplications()); + Assert.assertEquals(1, queueB.getNumApplications()); + Assert.assertEquals(0, queueB1.getNumApplications()); + + // Release attempt add event + ApplicationAttemptId appAttemptId2 = + BuilderUtils.newApplicationAttemptId(appId, 2); + SchedulerEvent addAttemptEvent2 = + new AppAttemptAddedSchedulerEvent(appAttemptId2, true); + sch.handle(addAttemptEvent2); + + // Check metrics after attempt added + Assert.assertEquals(0, queueA1.getNumApplications()); + Assert.assertEquals(1, queueB.getNumApplications()); + Assert.assertEquals(1, queueB1.getNumApplications()); + + + QueueMetrics queueB1M = queueB1.getMetrics(); + QueueMetrics queueBM = queueB.getMetrics(); + // Verify allocation MB of current state + Assert.assertEquals(0, queueA1M.getAllocatedMB()); + Assert.assertEquals(0, queueA1M.getAllocatedVirtualCores()); + Assert.assertEquals(1024, queueB1M.getAllocatedMB()); + Assert.assertEquals(1, queueB1M.getAllocatedVirtualCores()); + + // remove attempt + sch.handle(new AppAttemptRemovedSchedulerEvent(appAttemptId2, + RMAppAttemptState.FINISHED, false)); + + Assert.assertEquals(0, queueA1M.getAllocatedMB()); + Assert.assertEquals(0, queueA1M.getAllocatedVirtualCores()); + Assert.assertEquals(0, queueB1M.getAllocatedMB()); + Assert.assertEquals(0, queueB1M.getAllocatedVirtualCores()); + + verifyQueueMetrics(queueB1M); + verifyQueueMetrics(queueBM); + // Verify queue A1 metrics + verifyQueueMetrics(queueA1M); + rm.close(); + } + + private void verifyQueueMetrics(QueueMetrics queue) { + Assert.assertEquals(0, queue.getPendingMB()); + Assert.assertEquals(0, queue.getActiveUsers()); + Assert.assertEquals(0, queue.getActiveApps()); + Assert.assertEquals(0, queue.getAppsPending()); + Assert.assertEquals(0, queue.getAppsRunning()); + Assert.assertEquals(0, queue.getAllocatedMB()); + Assert.assertEquals(0, queue.getAllocatedVirtualCores()); + } + + private Configuration getCapacityConfiguration(Configuration config) { + CapacitySchedulerConfiguration conf = + new CapacitySchedulerConfiguration(config); + + // Define top-level queues + conf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[] {"a", "b"}); + conf.setCapacity(A, 50); + conf.setCapacity(B, 50); + conf.setQueues(A, new String[] {"a1", "a2"}); + conf.setCapacity(A1, 50); + conf.setCapacity(A2, 50); + conf.setQueues(B, new String[] {"b1"}); + conf.setCapacity(B1, 100); + return conf; + } + @Test public void testKillAllAppsInQueue() throws Exception { MockRM rm = setUpMove();