YARN-4522. Queue acl can be checked at app submission. (Jian He via wangda)
This commit is contained in:
parent
ab725cff66
commit
8310b2e9ff
@ -261,7 +261,7 @@ private void submitApp()
|
|||||||
UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
|
UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
|
||||||
ugi.doAs(new PrivilegedExceptionAction<Object>() {
|
ugi.doAs(new PrivilegedExceptionAction<Object>() {
|
||||||
@Override
|
@Override
|
||||||
public Object run() throws YarnException {
|
public Object run() throws YarnException, IOException {
|
||||||
rm.getClientRMService().submitApplication(subAppRequest);
|
rm.getClientRMService().submitApplication(subAppRequest);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
@ -82,6 +82,8 @@ Release 2.9.0 - UNRELEASED
|
|||||||
YARN-3480. Remove attempts that are beyond max-attempt limit from state
|
YARN-3480. Remove attempts that are beyond max-attempt limit from state
|
||||||
store. (Jun Gong via jianhe)
|
store. (Jun Gong via jianhe)
|
||||||
|
|
||||||
|
YARN-4522. Queue acl can be checked at app submission. (Jian He via wangda)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
@ -551,7 +551,7 @@ public GetContainersResponse getContainers(GetContainersRequest request)
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public SubmitApplicationResponse submitApplication(
|
public SubmitApplicationResponse submitApplication(
|
||||||
SubmitApplicationRequest request) throws YarnException {
|
SubmitApplicationRequest request) throws YarnException, IOException {
|
||||||
resetStartFailoverFlag(true);
|
resetStartFailoverFlag(true);
|
||||||
|
|
||||||
// make sure failover has been triggered
|
// make sure failover has been triggered
|
||||||
|
@ -549,7 +549,7 @@ public GetContainersResponse getContainers(GetContainersRequest request)
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public SubmitApplicationResponse submitApplication(
|
public SubmitApplicationResponse submitApplication(
|
||||||
SubmitApplicationRequest request) throws YarnException {
|
SubmitApplicationRequest request) throws YarnException, IOException {
|
||||||
ApplicationSubmissionContext submissionContext = request
|
ApplicationSubmissionContext submissionContext = request
|
||||||
.getApplicationSubmissionContext();
|
.getApplicationSubmissionContext();
|
||||||
ApplicationId applicationId = submissionContext.getApplicationId();
|
ApplicationId applicationId = submissionContext.getApplicationId();
|
||||||
|
@ -26,6 +26,7 @@
|
|||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.io.DataInputByteBuffer;
|
import org.apache.hadoop.io.DataInputByteBuffer;
|
||||||
|
import org.apache.hadoop.security.AccessControlException;
|
||||||
import org.apache.hadoop.security.Credentials;
|
import org.apache.hadoop.security.Credentials;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
@ -33,6 +34,7 @@
|
|||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
|
import org.apache.hadoop.yarn.api.records.QueueACL;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
@ -55,6 +57,7 @@
|
|||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
|
|
||||||
@ -78,7 +81,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
|
|||||||
private final YarnScheduler scheduler;
|
private final YarnScheduler scheduler;
|
||||||
private final ApplicationACLsManager applicationACLsManager;
|
private final ApplicationACLsManager applicationACLsManager;
|
||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
|
private boolean isAclEnabled = false;
|
||||||
public RMAppManager(RMContext context,
|
public RMAppManager(RMContext context,
|
||||||
YarnScheduler scheduler, ApplicationMasterService masterService,
|
YarnScheduler scheduler, ApplicationMasterService masterService,
|
||||||
ApplicationACLsManager applicationACLsManager, Configuration conf) {
|
ApplicationACLsManager applicationACLsManager, Configuration conf) {
|
||||||
@ -97,6 +100,8 @@ public RMAppManager(RMContext context,
|
|||||||
if (this.maxCompletedAppsInStateStore > this.maxCompletedAppsInMemory) {
|
if (this.maxCompletedAppsInStateStore > this.maxCompletedAppsInMemory) {
|
||||||
this.maxCompletedAppsInStateStore = this.maxCompletedAppsInMemory;
|
this.maxCompletedAppsInStateStore = this.maxCompletedAppsInMemory;
|
||||||
}
|
}
|
||||||
|
this.isAclEnabled = conf.getBoolean(YarnConfiguration.YARN_ACL_ENABLE,
|
||||||
|
YarnConfiguration.DEFAULT_YARN_ACL_ENABLE);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -276,7 +281,7 @@ protected synchronized void checkAppNumCompletedLimit() {
|
|||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
protected void submitApplication(
|
protected void submitApplication(
|
||||||
ApplicationSubmissionContext submissionContext, long submitTime,
|
ApplicationSubmissionContext submissionContext, long submitTime,
|
||||||
String user) throws YarnException {
|
String user) throws YarnException, AccessControlException {
|
||||||
ApplicationId applicationId = submissionContext.getApplicationId();
|
ApplicationId applicationId = submissionContext.getApplicationId();
|
||||||
|
|
||||||
RMAppImpl application =
|
RMAppImpl application =
|
||||||
@ -325,7 +330,8 @@ protected void recoverApplication(ApplicationStateData appState,
|
|||||||
|
|
||||||
private RMAppImpl createAndPopulateNewRMApp(
|
private RMAppImpl createAndPopulateNewRMApp(
|
||||||
ApplicationSubmissionContext submissionContext, long submitTime,
|
ApplicationSubmissionContext submissionContext, long submitTime,
|
||||||
String user, boolean isRecovery) throws YarnException {
|
String user, boolean isRecovery)
|
||||||
|
throws YarnException, AccessControlException {
|
||||||
// Do queue mapping
|
// Do queue mapping
|
||||||
if (!isRecovery) {
|
if (!isRecovery) {
|
||||||
if (rmContext.getQueuePlacementManager() != null) {
|
if (rmContext.getQueuePlacementManager() != null) {
|
||||||
@ -346,6 +352,22 @@ private RMAppImpl createAndPopulateNewRMApp(
|
|||||||
submissionContext.getQueue(), applicationId);
|
submissionContext.getQueue(), applicationId);
|
||||||
submissionContext.setPriority(appPriority);
|
submissionContext.setPriority(appPriority);
|
||||||
|
|
||||||
|
UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(user);
|
||||||
|
// Since FairScheduler queue mapping is done inside scheduler,
|
||||||
|
// if FairScheduler is used and the queue doesn't exist, we should not
|
||||||
|
// fail here because queue will be created inside FS. Ideally, FS queue
|
||||||
|
// mapping should be done outside scheduler too like CS.
|
||||||
|
// For now, exclude FS for the acl check.
|
||||||
|
if (!isRecovery && isAclEnabled && scheduler instanceof CapacityScheduler &&
|
||||||
|
!scheduler.checkAccess(userUgi, QueueACL.SUBMIT_APPLICATIONS,
|
||||||
|
submissionContext.getQueue()) &&
|
||||||
|
!scheduler.checkAccess(userUgi, QueueACL.ADMINISTER_QUEUE,
|
||||||
|
submissionContext.getQueue())) {
|
||||||
|
throw new AccessControlException(
|
||||||
|
"User " + user + " does not have permission to submit "
|
||||||
|
+ applicationId + " to queue " + submissionContext.getQueue());
|
||||||
|
}
|
||||||
|
|
||||||
// Create RMApp
|
// Create RMApp
|
||||||
RMAppImpl application = new RMAppImpl(applicationId, rmContext, this.conf,
|
RMAppImpl application = new RMAppImpl(applicationId, rmContext, this.conf,
|
||||||
submissionContext.getApplicationName(), user,
|
submissionContext.getApplicationName(), user,
|
||||||
|
@ -479,14 +479,6 @@ public void submitApplication(ApplicationId applicationId, String userName,
|
|||||||
String queue) throws AccessControlException {
|
String queue) throws AccessControlException {
|
||||||
// Careful! Locking order is important!
|
// Careful! Locking order is important!
|
||||||
|
|
||||||
// Check queue ACLs
|
|
||||||
UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(userName);
|
|
||||||
if (!hasAccess(QueueACL.SUBMIT_APPLICATIONS, userUgi)
|
|
||||||
&& !hasAccess(QueueACL.ADMINISTER_QUEUE, userUgi)) {
|
|
||||||
throw new AccessControlException("User " + userName + " cannot submit" +
|
|
||||||
" applications to queue " + getQueuePath());
|
|
||||||
}
|
|
||||||
|
|
||||||
User user = null;
|
User user = null;
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
|
|
||||||
|
@ -29,6 +29,7 @@
|
|||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@ -194,7 +195,7 @@ public int getCompletedAppsInStateStore() {
|
|||||||
}
|
}
|
||||||
public void submitApplication(
|
public void submitApplication(
|
||||||
ApplicationSubmissionContext submissionContext, String user)
|
ApplicationSubmissionContext submissionContext, String user)
|
||||||
throws YarnException {
|
throws YarnException, IOException {
|
||||||
super.submitApplication(submissionContext, System.currentTimeMillis(),
|
super.submitApplication(submissionContext, System.currentTimeMillis(),
|
||||||
user);
|
user);
|
||||||
}
|
}
|
||||||
|
@ -893,7 +893,7 @@ public void handle(Event rawEvent) {
|
|||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
rmService.submitApplication(submitRequest1);
|
rmService.submitApplication(submitRequest1);
|
||||||
} catch (YarnException e) {}
|
} catch (YarnException | IOException e) {}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
t.start();
|
t.start();
|
||||||
|
@ -735,6 +735,16 @@ public void testAppSubmit(String acceptMedia, String contentMedia)
|
|||||||
client().addFilter(new LoggingFilter(System.out));
|
client().addFilter(new LoggingFilter(System.out));
|
||||||
String lrKey = "example";
|
String lrKey = "example";
|
||||||
String queueName = "testqueue";
|
String queueName = "testqueue";
|
||||||
|
|
||||||
|
// create the queue
|
||||||
|
String[] queues = { "default", "testqueue" };
|
||||||
|
CapacitySchedulerConfiguration csconf =
|
||||||
|
new CapacitySchedulerConfiguration();
|
||||||
|
csconf.setQueues("root", queues);
|
||||||
|
csconf.setCapacity("root.default", 50.0f);
|
||||||
|
csconf.setCapacity("root.testqueue", 50.0f);
|
||||||
|
rm.getResourceScheduler().reinitialize(csconf, rm.getRMContext());
|
||||||
|
|
||||||
String appName = "test";
|
String appName = "test";
|
||||||
String appType = "test-type";
|
String appType = "test-type";
|
||||||
String urlPath = "apps";
|
String urlPath = "apps";
|
||||||
|
Loading…
Reference in New Issue
Block a user