YARN-4141. Runtime Application Priority change should not throw exception for applications at finishing states. Contributed by Sunil G
This commit is contained in:
parent
3abbdc929b
commit
9f53a95ff6
@ -904,6 +904,9 @@ Release 2.8.0 - UNRELEASED
|
|||||||
|
|
||||||
YARN-4204. ConcurrentModificationException in FairSchedulerQueueInfo. (adhoot)
|
YARN-4204. ConcurrentModificationException in FairSchedulerQueueInfo. (adhoot)
|
||||||
|
|
||||||
|
YARN-4141. Runtime Application Priority change should not throw exception
|
||||||
|
for applications at finishing states (Sunil G via jlowe)
|
||||||
|
|
||||||
Release 2.7.2 - UNRELEASED
|
Release 2.7.2 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -185,6 +185,12 @@ public class ClientRMService extends AbstractService implements
|
|||||||
private ReservationSystem reservationSystem;
|
private ReservationSystem reservationSystem;
|
||||||
private ReservationInputValidator rValidator;
|
private ReservationInputValidator rValidator;
|
||||||
|
|
||||||
|
private static final EnumSet<RMAppState> COMPLETED_APP_STATES = EnumSet.of(
|
||||||
|
RMAppState.FINISHED, RMAppState.FINISHING, RMAppState.FAILED,
|
||||||
|
RMAppState.KILLED, RMAppState.FINAL_SAVING, RMAppState.KILLING);
|
||||||
|
private static final EnumSet<RMAppState> ACTIVE_APP_STATES = EnumSet.of(
|
||||||
|
RMAppState.ACCEPTED, RMAppState.RUNNING);
|
||||||
|
|
||||||
public ClientRMService(RMContext rmContext, YarnScheduler scheduler,
|
public ClientRMService(RMContext rmContext, YarnScheduler scheduler,
|
||||||
RMAppManager rmAppManager, ApplicationACLsManager applicationACLsManager,
|
RMAppManager rmAppManager, ApplicationACLsManager applicationACLsManager,
|
||||||
QueueACLsManager queueACLsManager,
|
QueueACLsManager queueACLsManager,
|
||||||
@ -1334,7 +1340,8 @@ public UpdateApplicationPriorityResponse updateApplicationPriority(
|
|||||||
AuditConstants.UPDATE_APP_PRIORITY, "UNKNOWN", "ClientRMService",
|
AuditConstants.UPDATE_APP_PRIORITY, "UNKNOWN", "ClientRMService",
|
||||||
"Trying to update priority of an absent application", applicationId);
|
"Trying to update priority of an absent application", applicationId);
|
||||||
throw new ApplicationNotFoundException(
|
throw new ApplicationNotFoundException(
|
||||||
"Trying to update priority o an absent application " + applicationId);
|
"Trying to update priority of an absent application "
|
||||||
|
+ applicationId);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!checkAccess(callerUGI, application.getUser(),
|
if (!checkAccess(callerUGI, application.getUser(),
|
||||||
@ -1349,12 +1356,20 @@ public UpdateApplicationPriorityResponse updateApplicationPriority(
|
|||||||
+ ApplicationAccessType.MODIFY_APP.name() + " on " + applicationId));
|
+ ApplicationAccessType.MODIFY_APP.name() + " on " + applicationId));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
UpdateApplicationPriorityResponse response = recordFactory
|
||||||
|
.newRecordInstance(UpdateApplicationPriorityResponse.class);
|
||||||
// Update priority only when app is tracked by the scheduler
|
// Update priority only when app is tracked by the scheduler
|
||||||
if (!EnumSet.of(RMAppState.ACCEPTED, RMAppState.RUNNING).contains(
|
if (!ACTIVE_APP_STATES.contains(application.getState())) {
|
||||||
application.getState())) {
|
if (COMPLETED_APP_STATES.contains(application.getState())) {
|
||||||
String msg =
|
// If Application is in any of the final states, change priority
|
||||||
"Application in " + application.getState()
|
// can be skipped rather throwing exception.
|
||||||
+ " state cannot be update priority.";
|
RMAuditLogger.logSuccess(callerUGI.getShortUserName(),
|
||||||
|
AuditConstants.UPDATE_APP_PRIORITY, "ClientRMService",
|
||||||
|
applicationId);
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
String msg = "Application in " + application.getState()
|
||||||
|
+ " state cannot update priority.";
|
||||||
RMAuditLogger
|
RMAuditLogger
|
||||||
.logFailure(callerUGI.getShortUserName(),
|
.logFailure(callerUGI.getShortUserName(),
|
||||||
AuditConstants.UPDATE_APP_PRIORITY, "UNKNOWN", "ClientRMService",
|
AuditConstants.UPDATE_APP_PRIORITY, "UNKNOWN", "ClientRMService",
|
||||||
@ -1374,9 +1389,6 @@ public UpdateApplicationPriorityResponse updateApplicationPriority(
|
|||||||
|
|
||||||
RMAuditLogger.logSuccess(callerUGI.getShortUserName(),
|
RMAuditLogger.logSuccess(callerUGI.getShortUserName(),
|
||||||
AuditConstants.UPDATE_APP_PRIORITY, "ClientRMService", applicationId);
|
AuditConstants.UPDATE_APP_PRIORITY, "ClientRMService", applicationId);
|
||||||
UpdateApplicationPriorityResponse response =
|
|
||||||
recordFactory
|
|
||||||
.newRecordInstance(UpdateApplicationPriorityResponse.class);
|
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1335,7 +1335,7 @@ protected ClientRMService createClientRMService() {
|
|||||||
@Test(timeout = 120000)
|
@Test(timeout = 120000)
|
||||||
public void testUpdateApplicationPriorityRequest() throws Exception {
|
public void testUpdateApplicationPriorityRequest() throws Exception {
|
||||||
int maxPriority = 10;
|
int maxPriority = 10;
|
||||||
int appPriorty = 5;
|
int appPriority = 5;
|
||||||
YarnConfiguration conf = new YarnConfiguration();
|
YarnConfiguration conf = new YarnConfiguration();
|
||||||
conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY,
|
conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY,
|
||||||
maxPriority);
|
maxPriority);
|
||||||
@ -1344,43 +1344,47 @@ public void testUpdateApplicationPriorityRequest() throws Exception {
|
|||||||
rm.start();
|
rm.start();
|
||||||
|
|
||||||
// Start app1 with appPriority 5
|
// Start app1 with appPriority 5
|
||||||
RMApp app1 = rm.submitApp(1024, Priority.newInstance(appPriorty));
|
RMApp app1 = rm.submitApp(1024, Priority.newInstance(appPriority));
|
||||||
|
|
||||||
Assert.assertEquals("Incorrect priority has been set to application",
|
Assert.assertEquals("Incorrect priority has been set to application",
|
||||||
appPriorty, app1.getApplicationSubmissionContext().getPriority()
|
appPriority, app1.getApplicationSubmissionContext().getPriority()
|
||||||
.getPriority());
|
.getPriority());
|
||||||
|
|
||||||
appPriorty = 9;
|
appPriority = 9;
|
||||||
ClientRMService rmService = rm.getClientRMService();
|
ClientRMService rmService = rm.getClientRMService();
|
||||||
UpdateApplicationPriorityRequest updateRequest =
|
UpdateApplicationPriorityRequest updateRequest =
|
||||||
UpdateApplicationPriorityRequest.newInstance(app1.getApplicationId(),
|
UpdateApplicationPriorityRequest.newInstance(app1.getApplicationId(),
|
||||||
Priority.newInstance(appPriorty));
|
Priority.newInstance(appPriority));
|
||||||
|
|
||||||
rmService.updateApplicationPriority(updateRequest);
|
rmService.updateApplicationPriority(updateRequest);
|
||||||
|
|
||||||
Assert.assertEquals("Incorrect priority has been set to application",
|
Assert.assertEquals("Incorrect priority has been set to application",
|
||||||
appPriorty, app1.getApplicationSubmissionContext().getPriority()
|
appPriority, app1.getApplicationSubmissionContext().getPriority()
|
||||||
.getPriority());
|
.getPriority());
|
||||||
|
|
||||||
rm.killApp(app1.getApplicationId());
|
rm.killApp(app1.getApplicationId());
|
||||||
rm.waitForState(app1.getApplicationId(), RMAppState.KILLED);
|
rm.waitForState(app1.getApplicationId(), RMAppState.KILLED);
|
||||||
|
|
||||||
|
appPriority = 8;
|
||||||
|
UpdateApplicationPriorityRequest updateRequestNew =
|
||||||
|
UpdateApplicationPriorityRequest.newInstance(app1.getApplicationId(),
|
||||||
|
Priority.newInstance(appPriority));
|
||||||
// Update priority request for application in KILLED state
|
// Update priority request for application in KILLED state
|
||||||
try {
|
rmService.updateApplicationPriority(updateRequestNew);
|
||||||
rmService.updateApplicationPriority(updateRequest);
|
|
||||||
Assert.fail("Can not update priority for an application in KILLED state");
|
// Hence new priority should not be updated
|
||||||
} catch (YarnException e) {
|
Assert.assertNotEquals("Priority should not be updated as app is in KILLED state",
|
||||||
String msg =
|
appPriority, app1.getApplicationSubmissionContext().getPriority()
|
||||||
"Application in " + app1.getState()
|
.getPriority());
|
||||||
+ " state cannot be update priority.";
|
Assert.assertEquals("Priority should be same as old one before update",
|
||||||
Assert.assertTrue("", msg.contains(e.getMessage()));
|
9, app1.getApplicationSubmissionContext().getPriority()
|
||||||
}
|
.getPriority());
|
||||||
|
|
||||||
// Update priority request for invalid application id.
|
// Update priority request for invalid application id.
|
||||||
ApplicationId invalidAppId = ApplicationId.newInstance(123456789L, 3);
|
ApplicationId invalidAppId = ApplicationId.newInstance(123456789L, 3);
|
||||||
updateRequest =
|
updateRequest =
|
||||||
UpdateApplicationPriorityRequest.newInstance(invalidAppId,
|
UpdateApplicationPriorityRequest.newInstance(invalidAppId,
|
||||||
Priority.newInstance(appPriorty));
|
Priority.newInstance(appPriority));
|
||||||
try {
|
try {
|
||||||
rmService.updateApplicationPriority(updateRequest);
|
rmService.updateApplicationPriority(updateRequest);
|
||||||
Assert
|
Assert
|
||||||
|
Loading…
Reference in New Issue
Block a user