MAPREDUCE-3104. Implemented Application-acls. (vinodkv)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1186748 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
37137845c8
commit
df2991c0cb
@ -111,6 +111,8 @@ Release 0.23.0 - Unreleased
|
||||
MAPREDUCE-2719. Add a simple, DistributedShell, application to illustrate
|
||||
alternate frameworks on YARN. (Hitesh Shah via acmurthy)
|
||||
|
||||
MAPREDUCE-3104. Implemented Application-acls. (vinodkv)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
MAPREDUCE-2187. Reporter sends progress during sort/merge. (Anupam Seth via
|
||||
|
@ -18,22 +18,29 @@
|
||||
|
||||
package org.apache.hadoop.mapreduce.v2.app.job.event;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
|
||||
|
||||
|
||||
public class TaskAttemptContainerAssignedEvent extends TaskAttemptEvent {
|
||||
|
||||
private final Container container;
|
||||
private final Map<ApplicationAccessType, String> applicationACLs;
|
||||
|
||||
public TaskAttemptContainerAssignedEvent(TaskAttemptId id,
|
||||
Container container) {
|
||||
Container container, Map<ApplicationAccessType, String> applicationACLs) {
|
||||
super(id, TaskAttemptEventType.TA_ASSIGNED);
|
||||
this.container = container;
|
||||
this.applicationACLs = applicationACLs;
|
||||
}
|
||||
|
||||
public Container getContainer() {
|
||||
return this.container;
|
||||
}
|
||||
|
||||
public Map<ApplicationAccessType, String> getApplicationACLs() {
|
||||
return this.applicationACLs;
|
||||
}
|
||||
}
|
||||
|
@ -18,7 +18,6 @@
|
||||
|
||||
package org.apache.hadoop.mapreduce.v2.app.job.impl;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
@ -53,7 +52,6 @@
|
||||
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
||||
import org.apache.hadoop.mapreduce.TaskCounter;
|
||||
import org.apache.hadoop.mapreduce.TypeConverter;
|
||||
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.MapAttemptFinishedEvent;
|
||||
import org.apache.hadoop.mapreduce.jobhistory.ReduceAttemptFinishedEvent;
|
||||
@ -101,8 +99,8 @@
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.apache.hadoop.yarn.Clock;
|
||||
import org.apache.hadoop.yarn.YarnException;
|
||||
import org.apache.hadoop.yarn.api.ApplicationConstants;
|
||||
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerToken;
|
||||
@ -117,6 +115,7 @@
|
||||
import org.apache.hadoop.yarn.state.SingleArcTransition;
|
||||
import org.apache.hadoop.yarn.state.StateMachine;
|
||||
import org.apache.hadoop.yarn.state.StateMachineFactory;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
import org.apache.hadoop.yarn.util.RackResolver;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
@ -532,8 +531,10 @@ private String getInitialClasspath() throws IOException {
|
||||
|
||||
/**
|
||||
* Create the {@link ContainerLaunchContext} for this attempt.
|
||||
* @param applicationACLs
|
||||
*/
|
||||
private ContainerLaunchContext createContainerLaunchContext() {
|
||||
private ContainerLaunchContext createContainerLaunchContext(
|
||||
Map<ApplicationAccessType, String> applicationACLs) {
|
||||
|
||||
// Application resources
|
||||
Map<String, LocalResource> localResources =
|
||||
@ -634,17 +635,11 @@ private ContainerLaunchContext createContainerLaunchContext() {
|
||||
jvmID);
|
||||
|
||||
// Construct the actual Container
|
||||
ContainerLaunchContext container =
|
||||
recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
||||
container.setContainerId(containerID);
|
||||
container.setUser(conf.get(MRJobConfig.USER_NAME));
|
||||
container.setResource(assignedCapability);
|
||||
container.setLocalResources(localResources);
|
||||
container.setEnvironment(environment);
|
||||
container.setCommands(commands);
|
||||
container.setServiceData(serviceData);
|
||||
container.setContainerTokens(tokens);
|
||||
|
||||
ContainerLaunchContext container = BuilderUtils
|
||||
.newContainerLaunchContext(containerID, conf
|
||||
.get(MRJobConfig.USER_NAME), assignedCapability, localResources,
|
||||
environment, commands, serviceData, tokens, applicationACLs);
|
||||
|
||||
return container;
|
||||
}
|
||||
|
||||
@ -1003,7 +998,7 @@ private static class ContainerAssignedTransition implements
|
||||
@Override
|
||||
public void transition(final TaskAttemptImpl taskAttempt,
|
||||
TaskAttemptEvent event) {
|
||||
TaskAttemptContainerAssignedEvent cEvent =
|
||||
final TaskAttemptContainerAssignedEvent cEvent =
|
||||
(TaskAttemptContainerAssignedEvent) event;
|
||||
taskAttempt.containerID = cEvent.getContainer().getId();
|
||||
taskAttempt.nodeHostName = cEvent.getContainer().getNodeId().getHost();
|
||||
@ -1026,7 +1021,8 @@ public void transition(final TaskAttemptImpl taskAttempt,
|
||||
taskAttempt.containerMgrAddress, taskAttempt.containerToken) {
|
||||
@Override
|
||||
public ContainerLaunchContext getContainer() {
|
||||
return taskAttempt.createContainerLaunchContext();
|
||||
return taskAttempt.createContainerLaunchContext(cEvent
|
||||
.getApplicationACLs());
|
||||
}
|
||||
@Override
|
||||
public Task getRemoteTask() { // classic mapred Task, not YARN version
|
||||
|
@ -112,7 +112,7 @@ public void handle(ContainerAllocatorEvent event) {
|
||||
eventHandler.handle(jce);
|
||||
}
|
||||
eventHandler.handle(new TaskAttemptContainerAssignedEvent(
|
||||
event.getAttemptID(), container));
|
||||
event.getAttemptID(), container, applicationACLs));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -373,7 +373,7 @@ private void sendAssignedEvent(TaskAttemptId yarnAttemptID,
|
||||
container.setNodeHttpAddress(attemptInfo.getTrackerName() + ":" +
|
||||
attemptInfo.getHttpPort());
|
||||
actualHandler.handle(new TaskAttemptContainerAssignedEvent(yarnAttemptID,
|
||||
container));
|
||||
container, null));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -20,6 +20,7 @@
|
||||
|
||||
import java.io.IOException;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
@ -35,7 +36,6 @@
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
||||
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.SecurityInfo;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
@ -47,6 +47,7 @@
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
@ -54,7 +55,6 @@
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||
import org.apache.hadoop.yarn.security.SchedulerSecurityInfo;
|
||||
import org.apache.hadoop.yarn.service.AbstractService;
|
||||
|
||||
/**
|
||||
@ -73,6 +73,7 @@ public abstract class RMCommunicator extends AbstractService {
|
||||
protected int lastResponseID;
|
||||
private Resource minContainerCapability;
|
||||
private Resource maxContainerCapability;
|
||||
protected Map<ApplicationAccessType, String> applicationACLs;
|
||||
|
||||
private final RecordFactory recordFactory =
|
||||
RecordFactoryProvider.getRecordFactory(null);
|
||||
@ -160,6 +161,7 @@ protected void register() {
|
||||
scheduler.registerApplicationMaster(request);
|
||||
minContainerCapability = response.getMinimumResourceCapability();
|
||||
maxContainerCapability = response.getMaximumResourceCapability();
|
||||
this.applicationACLs = response.getApplicationACLs();
|
||||
LOG.info("minContainerCapability: " + minContainerCapability.getMemory());
|
||||
LOG.info("maxContainerCapability: " + maxContainerCapability.getMemory());
|
||||
} catch (Exception are) {
|
||||
|
@ -620,7 +620,7 @@ else if (PRIORITY_REDUCE.equals(priority)) {
|
||||
|
||||
// send the container-assigned event to task attempt
|
||||
eventHandler.handle(new TaskAttemptContainerAssignedEvent(
|
||||
assigned.attemptID, allocated));
|
||||
assigned.attemptID, allocated, applicationACLs));
|
||||
|
||||
assignedRequests.add(allocated.getId(), assigned.attemptID);
|
||||
|
||||
|
@ -361,7 +361,7 @@ public void handle(ContainerAllocatorEvent event) {
|
||||
container.setNodeHttpAddress("localhost:9999");
|
||||
getContext().getEventHandler().handle(
|
||||
new TaskAttemptContainerAssignedEvent(event.getAttemptID(),
|
||||
container));
|
||||
container, null));
|
||||
}
|
||||
};
|
||||
}
|
||||
|
@ -144,7 +144,7 @@ public void run() {
|
||||
getContext().getEventHandler()
|
||||
.handle(
|
||||
new TaskAttemptContainerAssignedEvent(event
|
||||
.getAttemptID(), container));
|
||||
.getAttemptID(), container, null));
|
||||
concurrentRunningTasks++;
|
||||
} else {
|
||||
Thread.sleep(1000);
|
||||
|
@ -275,7 +275,12 @@ public interface MRJobConfig {
|
||||
|
||||
public static final String JOB_ACL_VIEW_JOB = "mapreduce.job.acl-view-job";
|
||||
|
||||
public static final String DEFAULT_JOB_ACL_VIEW_JOB = " ";
|
||||
|
||||
public static final String JOB_ACL_MODIFY_JOB = "mapreduce.job.acl-modify-job";
|
||||
|
||||
public static final String DEFAULT_JOB_ACL_MODIFY_JOB = " ";
|
||||
|
||||
public static final String JOB_SUBMITHOST =
|
||||
"mapreduce.job.submithostname";
|
||||
public static final String JOB_SUBMITHOSTADDR =
|
||||
|
@ -62,6 +62,7 @@
|
||||
import org.apache.hadoop.yarn.YarnException;
|
||||
import org.apache.hadoop.yarn.api.ApplicationConstants;
|
||||
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
@ -75,6 +76,7 @@
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
|
||||
|
||||
@ -360,14 +362,19 @@ public ApplicationSubmissionContext createApplicationSubmissionContext(
|
||||
// Parse distributed cache
|
||||
MRApps.setupDistributedCache(jobConf, localResources);
|
||||
|
||||
Map<ApplicationAccessType, String> acls
|
||||
= new HashMap<ApplicationAccessType, String>(2);
|
||||
acls.put(ApplicationAccessType.VIEW_APP, jobConf.get(
|
||||
MRJobConfig.JOB_ACL_VIEW_JOB, MRJobConfig.DEFAULT_JOB_ACL_VIEW_JOB));
|
||||
acls.put(ApplicationAccessType.MODIFY_APP, jobConf.get(
|
||||
MRJobConfig.JOB_ACL_MODIFY_JOB,
|
||||
MRJobConfig.DEFAULT_JOB_ACL_MODIFY_JOB));
|
||||
|
||||
// Setup ContainerLaunchContext for AM container
|
||||
ContainerLaunchContext amContainer =
|
||||
recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
||||
amContainer.setResource(capability); // Resource (mem) required
|
||||
amContainer.setLocalResources(localResources); // Local resources
|
||||
amContainer.setEnvironment(environment); // Environment
|
||||
amContainer.setCommands(vargsFinal); // Command for AM
|
||||
amContainer.setContainerTokens(securityTokens); // Security tokens
|
||||
ContainerLaunchContext amContainer = BuilderUtils
|
||||
.newContainerLaunchContext(null, UserGroupInformation
|
||||
.getCurrentUser().getShortUserName(), capability, localResources,
|
||||
environment, vargsFinal, null, securityTokens, acls);
|
||||
|
||||
// Set up the ApplicationSubmissionContext
|
||||
ApplicationSubmissionContext appContext =
|
||||
|
@ -18,11 +18,14 @@
|
||||
|
||||
package org.apache.hadoop.yarn.api.protocolrecords;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Stable;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.yarn.api.AMRMProtocol;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
|
||||
/**
|
||||
@ -67,4 +70,20 @@ public interface RegisterApplicationMasterResponse {
|
||||
@Private
|
||||
@Unstable
|
||||
public void setMaximumResourceCapability(Resource capability);
|
||||
|
||||
/**
|
||||
* Get the <code>ApplicationACL</code>s for the application.
|
||||
* @return all the <code>ApplicationACL</code>s
|
||||
*/
|
||||
@Public
|
||||
@Stable
|
||||
public Map<ApplicationAccessType, String> getApplicationACLs();
|
||||
|
||||
/**
|
||||
* Set the <code>ApplicationACL</code>s for the application.
|
||||
* @param acls
|
||||
*/
|
||||
@Private
|
||||
@Unstable
|
||||
public void setApplicationACLs(Map<ApplicationAccessType, String> acls);
|
||||
}
|
||||
|
@ -19,13 +19,21 @@
|
||||
package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
|
||||
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
import org.apache.hadoop.yarn.api.records.ProtoBase;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationACLMapProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProtoOrBuilder;
|
||||
import org.apache.hadoop.yarn.util.ProtoUtils;
|
||||
|
||||
|
||||
public class RegisterApplicationMasterResponsePBImpl
|
||||
@ -38,6 +46,7 @@ public class RegisterApplicationMasterResponsePBImpl
|
||||
|
||||
private Resource minimumResourceCapability;
|
||||
private Resource maximumResourceCapability;
|
||||
private Map<ApplicationAccessType, String> applicationACLS = null;
|
||||
|
||||
public RegisterApplicationMasterResponsePBImpl() {
|
||||
builder = RegisterApplicationMasterResponseProto.newBuilder();
|
||||
@ -72,6 +81,9 @@ private void mergeLocalToBuilder() {
|
||||
builder.setMaximumCapability(
|
||||
convertToProtoFormat(this.maximumResourceCapability));
|
||||
}
|
||||
if (this.applicationACLS != null) {
|
||||
addApplicationACLs();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -130,6 +142,77 @@ public void setMinimumResourceCapability(Resource capability) {
|
||||
this.minimumResourceCapability = capability;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Map<ApplicationAccessType, String> getApplicationACLs() {
|
||||
initApplicationACLs();
|
||||
return this.applicationACLS;
|
||||
}
|
||||
|
||||
private void initApplicationACLs() {
|
||||
if (this.applicationACLS != null) {
|
||||
return;
|
||||
}
|
||||
RegisterApplicationMasterResponseProtoOrBuilder p = viaProto ? proto
|
||||
: builder;
|
||||
List<ApplicationACLMapProto> list = p.getApplicationACLsList();
|
||||
this.applicationACLS = new HashMap<ApplicationAccessType, String>(list
|
||||
.size());
|
||||
|
||||
for (ApplicationACLMapProto aclProto : list) {
|
||||
this.applicationACLS.put(ProtoUtils.convertFromProtoFormat(aclProto
|
||||
.getAccessType()), aclProto.getAcl());
|
||||
}
|
||||
}
|
||||
|
||||
private void addApplicationACLs() {
|
||||
maybeInitBuilder();
|
||||
builder.clearApplicationACLs();
|
||||
if (applicationACLS == null) {
|
||||
return;
|
||||
}
|
||||
Iterable<? extends ApplicationACLMapProto> values
|
||||
= new Iterable<ApplicationACLMapProto>() {
|
||||
|
||||
@Override
|
||||
public Iterator<ApplicationACLMapProto> iterator() {
|
||||
return new Iterator<ApplicationACLMapProto>() {
|
||||
Iterator<ApplicationAccessType> aclsIterator = applicationACLS
|
||||
.keySet().iterator();
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
return aclsIterator.hasNext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ApplicationACLMapProto next() {
|
||||
ApplicationAccessType key = aclsIterator.next();
|
||||
return ApplicationACLMapProto.newBuilder().setAcl(
|
||||
applicationACLS.get(key)).setAccessType(
|
||||
ProtoUtils.convertToProtoFormat(key)).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remove() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
this.builder.addAllApplicationACLs(values);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setApplicationACLs(
|
||||
final Map<ApplicationAccessType, String> appACLs) {
|
||||
if (appACLs == null)
|
||||
return;
|
||||
initApplicationACLs();
|
||||
this.applicationACLS.clear();
|
||||
this.applicationACLS.putAll(appACLs);
|
||||
}
|
||||
|
||||
private Resource convertFromProtoFormat(ResourceProto resource) {
|
||||
return new ResourcePBImpl(resource);
|
||||
}
|
||||
|
@ -0,0 +1,42 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.api.records;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Stable;
|
||||
|
||||
/**
|
||||
* Application access types.
|
||||
*/
|
||||
@Public
|
||||
@Stable
|
||||
public enum ApplicationAccessType {
|
||||
|
||||
/**
|
||||
* Access-type representing 'viewing' application. ACLs against this type
|
||||
* dictate who can 'view' some or all of the application related details.
|
||||
*/
|
||||
VIEW_APP,
|
||||
|
||||
/**
|
||||
* Access-type representing 'modifying' application. ACLs against this type
|
||||
* dictate who can 'modify' the application for e.g., by killing the
|
||||
* application
|
||||
*/
|
||||
MODIFY_APP;
|
||||
}
|
@ -148,5 +148,4 @@ public interface ApplicationSubmissionContext {
|
||||
@Public
|
||||
@Stable
|
||||
public void setAMContainerSpec(ContainerLaunchContext amContainer);
|
||||
|
||||
}
|
@ -181,5 +181,20 @@ public interface ContainerLaunchContext {
|
||||
@Public
|
||||
@Stable
|
||||
void setCommands(List<String> commands);
|
||||
|
||||
|
||||
/**
|
||||
* Get the <code>ApplicationACL</code>s for the application.
|
||||
* @return all the <code>ApplicationACL</code>s
|
||||
*/
|
||||
@Public
|
||||
@Stable
|
||||
public Map<ApplicationAccessType, String> getApplicationACLs();
|
||||
|
||||
/**
|
||||
* Set the <code>ApplicationACL</code>s for the application.
|
||||
* @param acls
|
||||
*/
|
||||
@Public
|
||||
@Stable
|
||||
public void setApplicationACLs(Map<ApplicationAccessType, String> acls);
|
||||
}
|
||||
|
@ -23,14 +23,17 @@
|
||||
import org.apache.hadoop.yarn.api.ClientRMProtocol;
|
||||
|
||||
/**
|
||||
* <p><code>QueueACL</code> enumerates the various ACLs for queues.</p>
|
||||
* <p>
|
||||
* <code>QueueACL</code> enumerates the various ACLs for queues.
|
||||
* </p>
|
||||
*
|
||||
* <p>The ACLs are one of:
|
||||
* <ul>
|
||||
* <li>{@link #SUBMIT_JOB} - ACL to submit jobs to the queue.</li>
|
||||
* <li>{@link #ADMINISTER_QUEUE} - ACL to administer the queue.</li>
|
||||
* <li>{@link #ADMINISTER_JOBS} - ACL to administer jobs in the queue.</li>
|
||||
* </ul>
|
||||
* <p>
|
||||
* The ACLs are one of:
|
||||
* <ul>
|
||||
* <li>{@link #SUBMIT_APPLICATIONS} - ACL to submit applications to the
|
||||
* queue.</li>
|
||||
* <li>{@link #ADMINISTER_QUEUE} - ACL to administer the queue.</li>
|
||||
* </ul>
|
||||
* </p>
|
||||
*
|
||||
* @see QueueInfo
|
||||
@ -40,17 +43,12 @@
|
||||
@Stable
|
||||
public enum QueueACL {
|
||||
/**
|
||||
* ACL to submit jobs to the queue.
|
||||
* ACL to submit applications to the queue.
|
||||
*/
|
||||
SUBMIT_JOB,
|
||||
SUBMIT_APPLICATIONS,
|
||||
|
||||
/**
|
||||
* ACL to administer the queue.
|
||||
*/
|
||||
ADMINISTER_QUEUE,
|
||||
|
||||
/**
|
||||
* ACL to administer jobs in the queue.
|
||||
*/
|
||||
ADMINISTER_JOBS; // currently unused
|
||||
ADMINISTER_QUEUE,
|
||||
}
|
@ -25,11 +25,13 @@
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||
import org.apache.hadoop.yarn.api.records.ProtoBase;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationACLMapProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerLaunchContextProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerLaunchContextProtoOrBuilder;
|
||||
@ -38,6 +40,7 @@
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.StringBytesMapProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.StringLocalResourceMapProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.StringStringMapProto;
|
||||
import org.apache.hadoop.yarn.util.ProtoUtils;
|
||||
|
||||
public class ContainerLaunchContextPBImpl
|
||||
extends ProtoBase<ContainerLaunchContextProto>
|
||||
@ -54,6 +57,7 @@ public class ContainerLaunchContextPBImpl
|
||||
private Map<String, ByteBuffer> serviceData = null;
|
||||
private Map<String, String> environment = null;
|
||||
private List<String> commands = null;
|
||||
private Map<ApplicationAccessType, String> applicationACLS = null;
|
||||
|
||||
public ContainerLaunchContextPBImpl() {
|
||||
builder = ContainerLaunchContextProto.newBuilder();
|
||||
@ -97,6 +101,9 @@ private void mergeLocalToBuilder() {
|
||||
if (this.commands != null) {
|
||||
addCommandsToProto();
|
||||
}
|
||||
if (this.applicationACLS != null) {
|
||||
addApplicationACLs();
|
||||
}
|
||||
}
|
||||
|
||||
private void mergeLocalToProto() {
|
||||
@ -424,6 +431,75 @@ public boolean hasNext() {
|
||||
builder.addAllEnvironment(iterable);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<ApplicationAccessType, String> getApplicationACLs() {
|
||||
initApplicationACLs();
|
||||
return this.applicationACLS;
|
||||
}
|
||||
|
||||
private void initApplicationACLs() {
|
||||
if (this.applicationACLS != null) {
|
||||
return;
|
||||
}
|
||||
ContainerLaunchContextProtoOrBuilder p = viaProto ? proto : builder;
|
||||
List<ApplicationACLMapProto> list = p.getApplicationACLsList();
|
||||
this.applicationACLS = new HashMap<ApplicationAccessType, String>(list
|
||||
.size());
|
||||
|
||||
for (ApplicationACLMapProto aclProto : list) {
|
||||
this.applicationACLS.put(ProtoUtils.convertFromProtoFormat(aclProto
|
||||
.getAccessType()), aclProto.getAcl());
|
||||
}
|
||||
}
|
||||
|
||||
private void addApplicationACLs() {
|
||||
maybeInitBuilder();
|
||||
builder.clearApplicationACLs();
|
||||
if (applicationACLS == null) {
|
||||
return;
|
||||
}
|
||||
Iterable<? extends ApplicationACLMapProto> values
|
||||
= new Iterable<ApplicationACLMapProto>() {
|
||||
|
||||
@Override
|
||||
public Iterator<ApplicationACLMapProto> iterator() {
|
||||
return new Iterator<ApplicationACLMapProto>() {
|
||||
Iterator<ApplicationAccessType> aclsIterator = applicationACLS
|
||||
.keySet().iterator();
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
return aclsIterator.hasNext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ApplicationACLMapProto next() {
|
||||
ApplicationAccessType key = aclsIterator.next();
|
||||
return ApplicationACLMapProto.newBuilder().setAcl(
|
||||
applicationACLS.get(key)).setAccessType(
|
||||
ProtoUtils.convertToProtoFormat(key)).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remove() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
this.builder.addAllApplicationACLs(values);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setApplicationACLs(
|
||||
final Map<ApplicationAccessType, String> appACLs) {
|
||||
if (appACLs == null)
|
||||
return;
|
||||
initApplicationACLs();
|
||||
this.applicationACLS.clear();
|
||||
this.applicationACLS.putAll(appACLs);
|
||||
}
|
||||
|
||||
private ResourcePBImpl convertFromProtoFormat(ResourceProto p) {
|
||||
return new ResourcePBImpl(p);
|
||||
}
|
||||
|
@ -20,15 +20,17 @@
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
||||
import org.apache.hadoop.yarn.api.records.QueueACL;
|
||||
import org.apache.hadoop.yarn.api.records.QueueState;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationResourceUsageReportPBImpl;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAccessTypeProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationResourceUsageReportProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStateProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.FinalApplicationStatusProto;
|
||||
@ -140,7 +142,7 @@ public static QueueState convertFromProtoFormat(QueueStateProto e) {
|
||||
/*
|
||||
* QueueACL
|
||||
*/
|
||||
private static String QUEUE_ACL_PREFIX = "Q_";
|
||||
private static String QUEUE_ACL_PREFIX = "QACL_";
|
||||
public static QueueACLProto convertToProtoFormat(QueueACL e) {
|
||||
return QueueACLProto.valueOf(QUEUE_ACL_PREFIX + e.name());
|
||||
}
|
||||
@ -148,4 +150,21 @@ public static QueueACL convertFromProtoFormat(QueueACLProto e) {
|
||||
return QueueACL.valueOf(e.name().replace(QUEUE_ACL_PREFIX, ""));
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ApplicationAccessType
|
||||
*/
|
||||
private static String APP_ACCESS_TYPE_PREFIX = "APPACCESS_";
|
||||
|
||||
public static ApplicationAccessTypeProto convertToProtoFormat(
|
||||
ApplicationAccessType e) {
|
||||
return ApplicationAccessTypeProto.valueOf(APP_ACCESS_TYPE_PREFIX
|
||||
+ e.name());
|
||||
}
|
||||
|
||||
public static ApplicationAccessType convertFromProtoFormat(
|
||||
ApplicationAccessTypeProto e) {
|
||||
return ApplicationAccessType.valueOf(e.name().replace(
|
||||
APP_ACCESS_TYPE_PREFIX, ""));
|
||||
}
|
||||
}
|
||||
|
@ -217,6 +217,16 @@ message ApplicationSubmissionContextProto {
|
||||
optional ContainerLaunchContextProto am_container_spec = 6;
|
||||
}
|
||||
|
||||
enum ApplicationAccessTypeProto {
|
||||
APPACCESS_VIEW_APP = 1;
|
||||
APPACCESS_MODIFY_APP = 2;
|
||||
}
|
||||
|
||||
message ApplicationACLMapProto {
|
||||
optional ApplicationAccessTypeProto accessType = 1;
|
||||
optional string acl = 2 [default = " "];
|
||||
}
|
||||
|
||||
message YarnClusterMetricsProto {
|
||||
optional int32 num_node_managers = 1;
|
||||
}
|
||||
@ -237,9 +247,8 @@ message QueueInfoProto {
|
||||
}
|
||||
|
||||
enum QueueACLProto {
|
||||
Q_SUBMIT_JOB = 1;
|
||||
Q_ADMINISTER_QUEUE = 2;
|
||||
Q_ADMINISTER_JOBS = 3;
|
||||
QACL_SUBMIT_APPLICATIONS = 1;
|
||||
QACL_ADMINISTER_QUEUE = 2;
|
||||
}
|
||||
|
||||
message QueueUserACLInfoProto {
|
||||
@ -260,6 +269,7 @@ message ContainerLaunchContextProto {
|
||||
repeated StringBytesMapProto service_data = 6;
|
||||
repeated StringStringMapProto environment = 7;
|
||||
repeated string command = 8;
|
||||
repeated ApplicationACLMapProto application_ACLs = 9;
|
||||
}
|
||||
|
||||
message ContainerStatusProto {
|
||||
|
@ -36,6 +36,7 @@ message RegisterApplicationMasterRequestProto {
|
||||
message RegisterApplicationMasterResponseProto {
|
||||
optional ResourceProto minimumCapability = 1;
|
||||
optional ResourceProto maximumCapability = 2;
|
||||
repeated ApplicationACLMapProto application_ACLs = 3;
|
||||
}
|
||||
|
||||
message FinishApplicationMasterRequestProto {
|
||||
|
@ -36,14 +36,8 @@ public class YarnConfiguration extends Configuration {
|
||||
}
|
||||
|
||||
//Configurations
|
||||
|
||||
/** ACL of who can view this application.*/
|
||||
public static final String APPLICATION_ACL_VIEW_APP =
|
||||
"yarn.app.acl.view-job";
|
||||
|
||||
/** ACL of who can modify this application.*/
|
||||
public static final String APPLICATION_ACL_MODIFY_APP =
|
||||
"yarn.app.acl.modify-job";
|
||||
|
||||
public static final String YARN_PREFIX = "yarn.";
|
||||
|
||||
/** Delay before deleting resource to ease debugging of NM issues */
|
||||
public static final String DEBUG_NM_DELETE_DELAY_SEC =
|
||||
@ -52,7 +46,7 @@ public class YarnConfiguration extends Configuration {
|
||||
////////////////////////////////
|
||||
// IPC Configs
|
||||
////////////////////////////////
|
||||
public static final String IPC_PREFIX = "yarn.ipc.";
|
||||
public static final String IPC_PREFIX = YARN_PREFIX + "ipc.";
|
||||
|
||||
/** Factory to create client IPC classes.*/
|
||||
public static final String IPC_CLIENT_FACTORY =
|
||||
@ -126,15 +120,15 @@ public class YarnConfiguration extends Configuration {
|
||||
public static final String DEFAULT_RM_RESOURCE_TRACKER_ADDRESS =
|
||||
"0.0.0.0:8025";
|
||||
|
||||
/** Are RM acls enabled.*/
|
||||
public static final String RM_ACL_ENABLE =
|
||||
RM_PREFIX + "acl.enable";
|
||||
public static final boolean DEFAULT_RM_ACL_ENABLE = true;
|
||||
/** Are acls enabled.*/
|
||||
public static final String YARN_ACL_ENABLE =
|
||||
YARN_PREFIX + "acl.enable";
|
||||
public static final boolean DEFAULT_YARN_ACL_ENABLE = true;
|
||||
|
||||
/** ACL of who can be admin of RM.*/
|
||||
public static final String RM_ADMIN_ACL =
|
||||
RM_PREFIX + "admin.acl";
|
||||
public static final String DEFAULT_RM_ADMIN_ACL = "*";
|
||||
/** ACL of who can be admin of YARN cluster.*/
|
||||
public static final String YARN_ADMIN_ACL =
|
||||
YARN_PREFIX + "admin.acl";
|
||||
public static final String DEFAULT_YARN_ADMIN_ACL = "*";
|
||||
|
||||
/** The address of the RM admin interface.*/
|
||||
public static final String RM_ADMIN_ADDRESS =
|
||||
|
@ -18,20 +18,26 @@
|
||||
|
||||
package org.apache.hadoop.yarn.util;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.URI;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerToken;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
||||
@ -41,8 +47,10 @@
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
|
||||
import org.apache.hadoop.yarn.api.records.URL;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||
|
||||
/**
|
||||
* Builder utilities to construct various objects.
|
||||
@ -195,18 +203,9 @@ public static NodeId newNodeId(String host, int port) {
|
||||
return nodeId;
|
||||
}
|
||||
|
||||
public static Container newContainer(RecordFactory recordFactory,
|
||||
ApplicationAttemptId appAttemptId, int containerId, NodeId nodeId,
|
||||
String nodeHttpAddress, Resource resource, Priority priority) {
|
||||
ContainerId containerID =
|
||||
newContainerId(recordFactory, appAttemptId, containerId);
|
||||
return newContainer(containerID, nodeId, nodeHttpAddress,
|
||||
resource, priority);
|
||||
}
|
||||
|
||||
public static Container newContainer(ContainerId containerId,
|
||||
NodeId nodeId, String nodeHttpAddress,
|
||||
Resource resource, Priority priority) {
|
||||
Resource resource, Priority priority, ContainerToken containerToken) {
|
||||
Container container = recordFactory.newRecordInstance(Container.class);
|
||||
container.setId(containerId);
|
||||
container.setNodeId(nodeId);
|
||||
@ -218,6 +217,42 @@ public static Container newContainer(ContainerId containerId,
|
||||
containerStatus.setContainerId(containerId);
|
||||
containerStatus.setState(ContainerState.NEW);
|
||||
container.setContainerStatus(containerStatus);
|
||||
container.setContainerToken(containerToken);
|
||||
return container;
|
||||
}
|
||||
|
||||
public static ContainerToken newContainerToken(NodeId nodeId,
|
||||
ByteBuffer password, ContainerTokenIdentifier tokenIdentifier) {
|
||||
ContainerToken containerToken = recordFactory
|
||||
.newRecordInstance(ContainerToken.class);
|
||||
containerToken.setIdentifier(ByteBuffer.wrap(tokenIdentifier.getBytes()));
|
||||
containerToken.setKind(ContainerTokenIdentifier.KIND.toString());
|
||||
containerToken.setPassword(password);
|
||||
// RPC layer client expects ip:port as service for tokens
|
||||
InetSocketAddress addr = NetUtils.createSocketAddr(nodeId.getHost(),
|
||||
nodeId.getPort());
|
||||
containerToken.setService(addr.getAddress().getHostAddress() + ":"
|
||||
+ addr.getPort());
|
||||
return containerToken;
|
||||
}
|
||||
|
||||
public static ContainerLaunchContext newContainerLaunchContext(
|
||||
ContainerId containerID, String user, Resource assignedCapability,
|
||||
Map<String, LocalResource> localResources,
|
||||
Map<String, String> environment, List<String> commands,
|
||||
Map<String, ByteBuffer> serviceData, ByteBuffer containerTokens,
|
||||
Map<ApplicationAccessType, String> acls) {
|
||||
ContainerLaunchContext container = recordFactory
|
||||
.newRecordInstance(ContainerLaunchContext.class);
|
||||
container.setContainerId(containerID);
|
||||
container.setUser(user);
|
||||
container.setResource(assignedCapability);
|
||||
container.setLocalResources(localResources);
|
||||
container.setEnvironment(environment);
|
||||
container.setCommands(commands);
|
||||
container.setServiceData(serviceData);
|
||||
container.setContainerTokens(containerTokens);
|
||||
container.setApplicationACLs(acls);
|
||||
return container;
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,117 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.security;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authorize.AccessControlList;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public class ApplicationACLsManager {
|
||||
|
||||
private static final Log LOG = LogFactory
|
||||
.getLog(ApplicationACLsManager.class);
|
||||
|
||||
private final Configuration conf;
|
||||
private final AccessControlList adminAcl;
|
||||
private final ConcurrentMap<ApplicationId, Map<ApplicationAccessType, AccessControlList>> applicationACLS
|
||||
= new ConcurrentHashMap<ApplicationId, Map<ApplicationAccessType, AccessControlList>>();
|
||||
|
||||
public ApplicationACLsManager(Configuration conf) {
|
||||
this.conf = conf;
|
||||
this.adminAcl = new AccessControlList(conf.get(
|
||||
YarnConfiguration.YARN_ADMIN_ACL,
|
||||
YarnConfiguration.DEFAULT_YARN_ADMIN_ACL));
|
||||
}
|
||||
|
||||
public boolean areACLsEnabled() {
|
||||
return conf.getBoolean(YarnConfiguration.YARN_ACL_ENABLE,
|
||||
YarnConfiguration.DEFAULT_YARN_ACL_ENABLE);
|
||||
}
|
||||
|
||||
public void addApplication(ApplicationId appId,
|
||||
Map<ApplicationAccessType, String> acls) {
|
||||
Map<ApplicationAccessType, AccessControlList> finalMap
|
||||
= new HashMap<ApplicationAccessType, AccessControlList>(acls.size());
|
||||
for (Entry<ApplicationAccessType, String> acl : acls.entrySet()) {
|
||||
finalMap.put(acl.getKey(), new AccessControlList(acl.getValue()));
|
||||
}
|
||||
this.applicationACLS.put(appId, finalMap);
|
||||
}
|
||||
|
||||
public void removeApplication(ApplicationId appId) {
|
||||
this.applicationACLS.remove(appId);
|
||||
}
|
||||
|
||||
/**
|
||||
* If authorization is enabled, checks whether the user (in the callerUGI) is
|
||||
* authorized to perform the access specified by 'applicationAccessType' on
|
||||
* the application by checking if the user is applicationOwner or part of
|
||||
* application ACL for the specific access-type.
|
||||
* <ul>
|
||||
* <li>The owner of the application can have all access-types on the
|
||||
* application</li>
|
||||
* <li>For all other users/groups application-acls are checked</li>
|
||||
* </ul>
|
||||
*
|
||||
* @param callerUGI
|
||||
* @param applicationAccessType
|
||||
* @param applicationOwner
|
||||
* @param applicationId
|
||||
* @throws AccessControlException
|
||||
*/
|
||||
public boolean checkAccess(UserGroupInformation callerUGI,
|
||||
ApplicationAccessType applicationAccessType, String applicationOwner,
|
||||
ApplicationId applicationId) {
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Verifying access-type " + applicationAccessType + " for "
|
||||
+ callerUGI + " on application " + applicationId + " owned by "
|
||||
+ applicationOwner);
|
||||
}
|
||||
|
||||
String user = callerUGI.getShortUserName();
|
||||
if (!areACLsEnabled()) {
|
||||
return true;
|
||||
}
|
||||
|
||||
AccessControlList applicationACL = this.applicationACLS
|
||||
.get(applicationId).get(applicationAccessType);
|
||||
|
||||
// Allow application-owner for any type of access on the application
|
||||
if (this.adminAcl.isUserAllowed(callerUGI)
|
||||
|| user.equals(applicationOwner)
|
||||
|| applicationACL.isUserAllowed(callerUGI)) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
@ -83,14 +83,14 @@
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>Are RM acls enabled.</description>
|
||||
<name>yarn.resourcemanager.acl.enable</name>
|
||||
<description>Are acls enabled.</description>
|
||||
<name>yarn.acl.enable</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>ACL of who can be admin of RM.</description>
|
||||
<name>yarn.resourcemanager.admin.acl</name>
|
||||
<description>ACL of who can be admin of the YARN cluster.</description>
|
||||
<name>yarn.admin.acl</name>
|
||||
<value>*</value>
|
||||
</property>
|
||||
|
||||
|
@ -46,6 +46,7 @@
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.webapp.WebServer;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.service.CompositeService;
|
||||
import org.apache.hadoop.yarn.service.Service;
|
||||
@ -55,6 +56,7 @@ public class NodeManager extends CompositeService {
|
||||
private static final Log LOG = LogFactory.getLog(NodeManager.class);
|
||||
protected final NodeManagerMetrics metrics = NodeManagerMetrics.create();
|
||||
protected ContainerTokenSecretManager containerTokenSecretManager;
|
||||
private ApplicationACLsManager aclsManager;
|
||||
|
||||
public NodeManager() {
|
||||
super(NodeManager.class.getName());
|
||||
@ -74,14 +76,14 @@ protected NodeResourceMonitor createNodeResourceMonitor() {
|
||||
protected ContainerManagerImpl createContainerManager(Context context,
|
||||
ContainerExecutor exec, DeletionService del,
|
||||
NodeStatusUpdater nodeStatusUpdater, ContainerTokenSecretManager
|
||||
containerTokenSecretManager) {
|
||||
containerTokenSecretManager, ApplicationACLsManager aclsManager) {
|
||||
return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater,
|
||||
metrics, containerTokenSecretManager);
|
||||
metrics, containerTokenSecretManager, aclsManager);
|
||||
}
|
||||
|
||||
protected WebServer createWebServer(Context nmContext,
|
||||
ResourceView resourceView) {
|
||||
return new WebServer(nmContext, resourceView);
|
||||
ResourceView resourceView, ApplicationACLsManager aclsManager) {
|
||||
return new WebServer(nmContext, resourceView, aclsManager);
|
||||
}
|
||||
|
||||
protected void doSecureLogin() throws IOException {
|
||||
@ -101,6 +103,8 @@ public void init(Configuration conf) {
|
||||
this.containerTokenSecretManager = new ContainerTokenSecretManager();
|
||||
}
|
||||
|
||||
this.aclsManager = new ApplicationACLsManager(conf);
|
||||
|
||||
ContainerExecutor exec = ReflectionUtils.newInstance(
|
||||
conf.getClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
|
||||
DefaultContainerExecutor.class, ContainerExecutor.class), conf);
|
||||
@ -125,11 +129,11 @@ public void init(Configuration conf) {
|
||||
|
||||
ContainerManagerImpl containerManager =
|
||||
createContainerManager(context, exec, del, nodeStatusUpdater,
|
||||
this.containerTokenSecretManager);
|
||||
this.containerTokenSecretManager, this.aclsManager);
|
||||
addService(containerManager);
|
||||
|
||||
Service webServer =
|
||||
createWebServer(context, containerManager.getContainersMonitor());
|
||||
Service webServer = createWebServer(context, containerManager
|
||||
.getContainersMonitor(), this.aclsManager);
|
||||
addService(webServer);
|
||||
|
||||
dispatcher.register(ContainerManagerEventType.class, containerManager);
|
||||
|
@ -34,7 +34,6 @@
|
||||
import org.apache.hadoop.io.DataInputByteBuffer;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.SecurityInfo;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
@ -58,7 +57,6 @@
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.ipc.RPCUtil;
|
||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||
import org.apache.hadoop.yarn.security.ContainerManagerSecurityInfo;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedContainersEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
|
||||
@ -89,6 +87,7 @@
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.service.CompositeService;
|
||||
import org.apache.hadoop.yarn.service.Service;
|
||||
@ -114,13 +113,14 @@ public class ContainerManagerImpl extends CompositeService implements
|
||||
private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
|
||||
|
||||
protected final AsyncDispatcher dispatcher;
|
||||
private final ApplicationACLsManager aclsManager;
|
||||
|
||||
private final DeletionService deletionService;
|
||||
|
||||
public ContainerManagerImpl(Context context, ContainerExecutor exec,
|
||||
DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater,
|
||||
NodeManagerMetrics metrics, ContainerTokenSecretManager
|
||||
containerTokenSecretManager) {
|
||||
containerTokenSecretManager, ApplicationACLsManager aclsManager) {
|
||||
super(ContainerManagerImpl.class.getName());
|
||||
this.context = context;
|
||||
dispatcher = new AsyncDispatcher();
|
||||
@ -136,6 +136,7 @@ public ContainerManagerImpl(Context context, ContainerExecutor exec,
|
||||
|
||||
this.nodeStatusUpdater = nodeStatusUpdater;
|
||||
this.containerTokenSecretManager = containerTokenSecretManager;
|
||||
this.aclsManager = aclsManager;
|
||||
|
||||
// Start configurable services
|
||||
auxiliaryServices = new AuxServices();
|
||||
@ -271,13 +272,14 @@ public StartContainerResponse startContainer(StartContainerRequest request)
|
||||
|
||||
// Create the application
|
||||
Application application = new ApplicationImpl(dispatcher,
|
||||
launchContext.getUser(), applicationID, credentials);
|
||||
this.aclsManager, launchContext.getUser(), applicationID, credentials);
|
||||
if (null ==
|
||||
context.getApplications().putIfAbsent(applicationID, application)) {
|
||||
LOG.info("Creating a new application reference for app "
|
||||
+ applicationID);
|
||||
dispatcher.getEventHandler().handle(
|
||||
new ApplicationInitEvent(applicationID));
|
||||
new ApplicationInitEvent(applicationID, container
|
||||
.getLaunchContext().getApplicationACLs()));
|
||||
}
|
||||
|
||||
// TODO: Validate the request
|
||||
|
@ -21,6 +21,9 @@
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
@ -39,6 +42,7 @@
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.ContainerLogsRetentionPolicy;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorAppFinishedEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorAppStartedEvent;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
|
||||
import org.apache.hadoop.yarn.state.MultipleArcTransition;
|
||||
import org.apache.hadoop.yarn.state.SingleArcTransition;
|
||||
@ -56,18 +60,26 @@ public class ApplicationImpl implements Application {
|
||||
final String user;
|
||||
final ApplicationId appId;
|
||||
final Credentials credentials;
|
||||
final ApplicationACLsManager aclsManager;
|
||||
private final ReadLock readLock;
|
||||
private final WriteLock writeLock;
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(Application.class);
|
||||
|
||||
Map<ContainerId, Container> containers =
|
||||
new HashMap<ContainerId, Container>();
|
||||
|
||||
public ApplicationImpl(Dispatcher dispatcher, String user,
|
||||
ApplicationId appId, Credentials credentials) {
|
||||
public ApplicationImpl(Dispatcher dispatcher,
|
||||
ApplicationACLsManager aclsManager, String user, ApplicationId appId,
|
||||
Credentials credentials) {
|
||||
this.dispatcher = dispatcher;
|
||||
this.user = user.toString();
|
||||
this.appId = appId;
|
||||
this.credentials = credentials;
|
||||
this.aclsManager = aclsManager;
|
||||
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
|
||||
readLock = lock.readLock();
|
||||
writeLock = lock.writeLock();
|
||||
stateMachine = stateMachineFactory.make(this);
|
||||
}
|
||||
|
||||
@ -82,15 +94,23 @@ public ApplicationId getAppId() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized ApplicationState getApplicationState() {
|
||||
// TODO: Synchro should be at statemachine level.
|
||||
// This is only for tests?
|
||||
return this.stateMachine.getCurrentState();
|
||||
public ApplicationState getApplicationState() {
|
||||
this.readLock.lock();
|
||||
try {
|
||||
return this.stateMachine.getCurrentState();
|
||||
} finally {
|
||||
this.readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<ContainerId, Container> getContainers() {
|
||||
return this.containers;
|
||||
this.readLock.lock();
|
||||
try {
|
||||
return this.containers;
|
||||
} finally {
|
||||
this.readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
private static final ContainerDoneTransition CONTAINER_DONE_TRANSITION =
|
||||
@ -170,6 +190,9 @@ static class AppInitTransition implements
|
||||
SingleArcTransition<ApplicationImpl, ApplicationEvent> {
|
||||
@Override
|
||||
public void transition(ApplicationImpl app, ApplicationEvent event) {
|
||||
ApplicationInitEvent initEvent = (ApplicationInitEvent)event;
|
||||
app.aclsManager.addApplication(app.getAppId(), initEvent
|
||||
.getApplicationACLs());
|
||||
app.dispatcher.getEventHandler().handle(
|
||||
new ApplicationLocalizationEvent(
|
||||
LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
|
||||
@ -315,29 +338,40 @@ static class AppCompletelyDoneTransition implements
|
||||
SingleArcTransition<ApplicationImpl, ApplicationEvent> {
|
||||
@Override
|
||||
public void transition(ApplicationImpl app, ApplicationEvent event) {
|
||||
|
||||
app.aclsManager.removeApplication(app.getAppId());
|
||||
|
||||
// Inform the logService
|
||||
app.dispatcher.getEventHandler().handle(
|
||||
new LogAggregatorAppFinishedEvent(app.appId));
|
||||
|
||||
// TODO: Also make logService write the acls to the aggregated file.
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void handle(ApplicationEvent event) {
|
||||
public void handle(ApplicationEvent event) {
|
||||
|
||||
ApplicationId applicationID = event.getApplicationID();
|
||||
LOG.info("Processing " + applicationID + " of type " + event.getType());
|
||||
this.writeLock.lock();
|
||||
|
||||
ApplicationState oldState = stateMachine.getCurrentState();
|
||||
ApplicationState newState = null;
|
||||
try {
|
||||
// queue event requesting init of the same app
|
||||
newState = stateMachine.doTransition(event.getType(), event);
|
||||
} catch (InvalidStateTransitonException e) {
|
||||
LOG.warn("Can't handle this event at current state", e);
|
||||
}
|
||||
if (oldState != newState) {
|
||||
LOG.info("Application " + applicationID + " transitioned from "
|
||||
+ oldState + " to " + newState);
|
||||
ApplicationId applicationID = event.getApplicationID();
|
||||
LOG.info("Processing " + applicationID + " of type " + event.getType());
|
||||
|
||||
ApplicationState oldState = stateMachine.getCurrentState();
|
||||
ApplicationState newState = null;
|
||||
try {
|
||||
// queue event requesting init of the same app
|
||||
newState = stateMachine.doTransition(event.getType(), event);
|
||||
} catch (InvalidStateTransitonException e) {
|
||||
LOG.warn("Can't handle this event at current state", e);
|
||||
}
|
||||
if (oldState != newState) {
|
||||
LOG.info("Application " + applicationID + " transitioned from "
|
||||
+ oldState + " to " + newState);
|
||||
}
|
||||
} finally {
|
||||
this.writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -18,11 +18,22 @@
|
||||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.application;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
|
||||
public class ApplicationInitEvent extends ApplicationEvent {
|
||||
|
||||
public ApplicationInitEvent(ApplicationId appId) {
|
||||
private final Map<ApplicationAccessType, String> applicationACLs;
|
||||
|
||||
public ApplicationInitEvent(ApplicationId appId,
|
||||
Map<ApplicationAccessType, String> acls) {
|
||||
super(appId, ApplicationEventType.INIT_APPLICATION);
|
||||
this.applicationACLs = acls;
|
||||
}
|
||||
|
||||
public Map<ApplicationAccessType, String> getApplicationACLs() {
|
||||
return this.applicationACLs;
|
||||
}
|
||||
}
|
||||
|
@ -28,12 +28,17 @@
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.LocalDirAllocator;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
import org.apache.hadoop.yarn.webapp.SubView;
|
||||
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
|
||||
@ -54,12 +59,15 @@ public static class ContainersLogsBlock extends HtmlBlock implements
|
||||
private final Configuration conf;
|
||||
private final LocalDirAllocator logsSelector;
|
||||
private final Context nmContext;
|
||||
private final ApplicationACLsManager aclsManager;
|
||||
|
||||
@Inject
|
||||
public ContainersLogsBlock(Configuration conf, Context context) {
|
||||
public ContainersLogsBlock(Configuration conf, Context context,
|
||||
ApplicationACLsManager aclsManager) {
|
||||
this.conf = conf;
|
||||
this.logsSelector = new LocalDirAllocator(YarnConfiguration.NM_LOG_DIRS);
|
||||
this.nmContext = context;
|
||||
this.aclsManager = aclsManager;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -74,23 +82,49 @@ protected void render(Block html) {
|
||||
return;
|
||||
}
|
||||
|
||||
ApplicationId applicationId = containerId.getApplicationAttemptId()
|
||||
.getApplicationId();
|
||||
Application application = this.nmContext.getApplications().get(
|
||||
applicationId);
|
||||
Container container = this.nmContext.getContainers().get(containerId);
|
||||
|
||||
if (container == null) {
|
||||
if (application == null || container == null) {
|
||||
div.h1(
|
||||
"Unknown container. Container is either not yet running or "
|
||||
+ "has already completed or "
|
||||
+ "doesn't belong to this node at all.")._();
|
||||
} else if (EnumSet.of(ContainerState.NEW, ContainerState.LOCALIZING,
|
||||
return;
|
||||
}
|
||||
|
||||
if (EnumSet.of(ContainerState.NEW, ContainerState.LOCALIZING,
|
||||
ContainerState.LOCALIZING).contains(container.getContainerState())) {
|
||||
div.h1("Container is not yet running. Current state is "
|
||||
+ container.getContainerState())
|
||||
._();
|
||||
} else if (EnumSet.of(ContainerState.RUNNING,
|
||||
return;
|
||||
}
|
||||
|
||||
if (EnumSet.of(ContainerState.RUNNING,
|
||||
ContainerState.EXITED_WITH_FAILURE,
|
||||
ContainerState.EXITED_WITH_SUCCESS).contains(
|
||||
container.getContainerState())) {
|
||||
|
||||
// Check for the authorization.
|
||||
String remoteUser = request().getRemoteUser();
|
||||
UserGroupInformation callerUGI = null;
|
||||
if (remoteUser != null) {
|
||||
callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
|
||||
}
|
||||
if (callerUGI != null && !this.aclsManager.checkAccess(callerUGI,
|
||||
ApplicationAccessType.VIEW_APP, application.getUser(),
|
||||
applicationId)) {
|
||||
div.h1(
|
||||
"You (User " + remoteUser
|
||||
+ ") are not authorized to view the logs for application "
|
||||
+ applicationId)._();
|
||||
return;
|
||||
}
|
||||
|
||||
if (!$(CONTAINER_LOG_TYPE).isEmpty()) {
|
||||
File logFile = null;
|
||||
try {
|
||||
@ -98,7 +132,7 @@ protected void render(Block html) {
|
||||
new File(this.logsSelector
|
||||
.getLocalPathToRead(
|
||||
ConverterUtils.toString(
|
||||
containerId.getApplicationAttemptId().getApplicationId())
|
||||
applicationId)
|
||||
+ Path.SEPARATOR + $(CONTAINER_ID)
|
||||
+ Path.SEPARATOR
|
||||
+ $(CONTAINER_LOG_TYPE), this.conf).toUri()
|
||||
@ -175,8 +209,8 @@ protected void render(Block html) {
|
||||
|
||||
static List<File>
|
||||
getContainerLogDirs(Configuration conf, ContainerId containerId) {
|
||||
String[] logDirs =
|
||||
conf.getStrings(YarnConfiguration.NM_LOG_DIRS, YarnConfiguration.DEFAULT_NM_LOG_DIRS);
|
||||
String[] logDirs = conf.getStrings(YarnConfiguration.NM_LOG_DIRS,
|
||||
YarnConfiguration.DEFAULT_NM_LOG_DIRS);
|
||||
List<File> containerLogDirs = new ArrayList<File>(logDirs.length);
|
||||
for (String logDir : logDirs) {
|
||||
String appIdStr =
|
||||
|
@ -27,6 +27,7 @@
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.ResourceView;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
import org.apache.hadoop.yarn.service.AbstractService;
|
||||
import org.apache.hadoop.yarn.webapp.WebApp;
|
||||
import org.apache.hadoop.yarn.webapp.WebApps;
|
||||
@ -36,13 +37,14 @@ public class WebServer extends AbstractService {
|
||||
private static final Log LOG = LogFactory.getLog(WebServer.class);
|
||||
|
||||
private final Context nmContext;
|
||||
private final ResourceView resourceView;
|
||||
private final NMWebApp nmWebApp;
|
||||
private WebApp webApp;
|
||||
|
||||
public WebServer(Context nmContext, ResourceView resourceView) {
|
||||
public WebServer(Context nmContext, ResourceView resourceView,
|
||||
ApplicationACLsManager aclsManager) {
|
||||
super(WebServer.class.getName());
|
||||
this.nmContext = nmContext;
|
||||
this.resourceView = resourceView;
|
||||
this.nmWebApp = new NMWebApp(resourceView, aclsManager);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -56,10 +58,8 @@ public synchronized void start() {
|
||||
YarnConfiguration.DEFAULT_NM_WEBAPP_ADDRESS);
|
||||
LOG.info("Instantiating NMWebApp at " + bindAddress);
|
||||
try {
|
||||
this.webApp =
|
||||
WebApps.$for("node", Context.class, this.nmContext)
|
||||
.at(bindAddress).with(getConfig())
|
||||
.start(new NMWebApp(this.resourceView));
|
||||
this.webApp = WebApps.$for("node", Context.class, this.nmContext).at(
|
||||
bindAddress).with(getConfig()).start(this.nmWebApp);
|
||||
} catch (Exception e) {
|
||||
String msg = "NMWebapps failed to start.";
|
||||
LOG.error(msg, e);
|
||||
@ -79,14 +79,18 @@ public synchronized void stop() {
|
||||
public static class NMWebApp extends WebApp implements NMWebParams {
|
||||
|
||||
private final ResourceView resourceView;
|
||||
private final ApplicationACLsManager aclsManager;
|
||||
|
||||
public NMWebApp(ResourceView resourceView) {
|
||||
public NMWebApp(ResourceView resourceView,
|
||||
ApplicationACLsManager aclsManager) {
|
||||
this.resourceView = resourceView;
|
||||
this.aclsManager = aclsManager;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setup() {
|
||||
bind(ResourceView.class).toInstance(this.resourceView);
|
||||
bind(ApplicationACLsManager.class).toInstance(this.aclsManager);
|
||||
route("/", NMController.class, "info");
|
||||
route("/node", NMController.class, "node");
|
||||
route("/allApplications", NMController.class, "allApplications");
|
||||
|
@ -28,6 +28,7 @@
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
||||
@ -60,8 +61,11 @@ public class DummyContainerManager extends ContainerManagerImpl {
|
||||
|
||||
public DummyContainerManager(Context context, ContainerExecutor exec,
|
||||
DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater,
|
||||
NodeManagerMetrics metrics, ContainerTokenSecretManager containerTokenSecretManager) {
|
||||
super(context, exec, deletionContext, nodeStatusUpdater, metrics, containerTokenSecretManager);
|
||||
NodeManagerMetrics metrics,
|
||||
ContainerTokenSecretManager containerTokenSecretManager,
|
||||
ApplicationACLsManager applicationACLsManager) {
|
||||
super(context, exec, deletionContext, nodeStatusUpdater, metrics,
|
||||
containerTokenSecretManager, applicationACLsManager);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -37,6 +37,7 @@
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.api.ResourceTracker;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
|
||||
@ -97,9 +98,9 @@ protected void startStatusUpdater() {
|
||||
}
|
||||
};
|
||||
|
||||
DummyContainerManager containerManager =
|
||||
new DummyContainerManager(context, exec, del, nodeStatusUpdater,
|
||||
metrics, containerTokenSecretManager);
|
||||
DummyContainerManager containerManager = new DummyContainerManager(
|
||||
context, exec, del, nodeStatusUpdater, metrics,
|
||||
containerTokenSecretManager, new ApplicationACLsManager(conf));
|
||||
containerManager.init(conf);
|
||||
containerManager.start();
|
||||
|
||||
|
@ -58,6 +58,7 @@
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.service.Service;
|
||||
import org.apache.hadoop.yarn.service.Service.STATE;
|
||||
@ -324,9 +325,11 @@ protected NodeStatusUpdater createNodeStatusUpdater(Context context,
|
||||
protected ContainerManagerImpl createContainerManager(Context context,
|
||||
ContainerExecutor exec, DeletionService del,
|
||||
NodeStatusUpdater nodeStatusUpdater,
|
||||
ContainerTokenSecretManager containerTokenSecretManager) {
|
||||
return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater,
|
||||
metrics, containerTokenSecretManager) {
|
||||
ContainerTokenSecretManager containerTokenSecretManager,
|
||||
ApplicationACLsManager aclsManager) {
|
||||
return new ContainerManagerImpl(context, exec, del,
|
||||
nodeStatusUpdater, metrics, containerTokenSecretManager,
|
||||
aclsManager) {
|
||||
@Override
|
||||
public void start() {
|
||||
// Simulating failure of starting RPC server
|
||||
|
@ -52,6 +52,7 @@
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.service.Service.STATE;
|
||||
import org.junit.After;
|
||||
@ -146,9 +147,9 @@ public void delete(String user, Path subDir, Path[] baseDirs) {
|
||||
delSrvc.init(conf);
|
||||
|
||||
exec = createContainerExecutor();
|
||||
containerManager =
|
||||
new ContainerManagerImpl(context, exec, delSrvc, nodeStatusUpdater,
|
||||
metrics, this.containerTokenSecretManager);
|
||||
containerManager = new ContainerManagerImpl(context, exec, delSrvc,
|
||||
nodeStatusUpdater, metrics, this.containerTokenSecretManager,
|
||||
new ApplicationACLsManager(conf));
|
||||
containerManager.init(conf);
|
||||
}
|
||||
|
||||
|
@ -57,6 +57,7 @@
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
import org.junit.Test;
|
||||
@ -385,7 +386,8 @@ public void testLocalFilesCleanup() throws InterruptedException,
|
||||
ContainerTokenSecretManager containerTokenSecretManager = new
|
||||
ContainerTokenSecretManager();
|
||||
containerManager = new ContainerManagerImpl(context, exec, delSrvc,
|
||||
nodeStatusUpdater, metrics, containerTokenSecretManager);
|
||||
nodeStatusUpdater, metrics, containerTokenSecretManager,
|
||||
new ApplicationACLsManager(conf));
|
||||
containerManager.init(conf);
|
||||
containerManager.start();
|
||||
|
||||
|
@ -1,14 +1,23 @@
|
||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.application;
|
||||
|
||||
import static org.mockito.Mockito.*;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.mockito.Matchers.argThat;
|
||||
import static org.mockito.Matchers.refEq;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.reset;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
|
||||
@ -27,6 +36,7 @@
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorEventType;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
import org.junit.Test;
|
||||
import org.mockito.ArgumentMatcher;
|
||||
@ -366,7 +376,8 @@ private class WrappedApplication {
|
||||
this.user = user;
|
||||
this.appId = BuilderUtils.newApplicationId(timestamp, id);
|
||||
|
||||
app = new ApplicationImpl(dispatcher, this.user, appId, null);
|
||||
app = new ApplicationImpl(dispatcher, new ApplicationACLsManager(
|
||||
new Configuration()), this.user, appId, null);
|
||||
containers = new ArrayList<Container>();
|
||||
for (int i = 0; i < numContainers; i++) {
|
||||
containers.add(createMockedContainer(this.appId, i));
|
||||
@ -384,10 +395,10 @@ public void finished() {
|
||||
}
|
||||
|
||||
public void initApplication() {
|
||||
app.handle(new ApplicationInitEvent(appId));
|
||||
app.handle(new ApplicationInitEvent(appId,
|
||||
new HashMap<ApplicationAccessType, String>()));
|
||||
}
|
||||
|
||||
|
||||
public void initContainer(int containerNum) {
|
||||
if (containerNum == -1) {
|
||||
for (int i = 0; i < containers.size(); i++) {
|
||||
@ -429,6 +440,10 @@ private Container createMockedContainer(ApplicationId appId, int containerId) {
|
||||
ContainerId cId = BuilderUtils.newContainerId(appAttemptId, containerId);
|
||||
Container c = mock(Container.class);
|
||||
when(c.getContainerID()).thenReturn(cId);
|
||||
ContainerLaunchContext launchContext = mock(ContainerLaunchContext.class);
|
||||
when(c.getLaunchContext()).thenReturn(launchContext);
|
||||
when(launchContext.getApplicationACLs()).thenReturn(
|
||||
new HashMap<ApplicationAccessType, String>());
|
||||
return c;
|
||||
}
|
||||
}
|
||||
|
@ -18,6 +18,9 @@
|
||||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.webapp;
|
||||
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileWriter;
|
||||
import java.io.IOException;
|
||||
@ -41,11 +44,11 @@
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
public class TestNMWebServer {
|
||||
|
||||
@ -58,7 +61,7 @@ public void setup() {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNMWebApp() throws InterruptedException, IOException {
|
||||
public void testNMWebApp() throws IOException {
|
||||
Context nmContext = new NodeManager.NMContext();
|
||||
ResourceView resourceView = new ResourceView() {
|
||||
@Override
|
||||
@ -70,8 +73,9 @@ public long getPmemAllocatedForContainers() {
|
||||
return 0;
|
||||
}
|
||||
};
|
||||
WebServer server = new WebServer(nmContext, resourceView);
|
||||
Configuration conf = new Configuration();
|
||||
WebServer server = new WebServer(nmContext, resourceView,
|
||||
new ApplicationACLsManager(conf));
|
||||
conf.set(YarnConfiguration.NM_LOCAL_DIRS, testRootDir.getAbsolutePath());
|
||||
server.init(conf);
|
||||
server.start();
|
||||
@ -88,9 +92,8 @@ public long getPmemAllocatedForContainers() {
|
||||
when(app.getUser()).thenReturn(user);
|
||||
when(app.getAppId()).thenReturn(appId);
|
||||
nmContext.getApplications().put(appId, app);
|
||||
ApplicationAttemptId appAttemptId = recordFactory.newRecordInstance(ApplicationAttemptId.class);
|
||||
appAttemptId.setApplicationId(appId);
|
||||
appAttemptId.setAttemptId(1);
|
||||
ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
|
||||
appId, 1);
|
||||
ContainerId container1 =
|
||||
BuilderUtils.newContainerId(recordFactory, appId, appAttemptId, 0);
|
||||
ContainerId container2 =
|
||||
|
@ -28,7 +28,6 @@
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.security.Groups;
|
||||
import org.apache.hadoop.security.SecurityInfo;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authorize.AccessControlList;
|
||||
import org.apache.hadoop.security.authorize.ProxyUsers;
|
||||
@ -38,7 +37,7 @@
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.ipc.RPCUtil;
|
||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||
import org.apache.hadoop.yarn.security.SchedulerSecurityInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.api.RMAdminProtocol;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.api.protocolrecords.RefreshAdminAclsRequest;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.api.protocolrecords.RefreshAdminAclsResponse;
|
||||
@ -51,7 +50,6 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.api.protocolrecords.RefreshUserToGroupsMappingsRequest;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
|
||||
import org.apache.hadoop.yarn.service.AbstractService;
|
||||
|
||||
public class AdminService extends AbstractService implements RMAdminProtocol {
|
||||
@ -86,9 +84,9 @@ public void init(Configuration conf) {
|
||||
conf.get(YarnConfiguration.RM_ADMIN_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS);
|
||||
masterServiceAddress = NetUtils.createSocketAddr(bindAddress);
|
||||
adminAcl =
|
||||
new AccessControlList(
|
||||
conf.get(YarnConfiguration.RM_ADMIN_ACL, YarnConfiguration.DEFAULT_RM_ADMIN_ACL));
|
||||
adminAcl = new AccessControlList(conf.get(
|
||||
YarnConfiguration.YARN_ADMIN_ACL,
|
||||
YarnConfiguration.DEFAULT_YARN_ADMIN_ACL));
|
||||
}
|
||||
|
||||
public void start() {
|
||||
@ -214,9 +212,9 @@ public RefreshAdminAclsResponse refreshAdminAcls(
|
||||
UserGroupInformation user = checkAcls("refreshAdminAcls");
|
||||
|
||||
Configuration conf = new Configuration();
|
||||
adminAcl =
|
||||
new AccessControlList(
|
||||
conf.get(YarnConfiguration.RM_ADMIN_ACL, YarnConfiguration.DEFAULT_RM_ADMIN_ACL));
|
||||
adminAcl = new AccessControlList(conf.get(
|
||||
YarnConfiguration.YARN_ADMIN_ACL,
|
||||
YarnConfiguration.DEFAULT_YARN_ADMIN_ACL));
|
||||
RMAuditLogger.logSuccess(user.getShortUserName(), "refreshAdminAcls",
|
||||
"AdminService");
|
||||
|
||||
|
@ -1,56 +0,0 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||
|
||||
import org.apache.hadoop.classification.*;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
|
||||
/**
|
||||
* Application related ACLs
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public enum ApplicationACL {
|
||||
|
||||
/**
|
||||
* ACL for 'viewing' application. Dictates who can 'view' some or all of the application
|
||||
* related details.
|
||||
*/
|
||||
VIEW_APP(YarnConfiguration.APPLICATION_ACL_VIEW_APP),
|
||||
|
||||
/**
|
||||
* ACL for 'modifying' application. Dictates who can 'modify' the application for e.g., by
|
||||
* killing the application
|
||||
*/
|
||||
MODIFY_APP(YarnConfiguration.APPLICATION_ACL_MODIFY_APP);
|
||||
|
||||
String aclName;
|
||||
|
||||
ApplicationACL(String name) {
|
||||
this.aclName = name;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the name of the ACL. Here it is same as the name of the configuration
|
||||
* property for specifying the ACL for the application.
|
||||
*
|
||||
* @return aclName
|
||||
*/
|
||||
public String getAclName() {
|
||||
return aclName;
|
||||
}
|
||||
}
|
@ -1,107 +0,0 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authorize.AccessControlList;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public class ApplicationACLsManager {
|
||||
|
||||
Configuration conf;
|
||||
|
||||
public ApplicationACLsManager(Configuration conf) {
|
||||
this.conf = conf;
|
||||
}
|
||||
|
||||
public boolean areACLsEnabled() {
|
||||
return conf.getBoolean(YarnConfiguration.RM_ACL_ENABLE,
|
||||
YarnConfiguration.DEFAULT_RM_ACL_ENABLE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Construct the ApplicationACLs from the configuration so that they can be kept in
|
||||
* the memory. If authorization is disabled on the RM, nothing is constructed
|
||||
* and an empty map is returned.
|
||||
*
|
||||
* @return ApplicationACL to AccessControlList map.
|
||||
*/
|
||||
public Map<ApplicationACL, AccessControlList> constructApplicationACLs(
|
||||
Configuration conf) {
|
||||
|
||||
Map<ApplicationACL, AccessControlList> acls =
|
||||
new HashMap<ApplicationACL, AccessControlList>();
|
||||
|
||||
// Don't construct anything if authorization is disabled.
|
||||
if (!areACLsEnabled()) {
|
||||
return acls;
|
||||
}
|
||||
|
||||
for (ApplicationACL aclName : ApplicationACL.values()) {
|
||||
String aclConfigName = aclName.getAclName();
|
||||
String aclConfigured = conf.get(aclConfigName);
|
||||
if (aclConfigured == null) {
|
||||
// If ACLs are not configured at all, we grant no access to anyone. So
|
||||
// applicationOwner and superuser/supergroup _only_ can do 'stuff'
|
||||
aclConfigured = " ";
|
||||
}
|
||||
acls.put(aclName, new AccessControlList(aclConfigured));
|
||||
}
|
||||
return acls;
|
||||
}
|
||||
|
||||
/**
|
||||
* If authorization is enabled, checks whether the user (in the callerUGI)
|
||||
* is authorized to perform the operation specified by 'applicationOperation' on
|
||||
* the application by checking if the user is applicationOwner or part of application ACL for the
|
||||
* specific application operation.
|
||||
* <ul>
|
||||
* <li>The owner of the application can do any operation on the application</li>
|
||||
* <li>For all other users/groups application-acls are checked</li>
|
||||
* </ul>
|
||||
* @param callerUGI
|
||||
* @param applicationOperation
|
||||
* @param applicationOwner
|
||||
* @param acl
|
||||
* @throws AccessControlException
|
||||
*/
|
||||
public boolean checkAccess(UserGroupInformation callerUGI,
|
||||
ApplicationACL applicationOperation, String applicationOwner,
|
||||
AccessControlList acl) {
|
||||
|
||||
String user = callerUGI.getShortUserName();
|
||||
if (!areACLsEnabled()) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// Allow application-owner for any operation on the application
|
||||
if (user.equals(applicationOwner)
|
||||
|| acl.isUserAllowed(callerUGI)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
}
|
@ -24,7 +24,6 @@
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.avro.ipc.Server;
|
||||
@ -32,12 +31,8 @@
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.SecurityInfo;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authorize.AccessControlList;
|
||||
import org.apache.hadoop.yarn.api.ClientRMProtocol;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
|
||||
@ -52,9 +47,12 @@
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||
@ -66,13 +64,13 @@
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.ipc.RPCUtil;
|
||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||
import org.apache.hadoop.yarn.security.client.ClientRMSecurityInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
import org.apache.hadoop.yarn.service.AbstractService;
|
||||
|
||||
|
||||
@ -96,15 +94,15 @@ public class ClientRMService extends AbstractService implements
|
||||
private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
|
||||
InetSocketAddress clientBindAddress;
|
||||
|
||||
private ApplicationACLsManager aclsManager;
|
||||
private Map<ApplicationACL, AccessControlList> applicationACLs;
|
||||
private final ApplicationACLsManager applicationsACLsManager;
|
||||
|
||||
public ClientRMService(RMContext rmContext, YarnScheduler scheduler,
|
||||
RMAppManager rmAppManager) {
|
||||
RMAppManager rmAppManager, ApplicationACLsManager applicationACLsManager) {
|
||||
super(ClientRMService.class.getName());
|
||||
this.scheduler = scheduler;
|
||||
this.rmContext = rmContext;
|
||||
this.rmAppManager = rmAppManager;
|
||||
this.applicationsACLsManager = applicationACLsManager;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -114,10 +112,6 @@ public void init(Configuration conf) {
|
||||
YarnConfiguration.DEFAULT_RM_ADDRESS);
|
||||
clientBindAddress =
|
||||
NetUtils.createSocketAddr(clientServiceBindAddress);
|
||||
|
||||
this.aclsManager = new ApplicationACLsManager(conf);
|
||||
this.applicationACLs = aclsManager.constructApplicationACLs(conf);
|
||||
|
||||
super.init(conf);
|
||||
}
|
||||
|
||||
@ -139,21 +133,19 @@ public void start() {
|
||||
|
||||
/**
|
||||
* check if the calling user has the access to application information.
|
||||
* @param appAttemptId
|
||||
* @param callerUGI
|
||||
* @param owner
|
||||
* @param appACL
|
||||
* @param operationPerformed
|
||||
* @param applicationId
|
||||
* @return
|
||||
*/
|
||||
private boolean checkAccess(UserGroupInformation callerUGI, String owner, ApplicationACL appACL) {
|
||||
if (!UserGroupInformation.isSecurityEnabled()) {
|
||||
return true;
|
||||
}
|
||||
AccessControlList applicationACL = applicationACLs.get(appACL);
|
||||
return aclsManager.checkAccess(callerUGI, appACL, owner, applicationACL);
|
||||
private boolean checkAccess(UserGroupInformation callerUGI, String owner,
|
||||
ApplicationAccessType operationPerformed, ApplicationId applicationId) {
|
||||
return applicationsACLsManager.checkAccess(callerUGI, operationPerformed,
|
||||
owner, applicationId);
|
||||
}
|
||||
|
||||
public ApplicationId getNewApplicationId() {
|
||||
ApplicationId getNewApplicationId() {
|
||||
ApplicationId applicationId = org.apache.hadoop.yarn.util.BuilderUtils
|
||||
.newApplicationId(recordFactory, ResourceManager.clusterTimeStamp,
|
||||
applicationCounter.incrementAndGet());
|
||||
@ -180,9 +172,29 @@ public GetNewApplicationResponse getNewApplication(
|
||||
public GetApplicationReportResponse getApplicationReport(
|
||||
GetApplicationReportRequest request) throws YarnRemoteException {
|
||||
ApplicationId applicationId = request.getApplicationId();
|
||||
RMApp application = rmContext.getRMApps().get(applicationId);
|
||||
ApplicationReport report = (application == null) ? null : application
|
||||
.createAndGetApplicationReport();
|
||||
|
||||
UserGroupInformation callerUGI;
|
||||
try {
|
||||
callerUGI = UserGroupInformation.getCurrentUser();
|
||||
} catch (IOException ie) {
|
||||
LOG.info("Error getting UGI ", ie);
|
||||
throw RPCUtil.getRemoteException(ie);
|
||||
}
|
||||
|
||||
RMApp application = this.rmContext.getRMApps().get(applicationId);
|
||||
if (application == null) {
|
||||
throw RPCUtil.getRemoteException("Trying to get information for an "
|
||||
+ "absent application " + applicationId);
|
||||
}
|
||||
|
||||
if (!checkAccess(callerUGI, application.getUser(),
|
||||
ApplicationAccessType.VIEW_APP, applicationId)) {
|
||||
throw RPCUtil.getRemoteException(new AccessControlException("User "
|
||||
+ callerUGI.getShortUserName() + " cannot perform operation "
|
||||
+ ApplicationAccessType.VIEW_APP.name() + " on " + applicationId));
|
||||
}
|
||||
|
||||
ApplicationReport report = application.createAndGetApplicationReport();
|
||||
|
||||
GetApplicationReportResponse response = recordFactory
|
||||
.newRecordInstance(GetApplicationReportResponse.class);
|
||||
@ -203,7 +215,7 @@ public SubmitApplicationResponse submitApplication(
|
||||
throw new IOException("Application with id " + applicationId
|
||||
+ " is already present! Cannot add a duplicate!");
|
||||
}
|
||||
|
||||
|
||||
// Safety
|
||||
submissionContext.setUser(user);
|
||||
|
||||
@ -249,16 +261,25 @@ public KillApplicationResponse forceKillApplication(
|
||||
}
|
||||
|
||||
RMApp application = this.rmContext.getRMApps().get(applicationId);
|
||||
// TODO: What if null
|
||||
if (application == null) {
|
||||
RMAuditLogger.logFailure(callerUGI.getUserName(),
|
||||
AuditConstants.KILL_APP_REQUEST, "UNKNOWN", "ClientRMService",
|
||||
"Trying to kill an absent application", applicationId);
|
||||
throw RPCUtil
|
||||
.getRemoteException("Trying to kill an absent application "
|
||||
+ applicationId);
|
||||
}
|
||||
|
||||
if (!checkAccess(callerUGI, application.getUser(),
|
||||
ApplicationACL.MODIFY_APP)) {
|
||||
RMAuditLogger.logFailure(callerUGI.getShortUserName(),
|
||||
AuditConstants.KILL_APP_REQUEST,
|
||||
"User doesn't have MODIFY_APP permissions", "ClientRMService",
|
||||
ApplicationAccessType.MODIFY_APP, applicationId)) {
|
||||
RMAuditLogger.logFailure(callerUGI.getShortUserName(),
|
||||
AuditConstants.KILL_APP_REQUEST,
|
||||
"User doesn't have permissions to "
|
||||
+ ApplicationAccessType.MODIFY_APP.toString(), "ClientRMService",
|
||||
AuditConstants.UNAUTHORIZED_USER, applicationId);
|
||||
throw RPCUtil.getRemoteException(new AccessControlException("User "
|
||||
+ callerUGI.getShortUserName() + " cannot perform operation "
|
||||
+ ApplicationACL.MODIFY_APP.name() + " on " + applicationId));
|
||||
+ ApplicationAccessType.MODIFY_APP.name() + " on " + applicationId));
|
||||
}
|
||||
|
||||
this.rmContext.getDispatcher().getEventHandler().handle(
|
||||
@ -287,9 +308,24 @@ public GetClusterMetricsResponse getClusterMetrics(
|
||||
public GetAllApplicationsResponse getAllApplications(
|
||||
GetAllApplicationsRequest request) throws YarnRemoteException {
|
||||
|
||||
UserGroupInformation callerUGI;
|
||||
try {
|
||||
callerUGI = UserGroupInformation.getCurrentUser();
|
||||
} catch (IOException ie) {
|
||||
LOG.info("Error getting UGI ", ie);
|
||||
throw RPCUtil.getRemoteException(ie);
|
||||
}
|
||||
|
||||
List<ApplicationReport> reports = new ArrayList<ApplicationReport>();
|
||||
for (RMApp application : this.rmContext.getRMApps().values()) {
|
||||
reports.add(application.createAndGetApplicationReport());
|
||||
// Only give out the applications viewable by the user as
|
||||
// ApplicationReport has confidential information like client-token, ACLs
|
||||
// etc. Web UI displays all applications though as we filter and print
|
||||
// only public information there.
|
||||
if (checkAccess(callerUGI, application.getUser(),
|
||||
ApplicationAccessType.VIEW_APP, application.getApplicationId())) {
|
||||
reports.add(application.createAndGetApplicationReport());
|
||||
}
|
||||
}
|
||||
|
||||
GetAllApplicationsResponse response =
|
||||
|
@ -19,6 +19,7 @@
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.LinkedList;
|
||||
import java.util.Map.Entry;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
@ -27,6 +28,7 @@
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
@ -42,6 +44,7 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
|
||||
/**
|
||||
* This class manages the list of applications for the resource manager.
|
||||
@ -57,15 +60,18 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent> {
|
||||
private final ClientToAMSecretManager clientToAMSecretManager;
|
||||
private final ApplicationMasterService masterService;
|
||||
private final YarnScheduler scheduler;
|
||||
private final ApplicationACLsManager applicationACLsManager;
|
||||
private Configuration conf;
|
||||
|
||||
public RMAppManager(RMContext context, ClientToAMSecretManager
|
||||
clientToAMSecretManager, YarnScheduler scheduler,
|
||||
ApplicationMasterService masterService, Configuration conf) {
|
||||
public RMAppManager(RMContext context,
|
||||
ClientToAMSecretManager clientToAMSecretManager,
|
||||
YarnScheduler scheduler, ApplicationMasterService masterService,
|
||||
ApplicationACLsManager applicationACLsManager, Configuration conf) {
|
||||
this.rmContext = context;
|
||||
this.scheduler = scheduler;
|
||||
this.clientToAMSecretManager = clientToAMSecretManager;
|
||||
this.masterService = masterService;
|
||||
this.applicationACLsManager = applicationACLsManager;
|
||||
this.conf = conf;
|
||||
setCompletedAppsMax(conf.getInt(
|
||||
YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS,
|
||||
@ -208,6 +214,7 @@ protected synchronized void checkAppNumCompletedLimit() {
|
||||
LOG.info("Application should be expired, max # apps"
|
||||
+ " met. Removing app: " + removeId);
|
||||
rmContext.getRMApps().remove(removeId);
|
||||
this.applicationACLsManager.removeApplication(removeId);
|
||||
}
|
||||
}
|
||||
|
||||
@ -256,12 +263,17 @@ protected synchronized void submitApplication(
|
||||
LOG.info(message);
|
||||
throw RPCUtil.getRemoteException(message);
|
||||
} else {
|
||||
|
||||
this.applicationACLsManager.addApplication(applicationId,
|
||||
submissionContext.getAMContainerSpec().getApplicationACLs());
|
||||
|
||||
this.rmContext.getDispatcher().getEventHandler().handle(
|
||||
new RMAppEvent(applicationId, RMAppEventType.START));
|
||||
}
|
||||
} catch (IOException ie) {
|
||||
LOG.info("RMAppManager submit application exception", ie);
|
||||
if (application != null) {
|
||||
// TODO: Weird setup.
|
||||
this.rmContext.getDispatcher().getEventHandler().handle(
|
||||
new RMAppRejectedEvent(applicationId, ie.getMessage()));
|
||||
}
|
||||
|
@ -63,6 +63,7 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.service.AbstractService;
|
||||
import org.apache.hadoop.yarn.service.CompositeService;
|
||||
@ -77,7 +78,6 @@
|
||||
public class ResourceManager extends CompositeService implements Recoverable {
|
||||
private static final Log LOG = LogFactory.getLog(ResourceManager.class);
|
||||
public static final long clusterTimeStamp = System.currentTimeMillis();
|
||||
private YarnConfiguration conf;
|
||||
|
||||
protected ClientToAMSecretManager clientToAMSecretManager =
|
||||
new ClientToAMSecretManager();
|
||||
@ -100,12 +100,15 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||
protected NodesListManager nodesListManager;
|
||||
private EventHandler<SchedulerEvent> schedulerDispatcher;
|
||||
protected RMAppManager rmAppManager;
|
||||
protected ApplicationACLsManager applicationACLsManager;
|
||||
|
||||
private WebApp webApp;
|
||||
private RMContext rmContext;
|
||||
private final Store store;
|
||||
protected ResourceTrackerService resourceTracker;
|
||||
|
||||
|
||||
private Configuration conf;
|
||||
|
||||
public ResourceManager(Store store) {
|
||||
super("ResourceManager");
|
||||
this.store = store;
|
||||
@ -119,6 +122,8 @@ public RMContext getRMContext() {
|
||||
@Override
|
||||
public synchronized void init(Configuration conf) {
|
||||
|
||||
this.conf = conf;
|
||||
|
||||
this.rmDispatcher = createDispatcher();
|
||||
addIfService(this.rmDispatcher);
|
||||
|
||||
@ -134,8 +139,6 @@ public synchronized void init(Configuration conf) {
|
||||
|
||||
addService(nodesListManager);
|
||||
|
||||
// Initialize the config
|
||||
this.conf = new YarnConfiguration(conf);
|
||||
// Initialize the scheduler
|
||||
this.scheduler = createScheduler();
|
||||
this.schedulerDispatcher = createSchedulerEventDispatcher();
|
||||
@ -166,7 +169,7 @@ public synchronized void init(Configuration conf) {
|
||||
addService(resourceTracker);
|
||||
|
||||
try {
|
||||
this.scheduler.reinitialize(this.conf,
|
||||
this.scheduler.reinitialize(conf,
|
||||
this.containerTokenSecretManager, this.rmContext);
|
||||
} catch (IOException ioe) {
|
||||
throw new RuntimeException("Failed to initialize scheduler", ioe);
|
||||
@ -175,6 +178,8 @@ public synchronized void init(Configuration conf) {
|
||||
masterService = createApplicationMasterService();
|
||||
addService(masterService) ;
|
||||
|
||||
this.applicationACLsManager = new ApplicationACLsManager(conf);
|
||||
|
||||
this.rmAppManager = createRMAppManager();
|
||||
// Register event handler for RMAppManagerEvents
|
||||
this.rmDispatcher.register(RMAppManagerEventType.class,
|
||||
@ -210,11 +215,9 @@ protected void addIfService(Object object) {
|
||||
}
|
||||
|
||||
protected ResourceScheduler createScheduler() {
|
||||
return
|
||||
ReflectionUtils.newInstance(
|
||||
conf.getClass(YarnConfiguration.RM_SCHEDULER,
|
||||
FifoScheduler.class, ResourceScheduler.class),
|
||||
this.conf);
|
||||
return ReflectionUtils.newInstance(this.conf.getClass(
|
||||
YarnConfiguration.RM_SCHEDULER, FifoScheduler.class,
|
||||
ResourceScheduler.class), this.conf);
|
||||
}
|
||||
|
||||
protected ApplicationMasterLauncher createAMLauncher() {
|
||||
@ -234,7 +237,8 @@ protected AMLivelinessMonitor createAMLivelinessMonitor() {
|
||||
|
||||
protected RMAppManager createRMAppManager() {
|
||||
return new RMAppManager(this.rmContext, this.clientToAMSecretManager,
|
||||
this.scheduler, this.masterService, this.conf);
|
||||
this.scheduler, this.masterService, this.applicationACLsManager,
|
||||
this.conf);
|
||||
}
|
||||
|
||||
@Private
|
||||
@ -395,7 +399,7 @@ public void handle(RMNodeEvent event) {
|
||||
|
||||
protected void startWepApp() {
|
||||
webApp = WebApps.$for("cluster", masterService).at(
|
||||
conf.get(YarnConfiguration.RM_WEBAPP_ADDRESS,
|
||||
this.conf.get(YarnConfiguration.RM_WEBAPP_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS)).
|
||||
start(new RMWebApp(this));
|
||||
|
||||
@ -426,7 +430,7 @@ public void start() {
|
||||
}
|
||||
|
||||
protected void doSecureLogin() throws IOException {
|
||||
SecurityUtil.login(conf, YarnConfiguration.RM_KEYTAB,
|
||||
SecurityUtil.login(this.conf, YarnConfiguration.RM_KEYTAB,
|
||||
YarnConfiguration.RM_PRINCIPAL);
|
||||
}
|
||||
|
||||
@ -452,7 +456,8 @@ protected ResourceTrackerService createResourceTrackerService() {
|
||||
}
|
||||
|
||||
protected ClientRMService createClientRMService() {
|
||||
return new ClientRMService(this.rmContext, scheduler, this.rmAppManager);
|
||||
return new ClientRMService(this.rmContext, scheduler, this.rmAppManager,
|
||||
this.applicationACLsManager);
|
||||
}
|
||||
|
||||
protected ApplicationMasterService createApplicationMasterService() {
|
||||
@ -462,7 +467,8 @@ protected ApplicationMasterService createApplicationMasterService() {
|
||||
|
||||
|
||||
protected AdminService createAdminService() {
|
||||
return new AdminService(conf, scheduler, rmContext, this.nodesListManager);
|
||||
return new AdminService(this.conf, scheduler, rmContext,
|
||||
this.nodesListManager);
|
||||
}
|
||||
|
||||
@Private
|
||||
@ -493,6 +499,11 @@ public ApplicationMasterService getApplicationMasterService() {
|
||||
return this.masterService;
|
||||
}
|
||||
|
||||
@Private
|
||||
public ApplicationACLsManager getApplicationACLsManager() {
|
||||
return this.applicationACLsManager;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void recover(RMState state) throws Exception {
|
||||
resourceTracker.recover(state);
|
||||
|
@ -152,8 +152,10 @@ RMAppEventType.KILL, new KillAppAndAttemptTransition())
|
||||
.addTransition(
|
||||
RMAppState.KILLED,
|
||||
RMAppState.KILLED,
|
||||
EnumSet.of(RMAppEventType.KILL, RMAppEventType.ATTEMPT_FINISHED,
|
||||
RMAppEventType.ATTEMPT_FAILED, RMAppEventType.ATTEMPT_KILLED))
|
||||
EnumSet.of(RMAppEventType.APP_ACCEPTED,
|
||||
RMAppEventType.APP_REJECTED, RMAppEventType.KILL,
|
||||
RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.ATTEMPT_FAILED,
|
||||
RMAppEventType.ATTEMPT_KILLED))
|
||||
|
||||
.installTopology();
|
||||
|
||||
|
@ -232,7 +232,9 @@ RMAppAttemptEventType.STATUS_UPDATE, new StatusUpdateTransition())
|
||||
.addTransition(
|
||||
RMAppAttemptState.KILLED,
|
||||
RMAppAttemptState.KILLED,
|
||||
EnumSet.of(RMAppAttemptEventType.EXPIRE,
|
||||
EnumSet.of(RMAppAttemptEventType.APP_ACCEPTED,
|
||||
RMAppAttemptEventType.APP_REJECTED,
|
||||
RMAppAttemptEventType.EXPIRE,
|
||||
RMAppAttemptEventType.LAUNCHED,
|
||||
RMAppAttemptEventType.LAUNCH_FAILED,
|
||||
RMAppAttemptEventType.EXPIRE,
|
||||
|
@ -41,6 +41,7 @@
|
||||
import org.apache.hadoop.security.authorize.AccessControlList;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerToken;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
@ -533,9 +534,9 @@ public void submitApplication(SchedulerApp application, String userName,
|
||||
} catch (IOException ioe) {
|
||||
throw new AccessControlException(ioe);
|
||||
}
|
||||
if (!hasAccess(QueueACL.SUBMIT_JOB, userUgi)) {
|
||||
if (!hasAccess(QueueACL.SUBMIT_APPLICATIONS, userUgi)) {
|
||||
throw new AccessControlException("User " + userName + " cannot submit" +
|
||||
" jobs to queue " + getQueuePath());
|
||||
" applications to queue " + getQueuePath());
|
||||
}
|
||||
|
||||
User user = null;
|
||||
@ -1065,35 +1066,26 @@ private Container getContainer(RMContainer rmContainer,
|
||||
|
||||
public Container createContainer(SchedulerApp application, SchedulerNode node,
|
||||
Resource capability, Priority priority) {
|
||||
Container container =
|
||||
BuilderUtils.newContainer(this.recordFactory,
|
||||
application.getApplicationAttemptId(),
|
||||
application.getNewContainerId(),
|
||||
node.getNodeID(), node.getHttpAddress(),
|
||||
capability, priority);
|
||||
|
||||
NodeId nodeId = node.getRMNode().getNodeID();
|
||||
ContainerId containerId = BuilderUtils.newContainerId(application
|
||||
.getApplicationAttemptId(), application.getNewContainerId());
|
||||
ContainerToken containerToken = null;
|
||||
|
||||
// If security is enabled, send the container-tokens too.
|
||||
if (UserGroupInformation.isSecurityEnabled()) {
|
||||
ContainerToken containerToken =
|
||||
this.recordFactory.newRecordInstance(ContainerToken.class);
|
||||
NodeId nodeId = container.getNodeId();
|
||||
ContainerTokenIdentifier tokenidentifier = new ContainerTokenIdentifier(
|
||||
container.getId(), nodeId.toString(), container.getResource());
|
||||
containerToken.setIdentifier(
|
||||
ByteBuffer.wrap(tokenidentifier.getBytes()));
|
||||
containerToken.setKind(ContainerTokenIdentifier.KIND.toString());
|
||||
containerToken.setPassword(
|
||||
ByteBuffer.wrap(
|
||||
containerTokenSecretManager.createPassword(tokenidentifier))
|
||||
);
|
||||
// RPC layer client expects ip:port as service for tokens
|
||||
InetSocketAddress addr = NetUtils.createSocketAddr(nodeId.getHost(),
|
||||
nodeId.getPort());
|
||||
containerToken.setService(addr.getAddress().getHostAddress() + ":"
|
||||
+ addr.getPort());
|
||||
container.setContainerToken(containerToken);
|
||||
ContainerTokenIdentifier tokenIdentifier = new ContainerTokenIdentifier(
|
||||
containerId, nodeId.toString(), capability);
|
||||
containerToken = BuilderUtils.newContainerToken(nodeId, ByteBuffer
|
||||
.wrap(containerTokenSecretManager
|
||||
.createPassword(tokenIdentifier)), tokenIdentifier);
|
||||
}
|
||||
|
||||
// Create the container
|
||||
Container container = BuilderUtils.newContainer(containerId, nodeId,
|
||||
node.getRMNode().getHttpAddress(), capability, priority,
|
||||
containerToken);
|
||||
|
||||
return container;
|
||||
}
|
||||
|
||||
|
@ -509,36 +509,25 @@ private int assignContainer(SchedulerNode node, SchedulerApp application,
|
||||
|
||||
if (assignedContainers > 0) {
|
||||
for (int i=0; i < assignedContainers; ++i) {
|
||||
// Create the container
|
||||
Container container =
|
||||
BuilderUtils.newContainer(recordFactory,
|
||||
application.getApplicationAttemptId(),
|
||||
application.getNewContainerId(),
|
||||
node.getRMNode().getNodeID(),
|
||||
node.getRMNode().getHttpAddress(),
|
||||
capability, priority);
|
||||
|
||||
|
||||
NodeId nodeId = node.getRMNode().getNodeID();
|
||||
ContainerId containerId = BuilderUtils.newContainerId(application
|
||||
.getApplicationAttemptId(), application.getNewContainerId());
|
||||
ContainerToken containerToken = null;
|
||||
|
||||
// If security is enabled, send the container-tokens too.
|
||||
if (UserGroupInformation.isSecurityEnabled()) {
|
||||
ContainerToken containerToken =
|
||||
recordFactory.newRecordInstance(ContainerToken.class);
|
||||
NodeId nodeId = container.getNodeId();
|
||||
ContainerTokenIdentifier tokenidentifier =
|
||||
new ContainerTokenIdentifier(container.getId(),
|
||||
nodeId.toString(), container.getResource());
|
||||
containerToken.setIdentifier(
|
||||
ByteBuffer.wrap(tokenidentifier.getBytes()));
|
||||
containerToken.setKind(ContainerTokenIdentifier.KIND.toString());
|
||||
containerToken.setPassword(
|
||||
ByteBuffer.wrap(containerTokenSecretManager
|
||||
.createPassword(tokenidentifier)));
|
||||
// RPC layer client expects ip:port as service for tokens
|
||||
InetSocketAddress addr = NetUtils.createSocketAddr(
|
||||
nodeId.getHost(), nodeId.getPort());
|
||||
containerToken.setService(addr.getAddress().getHostAddress() + ":"
|
||||
+ addr.getPort());
|
||||
container.setContainerToken(containerToken);
|
||||
ContainerTokenIdentifier tokenIdentifier = new ContainerTokenIdentifier(
|
||||
containerId, nodeId.toString(), capability);
|
||||
containerToken = BuilderUtils.newContainerToken(nodeId, ByteBuffer
|
||||
.wrap(containerTokenSecretManager
|
||||
.createPassword(tokenIdentifier)), tokenIdentifier);
|
||||
}
|
||||
|
||||
// Create the container
|
||||
Container container = BuilderUtils.newContainer(containerId, nodeId,
|
||||
node.getRMNode().getHttpAddress(), capability, priority,
|
||||
containerToken);
|
||||
|
||||
// Allocate!
|
||||
|
||||
|
@ -61,9 +61,10 @@ void toDataTableArrays(PrintWriter out) {
|
||||
}
|
||||
String appID = app.getApplicationId().toString();
|
||||
String trackingUrl = app.getTrackingUrl();
|
||||
boolean trackingUrlIsNotReady = trackingUrl == null || trackingUrl.isEmpty() || "N/A".equalsIgnoreCase(trackingUrl);
|
||||
String ui = trackingUrlIsNotReady ?
|
||||
"UNASSIGNED" : (app.getFinishTime() == 0 ? "ApplicationMaster" : "History");
|
||||
boolean trackingUrlIsNotReady = trackingUrl == null
|
||||
|| trackingUrl.isEmpty() || "N/A".equalsIgnoreCase(trackingUrl);
|
||||
String ui = trackingUrlIsNotReady ? "UNASSIGNED"
|
||||
: (app.getFinishTime() == 0 ? "ApplicationMaster" : "History");
|
||||
out.append("[\"");
|
||||
appendSortable(out, app.getApplicationId().getId());
|
||||
appendLink(out, appID, rc.prefix(), "app", appID).append(_SEP).
|
||||
|
@ -22,6 +22,7 @@
|
||||
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
import org.apache.hadoop.yarn.webapp.WebApp;
|
||||
|
||||
/**
|
||||
@ -43,6 +44,8 @@ public void setup() {
|
||||
if (rm != null) {
|
||||
bind(ResourceManager.class).toInstance(rm);
|
||||
bind(RMContext.class).toInstance(rm.getRMContext());
|
||||
bind(ApplicationACLsManager.class).toInstance(
|
||||
rm.getApplicationACLsManager());
|
||||
}
|
||||
route("/", RmController.class);
|
||||
route(pajoin("/nodes", NODE_STATE), RmController.class, "nodes");
|
||||
|
@ -24,14 +24,17 @@
|
||||
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
import org.apache.hadoop.yarn.util.Apps;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
import org.apache.hadoop.yarn.util.Times;
|
||||
@ -43,7 +46,14 @@
|
||||
// Do NOT rename/refactor this to RMView as it will wreak havoc
|
||||
// on Mac OS HFS as its case-insensitive!
|
||||
public class RmController extends Controller {
|
||||
@Inject RmController(RequestContext ctx) { super(ctx); }
|
||||
|
||||
private ApplicationACLsManager aclsManager;
|
||||
|
||||
@Inject
|
||||
RmController(RequestContext ctx, ApplicationACLsManager aclsManager) {
|
||||
super(ctx);
|
||||
this.aclsManager = aclsManager;
|
||||
}
|
||||
|
||||
@Override public void index() {
|
||||
setTitle("Applications");
|
||||
@ -70,10 +80,28 @@ public void app() {
|
||||
setTitle("Application not found: "+ aid);
|
||||
return;
|
||||
}
|
||||
|
||||
// Check for the authorization.
|
||||
String remoteUser = request().getRemoteUser();
|
||||
UserGroupInformation callerUGI = null;
|
||||
if (remoteUser != null) {
|
||||
callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
|
||||
}
|
||||
if (callerUGI != null
|
||||
&& !this.aclsManager.checkAccess(callerUGI,
|
||||
ApplicationAccessType.VIEW_APP, app.getUser(), appID)) {
|
||||
setStatus(HttpServletResponse.SC_UNAUTHORIZED);
|
||||
setTitle("Unauthorized request for viewing application " + appID);
|
||||
renderText("You (User " + remoteUser
|
||||
+ ") are not authorized to view the logs for application " + appID);
|
||||
return;
|
||||
}
|
||||
|
||||
setTitle(join("Application ", aid));
|
||||
String trackingUrl = app.getTrackingUrl();
|
||||
boolean trackingUrlIsNotReady = trackingUrl == null || trackingUrl.isEmpty() || "N/A".equalsIgnoreCase(trackingUrl);
|
||||
String ui = trackingUrlIsNotReady ? "UNASSIGNED" :
|
||||
boolean trackingUrlIsNotReady = trackingUrl == null
|
||||
|| trackingUrl.isEmpty() || "N/A".equalsIgnoreCase(trackingUrl);
|
||||
String ui = trackingUrlIsNotReady ? "UNASSIGNED" :
|
||||
(app.getFinishTime() == 0 ? "ApplicationMaster" : "History");
|
||||
|
||||
ResponseInfo info = info("Application Overview").
|
||||
|
@ -139,7 +139,8 @@ public void sendAMLaunchFailed(ApplicationAttemptId appAttemptId) throws Excepti
|
||||
|
||||
@Override
|
||||
protected ClientRMService createClientRMService() {
|
||||
return new ClientRMService(getRMContext(), getResourceScheduler(), rmAppManager) {
|
||||
return new ClientRMService(getRMContext(), getResourceScheduler(),
|
||||
rmAppManager, applicationACLsManager) {
|
||||
@Override
|
||||
public void start() {
|
||||
//override to not start rpc handler
|
||||
|
@ -186,7 +186,7 @@ synchronized public StartContainerResponse startContainer(
|
||||
BuilderUtils.newContainer(containerLaunchContext.getContainerId(),
|
||||
this.nodeId, nodeHttpAddress,
|
||||
containerLaunchContext.getResource(),
|
||||
null // DKDC - Doesn't matter
|
||||
null, null // DKDC - Doesn't matter
|
||||
);
|
||||
|
||||
applicationContainers.add(container);
|
||||
|
@ -19,29 +19,27 @@
|
||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import junit.framework.Assert;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.MockApps;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.MockApps;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
||||
@ -49,15 +47,15 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
import org.apache.hadoop.yarn.service.Service;
|
||||
|
||||
import org.junit.Test;
|
||||
import com.google.common.collect.Maps;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
|
||||
/**
|
||||
* Testing applications being retired from RM.
|
||||
@ -135,14 +133,16 @@ public void handle(RMAppEvent event) {
|
||||
public class TestRMAppManager extends RMAppManager {
|
||||
|
||||
public TestRMAppManager(RMContext context, Configuration conf) {
|
||||
super(context, null, null, null, conf);
|
||||
super(context, null, null, null, new ApplicationACLsManager(conf), conf);
|
||||
setCompletedAppsMax(YarnConfiguration.DEFAULT_RM_MAX_COMPLETED_APPLICATIONS);
|
||||
}
|
||||
|
||||
public TestRMAppManager(RMContext context, ClientToAMSecretManager
|
||||
clientToAMSecretManager, YarnScheduler scheduler,
|
||||
ApplicationMasterService masterService, Configuration conf) {
|
||||
super(context, clientToAMSecretManager, scheduler, masterService, conf);
|
||||
public TestRMAppManager(RMContext context,
|
||||
ClientToAMSecretManager clientToAMSecretManager,
|
||||
YarnScheduler scheduler, ApplicationMasterService masterService,
|
||||
ApplicationACLsManager applicationACLsManager, Configuration conf) {
|
||||
super(context, clientToAMSecretManager, scheduler, masterService,
|
||||
applicationACLsManager, conf);
|
||||
setCompletedAppsMax(YarnConfiguration.DEFAULT_RM_MAX_COMPLETED_APPLICATIONS);
|
||||
}
|
||||
|
||||
@ -339,14 +339,19 @@ public void testRMAppSubmit() throws Exception {
|
||||
ApplicationMasterService masterService = new ApplicationMasterService(rmContext,
|
||||
new ApplicationTokenSecretManager(), scheduler);
|
||||
Configuration conf = new Configuration();
|
||||
TestRMAppManager appMonitor = new TestRMAppManager(rmContext,
|
||||
new ClientToAMSecretManager(), scheduler, masterService, conf);
|
||||
TestRMAppManager appMonitor = new TestRMAppManager(rmContext,
|
||||
new ClientToAMSecretManager(), scheduler, masterService,
|
||||
new ApplicationACLsManager(conf), conf);
|
||||
|
||||
ApplicationId appID = MockApps.newAppID(1);
|
||||
RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
|
||||
ApplicationSubmissionContext context =
|
||||
recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
|
||||
context.setApplicationId(appID);
|
||||
ContainerLaunchContext amContainer = recordFactory
|
||||
.newRecordInstance(ContainerLaunchContext.class);
|
||||
amContainer.setApplicationACLs(new HashMap<ApplicationAccessType, String>());
|
||||
context.setAMContainerSpec(amContainer);
|
||||
setupDispatcher(rmContext, conf);
|
||||
|
||||
appMonitor.submitApplication(context);
|
||||
@ -382,8 +387,9 @@ public void testRMAppSubmitWithQueueAndName() throws Exception {
|
||||
ApplicationMasterService masterService = new ApplicationMasterService(rmContext,
|
||||
new ApplicationTokenSecretManager(), scheduler);
|
||||
Configuration conf = new Configuration();
|
||||
TestRMAppManager appMonitor = new TestRMAppManager(rmContext,
|
||||
new ClientToAMSecretManager(), scheduler, masterService, conf);
|
||||
TestRMAppManager appMonitor = new TestRMAppManager(rmContext,
|
||||
new ClientToAMSecretManager(), scheduler, masterService,
|
||||
new ApplicationACLsManager(conf), conf);
|
||||
|
||||
ApplicationId appID = MockApps.newAppID(10);
|
||||
RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
|
||||
@ -391,6 +397,11 @@ public void testRMAppSubmitWithQueueAndName() throws Exception {
|
||||
context.setApplicationId(appID);
|
||||
context.setApplicationName("testApp1");
|
||||
context.setQueue("testQueue");
|
||||
ContainerLaunchContext amContainer = recordFactory
|
||||
.newRecordInstance(ContainerLaunchContext.class);
|
||||
amContainer
|
||||
.setApplicationACLs(new HashMap<ApplicationAccessType, String>());
|
||||
context.setAMContainerSpec(amContainer);
|
||||
|
||||
setupDispatcher(rmContext, conf);
|
||||
|
||||
@ -424,8 +435,9 @@ public void testRMAppSubmitError() throws Exception {
|
||||
ApplicationMasterService masterService = new ApplicationMasterService(rmContext,
|
||||
new ApplicationTokenSecretManager(), scheduler);
|
||||
Configuration conf = new Configuration();
|
||||
TestRMAppManager appMonitor = new TestRMAppManager(rmContext,
|
||||
new ClientToAMSecretManager(), scheduler, masterService, conf);
|
||||
TestRMAppManager appMonitor = new TestRMAppManager(rmContext,
|
||||
new ClientToAMSecretManager(), scheduler, masterService,
|
||||
new ApplicationACLsManager(conf), conf);
|
||||
|
||||
ApplicationId appID = MockApps.newAppID(0);
|
||||
RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
|
||||
|
@ -0,0 +1,329 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import junit.framework.Assert;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authorize.AccessControlList;
|
||||
import org.apache.hadoop.yarn.api.ClientRMProtocol;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||
import org.apache.hadoop.yarn.service.Service.STATE;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestApplicationACLs {
|
||||
|
||||
private static final String APP_OWNER = "owner";
|
||||
private static final String FRIEND = "friend";
|
||||
private static final String ENEMY = "enemy";
|
||||
private static final String SUPER_USER = "superUser";
|
||||
private static final String FRIENDLY_GROUP = "friendly-group";
|
||||
private static final String SUPER_GROUP = "superGroup";
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(TestApplicationACLs.class);
|
||||
|
||||
static MockRM resourceManager;
|
||||
static Configuration conf = new YarnConfiguration();
|
||||
final static YarnRPC rpc = YarnRPC.create(conf);
|
||||
final static InetSocketAddress rmAddress = NetUtils
|
||||
.createSocketAddr(conf.get(YarnConfiguration.RM_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_RM_ADDRESS));
|
||||
private static ClientRMProtocol rmClient;
|
||||
|
||||
private static RecordFactory recordFactory = RecordFactoryProvider
|
||||
.getRecordFactory(conf);
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() throws InterruptedException, IOException {
|
||||
Store store = StoreFactory.getStore(conf);
|
||||
conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
|
||||
AccessControlList adminACL = new AccessControlList("");
|
||||
adminACL.addGroup(SUPER_GROUP);
|
||||
conf.set(YarnConfiguration.YARN_ADMIN_ACL, adminACL.getAclString());
|
||||
resourceManager = new MockRM(conf) {
|
||||
protected ClientRMService createClientRMService() {
|
||||
return new ClientRMService(getRMContext(), this.scheduler,
|
||||
this.rmAppManager, this.applicationACLsManager);
|
||||
};
|
||||
};
|
||||
new Thread() {
|
||||
public void run() {
|
||||
UserGroupInformation.createUserForTesting(ENEMY, new String[] {});
|
||||
UserGroupInformation.createUserForTesting(FRIEND,
|
||||
new String[] { FRIENDLY_GROUP });
|
||||
UserGroupInformation.createUserForTesting(SUPER_USER,
|
||||
new String[] { SUPER_GROUP });
|
||||
resourceManager.start();
|
||||
};
|
||||
}.start();
|
||||
int waitCount = 0;
|
||||
while (resourceManager.getServiceState() == STATE.INITED
|
||||
&& waitCount++ < 60) {
|
||||
LOG.info("Waiting for RM to start...");
|
||||
Thread.sleep(1500);
|
||||
}
|
||||
if (resourceManager.getServiceState() != STATE.STARTED) {
|
||||
// RM could have failed.
|
||||
throw new IOException(
|
||||
"ResourceManager failed to start. Final state is "
|
||||
+ resourceManager.getServiceState());
|
||||
}
|
||||
|
||||
UserGroupInformation owner = UserGroupInformation
|
||||
.createRemoteUser(APP_OWNER);
|
||||
rmClient = owner.doAs(new PrivilegedExceptionAction<ClientRMProtocol>() {
|
||||
@Override
|
||||
public ClientRMProtocol run() throws Exception {
|
||||
return (ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class,
|
||||
rmAddress, conf);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() {
|
||||
if(resourceManager != null) {
|
||||
resourceManager.stop();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testApplicationACLs() throws Exception {
|
||||
|
||||
verifyOwnerAccess();
|
||||
|
||||
verifySuperUserAccess();
|
||||
|
||||
verifyFriendAccess();
|
||||
|
||||
verifyEnemyAccess();
|
||||
}
|
||||
|
||||
private ApplicationId submitAppAndGetAppId(AccessControlList viewACL,
|
||||
AccessControlList modifyACL) throws Exception {
|
||||
SubmitApplicationRequest submitRequest = recordFactory
|
||||
.newRecordInstance(SubmitApplicationRequest.class);
|
||||
ApplicationSubmissionContext context = recordFactory
|
||||
.newRecordInstance(ApplicationSubmissionContext.class);
|
||||
|
||||
ApplicationId applicationId = rmClient.getNewApplication(
|
||||
recordFactory.newRecordInstance(GetNewApplicationRequest.class))
|
||||
.getApplicationId();
|
||||
context.setApplicationId(applicationId);
|
||||
|
||||
Map<ApplicationAccessType, String> acls
|
||||
= new HashMap<ApplicationAccessType, String>();
|
||||
acls.put(ApplicationAccessType.VIEW_APP, viewACL.getAclString());
|
||||
acls.put(ApplicationAccessType.MODIFY_APP, modifyACL.getAclString());
|
||||
|
||||
ContainerLaunchContext amContainer = recordFactory
|
||||
.newRecordInstance(ContainerLaunchContext.class);
|
||||
Resource resource = BuilderUtils.newResource(1024);
|
||||
amContainer.setResource(resource);
|
||||
amContainer.setApplicationACLs(acls);
|
||||
context.setAMContainerSpec(amContainer);
|
||||
submitRequest.setApplicationSubmissionContext(context);
|
||||
rmClient.submitApplication(submitRequest);
|
||||
resourceManager.waitForState(applicationId, RMAppState.ACCEPTED);
|
||||
return applicationId;
|
||||
}
|
||||
|
||||
private ClientRMProtocol getRMClientForUser(String user)
|
||||
throws IOException, InterruptedException {
|
||||
UserGroupInformation userUGI = UserGroupInformation
|
||||
.createRemoteUser(user);
|
||||
ClientRMProtocol userClient = userUGI
|
||||
.doAs(new PrivilegedExceptionAction<ClientRMProtocol>() {
|
||||
@Override
|
||||
public ClientRMProtocol run() throws Exception {
|
||||
return (ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class,
|
||||
rmAddress, conf);
|
||||
}
|
||||
});
|
||||
return userClient;
|
||||
}
|
||||
|
||||
private void verifyOwnerAccess() throws Exception {
|
||||
|
||||
AccessControlList viewACL = new AccessControlList("");
|
||||
viewACL.addGroup(FRIENDLY_GROUP);
|
||||
AccessControlList modifyACL = new AccessControlList("");
|
||||
modifyACL.addUser(FRIEND);
|
||||
ApplicationId applicationId = submitAppAndGetAppId(viewACL, modifyACL);
|
||||
|
||||
final GetApplicationReportRequest appReportRequest = recordFactory
|
||||
.newRecordInstance(GetApplicationReportRequest.class);
|
||||
appReportRequest.setApplicationId(applicationId);
|
||||
final KillApplicationRequest finishAppRequest = recordFactory
|
||||
.newRecordInstance(KillApplicationRequest.class);
|
||||
finishAppRequest.setApplicationId(applicationId);
|
||||
|
||||
// View as owner
|
||||
rmClient.getApplicationReport(appReportRequest);
|
||||
|
||||
// List apps as owner
|
||||
Assert.assertEquals("App view by owner should list the apps!!", 1,
|
||||
rmClient.getAllApplications(
|
||||
recordFactory.newRecordInstance(GetAllApplicationsRequest.class))
|
||||
.getApplicationList().size());
|
||||
|
||||
// Kill app as owner
|
||||
rmClient.forceKillApplication(finishAppRequest);
|
||||
resourceManager.waitForState(applicationId, RMAppState.KILLED);
|
||||
}
|
||||
|
||||
private void verifySuperUserAccess() throws Exception {
|
||||
|
||||
AccessControlList viewACL = new AccessControlList("");
|
||||
viewACL.addGroup(FRIENDLY_GROUP);
|
||||
AccessControlList modifyACL = new AccessControlList("");
|
||||
modifyACL.addUser(FRIEND);
|
||||
ApplicationId applicationId = submitAppAndGetAppId(viewACL, modifyACL);
|
||||
|
||||
final GetApplicationReportRequest appReportRequest = recordFactory
|
||||
.newRecordInstance(GetApplicationReportRequest.class);
|
||||
appReportRequest.setApplicationId(applicationId);
|
||||
final KillApplicationRequest finishAppRequest = recordFactory
|
||||
.newRecordInstance(KillApplicationRequest.class);
|
||||
finishAppRequest.setApplicationId(applicationId);
|
||||
|
||||
ClientRMProtocol superUserClient = getRMClientForUser(SUPER_USER);
|
||||
|
||||
// View as the superUser
|
||||
superUserClient.getApplicationReport(appReportRequest);
|
||||
|
||||
// List apps as superUser
|
||||
Assert.assertEquals("App view by super-user should list the apps!!", 2,
|
||||
superUserClient.getAllApplications(
|
||||
recordFactory.newRecordInstance(GetAllApplicationsRequest.class))
|
||||
.getApplicationList().size());
|
||||
|
||||
// Kill app as the superUser
|
||||
superUserClient.forceKillApplication(finishAppRequest);
|
||||
resourceManager.waitForState(applicationId, RMAppState.KILLED);
|
||||
}
|
||||
|
||||
private void verifyFriendAccess() throws Exception {
|
||||
|
||||
AccessControlList viewACL = new AccessControlList("");
|
||||
viewACL.addGroup(FRIENDLY_GROUP);
|
||||
AccessControlList modifyACL = new AccessControlList("");
|
||||
modifyACL.addUser(FRIEND);
|
||||
ApplicationId applicationId = submitAppAndGetAppId(viewACL, modifyACL);
|
||||
|
||||
final GetApplicationReportRequest appReportRequest = recordFactory
|
||||
.newRecordInstance(GetApplicationReportRequest.class);
|
||||
appReportRequest.setApplicationId(applicationId);
|
||||
final KillApplicationRequest finishAppRequest = recordFactory
|
||||
.newRecordInstance(KillApplicationRequest.class);
|
||||
finishAppRequest.setApplicationId(applicationId);
|
||||
|
||||
ClientRMProtocol friendClient = getRMClientForUser(FRIEND);
|
||||
|
||||
// View as the friend
|
||||
friendClient.getApplicationReport(appReportRequest);
|
||||
|
||||
// List apps as friend
|
||||
Assert.assertEquals("App view by a friend should list the apps!!", 3,
|
||||
friendClient.getAllApplications(
|
||||
recordFactory.newRecordInstance(GetAllApplicationsRequest.class))
|
||||
.getApplicationList().size());
|
||||
|
||||
// Kill app as the friend
|
||||
friendClient.forceKillApplication(finishAppRequest);
|
||||
resourceManager.waitForState(applicationId, RMAppState.KILLED);
|
||||
}
|
||||
|
||||
private void verifyEnemyAccess() throws Exception {
|
||||
|
||||
AccessControlList viewACL = new AccessControlList("");
|
||||
viewACL.addGroup(FRIENDLY_GROUP);
|
||||
AccessControlList modifyACL = new AccessControlList("");
|
||||
modifyACL.addUser(FRIEND);
|
||||
ApplicationId applicationId = submitAppAndGetAppId(viewACL, modifyACL);
|
||||
|
||||
final GetApplicationReportRequest appReportRequest = recordFactory
|
||||
.newRecordInstance(GetApplicationReportRequest.class);
|
||||
appReportRequest.setApplicationId(applicationId);
|
||||
final KillApplicationRequest finishAppRequest = recordFactory
|
||||
.newRecordInstance(KillApplicationRequest.class);
|
||||
finishAppRequest.setApplicationId(applicationId);
|
||||
|
||||
ClientRMProtocol enemyRmClient = getRMClientForUser(ENEMY);
|
||||
|
||||
// View as the enemy
|
||||
try {
|
||||
enemyRmClient.getApplicationReport(appReportRequest);
|
||||
Assert.fail("App view by the enemy should fail!!");
|
||||
} catch (YarnRemoteException e) {
|
||||
LOG.info("Got exception while viewing app as the enemy", e);
|
||||
Assert.assertEquals("User enemy cannot perform operation VIEW_APP on "
|
||||
+ applicationId, e.getMessage());
|
||||
}
|
||||
|
||||
// List apps as enemy
|
||||
Assert.assertEquals("App view by enemy should not list any apps!!", 0,
|
||||
enemyRmClient.getAllApplications(
|
||||
recordFactory.newRecordInstance(GetAllApplicationsRequest.class))
|
||||
.getApplicationList().size());
|
||||
|
||||
// Kill app as the enemy
|
||||
try {
|
||||
enemyRmClient.forceKillApplication(finishAppRequest);
|
||||
Assert.fail("App killing by the enemy should fail!!");
|
||||
} catch (YarnRemoteException e) {
|
||||
LOG.info("Got exception while killing app as the enemy", e);
|
||||
Assert.assertEquals(
|
||||
"User enemy cannot perform operation MODIFY_APP on "
|
||||
+ applicationId, e.getMessage());
|
||||
}
|
||||
|
||||
rmClient.forceKillApplication(finishAppRequest);
|
||||
}
|
||||
}
|
@ -28,6 +28,7 @@
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
|
||||
@ -42,6 +43,7 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
import org.apache.hadoop.yarn.webapp.WebApps;
|
||||
import org.apache.hadoop.yarn.webapp.test.WebAppTests;
|
||||
import org.junit.Test;
|
||||
@ -54,8 +56,17 @@
|
||||
public class TestRMWebApp {
|
||||
static final int GiB = 1024; // MiB
|
||||
|
||||
@Test public void testControllerIndex() {
|
||||
Injector injector = WebAppTests.createMockInjector(this);
|
||||
@Test
|
||||
public void testControllerIndex() {
|
||||
Injector injector = WebAppTests.createMockInjector(TestRMWebApp.class,
|
||||
this, new Module() {
|
||||
|
||||
@Override
|
||||
public void configure(Binder binder) {
|
||||
binder.bind(ApplicationACLsManager.class).toInstance(
|
||||
new ApplicationACLsManager(new Configuration()));
|
||||
}
|
||||
});
|
||||
RmController c = injector.getInstance(RmController.class);
|
||||
c.index();
|
||||
assertEquals("Applications", c.get(TITLE, "unknown"));
|
||||
|
@ -176,6 +176,24 @@ Hadoop MapReduce Next Generation - Cluster Setup
|
||||
|
||||
* <<<conf/yarn-site.xml>>>
|
||||
|
||||
* Configurations for ResourceManager and NodeManager:
|
||||
|
||||
*-------------------------+-------------------------+------------------------+
|
||||
|| Parameter || Value || Notes |
|
||||
*-------------------------+-------------------------+------------------------+
|
||||
| <<<yarn.acl.enable>>> | | |
|
||||
| | <<<true>>> / <<<false>>> | |
|
||||
| | | Enable ACLs? Defaults to <true>. |
|
||||
*-------------------------+-------------------------+------------------------+
|
||||
| <<<yarn.admin.acl>>> | | |
|
||||
| | Admin ACL | |
|
||||
| | | ACL to set admins on the cluster. |
|
||||
| | | ACLs are of for <comma-separated-users><space><comma-separated-groups>. |
|
||||
| | | Defaults to special value of <<*>> which means <anyone>. |
|
||||
| | | Special value of just <space> means no one has access. |
|
||||
*-------------------------+-------------------------+------------------------+
|
||||
|
||||
|
||||
* Configurations for ResourceManager:
|
||||
|
||||
*-------------------------+-------------------------+------------------------+
|
||||
@ -206,17 +224,6 @@ Hadoop MapReduce Next Generation - Cluster Setup
|
||||
| | <<<ResourceManager>>> Scheduler class. | |
|
||||
| | | <<<CapacityScheduler>>> (recommended) or <<<FifoScheduler>>> |
|
||||
*-------------------------+-------------------------+------------------------+
|
||||
| <<<yarn.resourcemanager.acl.enable>>> | | |
|
||||
| | <<<true>>> / <<<false>>> | |
|
||||
| | | Enable ACLs? Defaults to <true>. |
|
||||
*-------------------------+-------------------------+------------------------+
|
||||
| <<<yarn.resourcemanager.admin.acl>>> | | |
|
||||
| | Admin ACL | |
|
||||
| | | ACL to set admins on the cluster. |
|
||||
| | | ACLs are of for <comma-separated-users><space><comma-separated-groups>. |
|
||||
| | | Defaults to special value of <<*>> which means <anyone>. |
|
||||
| | | Special value of just <space> means no one has access. |
|
||||
*-------------------------+-------------------------+------------------------+
|
||||
| <<<yarn.nodemanager.remote-app-log-dir>>> | | |
|
||||
| | </logs> | |
|
||||
| | | HDFS directory where the application logs are moved on application |
|
||||
|
Loading…
Reference in New Issue
Block a user