YARN-7139. FairScheduler: finished applications are always restored to default queue. Contributed by Wilfred Spiegelenburg.
This commit is contained in:
parent
37f4696a9c
commit
bc93ac229e
@ -495,15 +495,22 @@ protected void addApplication(ApplicationId applicationId,
|
||||
applications.put(applicationId, application);
|
||||
queue.getMetrics().submitApp(user);
|
||||
|
||||
LOG.info("Accepted application " + applicationId + " from user: " + user
|
||||
+ ", in queue: " + queue.getName()
|
||||
+ ", currently num of applications: " + applications.size());
|
||||
LOG.info("Accepted application " + applicationId + " from user: " + user
|
||||
+ ", in queue: " + queue.getName()
|
||||
+ ", currently num of applications: " + applications.size());
|
||||
if (isAppRecovering) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(applicationId
|
||||
+ " is recovering. Skip notifying APP_ACCEPTED");
|
||||
}
|
||||
} else{
|
||||
} else {
|
||||
// During tests we do not always have an application object, handle
|
||||
// it here but we probably should fix the tests
|
||||
if (rmApp != null && rmApp.getApplicationSubmissionContext() != null) {
|
||||
// Before we send out the event that the app is accepted is
|
||||
// to set the queue in the submissionContext (needed on restore etc)
|
||||
rmApp.getApplicationSubmissionContext().setQueue(queue.getName());
|
||||
}
|
||||
rmContext.getDispatcher().getEventHandler().handle(
|
||||
new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
|
||||
}
|
||||
|
@ -26,6 +26,7 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
|
||||
|
||||
import org.junit.After;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
@ -111,6 +112,13 @@ protected void configureFairScheduler(YarnConfiguration conf)
|
||||
conf.setLong(FairSchedulerConfiguration.UPDATE_INTERVAL_MS, 10);
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() {
|
||||
if (schedulerType == SchedulerType.FAIR) {
|
||||
(new File(FS_ALLOC_FILE)).delete();
|
||||
}
|
||||
}
|
||||
|
||||
public SchedulerType getSchedulerType() {
|
||||
return schedulerType;
|
||||
}
|
||||
|
@ -1688,4 +1688,43 @@ public void testDynamicAutoCreatedQueueRecovery(String user, String queueName)
|
||||
// *********** check appSchedulingInfo state ***********
|
||||
assertEquals((1L << 40) + 1L, schedulerAttempt.getNewContainerId());
|
||||
}
|
||||
|
||||
// Apps already completed before RM restart. Make sure we restore the queue
|
||||
// correctly
|
||||
@Test(timeout = 20000)
|
||||
public void testFairSchedulerCompletedAppsQueue() throws Exception {
|
||||
if (getSchedulerType() != SchedulerType.FAIR) {
|
||||
return;
|
||||
}
|
||||
|
||||
rm1 = new MockRM(conf);
|
||||
rm1.start();
|
||||
MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
|
||||
nm1.registerNode();
|
||||
RMApp app = rm1.submitApp(200);
|
||||
MockAM am1 = MockRM.launchAndRegisterAM(app, rm1, nm1);
|
||||
MockRM.finishAMAndVerifyAppState(app, rm1, nm1, am1);
|
||||
|
||||
String fsQueueContext = app.getApplicationSubmissionContext().getQueue();
|
||||
String fsQueueApp = app.getQueue();
|
||||
assertEquals("Queue in app not equal to submission context", fsQueueApp,
|
||||
fsQueueContext);
|
||||
RMAppAttempt rmAttempt = app.getCurrentAppAttempt();
|
||||
assertNotNull("No AppAttempt found", rmAttempt);
|
||||
|
||||
rm2 = new MockRM(conf, rm1.getRMStateStore());
|
||||
rm2.start();
|
||||
|
||||
RMApp recoveredApp =
|
||||
rm2.getRMContext().getRMApps().get(app.getApplicationId());
|
||||
RMAppAttempt rmAttemptRecovered = recoveredApp.getCurrentAppAttempt();
|
||||
assertNotNull("No AppAttempt found after recovery", rmAttemptRecovered);
|
||||
String fsQueueContextRecovered =
|
||||
recoveredApp.getApplicationSubmissionContext().getQueue();
|
||||
String fsQueueAppRecovered = recoveredApp.getQueue();
|
||||
assertEquals(RMAppState.FINISHED, recoveredApp.getState());
|
||||
assertEquals("Recovered app queue is not the same as context queue",
|
||||
fsQueueAppRecovered, fsQueueContextRecovered);
|
||||
}
|
||||
}
|
||||
|
@ -163,16 +163,18 @@ protected ApplicationAttemptId createSchedulingRequest(
|
||||
protected ApplicationAttemptId createSchedulingRequest(
|
||||
int memory, int vcores, String queueId, String userId, int numContainers,
|
||||
int priority) {
|
||||
ApplicationAttemptId id = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
|
||||
ApplicationAttemptId id = createAppAttemptId(this.APP_ID++,
|
||||
this.ATTEMPT_ID++);
|
||||
scheduler.addApplication(id.getApplicationId(), queueId, userId, false);
|
||||
// This conditional is for testAclSubmitApplication where app is rejected
|
||||
// and no app is added.
|
||||
if (scheduler.getSchedulerApplications().containsKey(id.getApplicationId())) {
|
||||
if (scheduler.getSchedulerApplications().
|
||||
containsKey(id.getApplicationId())) {
|
||||
scheduler.addApplicationAttempt(id, false, false);
|
||||
}
|
||||
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
|
||||
ResourceRequest request = createResourceRequest(memory, vcores, ResourceRequest.ANY,
|
||||
priority, numContainers, true);
|
||||
ResourceRequest request = createResourceRequest(memory, vcores,
|
||||
ResourceRequest.ANY, priority, numContainers, true);
|
||||
ask.add(request);
|
||||
|
||||
RMApp rmApp = mock(RMApp.class);
|
||||
@ -180,9 +182,11 @@ protected ApplicationAttemptId createSchedulingRequest(
|
||||
when(rmApp.getCurrentAppAttempt()).thenReturn(rmAppAttempt);
|
||||
when(rmAppAttempt.getRMAppAttemptMetrics()).thenReturn(
|
||||
new RMAppAttemptMetrics(id, resourceManager.getRMContext()));
|
||||
ApplicationSubmissionContext submissionContext = mock(ApplicationSubmissionContext.class);
|
||||
ApplicationSubmissionContext submissionContext =
|
||||
mock(ApplicationSubmissionContext.class);
|
||||
when(submissionContext.getUnmanagedAM()).thenReturn(false);
|
||||
when(rmAppAttempt.getSubmissionContext()).thenReturn(submissionContext);
|
||||
when(rmApp.getApplicationSubmissionContext()).thenReturn(submissionContext);
|
||||
Container container = mock(Container.class);
|
||||
when(rmAppAttempt.getMasterContainer()).thenReturn(container);
|
||||
resourceManager.getRMContext().getRMApps()
|
||||
@ -210,9 +214,11 @@ protected ApplicationAttemptId createSchedulingRequest(String queueId,
|
||||
when(rmApp.getCurrentAppAttempt()).thenReturn(rmAppAttempt);
|
||||
when(rmAppAttempt.getRMAppAttemptMetrics()).thenReturn(
|
||||
new RMAppAttemptMetrics(id,resourceManager.getRMContext()));
|
||||
ApplicationSubmissionContext submissionContext = mock(ApplicationSubmissionContext.class);
|
||||
ApplicationSubmissionContext submissionContext =
|
||||
mock(ApplicationSubmissionContext.class);
|
||||
when(submissionContext.getUnmanagedAM()).thenReturn(false);
|
||||
when(rmAppAttempt.getSubmissionContext()).thenReturn(submissionContext);
|
||||
when(rmApp.getApplicationSubmissionContext()).thenReturn(submissionContext);
|
||||
resourceManager.getRMContext().getRMApps()
|
||||
.put(id.getApplicationId(), rmApp);
|
||||
|
||||
@ -275,9 +281,11 @@ protected RMApp createMockRMApp(ApplicationAttemptId attemptId) {
|
||||
RMAppAttemptMetrics attemptMetric = mock(RMAppAttemptMetrics.class);
|
||||
when(attempt.getRMAppAttemptMetrics()).thenReturn(attemptMetric);
|
||||
when(app.getCurrentAppAttempt()).thenReturn(attempt);
|
||||
ApplicationSubmissionContext submissionContext = mock(ApplicationSubmissionContext.class);
|
||||
ApplicationSubmissionContext submissionContext =
|
||||
mock(ApplicationSubmissionContext.class);
|
||||
when(submissionContext.getUnmanagedAM()).thenReturn(false);
|
||||
when(attempt.getSubmissionContext()).thenReturn(submissionContext);
|
||||
when(app.getApplicationSubmissionContext()).thenReturn(submissionContext);
|
||||
resourceManager.getRMContext().getRMApps()
|
||||
.put(attemptId.getApplicationId(), app);
|
||||
return app;
|
||||
|
@ -315,11 +315,11 @@ public void testHeadroomWithBlackListedNodes() {
|
||||
List<String> blacklistAdditions = new ArrayList<String>(1);
|
||||
List<String> blacklistRemovals = new ArrayList<String>(1);
|
||||
blacklistAdditions.add(n1.getNodeName());
|
||||
app.updateBlacklist(blacklistAdditions, blacklistRemovals);
|
||||
app.getQueue().setFairShare(clusterResource);
|
||||
FSAppAttempt spyApp = spy(app);
|
||||
doReturn(false)
|
||||
.when(spyApp).isWaitingForAMContainer();
|
||||
spyApp.updateBlacklist(blacklistAdditions, blacklistRemovals);
|
||||
spyApp.getQueue().setFairShare(clusterResource);
|
||||
assertTrue(spyApp.isPlaceBlacklisted(n1.getNodeName()));
|
||||
assertFalse(spyApp.isPlaceBlacklisted(n2.getNodeName()));
|
||||
assertEquals(n2.getUnallocatedResource(), spyApp.getHeadroom());
|
||||
@ -327,7 +327,7 @@ public void testHeadroomWithBlackListedNodes() {
|
||||
blacklistAdditions.clear();
|
||||
blacklistAdditions.add(n2.getNodeName());
|
||||
blacklistRemovals.add(n1.getNodeName());
|
||||
app.updateBlacklist(blacklistAdditions, blacklistRemovals);
|
||||
spyApp.updateBlacklist(blacklistAdditions, blacklistRemovals);
|
||||
assertFalse(spyApp.isPlaceBlacklisted(n1.getNodeName()));
|
||||
assertTrue(spyApp.isPlaceBlacklisted(n2.getNodeName()));
|
||||
assertEquals(n1.getUnallocatedResource(), spyApp.getHeadroom());
|
||||
@ -335,7 +335,7 @@ public void testHeadroomWithBlackListedNodes() {
|
||||
blacklistAdditions.clear();
|
||||
blacklistRemovals.clear();
|
||||
blacklistRemovals.add(n2.getNodeName());
|
||||
app.updateBlacklist(blacklistAdditions, blacklistRemovals);
|
||||
spyApp.updateBlacklist(blacklistAdditions, blacklistRemovals);
|
||||
assertFalse(spyApp.isPlaceBlacklisted(n1.getNodeName()));
|
||||
assertFalse(spyApp.isPlaceBlacklisted(n2.getNodeName()));
|
||||
assertEquals(clusterResource, spyApp.getHeadroom());
|
||||
|
Loading…
Reference in New Issue
Block a user