diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index b31ab07c0a..e2a62ecd17 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -495,15 +495,22 @@ public class FairScheduler extends applications.put(applicationId, application); queue.getMetrics().submitApp(user); - LOG.info("Accepted application " + applicationId + " from user: " + user - + ", in queue: " + queue.getName() - + ", currently num of applications: " + applications.size()); + LOG.info("Accepted application " + applicationId + " from user: " + user + + ", in queue: " + queue.getName() + + ", currently num of applications: " + applications.size()); if (isAppRecovering) { if (LOG.isDebugEnabled()) { LOG.debug(applicationId + " is recovering. Skip notifying APP_ACCEPTED"); } - } else{ + } else { + // During tests we do not always have an application object, handle + // it here but we probably should fix the tests + if (rmApp != null && rmApp.getApplicationSubmissionContext() != null) { + // Before we send out the event that the app is accepted is + // to set the queue in the submissionContext (needed on restore etc) + rmApp.getApplicationSubmissionContext().setQueue(queue.getName()); + } rmContext.getDispatcher().getEventHandler().handle( new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED)); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ParameterizedSchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ParameterizedSchedulerTestBase.java index 9a29a89657..4de16dc88b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ParameterizedSchedulerTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ParameterizedSchedulerTestBase.java @@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration; +import org.junit.After; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -111,6 +112,13 @@ public abstract class ParameterizedSchedulerTestBase { conf.setLong(FairSchedulerConfiguration.UPDATE_INTERVAL_MS, 10); } + @After + public void tearDown() { + if (schedulerType == SchedulerType.FAIR) { + (new File(FS_ALLOC_FILE)).delete(); + } + } + public SchedulerType getSchedulerType() { return schedulerType; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java index efde781680..e4c83e32dc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java @@ -1688,4 +1688,43 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase // *********** check appSchedulingInfo state *********** assertEquals((1L << 40) + 1L, schedulerAttempt.getNewContainerId()); } + + // Apps already completed before RM restart. Make sure we restore the queue + // correctly + @Test(timeout = 20000) + public void testFairSchedulerCompletedAppsQueue() throws Exception { + if (getSchedulerType() != SchedulerType.FAIR) { + return; + } + + rm1 = new MockRM(conf); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService()); + nm1.registerNode(); + RMApp app = rm1.submitApp(200); + MockAM am1 = MockRM.launchAndRegisterAM(app, rm1, nm1); + MockRM.finishAMAndVerifyAppState(app, rm1, nm1, am1); + + String fsQueueContext = app.getApplicationSubmissionContext().getQueue(); + String fsQueueApp = app.getQueue(); + assertEquals("Queue in app not equal to submission context", fsQueueApp, + fsQueueContext); + RMAppAttempt rmAttempt = app.getCurrentAppAttempt(); + assertNotNull("No AppAttempt found", rmAttempt); + + rm2 = new MockRM(conf, rm1.getRMStateStore()); + rm2.start(); + + RMApp recoveredApp = + rm2.getRMContext().getRMApps().get(app.getApplicationId()); + RMAppAttempt rmAttemptRecovered = recoveredApp.getCurrentAppAttempt(); + assertNotNull("No AppAttempt found after recovery", rmAttemptRecovered); + String fsQueueContextRecovered = + recoveredApp.getApplicationSubmissionContext().getQueue(); + String fsQueueAppRecovered = recoveredApp.getQueue(); + assertEquals(RMAppState.FINISHED, recoveredApp.getState()); + assertEquals("Recovered app queue is not the same as context queue", + fsQueueAppRecovered, fsQueueContextRecovered); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java index af4e1dd32a..5f291863e1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java @@ -163,16 +163,18 @@ public class FairSchedulerTestBase { protected ApplicationAttemptId createSchedulingRequest( int memory, int vcores, String queueId, String userId, int numContainers, int priority) { - ApplicationAttemptId id = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++); + ApplicationAttemptId id = createAppAttemptId(this.APP_ID++, + this.ATTEMPT_ID++); scheduler.addApplication(id.getApplicationId(), queueId, userId, false); // This conditional is for testAclSubmitApplication where app is rejected // and no app is added. - if (scheduler.getSchedulerApplications().containsKey(id.getApplicationId())) { + if (scheduler.getSchedulerApplications(). + containsKey(id.getApplicationId())) { scheduler.addApplicationAttempt(id, false, false); } List ask = new ArrayList(); - ResourceRequest request = createResourceRequest(memory, vcores, ResourceRequest.ANY, - priority, numContainers, true); + ResourceRequest request = createResourceRequest(memory, vcores, + ResourceRequest.ANY, priority, numContainers, true); ask.add(request); RMApp rmApp = mock(RMApp.class); @@ -180,9 +182,11 @@ public class FairSchedulerTestBase { when(rmApp.getCurrentAppAttempt()).thenReturn(rmAppAttempt); when(rmAppAttempt.getRMAppAttemptMetrics()).thenReturn( new RMAppAttemptMetrics(id, resourceManager.getRMContext())); - ApplicationSubmissionContext submissionContext = mock(ApplicationSubmissionContext.class); + ApplicationSubmissionContext submissionContext = + mock(ApplicationSubmissionContext.class); when(submissionContext.getUnmanagedAM()).thenReturn(false); when(rmAppAttempt.getSubmissionContext()).thenReturn(submissionContext); + when(rmApp.getApplicationSubmissionContext()).thenReturn(submissionContext); Container container = mock(Container.class); when(rmAppAttempt.getMasterContainer()).thenReturn(container); resourceManager.getRMContext().getRMApps() @@ -210,9 +214,11 @@ public class FairSchedulerTestBase { when(rmApp.getCurrentAppAttempt()).thenReturn(rmAppAttempt); when(rmAppAttempt.getRMAppAttemptMetrics()).thenReturn( new RMAppAttemptMetrics(id,resourceManager.getRMContext())); - ApplicationSubmissionContext submissionContext = mock(ApplicationSubmissionContext.class); + ApplicationSubmissionContext submissionContext = + mock(ApplicationSubmissionContext.class); when(submissionContext.getUnmanagedAM()).thenReturn(false); when(rmAppAttempt.getSubmissionContext()).thenReturn(submissionContext); + when(rmApp.getApplicationSubmissionContext()).thenReturn(submissionContext); resourceManager.getRMContext().getRMApps() .put(id.getApplicationId(), rmApp); @@ -275,9 +281,11 @@ public class FairSchedulerTestBase { RMAppAttemptMetrics attemptMetric = mock(RMAppAttemptMetrics.class); when(attempt.getRMAppAttemptMetrics()).thenReturn(attemptMetric); when(app.getCurrentAppAttempt()).thenReturn(attempt); - ApplicationSubmissionContext submissionContext = mock(ApplicationSubmissionContext.class); + ApplicationSubmissionContext submissionContext = + mock(ApplicationSubmissionContext.class); when(submissionContext.getUnmanagedAM()).thenReturn(false); when(attempt.getSubmissionContext()).thenReturn(submissionContext); + when(app.getApplicationSubmissionContext()).thenReturn(submissionContext); resourceManager.getRMContext().getRMApps() .put(attemptId.getApplicationId(), app); return app; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java index 46187d9edd..51ffd23fd2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java @@ -315,11 +315,11 @@ public class TestFSAppAttempt extends FairSchedulerTestBase { List blacklistAdditions = new ArrayList(1); List blacklistRemovals = new ArrayList(1); blacklistAdditions.add(n1.getNodeName()); - app.updateBlacklist(blacklistAdditions, blacklistRemovals); - app.getQueue().setFairShare(clusterResource); FSAppAttempt spyApp = spy(app); doReturn(false) .when(spyApp).isWaitingForAMContainer(); + spyApp.updateBlacklist(blacklistAdditions, blacklistRemovals); + spyApp.getQueue().setFairShare(clusterResource); assertTrue(spyApp.isPlaceBlacklisted(n1.getNodeName())); assertFalse(spyApp.isPlaceBlacklisted(n2.getNodeName())); assertEquals(n2.getUnallocatedResource(), spyApp.getHeadroom()); @@ -327,7 +327,7 @@ public class TestFSAppAttempt extends FairSchedulerTestBase { blacklistAdditions.clear(); blacklistAdditions.add(n2.getNodeName()); blacklistRemovals.add(n1.getNodeName()); - app.updateBlacklist(blacklistAdditions, blacklistRemovals); + spyApp.updateBlacklist(blacklistAdditions, blacklistRemovals); assertFalse(spyApp.isPlaceBlacklisted(n1.getNodeName())); assertTrue(spyApp.isPlaceBlacklisted(n2.getNodeName())); assertEquals(n1.getUnallocatedResource(), spyApp.getHeadroom()); @@ -335,7 +335,7 @@ public class TestFSAppAttempt extends FairSchedulerTestBase { blacklistAdditions.clear(); blacklistRemovals.clear(); blacklistRemovals.add(n2.getNodeName()); - app.updateBlacklist(blacklistAdditions, blacklistRemovals); + spyApp.updateBlacklist(blacklistAdditions, blacklistRemovals); assertFalse(spyApp.isPlaceBlacklisted(n1.getNodeName())); assertFalse(spyApp.isPlaceBlacklisted(n2.getNodeName())); assertEquals(clusterResource, spyApp.getHeadroom());