YARN-1115: Provide optional means for a scheduler to check real user ACLs. Contributed by Eric Payne (epayne)
This commit is contained in:
parent
20aeb5ecc3
commit
d286994009
@ -600,10 +600,12 @@ public SubmitApplicationResponse submitApplication(
|
|||||||
// checked here, those that are dependent on RM configuration are validated
|
// checked here, those that are dependent on RM configuration are validated
|
||||||
// in RMAppManager.
|
// in RMAppManager.
|
||||||
|
|
||||||
|
UserGroupInformation userUgi = null;
|
||||||
String user = null;
|
String user = null;
|
||||||
try {
|
try {
|
||||||
// Safety
|
// Safety
|
||||||
user = UserGroupInformation.getCurrentUser().getShortUserName();
|
userUgi = UserGroupInformation.getCurrentUser();
|
||||||
|
user = userUgi.getShortUserName();
|
||||||
} catch (IOException ie) {
|
} catch (IOException ie) {
|
||||||
LOG.warn("Unable to get the current user.", ie);
|
LOG.warn("Unable to get the current user.", ie);
|
||||||
RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
|
RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
|
||||||
@ -694,7 +696,7 @@ public SubmitApplicationResponse submitApplication(
|
|||||||
try {
|
try {
|
||||||
// call RMAppManager to submit application directly
|
// call RMAppManager to submit application directly
|
||||||
rmAppManager.submitApplication(submissionContext,
|
rmAppManager.submitApplication(submissionContext,
|
||||||
System.currentTimeMillis(), user);
|
System.currentTimeMillis(), userUgi);
|
||||||
|
|
||||||
LOG.info("Application with id " + applicationId.getId() +
|
LOG.info("Application with id " + applicationId.getId() +
|
||||||
" submitted by user " + user);
|
" submitted by user " + user);
|
||||||
|
@ -352,16 +352,26 @@ protected synchronized void checkAppNumCompletedLimit() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@VisibleForTesting
|
||||||
|
@Deprecated
|
||||||
protected void submitApplication(
|
protected void submitApplication(
|
||||||
ApplicationSubmissionContext submissionContext, long submitTime,
|
ApplicationSubmissionContext submissionContext, long submitTime,
|
||||||
String user) throws YarnException {
|
String user) throws YarnException {
|
||||||
|
submitApplication(submissionContext, submitTime,
|
||||||
|
UserGroupInformation.createRemoteUser(user));
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
protected void submitApplication(
|
||||||
|
ApplicationSubmissionContext submissionContext, long submitTime,
|
||||||
|
UserGroupInformation userUgi) throws YarnException {
|
||||||
ApplicationId applicationId = submissionContext.getApplicationId();
|
ApplicationId applicationId = submissionContext.getApplicationId();
|
||||||
|
|
||||||
// Passing start time as -1. It will be eventually set in RMAppImpl
|
// Passing start time as -1. It will be eventually set in RMAppImpl
|
||||||
// constructor.
|
// constructor.
|
||||||
RMAppImpl application = createAndPopulateNewRMApp(
|
RMAppImpl application = createAndPopulateNewRMApp(
|
||||||
submissionContext, submitTime, user, false, -1, null);
|
submissionContext, submitTime, userUgi, false, -1, null);
|
||||||
try {
|
try {
|
||||||
if (UserGroupInformation.isSecurityEnabled()) {
|
if (UserGroupInformation.isSecurityEnabled()) {
|
||||||
this.rmContext.getDelegationTokenRenewer()
|
this.rmContext.getDelegationTokenRenewer()
|
||||||
@ -394,11 +404,21 @@ protected void recoverApplication(ApplicationStateData appState,
|
|||||||
ApplicationSubmissionContext appContext =
|
ApplicationSubmissionContext appContext =
|
||||||
appState.getApplicationSubmissionContext();
|
appState.getApplicationSubmissionContext();
|
||||||
ApplicationId appId = appContext.getApplicationId();
|
ApplicationId appId = appContext.getApplicationId();
|
||||||
|
UserGroupInformation userUgi = null;
|
||||||
|
if (appState.getRealUser() != null) {
|
||||||
|
UserGroupInformation realUserUgi = null;
|
||||||
|
realUserUgi =
|
||||||
|
UserGroupInformation.createRemoteUser(appState.getRealUser());
|
||||||
|
userUgi = UserGroupInformation.createProxyUser(appState.getUser(),
|
||||||
|
realUserUgi);
|
||||||
|
} else {
|
||||||
|
userUgi = UserGroupInformation.createRemoteUser(appState.getUser());
|
||||||
|
}
|
||||||
|
|
||||||
// create and recover app.
|
// create and recover app.
|
||||||
RMAppImpl application =
|
RMAppImpl application =
|
||||||
createAndPopulateNewRMApp(appContext, appState.getSubmitTime(),
|
createAndPopulateNewRMApp(appContext, appState.getSubmitTime(),
|
||||||
appState.getUser(), true, appState.getStartTime(),
|
userUgi, true, appState.getStartTime(),
|
||||||
appState.getState());
|
appState.getState());
|
||||||
|
|
||||||
application.handle(new RMAppRecoverEvent(appId, rmState));
|
application.handle(new RMAppRecoverEvent(appId, rmState));
|
||||||
@ -406,8 +426,9 @@ protected void recoverApplication(ApplicationStateData appState,
|
|||||||
|
|
||||||
private RMAppImpl createAndPopulateNewRMApp(
|
private RMAppImpl createAndPopulateNewRMApp(
|
||||||
ApplicationSubmissionContext submissionContext, long submitTime,
|
ApplicationSubmissionContext submissionContext, long submitTime,
|
||||||
String user, boolean isRecovery, long startTime,
|
UserGroupInformation userUgi, boolean isRecovery, long startTime,
|
||||||
RMAppState recoveredFinalState) throws YarnException {
|
RMAppState recoveredFinalState) throws YarnException {
|
||||||
|
String user = userUgi.getShortUserName();
|
||||||
|
|
||||||
ApplicationPlacementContext placementContext = null;
|
ApplicationPlacementContext placementContext = null;
|
||||||
if (recoveredFinalState == null) {
|
if (recoveredFinalState == null) {
|
||||||
@ -431,7 +452,6 @@ private RMAppImpl createAndPopulateNewRMApp(
|
|||||||
|
|
||||||
// Verify and get the update application priority and set back to
|
// Verify and get the update application priority and set back to
|
||||||
// submissionContext
|
// submissionContext
|
||||||
UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(user);
|
|
||||||
|
|
||||||
// Application priority needed to be validated only while submitting. During
|
// Application priority needed to be validated only while submitting. During
|
||||||
// recovery, validated priority could be recovered from submission context.
|
// recovery, validated priority could be recovered from submission context.
|
||||||
@ -519,7 +539,7 @@ private RMAppImpl createAndPopulateNewRMApp(
|
|||||||
// Create RMApp
|
// Create RMApp
|
||||||
RMAppImpl application =
|
RMAppImpl application =
|
||||||
new RMAppImpl(applicationId, rmContext, this.conf,
|
new RMAppImpl(applicationId, rmContext, this.conf,
|
||||||
submissionContext.getApplicationName(), user,
|
submissionContext.getApplicationName(), userUgi,
|
||||||
placementQueueName,
|
placementQueueName,
|
||||||
submissionContext, this.scheduler, this.masterService,
|
submissionContext, this.scheduler, this.masterService,
|
||||||
submitTime, submissionContext.getApplicationType(),
|
submitTime, submissionContext.getApplicationType(),
|
||||||
|
@ -937,7 +937,8 @@ public void storeNewApplication(RMApp app) {
|
|||||||
assert context instanceof ApplicationSubmissionContextPBImpl;
|
assert context instanceof ApplicationSubmissionContextPBImpl;
|
||||||
ApplicationStateData appState =
|
ApplicationStateData appState =
|
||||||
ApplicationStateData.newInstance(app.getSubmitTime(),
|
ApplicationStateData.newInstance(app.getSubmitTime(),
|
||||||
app.getStartTime(), context, app.getUser(), app.getCallerContext());
|
app.getStartTime(), context, app.getUser(), app.getRealUser(),
|
||||||
|
app.getCallerContext());
|
||||||
appState.setApplicationTimeouts(app.getApplicationTimeouts());
|
appState.setApplicationTimeouts(app.getApplicationTimeouts());
|
||||||
getRMStateStoreEventHandler().handle(new RMStateStoreAppEvent(appState));
|
getRMStateStoreEventHandler().handle(new RMStateStoreAppEvent(appState));
|
||||||
}
|
}
|
||||||
@ -1170,7 +1171,7 @@ public void removeApplication(RMApp app) {
|
|||||||
ApplicationStateData appState =
|
ApplicationStateData appState =
|
||||||
ApplicationStateData.newInstance(app.getSubmitTime(),
|
ApplicationStateData.newInstance(app.getSubmitTime(),
|
||||||
app.getStartTime(), app.getApplicationSubmissionContext(),
|
app.getStartTime(), app.getApplicationSubmissionContext(),
|
||||||
app.getUser(), app.getCallerContext());
|
app.getUser(), app.getRealUser(), app.getCallerContext());
|
||||||
for(RMAppAttempt appAttempt : app.getAppAttempts().values()) {
|
for(RMAppAttempt appAttempt : app.getAppAttempts().values()) {
|
||||||
appState.attempts.put(appAttempt.getAppAttemptId(), null);
|
appState.attempts.put(appAttempt.getAppAttemptId(), null);
|
||||||
}
|
}
|
||||||
|
@ -92,9 +92,46 @@ public static ApplicationStateData newInstance(long submitTime,
|
|||||||
|
|
||||||
public static ApplicationStateData newInstance(long submitTime,
|
public static ApplicationStateData newInstance(long submitTime,
|
||||||
long startTime, ApplicationSubmissionContext context, String user) {
|
long startTime, ApplicationSubmissionContext context, String user) {
|
||||||
return newInstance(submitTime, startTime, context, user, null);
|
return newInstance(submitTime, startTime, context, user,
|
||||||
|
(CallerContext) null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static ApplicationStateData newInstance(long submitTime,
|
||||||
|
long startTime, String user, String realUser,
|
||||||
|
ApplicationSubmissionContext submissionContext, RMAppState state,
|
||||||
|
String diagnostics, long launchTime, long finishTime,
|
||||||
|
CallerContext callerContext) {
|
||||||
|
ApplicationStateData appState =
|
||||||
|
newInstance(submitTime, startTime, user, submissionContext, state,
|
||||||
|
diagnostics, launchTime, finishTime, callerContext);
|
||||||
|
if (realUser != null) {
|
||||||
|
appState.setRealUser(realUser);
|
||||||
|
}
|
||||||
|
return appState;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static ApplicationStateData newInstance(long submitTime,
|
||||||
|
long startTime, String user, String realUser,
|
||||||
|
ApplicationSubmissionContext submissionContext, RMAppState state,
|
||||||
|
String diagnostics, long launchTime, long finishTime,
|
||||||
|
CallerContext callerContext,
|
||||||
|
Map<ApplicationTimeoutType, Long> applicationTimeouts) {
|
||||||
|
ApplicationStateData appState =
|
||||||
|
newInstance(submitTime, startTime, user, submissionContext, state,
|
||||||
|
diagnostics, launchTime, finishTime, callerContext, applicationTimeouts);
|
||||||
|
if (realUser != null) {
|
||||||
|
appState.setRealUser(realUser);
|
||||||
|
}
|
||||||
|
return appState;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static ApplicationStateData newInstance(long submitTime,
|
||||||
|
long startTime, ApplicationSubmissionContext context, String user,
|
||||||
|
String realUser, CallerContext callerContext) {
|
||||||
|
return newInstance(submitTime, startTime, user, realUser, context, null, "",
|
||||||
|
0, 0, callerContext);
|
||||||
|
}
|
||||||
|
|
||||||
public int getAttemptCount() {
|
public int getAttemptCount() {
|
||||||
return attempts.size();
|
return attempts.size();
|
||||||
}
|
}
|
||||||
@ -213,4 +250,8 @@ public abstract void setApplicationSubmissionContext(
|
|||||||
@Public
|
@Public
|
||||||
public abstract void setApplicationTimeouts(
|
public abstract void setApplicationTimeouts(
|
||||||
Map<ApplicationTimeoutType, Long> applicationTimeouts);
|
Map<ApplicationTimeoutType, Long> applicationTimeouts);
|
||||||
|
|
||||||
|
public abstract String getRealUser();
|
||||||
|
|
||||||
|
public abstract void setRealUser(String realUser);
|
||||||
}
|
}
|
||||||
|
@ -148,7 +148,22 @@ public void setUser(String user) {
|
|||||||
maybeInitBuilder();
|
maybeInitBuilder();
|
||||||
builder.setUser(user);
|
builder.setUser(user);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getRealUser() {
|
||||||
|
ApplicationStateDataProtoOrBuilder p = viaProto ? proto : builder;
|
||||||
|
if (!p.hasRealUser()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return (p.getRealUser());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setRealUser(String realUser) {
|
||||||
|
maybeInitBuilder();
|
||||||
|
builder.setRealUser(realUser);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ApplicationSubmissionContext getApplicationSubmissionContext() {
|
public ApplicationSubmissionContext getApplicationSubmissionContext() {
|
||||||
ApplicationStateDataProtoOrBuilder p = viaProto ? proto : builder;
|
ApplicationStateDataProtoOrBuilder p = viaProto ? proto : builder;
|
||||||
|
@ -325,4 +325,6 @@ ApplicationReport createAndGetApplicationReport(String clientUserName,
|
|||||||
* @return Map of envs related to application scheduling preferences.
|
* @return Map of envs related to application scheduling preferences.
|
||||||
*/
|
*/
|
||||||
Map<String, String> getApplicationSchedulingEnvs();
|
Map<String, String> getApplicationSchedulingEnvs();
|
||||||
|
|
||||||
|
String getRealUser();
|
||||||
}
|
}
|
||||||
|
@ -134,6 +134,7 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||||||
private final RMContext rmContext;
|
private final RMContext rmContext;
|
||||||
private final Configuration conf;
|
private final Configuration conf;
|
||||||
private final String user;
|
private final String user;
|
||||||
|
private final UserGroupInformation userUgi;
|
||||||
private final String name;
|
private final String name;
|
||||||
private final ApplicationSubmissionContext submissionContext;
|
private final ApplicationSubmissionContext submissionContext;
|
||||||
private final Dispatcher dispatcher;
|
private final Dispatcher dispatcher;
|
||||||
@ -421,7 +422,19 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
|
|||||||
String applicationType, Set<String> applicationTags,
|
String applicationType, Set<String> applicationTags,
|
||||||
List<ResourceRequest> amReqs, ApplicationPlacementContext
|
List<ResourceRequest> amReqs, ApplicationPlacementContext
|
||||||
placementContext, long startTime) {
|
placementContext, long startTime) {
|
||||||
|
this(applicationId, rmContext, config, name,
|
||||||
|
(user != null ? UserGroupInformation.createRemoteUser(user) : null),
|
||||||
|
queue, submissionContext, scheduler, masterService, submitTime,
|
||||||
|
applicationType, applicationTags, amReqs, placementContext, startTime);
|
||||||
|
}
|
||||||
|
|
||||||
|
public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
|
||||||
|
Configuration config, String name, UserGroupInformation userUgi,
|
||||||
|
String queue, ApplicationSubmissionContext submissionContext,
|
||||||
|
YarnScheduler scheduler, ApplicationMasterService masterService,
|
||||||
|
long submitTime, String applicationType, Set<String> applicationTags,
|
||||||
|
List<ResourceRequest> amReqs, ApplicationPlacementContext
|
||||||
|
placementContext, long startTime) {
|
||||||
this.systemClock = SystemClock.getInstance();
|
this.systemClock = SystemClock.getInstance();
|
||||||
|
|
||||||
this.applicationId = applicationId;
|
this.applicationId = applicationId;
|
||||||
@ -430,7 +443,9 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
|
|||||||
this.dispatcher = rmContext.getDispatcher();
|
this.dispatcher = rmContext.getDispatcher();
|
||||||
this.handler = dispatcher.getEventHandler();
|
this.handler = dispatcher.getEventHandler();
|
||||||
this.conf = config;
|
this.conf = config;
|
||||||
this.user = StringInterner.weakIntern(user);
|
this.user = StringInterner.weakIntern(
|
||||||
|
(userUgi != null) ? userUgi.getShortUserName() : null);
|
||||||
|
this.userUgi = userUgi;
|
||||||
this.queue = queue;
|
this.queue = queue;
|
||||||
this.submissionContext = submissionContext;
|
this.submissionContext = submissionContext;
|
||||||
this.scheduler = scheduler;
|
this.scheduler = scheduler;
|
||||||
@ -1313,7 +1328,7 @@ private void rememberTargetTransitionsAndStoreState(RMAppEvent event,
|
|||||||
|
|
||||||
ApplicationStateData appState =
|
ApplicationStateData appState =
|
||||||
ApplicationStateData.newInstance(this.submitTime, this.startTime,
|
ApplicationStateData.newInstance(this.submitTime, this.startTime,
|
||||||
this.user, this.submissionContext,
|
this.getUser(), this.getRealUser(), this.submissionContext,
|
||||||
stateToBeStored, diags, this.launchTime, this.storedFinishTime,
|
stateToBeStored, diags, this.launchTime, this.storedFinishTime,
|
||||||
this.callerContext);
|
this.callerContext);
|
||||||
appState.setApplicationTimeouts(this.applicationTimeouts);
|
appState.setApplicationTimeouts(this.applicationTimeouts);
|
||||||
@ -1902,4 +1917,10 @@ long getLogAggregationStartTime() {
|
|||||||
Clock getSystemClock() {
|
Clock getSystemClock() {
|
||||||
return systemClock;
|
return systemClock;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getRealUser() {
|
||||||
|
UserGroupInformation realUserUgi = this.userUgi.getRealUser();
|
||||||
|
return (realUserUgi != null) ? realUserUgi.getShortUserName() : null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -937,6 +937,10 @@ private void addApplicationOnRecovery(ApplicationId applicationId,
|
|||||||
} catch (AccessControlException ace) {
|
} catch (AccessControlException ace) {
|
||||||
// Ignore the exception for recovered app as the app was previously
|
// Ignore the exception for recovered app as the app was previously
|
||||||
// accepted.
|
// accepted.
|
||||||
|
LOG.warn("AccessControlException received when trying to recover "
|
||||||
|
+ applicationId + " in queue " + queueName + " for user " + user
|
||||||
|
+ ". Since the app was in the queue prior to recovery, the Capacity"
|
||||||
|
+ " Scheduler will recover the app anyway.", ace);
|
||||||
}
|
}
|
||||||
queue.getMetrics().submitApp(user, unmanagedAM);
|
queue.getMetrics().submitApp(user, unmanagedAM);
|
||||||
|
|
||||||
@ -2948,7 +2952,7 @@ public Priority updateApplicationPriority(Priority newPriority,
|
|||||||
ApplicationStateData appState = ApplicationStateData.newInstance(
|
ApplicationStateData appState = ApplicationStateData.newInstance(
|
||||||
rmApp.getSubmitTime(), rmApp.getStartTime(),
|
rmApp.getSubmitTime(), rmApp.getStartTime(),
|
||||||
rmApp.getApplicationSubmissionContext(), rmApp.getUser(),
|
rmApp.getApplicationSubmissionContext(), rmApp.getUser(),
|
||||||
rmApp.getCallerContext());
|
rmApp.getRealUser(), rmApp.getCallerContext());
|
||||||
appState.setApplicationTimeouts(rmApp.getApplicationTimeouts());
|
appState.setApplicationTimeouts(rmApp.getApplicationTimeouts());
|
||||||
appState.setLaunchTime(rmApp.getLaunchTime());
|
appState.setLaunchTime(rmApp.getLaunchTime());
|
||||||
rmContext.getStateStore().updateApplicationStateSynchronously(appState,
|
rmContext.getStateStore().updateApplicationStateSynchronously(appState,
|
||||||
|
@ -72,6 +72,7 @@ message ApplicationStateDataProto {
|
|||||||
optional hadoop.common.RPCCallerContextProto caller_context = 8;
|
optional hadoop.common.RPCCallerContextProto caller_context = 8;
|
||||||
repeated ApplicationTimeoutMapProto application_timeouts = 9;
|
repeated ApplicationTimeoutMapProto application_timeouts = 9;
|
||||||
optional int64 launch_time = 10;
|
optional int64 launch_time = 10;
|
||||||
|
optional string real_user = 11;
|
||||||
}
|
}
|
||||||
|
|
||||||
message ApplicationAttemptStateDataProto {
|
message ApplicationAttemptStateDataProto {
|
||||||
|
@ -19,6 +19,7 @@
|
|||||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
@ -71,7 +72,7 @@ public void submitApplication(
|
|||||||
ApplicationSubmissionContext submissionContext, String user)
|
ApplicationSubmissionContext submissionContext, String user)
|
||||||
throws YarnException {
|
throws YarnException {
|
||||||
super.submitApplication(submissionContext, System.currentTimeMillis(),
|
super.submitApplication(submissionContext, System.currentTimeMillis(),
|
||||||
user);
|
UserGroupInformation.createRemoteUser(user));
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getUserNameForPlacement(final String user,
|
public String getUserNameForPlacement(final String user,
|
||||||
|
@ -26,6 +26,7 @@
|
|||||||
import org.apache.hadoop.ha.HAServiceProtocol;
|
import org.apache.hadoop.ha.HAServiceProtocol;
|
||||||
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||||
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
|
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
import org.apache.hadoop.yarn.conf.HAUtil;
|
import org.apache.hadoop.yarn.conf.HAUtil;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
@ -169,7 +170,8 @@ public MyRMAppManager(RMContext context, YarnScheduler scheduler,
|
|||||||
@Override
|
@Override
|
||||||
protected void submitApplication(
|
protected void submitApplication(
|
||||||
ApplicationSubmissionContext submissionContext, long submitTime,
|
ApplicationSubmissionContext submissionContext, long submitTime,
|
||||||
String user) throws YarnException {
|
UserGroupInformation userUgi) throws YarnException {
|
||||||
|
String user = userUgi.getShortUserName();
|
||||||
//Do nothing, just add the application to RMContext
|
//Do nothing, just add the application to RMContext
|
||||||
RMAppImpl application =
|
RMAppImpl application =
|
||||||
new RMAppImpl(submissionContext.getApplicationId(), this.rmContext,
|
new RMAppImpl(submissionContext.getApplicationId(), this.rmContext,
|
||||||
|
@ -67,6 +67,10 @@ public String getUser() {
|
|||||||
throw new UnsupportedOperationException("Not supported yet.");
|
throw new UnsupportedOperationException("Not supported yet.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getRealUser() {
|
||||||
|
throw new UnsupportedOperationException("Not supported yet.");
|
||||||
|
}
|
||||||
@Override
|
@Override
|
||||||
public ApplicationSubmissionContext getApplicationSubmissionContext() {
|
public ApplicationSubmissionContext getApplicationSubmissionContext() {
|
||||||
throw new UnsupportedOperationException("Not supported yet.");
|
throw new UnsupportedOperationException("Not supported yet.");
|
||||||
|
@ -119,6 +119,11 @@ public String getUser() {
|
|||||||
return user;
|
return user;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getRealUser() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
public void setUser(String user) {
|
public void setUser(String user) {
|
||||||
this.user = user;
|
this.user = user;
|
||||||
}
|
}
|
||||||
|
@ -57,12 +57,15 @@
|
|||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.security.AccessControlException;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.security.authorize.AccessControlList;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
|
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
@ -74,9 +77,13 @@
|
|||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
|
import org.apache.hadoop.yarn.security.AccessType;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
@ -94,8 +101,10 @@
|
|||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueStateManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueStateManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicyWithExclusivePartitions;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicyWithExclusivePartitions;
|
||||||
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
|
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
|
||||||
|
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UsersManager.User;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UsersManager.User;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
|
||||||
@ -5261,4 +5270,139 @@ public void tearDown() throws Exception {
|
|||||||
cs.stop();
|
cs.stop();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static class TestRMAppManager extends RMAppManager {
|
||||||
|
TestRMAppManager(RMContext context, YarnScheduler scheduler,
|
||||||
|
ApplicationMasterService masterService,
|
||||||
|
ApplicationACLsManager applicationACLsManager, Configuration conf) {
|
||||||
|
super(context, scheduler, masterService, applicationACLsManager, conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void submitApplication(
|
||||||
|
ApplicationSubmissionContext submissionContext, long submitTime,
|
||||||
|
UserGroupInformation userUgi) throws YarnException {
|
||||||
|
super.submitApplication(submissionContext, submitTime, userUgi);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSubmitUsingRealUserAcls() throws Exception {
|
||||||
|
final String realUser = "AdminUser";
|
||||||
|
final String user0 = "user0";
|
||||||
|
final String user1 = "user1";
|
||||||
|
final String queue = "default";
|
||||||
|
|
||||||
|
YarnConfiguration conf = new YarnConfiguration();
|
||||||
|
MockRM rm = new MockRM();
|
||||||
|
rm.init(conf);
|
||||||
|
rm.start();
|
||||||
|
rm.getConfig().setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
|
||||||
|
|
||||||
|
UserGroupInformation realUserUgi =
|
||||||
|
UserGroupInformation.createRemoteUser(realUser);
|
||||||
|
UserGroupInformation ugi0 =
|
||||||
|
UserGroupInformation.createProxyUserForTesting("user0", realUserUgi,
|
||||||
|
new String[] {"group1"});
|
||||||
|
UserGroupInformation ugi1 =
|
||||||
|
UserGroupInformation.createProxyUserForTesting("user1", realUserUgi,
|
||||||
|
new String[] {"group1"});
|
||||||
|
ApplicationId applicationId0 = TestUtils.getMockApplicationId(0);
|
||||||
|
ApplicationId applicationId1 = TestUtils.getMockApplicationId(1);
|
||||||
|
CapacityScheduler cSched = (CapacityScheduler) rm.getResourceScheduler();
|
||||||
|
|
||||||
|
ParentQueue rootQueue = (ParentQueue) cSched.getRootQueue();
|
||||||
|
Map<AccessType, AccessControlList> rootAcls = rootQueue.acls;
|
||||||
|
rootAcls.put(AccessType.SUBMIT_APP, new AccessControlList(realUser));
|
||||||
|
rootAcls.put(AccessType.ADMINISTER_QUEUE, new AccessControlList(realUser));
|
||||||
|
|
||||||
|
LeafQueue defaultQueue = (LeafQueue)cSched.getQueue(queue);
|
||||||
|
Map<AccessType, AccessControlList> a = defaultQueue.acls;
|
||||||
|
a.put(AccessType.SUBMIT_APP, new AccessControlList(realUser));
|
||||||
|
a.put(AccessType.ADMINISTER_QUEUE, new AccessControlList(realUser));
|
||||||
|
|
||||||
|
TestRMAppManager testRmAppManager =
|
||||||
|
new TestRMAppManager(rmContext, cSched, rm.getApplicationMasterService(),
|
||||||
|
rm.getApplicationACLsManager(), rm.getConfig());
|
||||||
|
ContainerLaunchContext clc =
|
||||||
|
mock(ContainerLaunchContext.class);
|
||||||
|
ApplicationSubmissionContext asc =
|
||||||
|
ApplicationSubmissionContext.newInstance(
|
||||||
|
applicationId0, "myApp0", "default", Priority.newInstance(0),
|
||||||
|
clc, false, false, 1, Resource.newInstance(1024, 1));
|
||||||
|
|
||||||
|
// Each of the following test cases has a proxied user and a real user.
|
||||||
|
// The proxied users are user0 and user1, depending on the test. The real
|
||||||
|
// user is always AdminUser.
|
||||||
|
|
||||||
|
// Ensure that user0 is not allowed to submit to the default queue when only
|
||||||
|
// the admin user is in the submit ACL and the admin user does not have the
|
||||||
|
// USE_REAL_ACLS character prepended.
|
||||||
|
try {
|
||||||
|
testRmAppManager.submitApplication(asc, System.currentTimeMillis(), ugi0);
|
||||||
|
Assert.fail(user0 + " should not be allowed to submit to the "
|
||||||
|
+ queue + " queue when only admin user is in submit ACL.");
|
||||||
|
} catch (YarnException e) {
|
||||||
|
// This is the expected behavior.
|
||||||
|
assertTrue("Should have received an AccessControlException.",
|
||||||
|
e.getCause() instanceof AccessControlException);
|
||||||
|
}
|
||||||
|
|
||||||
|
// With only user0 in the list of users authorized to submit apps to the
|
||||||
|
// queue, ensure that user0 is allowed to submit to the default queue.
|
||||||
|
a.put(AccessType.SUBMIT_APP, new AccessControlList(user0));
|
||||||
|
a.put(AccessType.ADMINISTER_QUEUE, new AccessControlList(realUser));
|
||||||
|
try {
|
||||||
|
testRmAppManager.submitApplication(asc, System.currentTimeMillis(), ugi0);
|
||||||
|
} catch (YarnException e) {
|
||||||
|
Assert.fail(user0 + " should be allowed to submit to the "
|
||||||
|
+ queue + " queue.");
|
||||||
|
}
|
||||||
|
|
||||||
|
// With only user0 in the list of users authorized to submit apps to the
|
||||||
|
// queue, ensure that user1 is NOT allowed to submit to the default queue
|
||||||
|
try {
|
||||||
|
testRmAppManager.submitApplication(asc, System.currentTimeMillis(), ugi1);
|
||||||
|
Assert.fail(user1 + " should not be allowed to submit to the "
|
||||||
|
+ queue + " queue.");
|
||||||
|
} catch (YarnException e) {
|
||||||
|
// This is the expected behavior.
|
||||||
|
assertTrue("Should have received an AccessControlException.",
|
||||||
|
e.getCause() instanceof AccessControlException);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Even though the admin user is in the list of users allowed to submit to
|
||||||
|
// the default queue and user1's real user is the admin user, user1 should
|
||||||
|
// not be allowed to submit to queue because the ACL entry does not have the
|
||||||
|
// special character prepended in the list.
|
||||||
|
a.put(AccessType.SUBMIT_APP,
|
||||||
|
new AccessControlList(user0 + "," + realUser));
|
||||||
|
try {
|
||||||
|
testRmAppManager.submitApplication(asc, System.currentTimeMillis(), ugi1);
|
||||||
|
Assert.fail(user1 + " should not be allowed to submit to the "
|
||||||
|
+ queue + " queue.");
|
||||||
|
} catch (YarnException e) {
|
||||||
|
// This is the expected behavior.
|
||||||
|
assertTrue("Should have received an AccessControlException.",
|
||||||
|
e.getCause() instanceof AccessControlException);
|
||||||
|
}
|
||||||
|
|
||||||
|
// user1 should now be allowed to submit to the default queue because the
|
||||||
|
// admin user is in the ACL list and has the USE_REAL_ACLS character
|
||||||
|
// prepended.
|
||||||
|
a.put(AccessType.SUBMIT_APP,
|
||||||
|
new AccessControlList(user0 + ","
|
||||||
|
+ AccessControlList.USE_REAL_ACLS + realUser));
|
||||||
|
asc.setApplicationId(applicationId1);
|
||||||
|
try {
|
||||||
|
testRmAppManager.submitApplication(asc, System.currentTimeMillis(), ugi1);
|
||||||
|
} catch (YarnException e) {
|
||||||
|
LOG.error("failed to submit", e);
|
||||||
|
Assert.fail(user1 + " should be allowed to submit to the "
|
||||||
|
+ queue + " queue when real user is" + realUser + ".");
|
||||||
|
}
|
||||||
|
|
||||||
|
rm.stop();
|
||||||
|
rm.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -176,8 +176,8 @@ Configuration
|
|||||||
| Property | Description |
|
| Property | Description |
|
||||||
|:---- |:---- |
|
|:---- |:---- |
|
||||||
| `yarn.scheduler.capacity.<queue-path>.state` | The *state* of the queue. Can be one of `RUNNING` or `STOPPED`. If a queue is in `STOPPED` state, new applications cannot be submitted to *itself* or *any of its child queues*. Thus, if the *root* queue is `STOPPED` no applications can be submitted to the entire cluster. Existing applications continue to completion, thus the queue can be *drained* gracefully. Value is specified as Enumeration. |
|
| `yarn.scheduler.capacity.<queue-path>.state` | The *state* of the queue. Can be one of `RUNNING` or `STOPPED`. If a queue is in `STOPPED` state, new applications cannot be submitted to *itself* or *any of its child queues*. Thus, if the *root* queue is `STOPPED` no applications can be submitted to the entire cluster. Existing applications continue to completion, thus the queue can be *drained* gracefully. Value is specified as Enumeration. |
|
||||||
| `yarn.scheduler.capacity.root.<queue-path>.acl_submit_applications` | The *ACL* which controls who can *submit* applications to the given queue. If the given user/group has necessary ACLs on the given queue or *one of the parent queues in the hierarchy* they can submit applications. *ACLs* for this property *are* inherited from the parent queue if not specified. |
|
| `yarn.scheduler.capacity.root.<queue-path>.acl_submit_applications` | The *ACL* which controls who can *submit* applications to the given queue. If the given user/group has necessary ACLs on the given queue or *one of the parent queues in the hierarchy* they can submit applications. *ACLs* for this property *are* inherited from the parent queue if not specified. If a tilde (~) is prepended to a user name in this list, the real user's ACLs will allow the proxied user to submit to the queue. |
|
||||||
| `yarn.scheduler.capacity.root.<queue-path>.acl_administer_queue` | The *ACL* which controls who can *administer* applications on the given queue. If the given user/group has necessary ACLs on the given queue or *one of the parent queues in the hierarchy* they can administer applications. *ACLs* for this property *are* inherited from the parent queue if not specified. |
|
| `yarn.scheduler.capacity.root.<queue-path>.acl_administer_queue` | The *ACL* which controls who can *administer* applications on the given queue. If the given user/group has necessary ACLs on the given queue or *one of the parent queues in the hierarchy* they can administer applications. *ACLs* for this property *are* inherited from the parent queue if not specified. If a tilde (~) is prepended to a user name in this list, the real user's ACLs will allow the proxied user to administer apps the queue. |
|
||||||
|
|
||||||
**Note:** An *ACL* is of the form *user1*,*user2* *space* *group1*,*group2*. The special value of * implies *anyone*. The special value of *space* implies *no one*. The default is * for the root queue if not specified.
|
**Note:** An *ACL* is of the form *user1*,*user2* *space* *group1*,*group2*. The special value of * implies *anyone*. The special value of *space* implies *no one*. The default is * for the root queue if not specified.
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user