YARN-5956. Refactor ClientRMService for unify error handling across apis. Contributed by Kai Sasaki.
This commit is contained in:
parent
84ddedc0b2
commit
cffea251be
@ -196,7 +196,7 @@ public class ClientRMService extends AbstractService implements
|
|||||||
protected RMDelegationTokenSecretManager rmDTSecretManager;
|
protected RMDelegationTokenSecretManager rmDTSecretManager;
|
||||||
|
|
||||||
private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
|
private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
|
||||||
InetSocketAddress clientBindAddress;
|
private InetSocketAddress clientBindAddress;
|
||||||
|
|
||||||
private final ApplicationACLsManager applicationsACLsManager;
|
private final ApplicationACLsManager applicationsACLsManager;
|
||||||
private final QueueACLsManager queueACLsManager;
|
private final QueueACLsManager queueACLsManager;
|
||||||
@ -206,9 +206,6 @@ public class ClientRMService extends AbstractService implements
|
|||||||
private ReservationSystem reservationSystem;
|
private ReservationSystem reservationSystem;
|
||||||
private ReservationInputValidator rValidator;
|
private ReservationInputValidator rValidator;
|
||||||
|
|
||||||
private static final EnumSet<RMAppState> COMPLETED_APP_STATES = EnumSet.of(
|
|
||||||
RMAppState.FINISHED, RMAppState.FINISHING, RMAppState.FAILED,
|
|
||||||
RMAppState.KILLED, RMAppState.FINAL_SAVING, RMAppState.KILLING);
|
|
||||||
private static final EnumSet<RMAppState> ACTIVE_APP_STATES = EnumSet.of(
|
private static final EnumSet<RMAppState> ACTIVE_APP_STATES = EnumSet.of(
|
||||||
RMAppState.ACCEPTED, RMAppState.RUNNING);
|
RMAppState.ACCEPTED, RMAppState.RUNNING);
|
||||||
|
|
||||||
@ -298,11 +295,12 @@ public InetSocketAddress getBindAddress() {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* check if the calling user has the access to application information.
|
* check if the calling user has the access to application information.
|
||||||
* @param callerUGI
|
* @param callerUGI the user information who submit the request
|
||||||
* @param owner
|
* @param owner the user of the application
|
||||||
* @param operationPerformed
|
* @param operationPerformed the type of operation defined in
|
||||||
* @param application
|
* {@link ApplicationAccessType}
|
||||||
* @return
|
* @param application submitted application
|
||||||
|
* @return access is permitted or not
|
||||||
*/
|
*/
|
||||||
private boolean checkAccess(UserGroupInformation callerUGI, String owner,
|
private boolean checkAccess(UserGroupInformation callerUGI, String owner,
|
||||||
ApplicationAccessType operationPerformed, RMApp application) {
|
ApplicationAccessType operationPerformed, RMApp application) {
|
||||||
@ -379,24 +377,14 @@ public GetApplicationReportResponse getApplicationReport(
|
|||||||
public GetApplicationAttemptReportResponse getApplicationAttemptReport(
|
public GetApplicationAttemptReportResponse getApplicationAttemptReport(
|
||||||
GetApplicationAttemptReportRequest request) throws YarnException,
|
GetApplicationAttemptReportRequest request) throws YarnException,
|
||||||
IOException {
|
IOException {
|
||||||
|
ApplicationId applicationId
|
||||||
|
= request.getApplicationAttemptId().getApplicationId();
|
||||||
ApplicationAttemptId appAttemptId = request.getApplicationAttemptId();
|
ApplicationAttemptId appAttemptId = request.getApplicationAttemptId();
|
||||||
UserGroupInformation callerUGI;
|
UserGroupInformation callerUGI = getCallerUgi(applicationId,
|
||||||
try {
|
AuditConstants.GET_APP_ATTEMPT_REPORT);
|
||||||
callerUGI = UserGroupInformation.getCurrentUser();
|
RMApp application = verifyUserAccessForRMApp(applicationId, callerUGI,
|
||||||
} catch (IOException ie) {
|
AuditConstants.GET_APP_ATTEMPT_REPORT, ApplicationAccessType.VIEW_APP,
|
||||||
LOG.info("Error getting UGI ", ie);
|
false);
|
||||||
throw RPCUtil.getRemoteException(ie);
|
|
||||||
}
|
|
||||||
RMApp application = this.rmContext.getRMApps().get(
|
|
||||||
appAttemptId.getApplicationId());
|
|
||||||
if (application == null) {
|
|
||||||
// If the RM doesn't have the application, throw
|
|
||||||
// ApplicationNotFoundException and let client to handle.
|
|
||||||
throw new ApplicationNotFoundException("Application with id '"
|
|
||||||
+ request.getApplicationAttemptId().getApplicationId()
|
|
||||||
+ "' doesn't exist in RM. Please check that the job "
|
|
||||||
+ "submission was successful.");
|
|
||||||
}
|
|
||||||
|
|
||||||
boolean allowAccess = checkAccess(callerUGI, application.getUser(),
|
boolean allowAccess = checkAccess(callerUGI, application.getUser(),
|
||||||
ApplicationAccessType.VIEW_APP, application);
|
ApplicationAccessType.VIEW_APP, application);
|
||||||
@ -422,21 +410,11 @@ public GetApplicationAttemptReportResponse getApplicationAttemptReport(
|
|||||||
public GetApplicationAttemptsResponse getApplicationAttempts(
|
public GetApplicationAttemptsResponse getApplicationAttempts(
|
||||||
GetApplicationAttemptsRequest request) throws YarnException, IOException {
|
GetApplicationAttemptsRequest request) throws YarnException, IOException {
|
||||||
ApplicationId appId = request.getApplicationId();
|
ApplicationId appId = request.getApplicationId();
|
||||||
UserGroupInformation callerUGI;
|
UserGroupInformation callerUGI = getCallerUgi(appId,
|
||||||
try {
|
AuditConstants.GET_APP_ATTEMPTS);
|
||||||
callerUGI = UserGroupInformation.getCurrentUser();
|
RMApp application = verifyUserAccessForRMApp(appId, callerUGI,
|
||||||
} catch (IOException ie) {
|
AuditConstants.GET_APP_ATTEMPTS, ApplicationAccessType.VIEW_APP,
|
||||||
LOG.info("Error getting UGI ", ie);
|
false);
|
||||||
throw RPCUtil.getRemoteException(ie);
|
|
||||||
}
|
|
||||||
RMApp application = this.rmContext.getRMApps().get(appId);
|
|
||||||
if (application == null) {
|
|
||||||
// If the RM doesn't have the application, throw
|
|
||||||
// ApplicationNotFoundException and let client to handle.
|
|
||||||
throw new ApplicationNotFoundException("Application with id '" + appId
|
|
||||||
+ "' doesn't exist in RM. Please check that the job submission "
|
|
||||||
+ "was successful.");
|
|
||||||
}
|
|
||||||
boolean allowAccess = checkAccess(callerUGI, application.getUser(),
|
boolean allowAccess = checkAccess(callerUGI, application.getUser(),
|
||||||
ApplicationAccessType.VIEW_APP, application);
|
ApplicationAccessType.VIEW_APP, application);
|
||||||
GetApplicationAttemptsResponse response = null;
|
GetApplicationAttemptsResponse response = null;
|
||||||
@ -471,21 +449,11 @@ public GetContainerReportResponse getContainerReport(
|
|||||||
ContainerId containerId = request.getContainerId();
|
ContainerId containerId = request.getContainerId();
|
||||||
ApplicationAttemptId appAttemptId = containerId.getApplicationAttemptId();
|
ApplicationAttemptId appAttemptId = containerId.getApplicationAttemptId();
|
||||||
ApplicationId appId = appAttemptId.getApplicationId();
|
ApplicationId appId = appAttemptId.getApplicationId();
|
||||||
UserGroupInformation callerUGI;
|
UserGroupInformation callerUGI = getCallerUgi(appId,
|
||||||
try {
|
AuditConstants.GET_CONTAINER_REPORT);
|
||||||
callerUGI = UserGroupInformation.getCurrentUser();
|
RMApp application = verifyUserAccessForRMApp(appId, callerUGI,
|
||||||
} catch (IOException ie) {
|
AuditConstants.GET_CONTAINER_REPORT, ApplicationAccessType.VIEW_APP,
|
||||||
LOG.info("Error getting UGI ", ie);
|
false);
|
||||||
throw RPCUtil.getRemoteException(ie);
|
|
||||||
}
|
|
||||||
RMApp application = this.rmContext.getRMApps().get(appId);
|
|
||||||
if (application == null) {
|
|
||||||
// If the RM doesn't have the application, throw
|
|
||||||
// ApplicationNotFoundException and let client to handle.
|
|
||||||
throw new ApplicationNotFoundException("Application with id '" + appId
|
|
||||||
+ "' doesn't exist in RM. Please check that the job submission "
|
|
||||||
+ "was successful.");
|
|
||||||
}
|
|
||||||
boolean allowAccess = checkAccess(callerUGI, application.getUser(),
|
boolean allowAccess = checkAccess(callerUGI, application.getUser(),
|
||||||
ApplicationAccessType.VIEW_APP, application);
|
ApplicationAccessType.VIEW_APP, application);
|
||||||
GetContainerReportResponse response = null;
|
GetContainerReportResponse response = null;
|
||||||
@ -496,13 +464,13 @@ public GetContainerReportResponse getContainerReport(
|
|||||||
"ApplicationAttempt with id '" + appAttemptId +
|
"ApplicationAttempt with id '" + appAttemptId +
|
||||||
"' doesn't exist in RM.");
|
"' doesn't exist in RM.");
|
||||||
}
|
}
|
||||||
RMContainer rmConatiner = this.rmContext.getScheduler().getRMContainer(
|
RMContainer rmContainer = this.rmContext.getScheduler().getRMContainer(
|
||||||
containerId);
|
containerId);
|
||||||
if (rmConatiner == null) {
|
if (rmContainer == null) {
|
||||||
throw new ContainerNotFoundException("Container with id '" + containerId
|
throw new ContainerNotFoundException("Container with id '" + containerId
|
||||||
+ "' doesn't exist in RM.");
|
+ "' doesn't exist in RM.");
|
||||||
}
|
}
|
||||||
response = GetContainerReportResponse.newInstance(rmConatiner
|
response = GetContainerReportResponse.newInstance(rmContainer
|
||||||
.createContainerReport());
|
.createContainerReport());
|
||||||
} else {
|
} else {
|
||||||
throw new YarnException("User " + callerUGI.getShortUserName()
|
throw new YarnException("User " + callerUGI.getShortUserName()
|
||||||
@ -522,21 +490,10 @@ public GetContainersResponse getContainers(GetContainersRequest request)
|
|||||||
throws YarnException, IOException {
|
throws YarnException, IOException {
|
||||||
ApplicationAttemptId appAttemptId = request.getApplicationAttemptId();
|
ApplicationAttemptId appAttemptId = request.getApplicationAttemptId();
|
||||||
ApplicationId appId = appAttemptId.getApplicationId();
|
ApplicationId appId = appAttemptId.getApplicationId();
|
||||||
UserGroupInformation callerUGI;
|
UserGroupInformation callerUGI = getCallerUgi(appId,
|
||||||
try {
|
AuditConstants.GET_CONTAINERS);
|
||||||
callerUGI = UserGroupInformation.getCurrentUser();
|
RMApp application = verifyUserAccessForRMApp(appId, callerUGI,
|
||||||
} catch (IOException ie) {
|
AuditConstants.GET_CONTAINERS, ApplicationAccessType.VIEW_APP, false);
|
||||||
LOG.info("Error getting UGI ", ie);
|
|
||||||
throw RPCUtil.getRemoteException(ie);
|
|
||||||
}
|
|
||||||
RMApp application = this.rmContext.getRMApps().get(appId);
|
|
||||||
if (application == null) {
|
|
||||||
// If the RM doesn't have the application, throw
|
|
||||||
// ApplicationNotFoundException and let client to handle.
|
|
||||||
throw new ApplicationNotFoundException("Application with id '" + appId
|
|
||||||
+ "' doesn't exist in RM. Please check that the job submission "
|
|
||||||
+ "was successful.");
|
|
||||||
}
|
|
||||||
boolean allowAccess = checkAccess(callerUGI, application.getUser(),
|
boolean allowAccess = checkAccess(callerUGI, application.getUser(),
|
||||||
ApplicationAccessType.VIEW_APP, application);
|
ApplicationAccessType.VIEW_APP, application);
|
||||||
GetContainersResponse response = null;
|
GetContainersResponse response = null;
|
||||||
@ -600,6 +557,7 @@ public SubmitApplicationResponse submitApplication(
|
|||||||
TimelineUtils.FLOW_RUN_ID_TAG_PREFIX.toLowerCase() + ":")) {
|
TimelineUtils.FLOW_RUN_ID_TAG_PREFIX.toLowerCase() + ":")) {
|
||||||
value = tag.substring(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX.length()
|
value = tag.substring(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX.length()
|
||||||
+ 1);
|
+ 1);
|
||||||
|
// In order to check the number format
|
||||||
Long.valueOf(value);
|
Long.valueOf(value);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -676,9 +634,8 @@ public SubmitApplicationResponse submitApplication(
|
|||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
|
|
||||||
SubmitApplicationResponse response = recordFactory
|
return recordFactory
|
||||||
.newRecordInstance(SubmitApplicationResponse.class);
|
.newRecordInstance(SubmitApplicationResponse.class);
|
||||||
return response;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
@ -689,26 +646,11 @@ public FailApplicationAttemptResponse failApplicationAttempt(
|
|||||||
ApplicationAttemptId attemptId = request.getApplicationAttemptId();
|
ApplicationAttemptId attemptId = request.getApplicationAttemptId();
|
||||||
ApplicationId applicationId = attemptId.getApplicationId();
|
ApplicationId applicationId = attemptId.getApplicationId();
|
||||||
|
|
||||||
UserGroupInformation callerUGI;
|
UserGroupInformation callerUGI = getCallerUgi(applicationId,
|
||||||
try {
|
AuditConstants.FAIL_ATTEMPT_REQUEST);
|
||||||
callerUGI = UserGroupInformation.getCurrentUser();
|
RMApp application = verifyUserAccessForRMApp(applicationId, callerUGI,
|
||||||
} catch (IOException ie) {
|
AuditConstants.FAIL_ATTEMPT_REQUEST, ApplicationAccessType.MODIFY_APP,
|
||||||
LOG.info("Error getting UGI ", ie);
|
true);
|
||||||
RMAuditLogger.logFailure("UNKNOWN", AuditConstants.FAIL_ATTEMPT_REQUEST,
|
|
||||||
"UNKNOWN", "ClientRMService" , "Error getting UGI",
|
|
||||||
applicationId, attemptId);
|
|
||||||
throw RPCUtil.getRemoteException(ie);
|
|
||||||
}
|
|
||||||
|
|
||||||
RMApp application = this.rmContext.getRMApps().get(applicationId);
|
|
||||||
if (application == null) {
|
|
||||||
RMAuditLogger.logFailure(callerUGI.getUserName(),
|
|
||||||
AuditConstants.FAIL_ATTEMPT_REQUEST, "UNKNOWN", "ClientRMService",
|
|
||||||
"Trying to fail an attempt of an absent application", applicationId,
|
|
||||||
attemptId);
|
|
||||||
throw new ApplicationNotFoundException("Trying to fail an attempt "
|
|
||||||
+ attemptId + " of an absent application " + applicationId);
|
|
||||||
}
|
|
||||||
|
|
||||||
RMAppAttempt appAttempt = application.getAppAttempts().get(attemptId);
|
RMAppAttempt appAttempt = application.getAppAttempts().get(attemptId);
|
||||||
if (appAttempt == null) {
|
if (appAttempt == null) {
|
||||||
@ -716,29 +658,15 @@ public FailApplicationAttemptResponse failApplicationAttempt(
|
|||||||
"ApplicationAttempt with id '" + attemptId + "' doesn't exist in RM.");
|
"ApplicationAttempt with id '" + attemptId + "' doesn't exist in RM.");
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!checkAccess(callerUGI, application.getUser(),
|
|
||||||
ApplicationAccessType.MODIFY_APP, application)) {
|
|
||||||
RMAuditLogger.logFailure(callerUGI.getShortUserName(),
|
|
||||||
AuditConstants.FAIL_ATTEMPT_REQUEST,
|
|
||||||
"User doesn't have permissions to "
|
|
||||||
+ ApplicationAccessType.MODIFY_APP.toString(), "ClientRMService",
|
|
||||||
AuditConstants.UNAUTHORIZED_USER, applicationId);
|
|
||||||
throw RPCUtil.getRemoteException(new AccessControlException("User "
|
|
||||||
+ callerUGI.getShortUserName() + " cannot perform operation "
|
|
||||||
+ ApplicationAccessType.MODIFY_APP.name() + " on " + applicationId));
|
|
||||||
}
|
|
||||||
|
|
||||||
FailApplicationAttemptResponse response =
|
FailApplicationAttemptResponse response =
|
||||||
recordFactory.newRecordInstance(FailApplicationAttemptResponse.class);
|
recordFactory.newRecordInstance(FailApplicationAttemptResponse.class);
|
||||||
|
|
||||||
if (!ACTIVE_APP_STATES.contains(application.getState())) {
|
if (application.isAppInCompletedStates()) {
|
||||||
if (COMPLETED_APP_STATES.contains(application.getState())) {
|
|
||||||
RMAuditLogger.logSuccess(callerUGI.getShortUserName(),
|
RMAuditLogger.logSuccess(callerUGI.getShortUserName(),
|
||||||
AuditConstants.FAIL_ATTEMPT_REQUEST, "ClientRMService",
|
AuditConstants.FAIL_ATTEMPT_REQUEST, "ClientRMService",
|
||||||
applicationId);
|
applicationId);
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
this.rmContext.getDispatcher().getEventHandler().handle(
|
this.rmContext.getDispatcher().getEventHandler().handle(
|
||||||
new RMAppAttemptEvent(attemptId, RMAppAttemptEventType.FAIL,
|
new RMAppAttemptEvent(attemptId, RMAppAttemptEventType.FAIL,
|
||||||
@ -769,7 +697,6 @@ public KillApplicationResponse forceKillApplication(
|
|||||||
applicationId, callerContext);
|
applicationId, callerContext);
|
||||||
throw RPCUtil.getRemoteException(ie);
|
throw RPCUtil.getRemoteException(ie);
|
||||||
}
|
}
|
||||||
|
|
||||||
RMApp application = this.rmContext.getRMApps().get(applicationId);
|
RMApp application = this.rmContext.getRMApps().get(applicationId);
|
||||||
if (application == null) {
|
if (application == null) {
|
||||||
RMAuditLogger.logFailure(callerUGI.getUserName(),
|
RMAuditLogger.logFailure(callerUGI.getUserName(),
|
||||||
@ -1107,15 +1034,15 @@ public GetDelegationTokenResponse getDelegationToken(
|
|||||||
RMDelegationTokenIdentifier tokenIdentifier =
|
RMDelegationTokenIdentifier tokenIdentifier =
|
||||||
new RMDelegationTokenIdentifier(owner, new Text(request.getRenewer()),
|
new RMDelegationTokenIdentifier(owner, new Text(request.getRenewer()),
|
||||||
realUser);
|
realUser);
|
||||||
Token<RMDelegationTokenIdentifier> realRMDTtoken =
|
Token<RMDelegationTokenIdentifier> realRMDToken =
|
||||||
new Token<RMDelegationTokenIdentifier>(tokenIdentifier,
|
new Token<RMDelegationTokenIdentifier>(tokenIdentifier,
|
||||||
this.rmDTSecretManager);
|
this.rmDTSecretManager);
|
||||||
response.setRMDelegationToken(
|
response.setRMDelegationToken(
|
||||||
BuilderUtils.newDelegationToken(
|
BuilderUtils.newDelegationToken(
|
||||||
realRMDTtoken.getIdentifier(),
|
realRMDToken.getIdentifier(),
|
||||||
realRMDTtoken.getKind().toString(),
|
realRMDToken.getKind().toString(),
|
||||||
realRMDTtoken.getPassword(),
|
realRMDToken.getPassword(),
|
||||||
realRMDTtoken.getService().toString()
|
realRMDToken.getService().toString()
|
||||||
));
|
));
|
||||||
return response;
|
return response;
|
||||||
} catch(IOException io) {
|
} catch(IOException io) {
|
||||||
@ -1175,37 +1102,11 @@ public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues(
|
|||||||
MoveApplicationAcrossQueuesRequest request) throws YarnException {
|
MoveApplicationAcrossQueuesRequest request) throws YarnException {
|
||||||
ApplicationId applicationId = request.getApplicationId();
|
ApplicationId applicationId = request.getApplicationId();
|
||||||
|
|
||||||
UserGroupInformation callerUGI;
|
UserGroupInformation callerUGI = getCallerUgi(applicationId,
|
||||||
try {
|
AuditConstants.MOVE_APP_REQUEST);
|
||||||
callerUGI = UserGroupInformation.getCurrentUser();
|
RMApp application = verifyUserAccessForRMApp(applicationId, callerUGI,
|
||||||
} catch (IOException ie) {
|
AuditConstants.MOVE_APP_REQUEST, ApplicationAccessType.MODIFY_APP,
|
||||||
LOG.info("Error getting UGI ", ie);
|
true);
|
||||||
RMAuditLogger.logFailure("UNKNOWN", AuditConstants.MOVE_APP_REQUEST,
|
|
||||||
"UNKNOWN", "ClientRMService" , "Error getting UGI",
|
|
||||||
applicationId);
|
|
||||||
throw RPCUtil.getRemoteException(ie);
|
|
||||||
}
|
|
||||||
|
|
||||||
RMApp application = this.rmContext.getRMApps().get(applicationId);
|
|
||||||
if (application == null) {
|
|
||||||
RMAuditLogger.logFailure(callerUGI.getUserName(),
|
|
||||||
AuditConstants.MOVE_APP_REQUEST, "UNKNOWN", "ClientRMService",
|
|
||||||
"Trying to move an absent application", applicationId);
|
|
||||||
throw new ApplicationNotFoundException("Trying to move an absent"
|
|
||||||
+ " application " + applicationId);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!checkAccess(callerUGI, application.getUser(),
|
|
||||||
ApplicationAccessType.MODIFY_APP, application)) {
|
|
||||||
RMAuditLogger.logFailure(callerUGI.getShortUserName(),
|
|
||||||
AuditConstants.MOVE_APP_REQUEST,
|
|
||||||
"User doesn't have permissions to "
|
|
||||||
+ ApplicationAccessType.MODIFY_APP.toString(), "ClientRMService",
|
|
||||||
AuditConstants.UNAUTHORIZED_USER, applicationId);
|
|
||||||
throw RPCUtil.getRemoteException(new AccessControlException("User "
|
|
||||||
+ callerUGI.getShortUserName() + " cannot perform operation "
|
|
||||||
+ ApplicationAccessType.MODIFY_APP.name() + " on " + applicationId));
|
|
||||||
}
|
|
||||||
|
|
||||||
String targetQueue = request.getTargetQueue();
|
String targetQueue = request.getTargetQueue();
|
||||||
if (!accessToTargetQueueAllowed(callerUGI, application, targetQueue)) {
|
if (!accessToTargetQueueAllowed(callerUGI, application, targetQueue)) {
|
||||||
@ -1245,9 +1146,8 @@ public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues(
|
|||||||
|
|
||||||
RMAuditLogger.logSuccess(callerUGI.getShortUserName(),
|
RMAuditLogger.logSuccess(callerUGI.getShortUserName(),
|
||||||
AuditConstants.MOVE_APP_REQUEST, "ClientRMService" , applicationId);
|
AuditConstants.MOVE_APP_REQUEST, "ClientRMService" , applicationId);
|
||||||
MoveApplicationAcrossQueuesResponse response = recordFactory
|
return recordFactory
|
||||||
.newRecordInstance(MoveApplicationAcrossQueuesResponse.class);
|
.newRecordInstance(MoveApplicationAcrossQueuesResponse.class);
|
||||||
return response;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -1304,7 +1204,7 @@ public Server getServer() {
|
|||||||
@Override
|
@Override
|
||||||
public GetNewReservationResponse getNewReservation(
|
public GetNewReservationResponse getNewReservation(
|
||||||
GetNewReservationRequest request) throws YarnException, IOException {
|
GetNewReservationRequest request) throws YarnException, IOException {
|
||||||
checkReservationSytem(AuditConstants.CREATE_NEW_RESERVATION_REQUEST);
|
checkReservationSystem(AuditConstants.CREATE_NEW_RESERVATION_REQUEST);
|
||||||
GetNewReservationResponse response =
|
GetNewReservationResponse response =
|
||||||
recordFactory.newRecordInstance(GetNewReservationResponse.class);
|
recordFactory.newRecordInstance(GetNewReservationResponse.class);
|
||||||
|
|
||||||
@ -1318,7 +1218,7 @@ public GetNewReservationResponse getNewReservation(
|
|||||||
public ReservationSubmissionResponse submitReservation(
|
public ReservationSubmissionResponse submitReservation(
|
||||||
ReservationSubmissionRequest request) throws YarnException, IOException {
|
ReservationSubmissionRequest request) throws YarnException, IOException {
|
||||||
// Check if reservation system is enabled
|
// Check if reservation system is enabled
|
||||||
checkReservationSytem(AuditConstants.SUBMIT_RESERVATION_REQUEST);
|
checkReservationSystem(AuditConstants.SUBMIT_RESERVATION_REQUEST);
|
||||||
ReservationSubmissionResponse response =
|
ReservationSubmissionResponse response =
|
||||||
recordFactory.newRecordInstance(ReservationSubmissionResponse.class);
|
recordFactory.newRecordInstance(ReservationSubmissionResponse.class);
|
||||||
ReservationId reservationId = request.getReservationId();
|
ReservationId reservationId = request.getReservationId();
|
||||||
@ -1377,7 +1277,7 @@ public ReservationSubmissionResponse submitReservation(
|
|||||||
public ReservationUpdateResponse updateReservation(
|
public ReservationUpdateResponse updateReservation(
|
||||||
ReservationUpdateRequest request) throws YarnException, IOException {
|
ReservationUpdateRequest request) throws YarnException, IOException {
|
||||||
// Check if reservation system is enabled
|
// Check if reservation system is enabled
|
||||||
checkReservationSytem(AuditConstants.UPDATE_RESERVATION_REQUEST);
|
checkReservationSystem(AuditConstants.UPDATE_RESERVATION_REQUEST);
|
||||||
ReservationUpdateResponse response =
|
ReservationUpdateResponse response =
|
||||||
recordFactory.newRecordInstance(ReservationUpdateResponse.class);
|
recordFactory.newRecordInstance(ReservationUpdateResponse.class);
|
||||||
// Validate the input
|
// Validate the input
|
||||||
@ -1416,7 +1316,7 @@ public ReservationUpdateResponse updateReservation(
|
|||||||
public ReservationDeleteResponse deleteReservation(
|
public ReservationDeleteResponse deleteReservation(
|
||||||
ReservationDeleteRequest request) throws YarnException, IOException {
|
ReservationDeleteRequest request) throws YarnException, IOException {
|
||||||
// Check if reservation system is enabled
|
// Check if reservation system is enabled
|
||||||
checkReservationSytem(AuditConstants.DELETE_RESERVATION_REQUEST);
|
checkReservationSystem(AuditConstants.DELETE_RESERVATION_REQUEST);
|
||||||
ReservationDeleteResponse response =
|
ReservationDeleteResponse response =
|
||||||
recordFactory.newRecordInstance(ReservationDeleteResponse.class);
|
recordFactory.newRecordInstance(ReservationDeleteResponse.class);
|
||||||
// Validate the input
|
// Validate the input
|
||||||
@ -1455,7 +1355,7 @@ public ReservationDeleteResponse deleteReservation(
|
|||||||
public ReservationListResponse listReservations(
|
public ReservationListResponse listReservations(
|
||||||
ReservationListRequest requestInfo) throws YarnException, IOException {
|
ReservationListRequest requestInfo) throws YarnException, IOException {
|
||||||
// Check if reservation system is enabled
|
// Check if reservation system is enabled
|
||||||
checkReservationSytem(AuditConstants.LIST_RESERVATION_REQUEST);
|
checkReservationSystem(AuditConstants.LIST_RESERVATION_REQUEST);
|
||||||
ReservationListResponse response =
|
ReservationListResponse response =
|
||||||
recordFactory.newRecordInstance(ReservationListResponse.class);
|
recordFactory.newRecordInstance(ReservationListResponse.class);
|
||||||
|
|
||||||
@ -1495,9 +1395,7 @@ public ReservationListResponse listReservations(
|
|||||||
public GetNodesToLabelsResponse getNodeToLabels(
|
public GetNodesToLabelsResponse getNodeToLabels(
|
||||||
GetNodesToLabelsRequest request) throws YarnException, IOException {
|
GetNodesToLabelsRequest request) throws YarnException, IOException {
|
||||||
RMNodeLabelsManager labelsMgr = rmContext.getNodeLabelManager();
|
RMNodeLabelsManager labelsMgr = rmContext.getNodeLabelManager();
|
||||||
GetNodesToLabelsResponse response =
|
return GetNodesToLabelsResponse.newInstance(labelsMgr.getNodeLabels());
|
||||||
GetNodesToLabelsResponse.newInstance(labelsMgr.getNodeLabels());
|
|
||||||
return response;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -1505,8 +1403,7 @@ public GetLabelsToNodesResponse getLabelsToNodes(
|
|||||||
GetLabelsToNodesRequest request) throws YarnException, IOException {
|
GetLabelsToNodesRequest request) throws YarnException, IOException {
|
||||||
RMNodeLabelsManager labelsMgr = rmContext.getNodeLabelManager();
|
RMNodeLabelsManager labelsMgr = rmContext.getNodeLabelManager();
|
||||||
if (request.getNodeLabels() == null || request.getNodeLabels().isEmpty()) {
|
if (request.getNodeLabels() == null || request.getNodeLabels().isEmpty()) {
|
||||||
return GetLabelsToNodesResponse.newInstance(
|
return GetLabelsToNodesResponse.newInstance(labelsMgr.getLabelsToNodes());
|
||||||
labelsMgr.getLabelsToNodes());
|
|
||||||
} else {
|
} else {
|
||||||
return GetLabelsToNodesResponse.newInstance(
|
return GetLabelsToNodesResponse.newInstance(
|
||||||
labelsMgr.getLabelsToNodes(request.getNodeLabels()));
|
labelsMgr.getLabelsToNodes(request.getNodeLabels()));
|
||||||
@ -1517,13 +1414,12 @@ public GetLabelsToNodesResponse getLabelsToNodes(
|
|||||||
public GetClusterNodeLabelsResponse getClusterNodeLabels(
|
public GetClusterNodeLabelsResponse getClusterNodeLabels(
|
||||||
GetClusterNodeLabelsRequest request) throws YarnException, IOException {
|
GetClusterNodeLabelsRequest request) throws YarnException, IOException {
|
||||||
RMNodeLabelsManager labelsMgr = rmContext.getNodeLabelManager();
|
RMNodeLabelsManager labelsMgr = rmContext.getNodeLabelManager();
|
||||||
GetClusterNodeLabelsResponse response =
|
return GetClusterNodeLabelsResponse.newInstance(
|
||||||
GetClusterNodeLabelsResponse.newInstance(
|
|
||||||
labelsMgr.getClusterNodeLabels());
|
labelsMgr.getClusterNodeLabels());
|
||||||
return response;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void checkReservationSytem(String auditConstant) throws YarnException {
|
private void checkReservationSystem(String auditConstant)
|
||||||
|
throws YarnException {
|
||||||
// Check if reservation is enabled
|
// Check if reservation is enabled
|
||||||
if (reservationSystem == null) {
|
if (reservationSystem == null) {
|
||||||
throw RPCUtil.getRemoteException("Reservation is not enabled."
|
throw RPCUtil.getRemoteException("Reservation is not enabled."
|
||||||
@ -1642,7 +1538,8 @@ public UpdateApplicationPriorityResponse updateApplicationPriority(
|
|||||||
UserGroupInformation callerUGI =
|
UserGroupInformation callerUGI =
|
||||||
getCallerUgi(applicationId, AuditConstants.UPDATE_APP_PRIORITY);
|
getCallerUgi(applicationId, AuditConstants.UPDATE_APP_PRIORITY);
|
||||||
RMApp application = verifyUserAccessForRMApp(applicationId, callerUGI,
|
RMApp application = verifyUserAccessForRMApp(applicationId, callerUGI,
|
||||||
AuditConstants.UPDATE_APP_PRIORITY);
|
AuditConstants.UPDATE_APP_PRIORITY, ApplicationAccessType.MODIFY_APP,
|
||||||
|
true);
|
||||||
|
|
||||||
UpdateApplicationPriorityResponse response = recordFactory
|
UpdateApplicationPriorityResponse response = recordFactory
|
||||||
.newRecordInstance(UpdateApplicationPriorityResponse.class);
|
.newRecordInstance(UpdateApplicationPriorityResponse.class);
|
||||||
@ -1685,9 +1582,14 @@ public UpdateApplicationPriorityResponse updateApplicationPriority(
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Signal a container.
|
* Send a signal to a container.
|
||||||
|
*
|
||||||
* After the request passes some sanity check, it will be delivered
|
* After the request passes some sanity check, it will be delivered
|
||||||
* to RMNodeImpl so that the next NM heartbeat will pick up the signal request
|
* to RMNodeImpl so that the next NM heartbeat will pick up the signal request
|
||||||
|
* @param request request to signal a container
|
||||||
|
* @return the response of sending signal request
|
||||||
|
* @throws YarnException rpc related exception
|
||||||
|
* @throws IOException fail to obtain user group information
|
||||||
*/
|
*/
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
@Override
|
@Override
|
||||||
@ -1759,7 +1661,8 @@ public UpdateApplicationTimeoutsResponse updateApplicationTimeouts(
|
|||||||
UserGroupInformation callerUGI =
|
UserGroupInformation callerUGI =
|
||||||
getCallerUgi(applicationId, AuditConstants.UPDATE_APP_TIMEOUTS);
|
getCallerUgi(applicationId, AuditConstants.UPDATE_APP_TIMEOUTS);
|
||||||
RMApp application = verifyUserAccessForRMApp(applicationId, callerUGI,
|
RMApp application = verifyUserAccessForRMApp(applicationId, callerUGI,
|
||||||
AuditConstants.UPDATE_APP_TIMEOUTS);
|
AuditConstants.UPDATE_APP_TIMEOUTS, ApplicationAccessType.MODIFY_APP,
|
||||||
|
true);
|
||||||
|
|
||||||
if (applicationTimeouts.isEmpty()) {
|
if (applicationTimeouts.isEmpty()) {
|
||||||
String message =
|
String message =
|
||||||
@ -1778,7 +1681,7 @@ public UpdateApplicationTimeoutsResponse updateApplicationTimeouts(
|
|||||||
if (!EnumSet
|
if (!EnumSet
|
||||||
.of(RMAppState.SUBMITTED, RMAppState.ACCEPTED, RMAppState.RUNNING)
|
.of(RMAppState.SUBMITTED, RMAppState.ACCEPTED, RMAppState.RUNNING)
|
||||||
.contains(state)) {
|
.contains(state)) {
|
||||||
if (COMPLETED_APP_STATES.contains(state)) {
|
if (application.isAppInCompletedStates()) {
|
||||||
// If Application is in any of the final states, update timeout
|
// If Application is in any of the final states, update timeout
|
||||||
// can be skipped rather throwing exception.
|
// can be skipped rather throwing exception.
|
||||||
RMAuditLogger.logSuccess(callerUGI.getShortUserName(),
|
RMAuditLogger.logSuccess(callerUGI.getShortUserName(),
|
||||||
@ -1823,26 +1726,35 @@ private UserGroupInformation getCallerUgi(ApplicationId applicationId,
|
|||||||
}
|
}
|
||||||
|
|
||||||
private RMApp verifyUserAccessForRMApp(ApplicationId applicationId,
|
private RMApp verifyUserAccessForRMApp(ApplicationId applicationId,
|
||||||
UserGroupInformation callerUGI, String operation) throws YarnException {
|
UserGroupInformation callerUGI, String operation,
|
||||||
|
ApplicationAccessType accessType,
|
||||||
|
boolean needCheckAccess) throws YarnException {
|
||||||
RMApp application = this.rmContext.getRMApps().get(applicationId);
|
RMApp application = this.rmContext.getRMApps().get(applicationId);
|
||||||
if (application == null) {
|
if (application == null) {
|
||||||
RMAuditLogger.logFailure(callerUGI.getUserName(), operation, "UNKNOWN",
|
RMAuditLogger.logFailure(callerUGI.getUserName(), operation, "UNKNOWN",
|
||||||
"ClientRMService",
|
"ClientRMService",
|
||||||
"Trying to " + operation + " of an absent application",
|
"Trying to " + operation + " of an absent application",
|
||||||
applicationId);
|
applicationId);
|
||||||
throw new ApplicationNotFoundException("Trying to " + operation
|
// If the RM doesn't have the application, throw
|
||||||
+ " of an absent application " + applicationId);
|
// ApplicationNotFoundException and let client to handle.
|
||||||
|
throw new ApplicationNotFoundException("Application with id '"
|
||||||
|
+ applicationId + "' doesn't exist in RM. "
|
||||||
|
+ "Please check that the job "
|
||||||
|
+ "submission was successful.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (needCheckAccess) {
|
||||||
if (!checkAccess(callerUGI, application.getUser(),
|
if (!checkAccess(callerUGI, application.getUser(),
|
||||||
ApplicationAccessType.MODIFY_APP, application)) {
|
accessType, application)) {
|
||||||
RMAuditLogger.logFailure(callerUGI.getShortUserName(), operation,
|
RMAuditLogger.logFailure(callerUGI.getShortUserName(), operation,
|
||||||
"User doesn't have permissions to "
|
"User doesn't have permissions to "
|
||||||
+ ApplicationAccessType.MODIFY_APP.toString(),
|
+ accessType.toString(),
|
||||||
"ClientRMService", AuditConstants.UNAUTHORIZED_USER, applicationId);
|
"ClientRMService", AuditConstants.UNAUTHORIZED_USER,
|
||||||
|
applicationId);
|
||||||
throw RPCUtil.getRemoteException(new AccessControlException("User "
|
throw RPCUtil.getRemoteException(new AccessControlException("User "
|
||||||
+ callerUGI.getShortUserName() + " cannot perform operation "
|
+ callerUGI.getShortUserName() + " cannot perform operation "
|
||||||
+ ApplicationAccessType.MODIFY_APP.name() + " on " + applicationId));
|
+ accessType.name() + " on " + applicationId));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return application;
|
return application;
|
||||||
}
|
}
|
||||||
|
@ -54,6 +54,11 @@ public static class AuditConstants {
|
|||||||
public static final String GET_APP_STATE = "Get Application State";
|
public static final String GET_APP_STATE = "Get Application State";
|
||||||
public static final String GET_APP_PRIORITY = "Get Application Priority";
|
public static final String GET_APP_PRIORITY = "Get Application Priority";
|
||||||
public static final String GET_APP_QUEUE = "Get Application Queue";
|
public static final String GET_APP_QUEUE = "Get Application Queue";
|
||||||
|
public static final String GET_APP_ATTEMPTS = "Get Application Attempts";
|
||||||
|
public static final String GET_APP_ATTEMPT_REPORT
|
||||||
|
= "Get Application Attempt Report";
|
||||||
|
public static final String GET_CONTAINERS = "Get Containers";
|
||||||
|
public static final String GET_CONTAINER_REPORT = "Get Container Report";
|
||||||
public static final String FINISH_SUCCESS_APP = "Application Finished - Succeeded";
|
public static final String FINISH_SUCCESS_APP = "Application Finished - Succeeded";
|
||||||
public static final String FINISH_FAILED_APP = "Application Finished - Failed";
|
public static final String FINISH_FAILED_APP = "Application Finished - Failed";
|
||||||
public static final String FINISH_KILLED_APP = "Application Finished - Killed";
|
public static final String FINISH_KILLED_APP = "Application Finished - Killed";
|
||||||
|
Loading…
Reference in New Issue
Block a user