YARN-11708: Setting maximum-application-lifetime using AQCv2 templates doesn't apply on the first submitted app (#7041)

This commit is contained in:
Susheel Gupta 2024-10-03 19:25:28 +05:30 committed by GitHub
parent b781882020
commit 1b5a2a7f65
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 120 additions and 28 deletions

View File

@ -1269,7 +1269,7 @@ public void transition(RMAppImpl app, RMAppEvent event) {
long applicationLifetime = long applicationLifetime =
app.getApplicationLifetime(ApplicationTimeoutType.LIFETIME); app.getApplicationLifetime(ApplicationTimeoutType.LIFETIME);
applicationLifetime = app.scheduler applicationLifetime = app.scheduler
.checkAndGetApplicationLifetime(app.queue, applicationLifetime); .checkAndGetApplicationLifetime(app.queue, applicationLifetime, app);
if (applicationLifetime > 0) { if (applicationLifetime > 0) {
// calculate next timeout value // calculate next timeout value
Long newTimeout = Long newTimeout =

View File

@ -82,6 +82,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitorManager; import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitorManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@ -1679,7 +1680,8 @@ public Resource getMaximumAllocation() {
} }
@Override @Override
public long checkAndGetApplicationLifetime(String queueName, long lifetime) { public long checkAndGetApplicationLifetime(String queueName, long lifetime,
RMAppImpl app) {
// Lifetime is the application lifetime by default. // Lifetime is the application lifetime by default.
return lifetime; return lifetime;
} }

View File

@ -44,6 +44,7 @@
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
@ -423,11 +424,13 @@ Resource getNormalizedResource(Resource requestedResource,
* Queue lifetime. * Queue lifetime.
* @param queueName Name of the Queue * @param queueName Name of the Queue
* @param lifetime configured application lifetime * @param lifetime configured application lifetime
* @param app details of app
* @return valid lifetime as per queue * @return valid lifetime as per queue
*/ */
@Public @Public
@Evolving @Evolving
long checkAndGetApplicationLifetime(String queueName, long lifetime); long checkAndGetApplicationLifetime(String queueName, long lifetime,
RMAppImpl app);
/** /**
* Get maximum lifetime for a queue. * Get maximum lifetime for a queue.

View File

@ -40,6 +40,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.placement.CSMappingPlacementRule; import org.apache.hadoop.yarn.server.resourcemanager.placement.CSMappingPlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementFactory; import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementFactory;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementRule; import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.slf4j.Marker; import org.slf4j.Marker;
@ -3374,14 +3375,45 @@ public boolean moveReservedContainer(RMContainer toBeMovedContainer,
@Override @Override
public long checkAndGetApplicationLifetime(String queueName, public long checkAndGetApplicationLifetime(String queueName,
long lifetimeRequestedByApp) { long lifetimeRequestedByApp, RMAppImpl app) {
readLock.lock(); CSQueue queue;
writeLock.lock();
try { try {
CSQueue queue = getQueue(queueName); queue = getQueue(queueName);
if (!(queue instanceof AbstractLeafQueue)) {
// This handles the case where the first submitted app in aqc queue
// does not exist, addressing the issue related to YARN-11708.
if (queue == null) {
queue = getOrCreateQueueFromPlacementContext(app.getApplicationId(),
app.getUser(), app.getQueue(), app.getApplicationPlacementContext(), false);
}
if (queue == null) {
String message = "Application " + app.getApplicationId()
+ " submitted by user " + app.getUser();
if (isAmbiguous(queueName)) {
message = message + " to ambiguous queue: " + queueName
+ " please use full queue path instead.";
} else {
message = message + "Application " + app.getApplicationId() +
" submitted by user " + app.getUser() + " to unknown queue: " + queueName;
}
this.rmContext.getDispatcher().getEventHandler().handle(
new RMAppEvent(app.getApplicationId(), RMAppEventType.APP_REJECTED,
message));
return lifetimeRequestedByApp; return lifetimeRequestedByApp;
} }
if (!(queue instanceof AbstractLeafQueue)) {
return lifetimeRequestedByApp;
}
} finally {
writeLock.unlock();
}
readLock.lock();
try {
long defaultApplicationLifetime = long defaultApplicationLifetime =
queue.getDefaultApplicationLifetime(); queue.getDefaultApplicationLifetime();
long maximumApplicationLifetime = long maximumApplicationLifetime =

View File

@ -59,6 +59,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
@ -2042,7 +2043,8 @@ ReadLock getSchedulerReadLock() {
} }
@Override @Override
public long checkAndGetApplicationLifetime(String queueName, long lifetime) { public long checkAndGetApplicationLifetime(String queueName, long lifetime,
RMAppImpl app) {
// Lifetime is the application lifetime by default. // Lifetime is the application lifetime by default.
return lifetime; return lifetime;
} }

View File

@ -2482,12 +2482,12 @@ public void testcheckAndGetApplicationLifetime() throws Exception {
// positive integer value // positive integer value
CapacityScheduler cs = setUpCSQueue(maxLifetime, defaultLifetime); CapacityScheduler cs = setUpCSQueue(maxLifetime, defaultLifetime);
Assert.assertEquals(maxLifetime, Assert.assertEquals(maxLifetime,
cs.checkAndGetApplicationLifetime("default", 100)); cs.checkAndGetApplicationLifetime("default", 100, null));
Assert.assertEquals(9, cs.checkAndGetApplicationLifetime("default", 9)); Assert.assertEquals(9, cs.checkAndGetApplicationLifetime("default", 9, null));
Assert.assertEquals(defaultLifetime, Assert.assertEquals(defaultLifetime,
cs.checkAndGetApplicationLifetime("default", -1)); cs.checkAndGetApplicationLifetime("default", -1, null));
Assert.assertEquals(defaultLifetime, Assert.assertEquals(defaultLifetime,
cs.checkAndGetApplicationLifetime("default", 0)); cs.checkAndGetApplicationLifetime("default", 0, null));
Assert.assertEquals(maxLifetime, Assert.assertEquals(maxLifetime,
cs.getMaximumApplicationLifetime("default")); cs.getMaximumApplicationLifetime("default"));
@ -2495,11 +2495,11 @@ public void testcheckAndGetApplicationLifetime() throws Exception {
defaultLifetime = -1; defaultLifetime = -1;
// test for default values // test for default values
cs = setUpCSQueue(maxLifetime, defaultLifetime); cs = setUpCSQueue(maxLifetime, defaultLifetime);
Assert.assertEquals(100, cs.checkAndGetApplicationLifetime("default", 100)); Assert.assertEquals(100, cs.checkAndGetApplicationLifetime("default", 100, null));
Assert.assertEquals(defaultLifetime, Assert.assertEquals(defaultLifetime,
cs.checkAndGetApplicationLifetime("default", -1)); cs.checkAndGetApplicationLifetime("default", -1, null));
Assert.assertEquals(defaultLifetime, Assert.assertEquals(defaultLifetime,
cs.checkAndGetApplicationLifetime("default", 0)); cs.checkAndGetApplicationLifetime("default", 0, null));
Assert.assertEquals(maxLifetime, Assert.assertEquals(maxLifetime,
cs.getMaximumApplicationLifetime("default")); cs.getMaximumApplicationLifetime("default"));
@ -2507,32 +2507,32 @@ public void testcheckAndGetApplicationLifetime() throws Exception {
defaultLifetime = 10; defaultLifetime = 10;
cs = setUpCSQueue(maxLifetime, defaultLifetime); cs = setUpCSQueue(maxLifetime, defaultLifetime);
Assert.assertEquals(maxLifetime, Assert.assertEquals(maxLifetime,
cs.checkAndGetApplicationLifetime("default", 100)); cs.checkAndGetApplicationLifetime("default", 100, null));
Assert.assertEquals(defaultLifetime, Assert.assertEquals(defaultLifetime,
cs.checkAndGetApplicationLifetime("default", -1)); cs.checkAndGetApplicationLifetime("default", -1, null));
Assert.assertEquals(defaultLifetime, Assert.assertEquals(defaultLifetime,
cs.checkAndGetApplicationLifetime("default", 0)); cs.checkAndGetApplicationLifetime("default", 0, null));
Assert.assertEquals(maxLifetime, Assert.assertEquals(maxLifetime,
cs.getMaximumApplicationLifetime("default")); cs.getMaximumApplicationLifetime("default"));
maxLifetime = 0; maxLifetime = 0;
defaultLifetime = 0; defaultLifetime = 0;
cs = setUpCSQueue(maxLifetime, defaultLifetime); cs = setUpCSQueue(maxLifetime, defaultLifetime);
Assert.assertEquals(100, cs.checkAndGetApplicationLifetime("default", 100)); Assert.assertEquals(100, cs.checkAndGetApplicationLifetime("default", 100, null));
Assert.assertEquals(defaultLifetime, Assert.assertEquals(defaultLifetime,
cs.checkAndGetApplicationLifetime("default", -1)); cs.checkAndGetApplicationLifetime("default", -1, null));
Assert.assertEquals(defaultLifetime, Assert.assertEquals(defaultLifetime,
cs.checkAndGetApplicationLifetime("default", 0)); cs.checkAndGetApplicationLifetime("default", 0, null));
maxLifetime = 10; maxLifetime = 10;
defaultLifetime = -1; defaultLifetime = -1;
cs = setUpCSQueue(maxLifetime, defaultLifetime); cs = setUpCSQueue(maxLifetime, defaultLifetime);
Assert.assertEquals(maxLifetime, Assert.assertEquals(maxLifetime,
cs.checkAndGetApplicationLifetime("default", 100)); cs.checkAndGetApplicationLifetime("default", 100, null));
Assert.assertEquals(maxLifetime, Assert.assertEquals(maxLifetime,
cs.checkAndGetApplicationLifetime("default", -1)); cs.checkAndGetApplicationLifetime("default", -1, null));
Assert.assertEquals(maxLifetime, Assert.assertEquals(maxLifetime,
cs.checkAndGetApplicationLifetime("default", 0)); cs.checkAndGetApplicationLifetime("default", 0, null));
maxLifetime = 5; maxLifetime = 5;
defaultLifetime = 10; defaultLifetime = 10;
@ -2549,11 +2549,11 @@ public void testcheckAndGetApplicationLifetime() throws Exception {
defaultLifetime = 10; defaultLifetime = 10;
cs = setUpCSQueue(maxLifetime, defaultLifetime); cs = setUpCSQueue(maxLifetime, defaultLifetime);
Assert.assertEquals(100, Assert.assertEquals(100,
cs.checkAndGetApplicationLifetime("default", 100)); cs.checkAndGetApplicationLifetime("default", 100, null));
Assert.assertEquals(defaultLifetime, Assert.assertEquals(defaultLifetime,
cs.checkAndGetApplicationLifetime("default", -1)); cs.checkAndGetApplicationLifetime("default", -1, null));
Assert.assertEquals(defaultLifetime, Assert.assertEquals(defaultLifetime,
cs.checkAndGetApplicationLifetime("default", 0)); cs.checkAndGetApplicationLifetime("default", 0, null));
} }
private CapacityScheduler setUpCSQueue(long maxLifetime, private CapacityScheduler setUpCSQueue(long maxLifetime,

View File

@ -90,7 +90,6 @@
.NO_LABEL; .NO_LABEL;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueUtils.EPSILON; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueUtils.EPSILON;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.ROOT;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
@ -618,6 +617,60 @@ public void testAutoQueueCreationFailsForEmptyPathWithAQCAndWeightMode()
} }
} }
@Test
public void testAutoQueueCreationWithWeightModeAndMaxAppLifetimeFirstSubmittedApp()
throws Exception {
if (mockRM != null) {
mockRM.stop();
}
long maxRootLifetime = 20L;
long defaultRootLifetime = 10L;
QueuePath testQueue = new QueuePath("root.test");
CapacitySchedulerConfiguration conf = setupSchedulerConfiguration();
conf.setQueues(ROOT, new String[] {"test"});
conf.setAutoQueueCreationV2Enabled(testQueue, true);
conf.setCapacity(DEFAULT, "1w");
conf.setCapacity(testQueue, "2w");
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
conf.setMaximumLifetimePerQueue(ROOT, maxRootLifetime);
conf.setDefaultLifetimePerQueue(ROOT, defaultRootLifetime);
MockRM newMockRM = new MockRM(conf);
newMockRM.start();
((CapacityScheduler) newMockRM.getResourceScheduler()).start();
CapacityScheduler newCS =
(CapacityScheduler) newMockRM.getResourceScheduler();
Priority appPriority = Priority.newInstance(0);
MockRMAppSubmissionData app = MockRMAppSubmissionData.Builder.createWithMemory(1024, newMockRM)
.withAppPriority(appPriority)
.withQueue("root.test.user")
.build();
RMApp app1 = MockRMAppSubmitter.submit(newMockRM, app);
Assert.assertEquals(newCS.getMaximumApplicationLifetime("root.test.user"), 20L);
try {
newMockRM.waitForState(app1.getApplicationId(), RMAppState.KILLED);
long totalTimeRun = app1.getFinishTime() - app1.getSubmitTime();
Assert.assertEquals(RMAppState.KILLED, app1.getState());
Assert.assertTrue("Application killed before default lifetime value",
totalTimeRun > (defaultRootLifetime * 1000));
Assert.assertTrue(
"Application killed after max lifetime value " + totalTimeRun,
totalTimeRun < (maxRootLifetime * 1000));
} finally {
((CapacityScheduler) newMockRM.getResourceScheduler()).stop();
newMockRM.stop();
}
}
/** /**
* This test case checks if a mapping rule can put an application to an auto * This test case checks if a mapping rule can put an application to an auto