YARN-486. Changed NM's startContainer API to accept Container record given by RM as a direct parameter instead of as part of the ContainerLaunchContext record. Contributed by Xuan Gong.
MAPREDUCE-5139. Update MR AM to use the modified startContainer API after YARN-486. Contributed by Xuan Gong. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1467063 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
8e86ffd91e
commit
e4c55e17fe
@ -283,6 +283,9 @@ Release 2.0.5-beta - UNRELEASED
|
||||
MAPREDUCE-5136. TestJobImpl->testJobNoTasks fails with IBM JAVA (Amir
|
||||
Sanjar via jlowe)
|
||||
|
||||
MAPREDUCE-5139. Update MR AM to use the modified startContainer API after
|
||||
YARN-486. (Xuan Gong via vinodkv)
|
||||
|
||||
Release 2.0.4-alpha - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -114,6 +114,7 @@
|
||||
import org.apache.hadoop.yarn.YarnException;
|
||||
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerToken;
|
||||
@ -767,8 +768,8 @@ private static ContainerLaunchContext createCommonContainerLaunchContext(
|
||||
// The null fields are per-container and will be constructed for each
|
||||
// container separately.
|
||||
ContainerLaunchContext container = BuilderUtils
|
||||
.newContainerLaunchContext(null, conf
|
||||
.get(MRJobConfig.USER_NAME), null, localResources,
|
||||
.newContainerLaunchContext(conf
|
||||
.get(MRJobConfig.USER_NAME), localResources,
|
||||
environment, null, serviceData, taskCredentialsBuffer,
|
||||
applicationACLs);
|
||||
|
||||
@ -777,10 +778,9 @@ private static ContainerLaunchContext createCommonContainerLaunchContext(
|
||||
|
||||
static ContainerLaunchContext createContainerLaunchContext(
|
||||
Map<ApplicationAccessType, String> applicationACLs,
|
||||
ContainerId containerID, Configuration conf,
|
||||
Token<JobTokenIdentifier> jobToken, Task remoteTask,
|
||||
Configuration conf, Token<JobTokenIdentifier> jobToken, Task remoteTask,
|
||||
final org.apache.hadoop.mapred.JobID oldJobId,
|
||||
Resource assignedCapability, WrappedJvmID jvmID,
|
||||
WrappedJvmID jvmID,
|
||||
TaskAttemptListener taskAttemptListener,
|
||||
Credentials credentials) {
|
||||
|
||||
@ -813,7 +813,7 @@ static ContainerLaunchContext createContainerLaunchContext(
|
||||
|
||||
// Construct the actual Container
|
||||
ContainerLaunchContext container = BuilderUtils.newContainerLaunchContext(
|
||||
containerID, commonContainerSpec.getUser(), assignedCapability,
|
||||
commonContainerSpec.getUser(),
|
||||
commonContainerSpec.getLocalResources(), myEnv, commands,
|
||||
myServiceData, commonContainerSpec.getContainerTokens().duplicate(),
|
||||
applicationACLs);
|
||||
@ -1511,15 +1511,13 @@ public void transition(final TaskAttemptImpl taskAttempt,
|
||||
//launch the container
|
||||
//create the container object to be launched for a given Task attempt
|
||||
ContainerLaunchContext launchContext = createContainerLaunchContext(
|
||||
cEvent.getApplicationACLs(), taskAttempt.containerID,
|
||||
taskAttempt.conf, taskAttempt.jobToken, taskAttempt.remoteTask,
|
||||
taskAttempt.oldJobId, taskAttempt.assignedCapability,
|
||||
taskAttempt.jvmID, taskAttempt.taskAttemptListener,
|
||||
taskAttempt.credentials);
|
||||
cEvent.getApplicationACLs(), taskAttempt.conf, taskAttempt.jobToken,
|
||||
taskAttempt.remoteTask, taskAttempt.oldJobId, taskAttempt.jvmID,
|
||||
taskAttempt.taskAttemptListener, taskAttempt.credentials);
|
||||
taskAttempt.eventHandler.handle(new ContainerRemoteLaunchEvent(
|
||||
taskAttempt.attemptId, taskAttempt.containerID,
|
||||
taskAttempt.containerMgrAddress, taskAttempt.containerToken,
|
||||
launchContext, taskAttempt.remoteTask));
|
||||
launchContext, taskAttempt.assignedCapability, taskAttempt.remoteTask));
|
||||
|
||||
// send event to speculator that our container needs are satisfied
|
||||
taskAttempt.eventHandler.handle
|
||||
|
@ -59,6 +59,7 @@
|
||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.service.AbstractService;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.ProtoUtils;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
|
||||
@ -150,10 +151,14 @@ public synchronized void launch(ContainerRemoteLaunchEvent event) {
|
||||
ContainerLaunchContext containerLaunchContext =
|
||||
event.getContainer();
|
||||
|
||||
org.apache.hadoop.yarn.api.records.Container container =
|
||||
BuilderUtils.newContainer(containerID, null, null,
|
||||
event.getResource(), null, containerToken);
|
||||
// Now launch the actual container
|
||||
StartContainerRequest startRequest = Records
|
||||
.newRecord(StartContainerRequest.class);
|
||||
startRequest.setContainerLaunchContext(containerLaunchContext);
|
||||
startRequest.setContainer(container);
|
||||
StartContainerResponse response = proxy.startContainer(startRequest);
|
||||
|
||||
ByteBuffer portInfo = response
|
||||
|
@ -23,26 +23,34 @@
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerToken;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
|
||||
public class ContainerRemoteLaunchEvent extends ContainerLauncherEvent {
|
||||
|
||||
private final ContainerLaunchContext container;
|
||||
private final Task task;
|
||||
private final Resource resource;
|
||||
|
||||
public ContainerRemoteLaunchEvent(TaskAttemptId taskAttemptID,
|
||||
ContainerId containerID, String containerMgrAddress,
|
||||
ContainerToken containerToken,
|
||||
ContainerLaunchContext containerLaunchContext, Task remoteTask) {
|
||||
ContainerLaunchContext containerLaunchContext, Resource resource,
|
||||
Task remoteTask) {
|
||||
super(taskAttemptID, containerID, containerMgrAddress, containerToken,
|
||||
ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH);
|
||||
this.container = containerLaunchContext;
|
||||
this.task = remoteTask;
|
||||
this.resource = resource;
|
||||
}
|
||||
|
||||
public ContainerLaunchContext getContainer() {
|
||||
return this.container;
|
||||
}
|
||||
|
||||
public Resource getResource() {
|
||||
return this.resource;
|
||||
}
|
||||
|
||||
public Task getRemoteTask() {
|
||||
return this.task;
|
||||
}
|
||||
|
@ -113,9 +113,9 @@ public void testAttemptContainerRequest() throws Exception {
|
||||
ContainerId containerId = BuilderUtils.newContainerId(1, 1, 1, 1);
|
||||
|
||||
ContainerLaunchContext launchCtx =
|
||||
TaskAttemptImpl.createContainerLaunchContext(acls, containerId,
|
||||
TaskAttemptImpl.createContainerLaunchContext(acls,
|
||||
jobConf, jobToken, taImpl.createRemoteTask(),
|
||||
TypeConverter.fromYarn(jobId), mock(Resource.class),
|
||||
TypeConverter.fromYarn(jobId),
|
||||
mock(WrappedJvmID.class), taListener,
|
||||
credentials);
|
||||
|
||||
|
@ -60,7 +60,6 @@
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerToken;
|
||||
@ -383,7 +382,6 @@ public GetContainerStatusResponse getContainerStatus(
|
||||
@Override
|
||||
public StartContainerResponse startContainer(StartContainerRequest request)
|
||||
throws YarnRemoteException {
|
||||
ContainerLaunchContext container = request.getContainerLaunchContext();
|
||||
StartContainerResponse response = recordFactory
|
||||
.newRecordInstance(StartContainerResponse.class);
|
||||
status = recordFactory.newRecordInstance(ContainerStatus.class);
|
||||
@ -395,7 +393,7 @@ public StartContainerResponse startContainer(StartContainerRequest request)
|
||||
throw new UndeclaredThrowableException(e);
|
||||
}
|
||||
status.setState(ContainerState.RUNNING);
|
||||
status.setContainerId(container.getContainerId());
|
||||
status.setContainerId(request.getContainer().getId());
|
||||
status.setExitStatus(0);
|
||||
return response;
|
||||
}
|
||||
|
@ -468,16 +468,14 @@ public ApplicationSubmissionContext createApplicationSubmissionContext(
|
||||
|
||||
// Setup ContainerLaunchContext for AM container
|
||||
ContainerLaunchContext amContainer = BuilderUtils
|
||||
.newContainerLaunchContext(null, UserGroupInformation
|
||||
.getCurrentUser().getShortUserName(), capability, localResources,
|
||||
.newContainerLaunchContext(UserGroupInformation
|
||||
.getCurrentUser().getShortUserName(), localResources,
|
||||
environment, vargsFinal, null, securityTokens, acls);
|
||||
|
||||
// Set up the ApplicationSubmissionContext
|
||||
ApplicationSubmissionContext appContext =
|
||||
recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
|
||||
appContext.setApplicationId(applicationId); // ApplicationId
|
||||
appContext.setUser( // User name
|
||||
UserGroupInformation.getCurrentUser().getShortUserName());
|
||||
appContext.setQueue( // Queue name
|
||||
jobConf.get(JobContext.QUEUE_NAME,
|
||||
YarnConfiguration.DEFAULT_QUEUE_NAME));
|
||||
@ -490,7 +488,7 @@ public ApplicationSubmissionContext createApplicationSubmissionContext(
|
||||
appContext.setMaxAppAttempts(
|
||||
conf.getInt(MRJobConfig.MR_AM_MAX_ATTEMPTS,
|
||||
MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS));
|
||||
|
||||
appContext.setResource(capability);
|
||||
return appContext;
|
||||
}
|
||||
|
||||
|
@ -78,6 +78,10 @@ Release 2.0.5-beta - UNRELEASED
|
||||
YARN-536. Removed the unused objects ContainerStatus and ContainerStatus from
|
||||
Container which also don't belong to the container. (Xuan Gong via vinodkv)
|
||||
|
||||
YARN-486. Changed NM's startContainer API to accept Container record given by
|
||||
RM as a direct parameter instead of as part of the ContainerLaunchContext
|
||||
record. (Xuan Gong via vinodkv)
|
||||
|
||||
NEW FEATURES
|
||||
|
||||
IMPROVEMENTS
|
||||
|
@ -21,6 +21,7 @@
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Stable;
|
||||
import org.apache.hadoop.yarn.api.ContainerManager;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
|
||||
/**
|
||||
@ -58,4 +59,12 @@ public interface StartContainerRequest {
|
||||
@Public
|
||||
@Stable
|
||||
public abstract void setContainerLaunchContext(ContainerLaunchContext context);
|
||||
|
||||
@Public
|
||||
@Stable
|
||||
public Container getContainer();
|
||||
|
||||
@Public
|
||||
@Stable
|
||||
public void setContainer(Container container);
|
||||
}
|
||||
|
@ -20,10 +20,13 @@
|
||||
|
||||
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.ProtoBase;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerLaunchContextPBImpl;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerLaunchContextProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainerRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainerRequestProtoOrBuilder;
|
||||
|
||||
@ -35,7 +38,8 @@ public class StartContainerRequestPBImpl extends ProtoBase<StartContainerRequest
|
||||
boolean viaProto = false;
|
||||
|
||||
private ContainerLaunchContext containerLaunchContext = null;
|
||||
|
||||
|
||||
private Container container = null;
|
||||
|
||||
public StartContainerRequestPBImpl() {
|
||||
builder = StartContainerRequestProto.newBuilder();
|
||||
@ -57,6 +61,9 @@ private void mergeLocalToBuilder() {
|
||||
if (this.containerLaunchContext != null) {
|
||||
builder.setContainerLaunchContext(convertToProtoFormat(this.containerLaunchContext));
|
||||
}
|
||||
if(this.container != null) {
|
||||
builder.setContainer(convertToProtoFormat(this.container));
|
||||
}
|
||||
}
|
||||
|
||||
private void mergeLocalToProto() {
|
||||
@ -96,6 +103,28 @@ public void setContainerLaunchContext(ContainerLaunchContext containerLaunchCont
|
||||
this.containerLaunchContext = containerLaunchContext;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Container getContainer() {
|
||||
StartContainerRequestProtoOrBuilder p = viaProto ? proto : builder;
|
||||
if (this.container != null) {
|
||||
return this.container;
|
||||
}
|
||||
if (!p.hasContainer()) {
|
||||
return null;
|
||||
}
|
||||
this.container = convertFromProtoFormat(p.getContainer());
|
||||
return this.container;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setContainer(Container container) {
|
||||
maybeInitBuilder();
|
||||
if(container == null) {
|
||||
builder.clearContainer();
|
||||
}
|
||||
this.container = container;
|
||||
}
|
||||
|
||||
private ContainerLaunchContextPBImpl convertFromProtoFormat(ContainerLaunchContextProto p) {
|
||||
return new ContainerLaunchContextPBImpl(p);
|
||||
}
|
||||
@ -106,4 +135,11 @@ private ContainerLaunchContextProto convertToProtoFormat(ContainerLaunchContext
|
||||
|
||||
|
||||
|
||||
private ContainerPBImpl convertFromProtoFormat(ContainerProto containerProto) {
|
||||
return new ContainerPBImpl(containerProto);
|
||||
}
|
||||
|
||||
private ContainerProto convertToProtoFormat(Container container) {
|
||||
return ((ContainerPBImpl)container).getProto();
|
||||
}
|
||||
}
|
||||
|
@ -112,22 +112,6 @@ public interface ApplicationSubmissionContext {
|
||||
@Public
|
||||
@Stable
|
||||
public void setPriority(Priority priority);
|
||||
|
||||
/**
|
||||
* Get the <em>user</em> submitting the application.
|
||||
* @return <em>user</em> submitting the application
|
||||
*/
|
||||
@Public
|
||||
@Stable
|
||||
public String getUser();
|
||||
|
||||
/**
|
||||
* Set the <em>user</em> submitting the application.
|
||||
* @param user <em>user</em> submitting the application
|
||||
*/
|
||||
@Public
|
||||
@Stable
|
||||
public void setUser(String user);
|
||||
|
||||
/**
|
||||
* Get the <code>ContainerLaunchContext</code> to describe the
|
||||
@ -207,4 +191,12 @@ public interface ApplicationSubmissionContext {
|
||||
@Public
|
||||
@Unstable
|
||||
public void setMaxAppAttempts(int maxAppAttempts);
|
||||
|
||||
@Public
|
||||
@Stable
|
||||
public Resource getResource();
|
||||
|
||||
@Public
|
||||
@Stable
|
||||
public void setResource(Resource resource);
|
||||
}
|
@ -51,22 +51,6 @@
|
||||
@Public
|
||||
@Stable
|
||||
public interface ContainerLaunchContext {
|
||||
/**
|
||||
* Get <code>ContainerId</code> of container to be launched.
|
||||
* @return <code>ContainerId</code> of container to be launched
|
||||
*/
|
||||
@Public
|
||||
@Stable
|
||||
ContainerId getContainerId();
|
||||
|
||||
/**
|
||||
* Set <code>ContainerId</code> of container to be launched.
|
||||
* @param containerId et <code>ContainerId</code> of container to be launched
|
||||
*/
|
||||
@Public
|
||||
@Stable
|
||||
void setContainerId(ContainerId containerId);
|
||||
|
||||
/**
|
||||
* Get the <em>user</em> to whom the container has been allocated.
|
||||
* @return the <em>user</em> to whom the container has been allocated
|
||||
@ -83,25 +67,6 @@ public interface ContainerLaunchContext {
|
||||
@Stable
|
||||
void setUser(String user);
|
||||
|
||||
/**
|
||||
* Get the <code>Resource</code> allocated to the container by the
|
||||
* <code>ResourceManager</code>.
|
||||
* @return <code>Resource</code> allocated to the container by the
|
||||
* <code>ResourceManager</code>
|
||||
*/
|
||||
@Public
|
||||
@Stable
|
||||
Resource getResource();
|
||||
|
||||
/**
|
||||
* Set the <code>Resource</code> allocated to the container by the
|
||||
* <code>ResourceManager</code>.
|
||||
* @param resource allocated resource
|
||||
*/
|
||||
@Public
|
||||
@Stable
|
||||
void setResource(Resource resource);
|
||||
|
||||
/**
|
||||
* Get security tokens (if security is enabled).
|
||||
* @return security tokens (if security is enabled)
|
||||
|
@ -23,11 +23,13 @@
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.ProtoBase;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProtoOrBuilder;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerLaunchContextProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
|
||||
|
||||
public class ApplicationSubmissionContextPBImpl
|
||||
extends ProtoBase<ApplicationSubmissionContextProto>
|
||||
@ -40,7 +42,8 @@ public class ApplicationSubmissionContextPBImpl
|
||||
private ApplicationId applicationId = null;
|
||||
private Priority priority = null;
|
||||
private ContainerLaunchContext amContainer = null;
|
||||
|
||||
private Resource resource = null;
|
||||
|
||||
public ApplicationSubmissionContextPBImpl() {
|
||||
builder = ApplicationSubmissionContextProto.newBuilder();
|
||||
}
|
||||
@ -68,6 +71,11 @@ private void mergeLocalToBuilder() {
|
||||
if (this.amContainer != null) {
|
||||
builder.setAmContainerSpec(convertToProtoFormat(this.amContainer));
|
||||
}
|
||||
if (this.resource != null &&
|
||||
!((ResourcePBImpl) this.resource).getProto().equals(
|
||||
builder.getResource())) {
|
||||
builder.setResource(convertToProtoFormat(this.resource));
|
||||
}
|
||||
}
|
||||
|
||||
private void mergeLocalToProto() {
|
||||
@ -165,25 +173,6 @@ public void setQueue(String queue) {
|
||||
}
|
||||
builder.setQueue((queue));
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getUser() {
|
||||
ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
|
||||
if (!p.hasUser()) {
|
||||
return null;
|
||||
}
|
||||
return (p.getUser());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setUser(String user) {
|
||||
maybeInitBuilder();
|
||||
if (user == null) {
|
||||
builder.clearUser();
|
||||
return;
|
||||
}
|
||||
builder.setUser((user));
|
||||
}
|
||||
|
||||
@Override
|
||||
public ContainerLaunchContext getAMContainerSpec() {
|
||||
@ -244,6 +233,28 @@ public void setMaxAppAttempts(int maxAppAttempts) {
|
||||
builder.setMaxAppAttempts(maxAppAttempts);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Resource getResource() {
|
||||
ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
|
||||
if (this.resource != null) {
|
||||
return this.resource;
|
||||
}
|
||||
if (!p.hasResource()) {
|
||||
return null;
|
||||
}
|
||||
this.resource = convertFromProtoFormat(p.getResource());
|
||||
return this.resource;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setResource(Resource resource) {
|
||||
maybeInitBuilder();
|
||||
if (resource == null) {
|
||||
builder.clearResource();
|
||||
}
|
||||
this.resource = resource;
|
||||
}
|
||||
|
||||
private PriorityPBImpl convertFromProtoFormat(PriorityProto p) {
|
||||
return new PriorityPBImpl(p);
|
||||
}
|
||||
@ -268,4 +279,12 @@ private ContainerLaunchContextPBImpl convertFromProtoFormat(
|
||||
private ContainerLaunchContextProto convertToProtoFormat(ContainerLaunchContext t) {
|
||||
return ((ContainerLaunchContextPBImpl)t).getProto();
|
||||
}
|
||||
|
||||
private ResourcePBImpl convertFromProtoFormat(ResourceProto p) {
|
||||
return new ResourcePBImpl(p);
|
||||
}
|
||||
|
||||
private ResourceProto convertToProtoFormat(Resource t) {
|
||||
return ((ResourcePBImpl)t).getProto();
|
||||
}
|
||||
}
|
||||
|
@ -26,17 +26,13 @@
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||
import org.apache.hadoop.yarn.api.records.ProtoBase;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationACLMapProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerLaunchContextProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerLaunchContextProtoOrBuilder;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.StringBytesMapProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.StringLocalResourceMapProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.StringStringMapProto;
|
||||
@ -50,8 +46,6 @@ public class ContainerLaunchContextPBImpl
|
||||
ContainerLaunchContextProto.Builder builder = null;
|
||||
boolean viaProto = false;
|
||||
|
||||
private ContainerId containerId = null;
|
||||
private Resource resource = null;
|
||||
private Map<String, LocalResource> localResources = null;
|
||||
private ByteBuffer containerTokens = null;
|
||||
private Map<String, ByteBuffer> serviceData = null;
|
||||
@ -76,16 +70,6 @@ public ContainerLaunchContextProto getProto() {
|
||||
}
|
||||
|
||||
private void mergeLocalToBuilder() {
|
||||
if (this.containerId != null &&
|
||||
!((ContainerIdPBImpl)containerId).getProto().equals(
|
||||
builder.getContainerId())) {
|
||||
builder.setContainerId(convertToProtoFormat(this.containerId));
|
||||
}
|
||||
if (this.resource != null &&
|
||||
!((ResourcePBImpl)this.resource).getProto().equals(
|
||||
builder.getResource())) {
|
||||
builder.setResource(convertToProtoFormat(this.resource));
|
||||
}
|
||||
if (this.localResources != null) {
|
||||
addLocalResourcesToProto();
|
||||
}
|
||||
@ -120,28 +104,6 @@ private void maybeInitBuilder() {
|
||||
}
|
||||
viaProto = false;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Resource getResource() {
|
||||
ContainerLaunchContextProtoOrBuilder p = viaProto ? proto : builder;
|
||||
if (this.resource != null) {
|
||||
return this.resource;
|
||||
}
|
||||
if (!p.hasResource()) {
|
||||
return null;
|
||||
}
|
||||
this.resource = convertFromProtoFormat(p.getResource());
|
||||
return this.resource;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setResource(Resource resource) {
|
||||
maybeInitBuilder();
|
||||
if (resource == null)
|
||||
builder.clearResource();
|
||||
this.resource = resource;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getCommands() {
|
||||
@ -197,26 +159,6 @@ public void setUser(String user) {
|
||||
}
|
||||
builder.setUser((user));
|
||||
}
|
||||
@Override
|
||||
public ContainerId getContainerId() {
|
||||
ContainerLaunchContextProtoOrBuilder p = viaProto ? proto : builder;
|
||||
if (this.containerId != null) {
|
||||
return this.containerId;
|
||||
}
|
||||
if (!p.hasContainerId()) {
|
||||
return null;
|
||||
}
|
||||
this.containerId = convertFromProtoFormat(p.getContainerId());
|
||||
return this.containerId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setContainerId(ContainerId containerId) {
|
||||
maybeInitBuilder();
|
||||
if (containerId == null)
|
||||
builder.clearContainerId();
|
||||
this.containerId = containerId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, LocalResource> getLocalResources() {
|
||||
@ -299,11 +241,12 @@ public ByteBuffer getContainerTokens() {
|
||||
@Override
|
||||
public void setContainerTokens(ByteBuffer containerTokens) {
|
||||
maybeInitBuilder();
|
||||
if (containerTokens == null)
|
||||
if (containerTokens == null) {
|
||||
builder.clearContainerTokens();
|
||||
}
|
||||
this.containerTokens = containerTokens;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Map<String, ByteBuffer> getServiceData() {
|
||||
initServiceData();
|
||||
@ -500,22 +443,6 @@ public void setApplicationACLs(
|
||||
this.applicationACLS.putAll(appACLs);
|
||||
}
|
||||
|
||||
private ResourcePBImpl convertFromProtoFormat(ResourceProto p) {
|
||||
return new ResourcePBImpl(p);
|
||||
}
|
||||
|
||||
private ResourceProto convertToProtoFormat(Resource t) {
|
||||
return ((ResourcePBImpl)t).getProto();
|
||||
}
|
||||
|
||||
private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) {
|
||||
return new ContainerIdPBImpl(p);
|
||||
}
|
||||
|
||||
private ContainerIdProto convertToProtoFormat(ContainerId t) {
|
||||
return ((ContainerIdPBImpl)t).getProto();
|
||||
}
|
||||
|
||||
private LocalResourcePBImpl convertFromProtoFormat(LocalResourceProto p) {
|
||||
return new LocalResourcePBImpl(p);
|
||||
}
|
||||
|
@ -211,13 +211,13 @@ message ResourceRequestProto {
|
||||
message ApplicationSubmissionContextProto {
|
||||
optional ApplicationIdProto application_id = 1;
|
||||
optional string application_name = 2 [default = "N/A"];
|
||||
optional string user = 3;
|
||||
optional string queue = 4 [default = "default"];
|
||||
optional PriorityProto priority = 5;
|
||||
optional ContainerLaunchContextProto am_container_spec = 6;
|
||||
optional bool cancel_tokens_when_complete = 7 [default = true];
|
||||
optional bool unmanaged_am = 8 [default = false];
|
||||
optional int32 maxAppAttempts = 9 [default = 0];
|
||||
optional string queue = 3 [default = "default"];
|
||||
optional PriorityProto priority = 4;
|
||||
optional ContainerLaunchContextProto am_container_spec = 5;
|
||||
optional bool cancel_tokens_when_complete = 6 [default = true];
|
||||
optional bool unmanaged_am = 7 [default = false];
|
||||
optional int32 maxAppAttempts = 8 [default = 0];
|
||||
optional ResourceProto resource = 9;
|
||||
}
|
||||
|
||||
enum ApplicationAccessTypeProto {
|
||||
@ -264,15 +264,13 @@ message QueueUserACLInfoProto {
|
||||
////////////////////////////////////////////////////////////////////////
|
||||
|
||||
message ContainerLaunchContextProto {
|
||||
optional ContainerIdProto container_id = 1;
|
||||
optional string user = 2;
|
||||
optional ResourceProto resource = 3;
|
||||
repeated StringLocalResourceMapProto localResources = 4;
|
||||
optional bytes container_tokens = 5;
|
||||
repeated StringBytesMapProto service_data = 6;
|
||||
repeated StringStringMapProto environment = 7;
|
||||
repeated string command = 8;
|
||||
repeated ApplicationACLMapProto application_ACLs = 9;
|
||||
optional string user = 1;
|
||||
repeated StringLocalResourceMapProto localResources = 2;
|
||||
optional bytes container_tokens = 3;
|
||||
repeated StringBytesMapProto service_data = 4;
|
||||
repeated StringStringMapProto environment = 5;
|
||||
repeated string command = 6;
|
||||
repeated ApplicationACLMapProto application_ACLs = 7;
|
||||
}
|
||||
|
||||
message ContainerStatusProto {
|
||||
|
@ -151,6 +151,7 @@ message GetQueueUserAclsInfoResponseProto {
|
||||
|
||||
message StartContainerRequestProto {
|
||||
optional ContainerLaunchContextProto container_launch_context = 1;
|
||||
optional ContainerProto container = 2;
|
||||
}
|
||||
|
||||
message StartContainerResponseProto {
|
||||
|
@ -679,9 +679,6 @@ public void run() {
|
||||
ContainerLaunchContext ctx = Records
|
||||
.newRecord(ContainerLaunchContext.class);
|
||||
|
||||
ctx.setContainerId(container.getId());
|
||||
ctx.setResource(container.getResource());
|
||||
|
||||
String jobUserName = System.getenv(ApplicationConstants.Environment.USER
|
||||
.key());
|
||||
ctx.setUser(jobUserName);
|
||||
@ -752,6 +749,7 @@ public void run() {
|
||||
StartContainerRequest startReq = Records
|
||||
.newRecord(StartContainerRequest.class);
|
||||
startReq.setContainerLaunchContext(ctx);
|
||||
startReq.setContainer(container);
|
||||
try {
|
||||
cm.startContainer(startReq);
|
||||
} catch (YarnRemoteException e) {
|
||||
|
@ -548,7 +548,7 @@ else if (amMemory > maxMem) {
|
||||
// For now, only memory is supported so we set memory requirements
|
||||
Resource capability = Records.newRecord(Resource.class);
|
||||
capability.setMemory(amMemory);
|
||||
amContainer.setResource(capability);
|
||||
appContext.setResource(capability);
|
||||
|
||||
// Service data is a binary blob that can be passed to the application
|
||||
// Not needed in this scenario
|
||||
@ -573,6 +573,7 @@ else if (amMemory > maxMem) {
|
||||
// Ignore the response as either a valid response object is returned on success
|
||||
// or an exception thrown to denote some form of a failure
|
||||
LOG.info("Submitting application to ASM");
|
||||
|
||||
super.submitApplication(appContext);
|
||||
|
||||
// TODO
|
||||
|
@ -284,16 +284,13 @@ public static ContainerToken newContainerToken(NodeId nodeId,
|
||||
}
|
||||
|
||||
public static ContainerLaunchContext newContainerLaunchContext(
|
||||
ContainerId containerID, String user, Resource assignedCapability,
|
||||
Map<String, LocalResource> localResources,
|
||||
String user, Map<String, LocalResource> localResources,
|
||||
Map<String, String> environment, List<String> commands,
|
||||
Map<String, ByteBuffer> serviceData, ByteBuffer containerTokens,
|
||||
Map<String, ByteBuffer> serviceData, ByteBuffer containerTokens,
|
||||
Map<ApplicationAccessType, String> acls) {
|
||||
ContainerLaunchContext container = recordFactory
|
||||
.newRecordInstance(ContainerLaunchContext.class);
|
||||
container.setContainerId(containerID);
|
||||
container.setUser(user);
|
||||
container.setResource(assignedCapability);
|
||||
container.setLocalResources(localResources);
|
||||
container.setEnvironment(environment);
|
||||
container.setCommands(commands);
|
||||
|
@ -38,6 +38,7 @@
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.StopContainerResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
@ -50,6 +51,7 @@
|
||||
import org.apache.hadoop.yarn.factory.providers.YarnRemoteExceptionFactoryProvider;
|
||||
import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC;
|
||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
import org.junit.Test;
|
||||
|
||||
/*
|
||||
@ -101,13 +103,14 @@ private void testRPCTimeout(String rpcClass) throws Exception {
|
||||
applicationAttemptId.setAttemptId(0);
|
||||
containerId.setApplicationAttemptId(applicationAttemptId);
|
||||
containerId.setId(100);
|
||||
containerLaunchContext.setContainerId(containerId);
|
||||
containerLaunchContext.setResource(recordFactory
|
||||
.newRecordInstance(Resource.class));
|
||||
Container container =
|
||||
BuilderUtils.newContainer(containerId, null, null, recordFactory
|
||||
.newRecordInstance(Resource.class), null, null);
|
||||
|
||||
StartContainerRequest scRequest = recordFactory
|
||||
.newRecordInstance(StartContainerRequest.class);
|
||||
scRequest.setContainerLaunchContext(containerLaunchContext);
|
||||
scRequest.setContainer(container);
|
||||
try {
|
||||
proxy.startContainer(scRequest);
|
||||
} catch (Exception e) {
|
||||
@ -141,7 +144,6 @@ public GetContainerStatusResponse getContainerStatus(
|
||||
@Override
|
||||
public StartContainerResponse startContainer(StartContainerRequest request)
|
||||
throws YarnRemoteException {
|
||||
ContainerLaunchContext container = request.getContainerLaunchContext();
|
||||
StartContainerResponse response = recordFactory
|
||||
.newRecordInstance(StartContainerResponse.class);
|
||||
status = recordFactory.newRecordInstance(ContainerStatus.class);
|
||||
@ -153,7 +155,7 @@ public StartContainerResponse startContainer(StartContainerRequest request)
|
||||
throw new UndeclaredThrowableException(e);
|
||||
}
|
||||
status.setState(ContainerState.RUNNING);
|
||||
status.setContainerId(container.getContainerId());
|
||||
status.setContainerId(request.getContainer().getId());
|
||||
status.setExitStatus(0);
|
||||
return response;
|
||||
}
|
||||
|
@ -39,6 +39,7 @@
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.StopContainerResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
@ -52,6 +53,7 @@
|
||||
import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC;
|
||||
import org.apache.hadoop.yarn.ipc.RPCUtil;
|
||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
import org.junit.Test;
|
||||
|
||||
@ -124,20 +126,21 @@ private void test(String rpcClass) throws Exception {
|
||||
applicationAttemptId.setAttemptId(0);
|
||||
containerId.setApplicationAttemptId(applicationAttemptId);
|
||||
containerId.setId(100);
|
||||
containerLaunchContext.setContainerId(containerId);
|
||||
containerLaunchContext.setResource(
|
||||
recordFactory.newRecordInstance(Resource.class));
|
||||
Container mockContainer =
|
||||
BuilderUtils.newContainer(containerId, null, null, recordFactory
|
||||
.newRecordInstance(Resource.class), null, null);
|
||||
// containerLaunchContext.env = new HashMap<CharSequence, CharSequence>();
|
||||
// containerLaunchContext.command = new ArrayList<CharSequence>();
|
||||
|
||||
StartContainerRequest scRequest =
|
||||
recordFactory.newRecordInstance(StartContainerRequest.class);
|
||||
scRequest.setContainerLaunchContext(containerLaunchContext);
|
||||
scRequest.setContainer(mockContainer);
|
||||
proxy.startContainer(scRequest);
|
||||
|
||||
GetContainerStatusRequest gcsRequest =
|
||||
recordFactory.newRecordInstance(GetContainerStatusRequest.class);
|
||||
gcsRequest.setContainerId(containerLaunchContext.getContainerId());
|
||||
gcsRequest.setContainerId(mockContainer.getId());
|
||||
GetContainerStatusResponse response = proxy.getContainerStatus(gcsRequest);
|
||||
ContainerStatus status = response.getStatus();
|
||||
|
||||
@ -145,7 +148,7 @@ private void test(String rpcClass) throws Exception {
|
||||
boolean exception = false;
|
||||
try {
|
||||
StopContainerRequest stopRequest = recordFactory.newRecordInstance(StopContainerRequest.class);
|
||||
stopRequest.setContainerId(containerLaunchContext.getContainerId());
|
||||
stopRequest.setContainerId(mockContainer.getId());
|
||||
proxy.stopContainer(stopRequest);
|
||||
} catch (YarnRemoteException e) {
|
||||
exception = true;
|
||||
@ -179,12 +182,11 @@ public GetContainerStatusResponse getContainerStatus(
|
||||
@Override
|
||||
public StartContainerResponse startContainer(StartContainerRequest request)
|
||||
throws YarnRemoteException {
|
||||
ContainerLaunchContext container = request.getContainerLaunchContext();
|
||||
StartContainerResponse response =
|
||||
recordFactory.newRecordInstance(StartContainerResponse.class);
|
||||
status = recordFactory.newRecordInstance(ContainerStatus.class);
|
||||
status.setState(ContainerState.RUNNING);
|
||||
status.setContainerId(container.getContainerId());
|
||||
status.setContainerId(request.getContainer().getId());
|
||||
status.setExitStatus(0);
|
||||
return response;
|
||||
}
|
||||
|
@ -220,7 +220,7 @@ public int launchContainer(Container container,
|
||||
String containerIdStr = ConverterUtils.toString(containerId);
|
||||
|
||||
resourcesHandler.preExecute(containerId,
|
||||
container.getLaunchContext().getResource());
|
||||
container.getResource());
|
||||
String resourcesOptions = resourcesHandler.getResourcesOption(
|
||||
containerId);
|
||||
|
||||
|
@ -308,7 +308,9 @@ private ContainerTokenIdentifier selectContainerTokenIdentifier(
|
||||
* @throws YarnRemoteException
|
||||
*/
|
||||
private void authorizeRequest(String containerIDStr,
|
||||
ContainerLaunchContext launchContext, UserGroupInformation remoteUgi)
|
||||
ContainerLaunchContext launchContext,
|
||||
org.apache.hadoop.yarn.api.records.Container container,
|
||||
UserGroupInformation remoteUgi)
|
||||
throws YarnRemoteException {
|
||||
|
||||
if (!UserGroupInformation.isSecurityEnabled()) {
|
||||
@ -369,10 +371,10 @@ private void authorizeRequest(String containerIDStr,
|
||||
}
|
||||
|
||||
Resource resource = tokenId.getResource();
|
||||
if (!resource.equals(launchContext.getResource())) {
|
||||
if (!resource.equals(container.getResource())) {
|
||||
unauthorized = true;
|
||||
messageBuilder.append("\nExpected resource " + resource
|
||||
+ " but found " + launchContext.getResource());
|
||||
+ " but found " + container.getResource());
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -392,12 +394,13 @@ private void authorizeRequest(String containerIDStr,
|
||||
public StartContainerResponse startContainer(StartContainerRequest request)
|
||||
throws YarnRemoteException {
|
||||
ContainerLaunchContext launchContext = request.getContainerLaunchContext();
|
||||
|
||||
ContainerId containerID = launchContext.getContainerId();
|
||||
org.apache.hadoop.yarn.api.records.Container lauchContainer =
|
||||
request.getContainer();
|
||||
ContainerId containerID = lauchContainer.getId();
|
||||
String containerIDStr = containerID.toString();
|
||||
|
||||
UserGroupInformation remoteUgi = getRemoteUgi(containerIDStr);
|
||||
authorizeRequest(containerIDStr, launchContext, remoteUgi);
|
||||
authorizeRequest(containerIDStr, launchContext, lauchContainer, remoteUgi);
|
||||
|
||||
LOG.info("Start request for " + containerIDStr + " by user "
|
||||
+ launchContext.getUser());
|
||||
@ -424,7 +427,7 @@ public StartContainerResponse startContainer(StartContainerRequest request)
|
||||
// //////////// End of parsing credentials
|
||||
|
||||
Container container = new ContainerImpl(getConfig(), this.dispatcher,
|
||||
launchContext, credentials, metrics);
|
||||
launchContext, lauchContainer, credentials, metrics);
|
||||
ApplicationId applicationID =
|
||||
containerID.getApplicationAttemptId().getApplicationId();
|
||||
if (context.getContainers().putIfAbsent(containerID, container) != null) {
|
||||
@ -469,7 +472,7 @@ public StartContainerResponse startContainer(StartContainerRequest request)
|
||||
// TODO launchedContainer misplaced -> doesn't necessarily mean a container
|
||||
// launch. A finished Application will not launch containers.
|
||||
metrics.launchedContainer();
|
||||
metrics.allocateContainer(launchContext.getResource());
|
||||
metrics.allocateContainer(lauchContainer.getResource());
|
||||
return response;
|
||||
}
|
||||
|
||||
@ -487,7 +490,7 @@ public StopContainerResponse stopContainer(StopContainerRequest request)
|
||||
// TODO: Only the container's owner can kill containers today.
|
||||
|
||||
UserGroupInformation remoteUgi = getRemoteUgi(containerIDStr);
|
||||
authorizeRequest(containerIDStr, null, remoteUgi);
|
||||
authorizeRequest(containerIDStr, null, null, remoteUgi);
|
||||
|
||||
StopContainerResponse response =
|
||||
recordFactory.newRecordInstance(StopContainerResponse.class);
|
||||
@ -529,7 +532,7 @@ public GetContainerStatusResponse getContainerStatus(
|
||||
// TODO: Only the container's owner can get containers' status today.
|
||||
|
||||
UserGroupInformation remoteUgi = getRemoteUgi(containerIDStr);
|
||||
authorizeRequest(containerIDStr, null, remoteUgi);
|
||||
authorizeRequest(containerIDStr, null, null, remoteUgi);
|
||||
|
||||
LOG.info("Getting container-status for " + containerIDStr);
|
||||
Container container = this.context.getContainers().get(containerID);
|
||||
|
@ -25,6 +25,7 @@
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
|
||||
public interface Container extends EventHandler<ContainerEvent> {
|
||||
@ -44,4 +45,6 @@ public interface Container extends EventHandler<ContainerEvent> {
|
||||
ContainerStatus cloneAndGetContainerStatus();
|
||||
|
||||
String toString();
|
||||
|
||||
Resource getResource();
|
||||
}
|
||||
|
@ -41,6 +41,7 @@
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
@ -76,6 +77,7 @@ public class ContainerImpl implements Container {
|
||||
private final Credentials credentials;
|
||||
private final NodeManagerMetrics metrics;
|
||||
private final ContainerLaunchContext launchContext;
|
||||
private final org.apache.hadoop.yarn.api.records.Container container;
|
||||
private int exitCode = YarnConfiguration.INVALID_CONTAINER_EXIT_STATUS;
|
||||
private final StringBuilder diagnostics;
|
||||
|
||||
@ -96,12 +98,13 @@ public class ContainerImpl implements Container {
|
||||
new ArrayList<LocalResourceRequest>();
|
||||
|
||||
public ContainerImpl(Configuration conf,
|
||||
Dispatcher dispatcher,
|
||||
ContainerLaunchContext launchContext, Credentials creds,
|
||||
NodeManagerMetrics metrics) {
|
||||
Dispatcher dispatcher, ContainerLaunchContext launchContext,
|
||||
org.apache.hadoop.yarn.api.records.Container container,
|
||||
Credentials creds, NodeManagerMetrics metrics) {
|
||||
this.daemonConf = conf;
|
||||
this.dispatcher = dispatcher;
|
||||
this.launchContext = launchContext;
|
||||
this.container = container;
|
||||
this.diagnostics = new StringBuilder();
|
||||
this.credentials = creds;
|
||||
this.metrics = metrics;
|
||||
@ -312,7 +315,7 @@ private org.apache.hadoop.yarn.api.records.ContainerState getCurrentState() {
|
||||
public ContainerId getContainerID() {
|
||||
this.readLock.lock();
|
||||
try {
|
||||
return this.launchContext.getContainerId();
|
||||
return this.container.getId();
|
||||
} finally {
|
||||
this.readLock.unlock();
|
||||
}
|
||||
@ -373,50 +376,61 @@ public ContainerLaunchContext getLaunchContext() {
|
||||
public ContainerStatus cloneAndGetContainerStatus() {
|
||||
this.readLock.lock();
|
||||
try {
|
||||
return BuilderUtils.newContainerStatus(this.getContainerID(),
|
||||
return BuilderUtils.newContainerStatus(this.container.getId(),
|
||||
getCurrentState(), diagnostics.toString(), exitCode);
|
||||
} finally {
|
||||
this.readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Resource getResource() {
|
||||
this.readLock.lock();
|
||||
try {
|
||||
return this.container.getResource();
|
||||
} finally {
|
||||
this.readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings({"fallthrough", "unchecked"})
|
||||
private void finished() {
|
||||
ContainerId containerID = this.container.getId();
|
||||
String user = this.launchContext.getUser();
|
||||
switch (getContainerState()) {
|
||||
case EXITED_WITH_SUCCESS:
|
||||
metrics.endRunningContainer();
|
||||
metrics.completedContainer();
|
||||
NMAuditLogger.logSuccess(getUser(),
|
||||
NMAuditLogger.logSuccess(user,
|
||||
AuditConstants.FINISH_SUCCESS_CONTAINER, "ContainerImpl",
|
||||
getContainerID().getApplicationAttemptId().getApplicationId(),
|
||||
getContainerID());
|
||||
containerID.getApplicationAttemptId()
|
||||
.getApplicationId(), containerID);
|
||||
break;
|
||||
case EXITED_WITH_FAILURE:
|
||||
metrics.endRunningContainer();
|
||||
// fall through
|
||||
case LOCALIZATION_FAILED:
|
||||
metrics.failedContainer();
|
||||
NMAuditLogger.logFailure(getUser(),
|
||||
NMAuditLogger.logFailure(user,
|
||||
AuditConstants.FINISH_FAILED_CONTAINER, "ContainerImpl",
|
||||
"Container failed with state: " + getContainerState(),
|
||||
getContainerID().getApplicationAttemptId().getApplicationId(),
|
||||
getContainerID());
|
||||
containerID.getApplicationAttemptId()
|
||||
.getApplicationId(), containerID);
|
||||
break;
|
||||
case CONTAINER_CLEANEDUP_AFTER_KILL:
|
||||
metrics.endRunningContainer();
|
||||
// fall through
|
||||
case NEW:
|
||||
metrics.killedContainer();
|
||||
NMAuditLogger.logSuccess(getUser(),
|
||||
NMAuditLogger.logSuccess(user,
|
||||
AuditConstants.FINISH_KILLED_CONTAINER, "ContainerImpl",
|
||||
getContainerID().getApplicationAttemptId().getApplicationId(),
|
||||
getContainerID());
|
||||
containerID.getApplicationAttemptId().getApplicationId(),
|
||||
containerID);
|
||||
}
|
||||
|
||||
metrics.releaseContainer(getLaunchContext().getResource());
|
||||
metrics.releaseContainer(this.container.getResource());
|
||||
|
||||
// Inform the application
|
||||
ContainerId containerID = getContainerID();
|
||||
@SuppressWarnings("rawtypes")
|
||||
EventHandler eventHandler = dispatcher.getEventHandler();
|
||||
eventHandler.handle(new ApplicationContainerFinishedEvent(containerID));
|
||||
@ -475,7 +489,7 @@ static class RequestResourcesTransition implements
|
||||
@Override
|
||||
public ContainerState transition(ContainerImpl container,
|
||||
ContainerEvent event) {
|
||||
final ContainerLaunchContext ctxt = container.getLaunchContext();
|
||||
final ContainerLaunchContext ctxt = container.launchContext;
|
||||
container.metrics.initingContainer();
|
||||
|
||||
// Inform the AuxServices about the opaque serviceData
|
||||
@ -486,9 +500,9 @@ public ContainerState transition(ContainerImpl container,
|
||||
for (Map.Entry<String,ByteBuffer> service : csd.entrySet()) {
|
||||
container.dispatcher.getEventHandler().handle(
|
||||
new AuxServicesEvent(AuxServicesEventType.APPLICATION_INIT,
|
||||
ctxt.getUser(),
|
||||
ctxt.getContainerId().getApplicationAttemptId().getApplicationId(),
|
||||
service.getKey().toString(), service.getValue()));
|
||||
ctxt.getUser(), container.container.getId()
|
||||
.getApplicationAttemptId().getApplicationId(),
|
||||
service.getKey().toString(), service.getValue()));
|
||||
}
|
||||
}
|
||||
|
||||
@ -571,7 +585,7 @@ public ContainerState transition(ContainerImpl container,
|
||||
container.pendingResources.remove(rsrcEvent.getResource());
|
||||
if (null == syms) {
|
||||
LOG.warn("Localized unknown resource " + rsrcEvent.getResource() +
|
||||
" for container " + container.getContainerID());
|
||||
" for container " + container.container.getId());
|
||||
assert false;
|
||||
// fail container?
|
||||
return ContainerState.LOCALIZING;
|
||||
@ -599,14 +613,14 @@ public void transition(ContainerImpl container, ContainerEvent event) {
|
||||
// Inform the ContainersMonitor to start monitoring the container's
|
||||
// resource usage.
|
||||
long pmemBytes =
|
||||
container.getLaunchContext().getResource().getMemory() * 1024 * 1024L;
|
||||
container.container.getResource().getMemory() * 1024 * 1024L;
|
||||
float pmemRatio = container.daemonConf.getFloat(
|
||||
YarnConfiguration.NM_VMEM_PMEM_RATIO,
|
||||
YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
|
||||
long vmemBytes = (long) (pmemRatio * pmemBytes);
|
||||
|
||||
container.dispatcher.getEventHandler().handle(
|
||||
new ContainerStartMonitoringEvent(container.getContainerID(),
|
||||
new ContainerStartMonitoringEvent(container.container.getId(),
|
||||
vmemBytes, pmemBytes));
|
||||
container.metrics.runningContainer();
|
||||
}
|
||||
@ -740,7 +754,7 @@ public void transition(ContainerImpl container, ContainerEvent event) {
|
||||
container.pendingResources.remove(rsrcEvent.getResource());
|
||||
if (null == syms) {
|
||||
LOG.warn("Localized unknown resource " + rsrcEvent.getResource() +
|
||||
" for container " + container.getContainerID());
|
||||
" for container " + container.container.getId());
|
||||
assert false;
|
||||
// fail container?
|
||||
return;
|
||||
@ -845,10 +859,9 @@ public void handle(ContainerEvent event) {
|
||||
public String toString() {
|
||||
this.readLock.lock();
|
||||
try {
|
||||
return ConverterUtils.toString(launchContext.getContainerId());
|
||||
return ConverterUtils.toString(container.getId());
|
||||
} finally {
|
||||
this.readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -249,9 +249,8 @@ public Integer call() {
|
||||
} catch (Throwable e) {
|
||||
LOG.warn("Failed to launch container.", e);
|
||||
dispatcher.getEventHandler().handle(new ContainerExitEvent(
|
||||
launchContext.getContainerId(),
|
||||
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, ret,
|
||||
e.getMessage()));
|
||||
containerID, ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, ret,
|
||||
e.getMessage()));
|
||||
return ret;
|
||||
} finally {
|
||||
completed.set(true);
|
||||
@ -267,7 +266,7 @@ public Integer call() {
|
||||
// If the process was killed, Send container_cleanedup_after_kill and
|
||||
// just break out of this method.
|
||||
dispatcher.getEventHandler().handle(
|
||||
new ContainerExitEvent(launchContext.getContainerId(),
|
||||
new ContainerExitEvent(containerID,
|
||||
ContainerEventType.CONTAINER_KILLED_ON_REQUEST, ret,
|
||||
"Container exited with a non-zero exit code " + ret));
|
||||
return ret;
|
||||
@ -276,15 +275,15 @@ public Integer call() {
|
||||
if (ret != 0) {
|
||||
LOG.warn("Container exited with a non-zero exit code " + ret);
|
||||
this.dispatcher.getEventHandler().handle(new ContainerExitEvent(
|
||||
launchContext.getContainerId(),
|
||||
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, ret,
|
||||
"Container exited with a non-zero exit code " + ret));
|
||||
containerID,
|
||||
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, ret,
|
||||
"Container exited with a non-zero exit code " + ret));
|
||||
return ret;
|
||||
}
|
||||
|
||||
LOG.info("Container " + containerIdStr + " succeeded ");
|
||||
dispatcher.getEventHandler().handle(
|
||||
new ContainerEvent(launchContext.getContainerId(),
|
||||
new ContainerEvent(containerID,
|
||||
ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS));
|
||||
return 0;
|
||||
}
|
||||
|
@ -72,7 +72,7 @@ public ContainerInfo(final Context nmContext, final Container container,
|
||||
}
|
||||
|
||||
this.user = container.getUser();
|
||||
Resource res = container.getLaunchContext().getResource();
|
||||
Resource res = container.getResource();
|
||||
if (res != null) {
|
||||
this.totalMemoryNeededMB = res.getMemory();
|
||||
}
|
||||
|
@ -27,6 +27,7 @@
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
@ -43,6 +44,8 @@
|
||||
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
import org.junit.Test;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
|
||||
public class TestEventFlow {
|
||||
|
||||
@ -117,12 +120,15 @@ protected void startStatusUpdater() {
|
||||
applicationAttemptId.setApplicationId(applicationId);
|
||||
applicationAttemptId.setAttemptId(0);
|
||||
cID.setApplicationAttemptId(applicationAttemptId);
|
||||
launchContext.setContainerId(cID);
|
||||
Container mockContainer = mock(Container.class);
|
||||
when(mockContainer.getId()).thenReturn(cID);
|
||||
when(mockContainer.getResource()).thenReturn(recordFactory
|
||||
.newRecordInstance(Resource.class));
|
||||
launchContext.setUser("testing");
|
||||
launchContext.setResource(recordFactory.newRecordInstance(Resource.class));
|
||||
StartContainerRequest request =
|
||||
recordFactory.newRecordInstance(StartContainerRequest.class);
|
||||
request.setContainerLaunchContext(launchContext);
|
||||
request.setContainer(mockContainer);
|
||||
containerManager.startContainer(request);
|
||||
|
||||
BaseContainerManagerTest.waitForContainerState(containerManager, cID,
|
||||
|
@ -99,7 +99,9 @@ public void testClearLocalDirWhenNodeReboot() throws IOException {
|
||||
Records.newRecord(ContainerLaunchContext.class);
|
||||
// Construct the Container-id
|
||||
ContainerId cId = createContainerId();
|
||||
containerLaunchContext.setContainerId(cId);
|
||||
org.apache.hadoop.yarn.api.records.Container mockContainer =
|
||||
mock(org.apache.hadoop.yarn.api.records.Container.class);
|
||||
when(mockContainer.getId()).thenReturn(cId);
|
||||
|
||||
containerLaunchContext.setUser(user);
|
||||
|
||||
@ -122,12 +124,13 @@ public void testClearLocalDirWhenNodeReboot() throws IOException {
|
||||
containerLaunchContext.setUser(containerLaunchContext.getUser());
|
||||
List<String> commands = new ArrayList<String>();
|
||||
containerLaunchContext.setCommands(commands);
|
||||
containerLaunchContext.setResource(Records
|
||||
.newRecord(Resource.class));
|
||||
containerLaunchContext.getResource().setMemory(1024);
|
||||
Resource resource = Records.newRecord(Resource.class);
|
||||
resource.setMemory(1024);
|
||||
when(mockContainer.getResource()).thenReturn(resource);
|
||||
StartContainerRequest startRequest =
|
||||
Records.newRecord(StartContainerRequest.class);
|
||||
startRequest.setContainerLaunchContext(containerLaunchContext);
|
||||
startRequest.setContainer(mockContainer);
|
||||
containerManager.startContainer(startRequest);
|
||||
|
||||
GetContainerStatusRequest request =
|
||||
|
@ -18,6 +18,9 @@
|
||||
|
||||
package org.apache.hadoop.yarn.server.nodemanager;
|
||||
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.BufferedWriter;
|
||||
import java.io.File;
|
||||
@ -41,6 +44,7 @@
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
@ -56,8 +60,8 @@
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
@ -157,10 +161,10 @@ private void startContainers(NodeManager nm) throws IOException {
|
||||
|
||||
ContainerLaunchContext containerLaunchContext =
|
||||
recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
||||
|
||||
Container mockContainer = mock(Container.class);
|
||||
// Construct the Container-id
|
||||
ContainerId cId = createContainerId();
|
||||
containerLaunchContext.setContainerId(cId);
|
||||
when(mockContainer.getId()).thenReturn(cId);
|
||||
|
||||
containerLaunchContext.setUser(user);
|
||||
|
||||
@ -184,12 +188,12 @@ private void startContainers(NodeManager nm) throws IOException {
|
||||
commands.add("/bin/bash");
|
||||
commands.add(scriptFile.getAbsolutePath());
|
||||
containerLaunchContext.setCommands(commands);
|
||||
containerLaunchContext.setResource(recordFactory
|
||||
.newRecordInstance(Resource.class));
|
||||
containerLaunchContext.getResource().setMemory(1024);
|
||||
Resource resource = BuilderUtils.newResource(1024, 1);
|
||||
when(mockContainer.getResource()).thenReturn(resource);
|
||||
StartContainerRequest startRequest =
|
||||
recordFactory.newRecordInstance(StartContainerRequest.class);
|
||||
startRequest.setContainerLaunchContext(containerLaunchContext);
|
||||
startRequest.setContainer(mockContainer);
|
||||
containerManager.startContainer(startRequest);
|
||||
|
||||
GetContainerStatusRequest request =
|
||||
@ -287,7 +291,7 @@ protected void registerWithRM() throws YarnRemoteException {
|
||||
|
||||
@Override
|
||||
protected void rebootNodeStatusUpdater() {
|
||||
ConcurrentMap<ContainerId, Container> containers =
|
||||
ConcurrentMap<ContainerId, org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container> containers =
|
||||
getNMContext().getContainers();
|
||||
// ensure that containers are empty before restart nodeStatusUpdater
|
||||
Assert.assertTrue(containers.isEmpty());
|
||||
|
@ -176,6 +176,8 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
|
||||
nodeStatus.setResponseId(heartBeatID++);
|
||||
Map<ApplicationId, List<ContainerStatus>> appToContainers =
|
||||
getAppToContainerStatusMap(nodeStatus.getContainersStatuses());
|
||||
org.apache.hadoop.yarn.api.records.Container mockContainer =
|
||||
mock(org.apache.hadoop.yarn.api.records.Container.class);
|
||||
if (heartBeatID == 1) {
|
||||
Assert.assertEquals(0, nodeStatus.getContainersStatuses().size());
|
||||
|
||||
@ -186,11 +188,12 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
|
||||
firstContainerID.setId(heartBeatID);
|
||||
ContainerLaunchContext launchContext = recordFactory
|
||||
.newRecordInstance(ContainerLaunchContext.class);
|
||||
launchContext.setContainerId(firstContainerID);
|
||||
launchContext.setResource(recordFactory.newRecordInstance(Resource.class));
|
||||
launchContext.getResource().setMemory(2);
|
||||
Container container = new ContainerImpl(conf , mockDispatcher,
|
||||
launchContext, null, mockMetrics);
|
||||
when(mockContainer.getId()).thenReturn(firstContainerID);
|
||||
Resource resource = BuilderUtils.newResource(2, 1);
|
||||
when(mockContainer.getResource()).thenReturn(resource);
|
||||
Container container =
|
||||
new ContainerImpl(conf, mockDispatcher, launchContext,
|
||||
mockContainer, null, mockMetrics);
|
||||
this.context.getContainers().put(firstContainerID, container);
|
||||
} else if (heartBeatID == 2) {
|
||||
// Checks on the RM end
|
||||
@ -211,11 +214,12 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
|
||||
secondContainerID.setId(heartBeatID);
|
||||
ContainerLaunchContext launchContext = recordFactory
|
||||
.newRecordInstance(ContainerLaunchContext.class);
|
||||
launchContext.setContainerId(secondContainerID);
|
||||
launchContext.setResource(recordFactory.newRecordInstance(Resource.class));
|
||||
launchContext.getResource().setMemory(3);
|
||||
Container container = new ContainerImpl(conf, mockDispatcher,
|
||||
launchContext, null, mockMetrics);
|
||||
when(mockContainer.getId()).thenReturn(secondContainerID);
|
||||
Resource resource = BuilderUtils.newResource(3, 1);
|
||||
when(mockContainer.getResource()).thenReturn(resource);
|
||||
Container container =
|
||||
new ContainerImpl(conf, mockDispatcher, launchContext,
|
||||
mockContainer, null, mockMetrics);
|
||||
this.context.getContainers().put(secondContainerID, container);
|
||||
} else if (heartBeatID == 3) {
|
||||
// Checks on the RM end
|
||||
|
@ -40,6 +40,7 @@
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
@ -47,7 +48,6 @@
|
||||
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.URL;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
|
||||
@ -58,8 +58,10 @@
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
import org.junit.Test;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
public class TestContainerManager extends BaseContainerManagerTest {
|
||||
|
||||
@ -121,7 +123,6 @@ public void testContainerSetup() throws IOException, InterruptedException {
|
||||
|
||||
// ////// Construct the Container-id
|
||||
ContainerId cId = createContainerId();
|
||||
container.setContainerId(cId);
|
||||
|
||||
container.setUser(user);
|
||||
|
||||
@ -143,14 +144,16 @@ public void testContainerSetup() throws IOException, InterruptedException {
|
||||
localResources.put(destinationFile, rsrc_alpha);
|
||||
containerLaunchContext.setLocalResources(localResources);
|
||||
containerLaunchContext.setUser(container.getUser());
|
||||
containerLaunchContext.setContainerId(container.getContainerId());
|
||||
containerLaunchContext.setResource(recordFactory
|
||||
.newRecordInstance(Resource.class));
|
||||
Container mockContainer = mock(Container.class);
|
||||
when(mockContainer.getId()).thenReturn(cId);
|
||||
when(mockContainer.getResource()).thenReturn(
|
||||
BuilderUtils.newResource(512, 1));
|
||||
|
||||
StartContainerRequest startRequest =
|
||||
recordFactory.newRecordInstance(StartContainerRequest.class);
|
||||
startRequest.setContainerLaunchContext(containerLaunchContext);
|
||||
|
||||
startRequest.setContainer(mockContainer);
|
||||
|
||||
containerManager.startContainer(startRequest);
|
||||
|
||||
BaseContainerManagerTest.waitForContainerState(containerManager, cId,
|
||||
@ -208,7 +211,6 @@ public void testContainerLaunchAndStop() throws IOException,
|
||||
|
||||
// ////// Construct the Container-id
|
||||
ContainerId cId = createContainerId();
|
||||
containerLaunchContext.setContainerId(cId);
|
||||
|
||||
containerLaunchContext.setUser(user);
|
||||
|
||||
@ -232,11 +234,13 @@ public void testContainerLaunchAndStop() throws IOException,
|
||||
commands.add("/bin/bash");
|
||||
commands.add(scriptFile.getAbsolutePath());
|
||||
containerLaunchContext.setCommands(commands);
|
||||
containerLaunchContext.setResource(recordFactory
|
||||
.newRecordInstance(Resource.class));
|
||||
containerLaunchContext.getResource().setMemory(100 * 1024 * 1024);
|
||||
Container mockContainer = mock(Container.class);
|
||||
when(mockContainer.getId()).thenReturn(cId);
|
||||
when(mockContainer.getResource()).thenReturn(
|
||||
BuilderUtils.newResource(100 * 1024 * 1024, 1));
|
||||
StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class);
|
||||
startRequest.setContainerLaunchContext(containerLaunchContext);
|
||||
startRequest.setContainer(mockContainer);
|
||||
containerManager.startContainer(startRequest);
|
||||
|
||||
int timeoutSecs = 0;
|
||||
@ -310,7 +314,6 @@ private void testContainerLaunchAndExit(int exitCode) throws IOException, Interr
|
||||
|
||||
// ////// Construct the Container-id
|
||||
ContainerId cId = createContainerId();
|
||||
containerLaunchContext.setContainerId(cId);
|
||||
|
||||
containerLaunchContext.setUser(user);
|
||||
|
||||
@ -334,12 +337,14 @@ private void testContainerLaunchAndExit(int exitCode) throws IOException, Interr
|
||||
commands.add("/bin/bash");
|
||||
commands.add(scriptFile.getAbsolutePath());
|
||||
containerLaunchContext.setCommands(commands);
|
||||
containerLaunchContext.setResource(recordFactory
|
||||
.newRecordInstance(Resource.class));
|
||||
containerLaunchContext.getResource().setMemory(100 * 1024 * 1024);
|
||||
Container mockContainer = mock(Container.class);
|
||||
when(mockContainer.getId()).thenReturn(cId);
|
||||
when(mockContainer.getResource()).thenReturn(
|
||||
BuilderUtils.newResource(100 * 1024 * 1024, 1));
|
||||
|
||||
StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class);
|
||||
startRequest.setContainerLaunchContext(containerLaunchContext);
|
||||
startRequest.setContainer(mockContainer);
|
||||
containerManager.startContainer(startRequest);
|
||||
|
||||
BaseContainerManagerTest.waitForContainerState(containerManager, cId,
|
||||
@ -402,7 +407,6 @@ public void testLocalFilesCleanup() throws InterruptedException,
|
||||
// ////// Construct the Container-id
|
||||
ContainerId cId = createContainerId();
|
||||
ApplicationId appId = cId.getApplicationAttemptId().getApplicationId();
|
||||
container.setContainerId(cId);
|
||||
|
||||
container.setUser(user);
|
||||
|
||||
@ -425,14 +429,16 @@ public void testLocalFilesCleanup() throws InterruptedException,
|
||||
localResources.put(destinationFile, rsrc_alpha);
|
||||
containerLaunchContext.setLocalResources(localResources);
|
||||
containerLaunchContext.setUser(container.getUser());
|
||||
containerLaunchContext.setContainerId(container.getContainerId());
|
||||
containerLaunchContext.setResource(recordFactory
|
||||
.newRecordInstance(Resource.class));
|
||||
Container mockContainer = mock(Container.class);
|
||||
when(mockContainer.getId()).thenReturn(cId);
|
||||
when(mockContainer.getResource()).thenReturn(
|
||||
BuilderUtils.newResource(100, 1));
|
||||
|
||||
// containerLaunchContext.command = new ArrayList<CharSequence>();
|
||||
|
||||
StartContainerRequest request = recordFactory.newRecordInstance(StartContainerRequest.class);
|
||||
request.setContainerLaunchContext(containerLaunchContext);
|
||||
request.setContainer(mockContainer);
|
||||
containerManager.startContainer(request);
|
||||
|
||||
BaseContainerManagerTest.waitForContainerState(containerManager, cId,
|
||||
|
@ -525,8 +525,9 @@ private static Map<String,ByteBuffer> createServiceData(Random r) {
|
||||
return serviceData;
|
||||
}
|
||||
|
||||
private Container newContainer(Dispatcher disp, ContainerLaunchContext ctx) {
|
||||
return new ContainerImpl(conf, disp, ctx, null, metrics);
|
||||
private Container newContainer(Dispatcher disp, ContainerLaunchContext ctx,
|
||||
org.apache.hadoop.yarn.api.records.Container container) {
|
||||
return new ContainerImpl(conf, disp, ctx, container, null, metrics);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@ -570,12 +571,14 @@ private class WrappedContainer {
|
||||
this.user = user;
|
||||
|
||||
ctxt = mock(ContainerLaunchContext.class);
|
||||
org.apache.hadoop.yarn.api.records.Container mockContainer =
|
||||
mock(org.apache.hadoop.yarn.api.records.Container.class);
|
||||
cId = BuilderUtils.newContainerId(appId, 1, timestamp, id);
|
||||
when(ctxt.getUser()).thenReturn(this.user);
|
||||
when(ctxt.getContainerId()).thenReturn(cId);
|
||||
when(mockContainer.getId()).thenReturn(cId);
|
||||
|
||||
Resource resource = BuilderUtils.newResource(1024, 1);
|
||||
when(ctxt.getResource()).thenReturn(resource);
|
||||
when(mockContainer.getResource()).thenReturn(resource);
|
||||
|
||||
if (withLocalRes) {
|
||||
Random r = new Random();
|
||||
@ -599,7 +602,7 @@ private class WrappedContainer {
|
||||
}
|
||||
when(ctxt.getServiceData()).thenReturn(serviceData);
|
||||
|
||||
c = newContainer(dispatcher, ctxt);
|
||||
c = newContainer(dispatcher, ctxt, mockContainer);
|
||||
dispatcher.start();
|
||||
}
|
||||
|
||||
|
@ -43,6 +43,7 @@
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
@ -57,11 +58,14 @@
|
||||
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin;
|
||||
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
import junit.framework.Assert;
|
||||
|
||||
public class TestContainerLaunch extends BaseContainerManagerTest {
|
||||
@ -184,6 +188,7 @@ public void testContainerEnvVariables() throws Exception {
|
||||
ContainerLaunchContext containerLaunchContext =
|
||||
recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
||||
|
||||
Container mockContainer = mock(Container.class);
|
||||
// ////// Construct the Container-id
|
||||
ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class);
|
||||
appId.setClusterTimestamp(0);
|
||||
@ -195,7 +200,7 @@ public void testContainerEnvVariables() throws Exception {
|
||||
ContainerId cId =
|
||||
recordFactory.newRecordInstance(ContainerId.class);
|
||||
cId.setApplicationAttemptId(appAttemptId);
|
||||
containerLaunchContext.setContainerId(cId);
|
||||
when(mockContainer.getId()).thenReturn(cId);
|
||||
|
||||
containerLaunchContext.setUser(user);
|
||||
|
||||
@ -222,11 +227,11 @@ public void testContainerEnvVariables() throws Exception {
|
||||
commands.add("/bin/bash");
|
||||
commands.add(scriptFile.getAbsolutePath());
|
||||
containerLaunchContext.setCommands(commands);
|
||||
containerLaunchContext.setResource(recordFactory
|
||||
.newRecordInstance(Resource.class));
|
||||
containerLaunchContext.getResource().setMemory(1024);
|
||||
when(mockContainer.getResource()).thenReturn(
|
||||
BuilderUtils.newResource(1024, 1));
|
||||
StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class);
|
||||
startRequest.setContainerLaunchContext(containerLaunchContext);
|
||||
startRequest.setContainer(mockContainer);
|
||||
containerManager.startContainer(startRequest);
|
||||
|
||||
int timeoutSecs = 0;
|
||||
@ -301,7 +306,7 @@ public void testDelayedKill() throws Exception {
|
||||
|
||||
ContainerLaunchContext containerLaunchContext =
|
||||
recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
||||
|
||||
Container mockContainer = mock(Container.class);
|
||||
// ////// Construct the Container-id
|
||||
ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class);
|
||||
appId.setClusterTimestamp(1);
|
||||
@ -313,7 +318,7 @@ public void testDelayedKill() throws Exception {
|
||||
ContainerId cId =
|
||||
recordFactory.newRecordInstance(ContainerId.class);
|
||||
cId.setApplicationAttemptId(appAttemptId);
|
||||
containerLaunchContext.setContainerId(cId);
|
||||
when(mockContainer.getId()).thenReturn(cId);
|
||||
|
||||
containerLaunchContext.setUser(user);
|
||||
|
||||
@ -339,11 +344,11 @@ public void testDelayedKill() throws Exception {
|
||||
List<String> commands = new ArrayList<String>();
|
||||
commands.add(scriptFile.getAbsolutePath());
|
||||
containerLaunchContext.setCommands(commands);
|
||||
containerLaunchContext.setResource(recordFactory
|
||||
.newRecordInstance(Resource.class));
|
||||
containerLaunchContext.getResource().setMemory(1024);
|
||||
when(mockContainer.getResource()).thenReturn(
|
||||
BuilderUtils.newResource(1024, 1));
|
||||
StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class);
|
||||
startRequest.setContainerLaunchContext(containerLaunchContext);
|
||||
startRequest.setContainer(mockContainer);
|
||||
containerManager.startContainer(startRequest);
|
||||
|
||||
int timeoutSecs = 0;
|
||||
|
@ -56,13 +56,13 @@
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.URL;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
||||
@ -91,6 +91,7 @@
|
||||
import org.mortbay.util.MultiException;
|
||||
|
||||
|
||||
|
||||
//@Ignore
|
||||
public class TestLogAggregationService extends BaseContainerManagerTest {
|
||||
|
||||
@ -679,7 +680,7 @@ public void testLogAggregationForRealContainerLaunch() throws IOException,
|
||||
|
||||
ContainerLaunchContext containerLaunchContext =
|
||||
recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
||||
|
||||
Container mockContainer = mock(Container.class);
|
||||
// ////// Construct the Container-id
|
||||
ApplicationId appId =
|
||||
recordFactory.newRecordInstance(ApplicationId.class);
|
||||
@ -689,7 +690,7 @@ public void testLogAggregationForRealContainerLaunch() throws IOException,
|
||||
BuilderUtils.newApplicationAttemptId(appId, 1);
|
||||
ContainerId cId = BuilderUtils.newContainerId(appAttemptId, 0);
|
||||
|
||||
containerLaunchContext.setContainerId(cId);
|
||||
when(mockContainer.getId()).thenReturn(cId);
|
||||
|
||||
containerLaunchContext.setUser(this.user);
|
||||
|
||||
@ -713,12 +714,12 @@ public void testLogAggregationForRealContainerLaunch() throws IOException,
|
||||
commands.add("/bin/bash");
|
||||
commands.add(scriptFile.getAbsolutePath());
|
||||
containerLaunchContext.setCommands(commands);
|
||||
containerLaunchContext.setResource(recordFactory
|
||||
.newRecordInstance(Resource.class));
|
||||
containerLaunchContext.getResource().setMemory(100 * 1024 * 1024);
|
||||
when(mockContainer.getResource()).thenReturn(
|
||||
BuilderUtils.newResource(100 * 1024 * 1024, 1));
|
||||
StartContainerRequest startRequest =
|
||||
recordFactory.newRecordInstance(StartContainerRequest.class);
|
||||
startRequest.setContainerLaunchContext(containerLaunchContext);
|
||||
startRequest.setContainer(mockContainer);
|
||||
this.containerManager.startContainer(startRequest);
|
||||
|
||||
BaseContainerManagerTest.waitForContainerState(this.containerManager,
|
||||
|
@ -21,7 +21,7 @@
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.File;
|
||||
@ -44,6 +44,7 @@
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
@ -51,7 +52,6 @@
|
||||
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.URL;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
||||
@ -60,6 +60,7 @@
|
||||
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin;
|
||||
import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree;
|
||||
@ -197,7 +198,7 @@ public void testContainerKillOnMemoryOverflow() throws IOException,
|
||||
|
||||
ContainerLaunchContext containerLaunchContext =
|
||||
recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
||||
|
||||
Container mockContainer = mock(Container.class);
|
||||
// ////// Construct the Container-id
|
||||
ApplicationId appId =
|
||||
recordFactory.newRecordInstance(ApplicationId.class);
|
||||
@ -210,7 +211,7 @@ public void testContainerKillOnMemoryOverflow() throws IOException,
|
||||
ContainerId cId = recordFactory.newRecordInstance(ContainerId.class);
|
||||
cId.setId(0);
|
||||
cId.setApplicationAttemptId(appAttemptId);
|
||||
containerLaunchContext.setContainerId(cId);
|
||||
when(mockContainer.getId()).thenReturn(cId);
|
||||
|
||||
containerLaunchContext.setUser(user);
|
||||
|
||||
@ -234,12 +235,12 @@ public void testContainerKillOnMemoryOverflow() throws IOException,
|
||||
commands.add("/bin/bash");
|
||||
commands.add(scriptFile.getAbsolutePath());
|
||||
containerLaunchContext.setCommands(commands);
|
||||
containerLaunchContext.setResource(recordFactory
|
||||
.newRecordInstance(Resource.class));
|
||||
containerLaunchContext.getResource().setMemory(8 * 1024 * 1024);
|
||||
when(mockContainer.getResource()).thenReturn(
|
||||
BuilderUtils.newResource(8 * 1024 * 1024, 1));
|
||||
StartContainerRequest startRequest =
|
||||
recordFactory.newRecordInstance(StartContainerRequest.class);
|
||||
startRequest.setContainerLaunchContext(containerLaunchContext);
|
||||
startRequest.setContainer(mockContainer);
|
||||
containerManager.startContainer(startRequest);
|
||||
|
||||
int timeoutSecs = 0;
|
||||
|
@ -30,6 +30,7 @@
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
@ -58,7 +59,6 @@ public MockContainer(ApplicationAttemptId appAttemptId,
|
||||
uniqId);
|
||||
this.launchContext = recordFactory
|
||||
.newRecordInstance(ContainerLaunchContext.class);
|
||||
launchContext.setContainerId(id);
|
||||
launchContext.setUser(user);
|
||||
this.state = ContainerState.NEW;
|
||||
|
||||
@ -104,7 +104,6 @@ public ContainerStatus cloneAndGetContainerStatus() {
|
||||
.newRecordInstance(ContainerStatus.class);
|
||||
containerStatus
|
||||
.setState(org.apache.hadoop.yarn.api.records.ContainerState.RUNNING);
|
||||
containerStatus.setContainerId(this.launchContext.getContainerId());
|
||||
containerStatus.setDiagnostics("testing");
|
||||
containerStatus.setExitStatus(0);
|
||||
return containerStatus;
|
||||
@ -119,4 +118,9 @@ public String toString() {
|
||||
public void handle(ContainerEvent event) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Resource getResource() {
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -179,15 +179,19 @@ public boolean isPmemCheckEnabled() {
|
||||
// TODO: Use builder utils
|
||||
ContainerLaunchContext launchContext =
|
||||
recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
||||
launchContext.setContainerId(containerId);
|
||||
org.apache.hadoop.yarn.api.records.Container mockContainer =
|
||||
mock(org.apache.hadoop.yarn.api.records.Container.class);
|
||||
when(mockContainer.getId()).thenReturn(containerId);
|
||||
launchContext.setUser(user);
|
||||
Container container =
|
||||
new ContainerImpl(conf, dispatcher, launchContext, null, metrics) {
|
||||
@Override
|
||||
public ContainerState getContainerState() {
|
||||
return ContainerState.RUNNING;
|
||||
};
|
||||
};
|
||||
new ContainerImpl(conf, dispatcher, launchContext, mockContainer,
|
||||
null, metrics) {
|
||||
|
||||
@Override
|
||||
public ContainerState getContainerState() {
|
||||
return ContainerState.RUNNING;
|
||||
};
|
||||
};
|
||||
nmContext.getContainers().put(containerId, container);
|
||||
//TODO: Gross hack. Fix in code.
|
||||
ApplicationId applicationId =
|
||||
|
@ -266,7 +266,7 @@ public SubmitApplicationResponse submitApplication(
|
||||
ApplicationSubmissionContext submissionContext = request
|
||||
.getApplicationSubmissionContext();
|
||||
ApplicationId applicationId = submissionContext.getApplicationId();
|
||||
String user = submissionContext.getUser();
|
||||
String user = submissionContext.getAMContainerSpec().getUser();
|
||||
try {
|
||||
user = UserGroupInformation.getCurrentUser().getShortUserName();
|
||||
if (rmContext.getRMApps().get(applicationId) != null) {
|
||||
@ -275,13 +275,13 @@ public SubmitApplicationResponse submitApplication(
|
||||
}
|
||||
|
||||
// Safety
|
||||
submissionContext.setUser(user);
|
||||
submissionContext.getAMContainerSpec().setUser(user);
|
||||
|
||||
// Check whether AM resource requirements are within required limits
|
||||
if (!submissionContext.getUnmanagedAM()) {
|
||||
ResourceRequest amReq = BuilderUtils.newResourceRequest(
|
||||
RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY,
|
||||
submissionContext.getAMContainerSpec().getResource(), 1);
|
||||
submissionContext.getResource(), 1);
|
||||
try {
|
||||
SchedulerUtils.validateResourceRequest(amReq,
|
||||
scheduler.getMaximumResourceCapability());
|
||||
|
@ -249,10 +249,11 @@ protected void submitApplication(
|
||||
// Create RMApp
|
||||
application =
|
||||
new RMAppImpl(applicationId, rmContext, this.conf,
|
||||
submissionContext.getApplicationName(),
|
||||
submissionContext.getUser(), submissionContext.getQueue(),
|
||||
submissionContext, this.scheduler, this.masterService,
|
||||
submitTime);
|
||||
submissionContext.getApplicationName(),
|
||||
submissionContext.getAMContainerSpec().getUser(),
|
||||
submissionContext.getQueue(),
|
||||
submissionContext, this.scheduler, this.masterService,
|
||||
submitTime);
|
||||
|
||||
// Sanity check - duplicate?
|
||||
if (rmContext.getRMApps().putIfAbsent(applicationId, application) !=
|
||||
|
@ -77,6 +77,7 @@ public class AMLauncher implements Runnable {
|
||||
RecordFactoryProvider.getRecordFactory(null);
|
||||
private final AMLauncherEventType eventType;
|
||||
private final RMContext rmContext;
|
||||
private final Container masterContainer;
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
private final EventHandler handler;
|
||||
@ -88,34 +89,36 @@ public AMLauncher(RMContext rmContext, RMAppAttempt application,
|
||||
this.eventType = eventType;
|
||||
this.rmContext = rmContext;
|
||||
this.handler = rmContext.getDispatcher().getEventHandler();
|
||||
this.masterContainer = application.getMasterContainer();
|
||||
}
|
||||
|
||||
private void connect() throws IOException {
|
||||
ContainerId masterContainerID = application.getMasterContainer().getId();
|
||||
ContainerId masterContainerID = masterContainer.getId();
|
||||
|
||||
containerMgrProxy = getContainerMgrProxy(masterContainerID);
|
||||
}
|
||||
|
||||
private void launch() throws IOException {
|
||||
connect();
|
||||
ContainerId masterContainerID = application.getMasterContainer().getId();
|
||||
ContainerId masterContainerID = masterContainer.getId();
|
||||
ApplicationSubmissionContext applicationContext =
|
||||
application.getSubmissionContext();
|
||||
LOG.info("Setting up container " + application.getMasterContainer()
|
||||
LOG.info("Setting up container " + masterContainer
|
||||
+ " for AM " + application.getAppAttemptId());
|
||||
ContainerLaunchContext launchContext =
|
||||
createAMContainerLaunchContext(applicationContext, masterContainerID);
|
||||
StartContainerRequest request =
|
||||
recordFactory.newRecordInstance(StartContainerRequest.class);
|
||||
request.setContainerLaunchContext(launchContext);
|
||||
request.setContainer(masterContainer);
|
||||
containerMgrProxy.startContainer(request);
|
||||
LOG.info("Done launching container " + application.getMasterContainer()
|
||||
LOG.info("Done launching container " + masterContainer
|
||||
+ " for AM " + application.getAppAttemptId());
|
||||
}
|
||||
|
||||
private void cleanup() throws IOException {
|
||||
connect();
|
||||
ContainerId containerId = application.getMasterContainer().getId();
|
||||
ContainerId containerId = masterContainer.getId();
|
||||
StopContainerRequest stopRequest =
|
||||
recordFactory.newRecordInstance(StopContainerRequest.class);
|
||||
stopRequest.setContainerId(containerId);
|
||||
@ -126,9 +129,7 @@ private void cleanup() throws IOException {
|
||||
protected ContainerManager getContainerMgrProxy(
|
||||
final ContainerId containerId) {
|
||||
|
||||
Container container = application.getMasterContainer();
|
||||
|
||||
final NodeId node = container.getNodeId();
|
||||
final NodeId node = masterContainer.getNodeId();
|
||||
final InetSocketAddress containerManagerBindAddress =
|
||||
NetUtils.createSocketAddrForHost(node.getHost(), node.getPort());
|
||||
|
||||
@ -138,8 +139,8 @@ protected ContainerManager getContainerMgrProxy(
|
||||
.createRemoteUser(containerId.toString());
|
||||
if (UserGroupInformation.isSecurityEnabled()) {
|
||||
Token<ContainerTokenIdentifier> token =
|
||||
ProtoUtils.convertFromProtoFormat(container.getContainerToken(),
|
||||
containerManagerBindAddress);
|
||||
ProtoUtils.convertFromProtoFormat(masterContainer
|
||||
.getContainerToken(), containerManagerBindAddress);
|
||||
currentUser.addToken(token);
|
||||
}
|
||||
return currentUser.doAs(new PrivilegedAction<ContainerManager>() {
|
||||
@ -165,30 +166,28 @@ private ContainerLaunchContext createAMContainerLaunchContext(
|
||||
new String[0])));
|
||||
|
||||
// Finalize the container
|
||||
container.setContainerId(containerID);
|
||||
container.setUser(applicationMasterContext.getUser());
|
||||
setupTokensAndEnv(container);
|
||||
container.setUser(applicationMasterContext.getAMContainerSpec().getUser());
|
||||
setupTokensAndEnv(container, containerID);
|
||||
|
||||
return container;
|
||||
}
|
||||
|
||||
private void setupTokensAndEnv(
|
||||
ContainerLaunchContext container)
|
||||
ContainerLaunchContext container, ContainerId containerID)
|
||||
throws IOException {
|
||||
Map<String, String> environment = container.getEnvironment();
|
||||
|
||||
environment.put(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV,
|
||||
application.getWebProxyBase());
|
||||
// Set the AppAttemptId, containerId, NMHTTPAdress, AppSubmitTime to be
|
||||
// consumable by the AM.
|
||||
environment.put(ApplicationConstants.AM_CONTAINER_ID_ENV, container
|
||||
.getContainerId().toString());
|
||||
environment.put(ApplicationConstants.NM_HOST_ENV, application
|
||||
.getMasterContainer().getNodeId().getHost());
|
||||
environment.put(ApplicationConstants.AM_CONTAINER_ID_ENV,
|
||||
containerID.toString());
|
||||
environment.put(ApplicationConstants.NM_HOST_ENV, masterContainer
|
||||
.getNodeId().getHost());
|
||||
environment.put(ApplicationConstants.NM_PORT_ENV,
|
||||
String.valueOf(application.getMasterContainer().getNodeId().getPort()));
|
||||
String.valueOf(masterContainer.getNodeId().getPort()));
|
||||
String parts[] =
|
||||
application.getMasterContainer().getNodeHttpAddress().split(":");
|
||||
masterContainer.getNodeHttpAddress().split(":");
|
||||
environment.put(ApplicationConstants.NM_HTTP_PORT_ENV, parts[1]);
|
||||
ApplicationId applicationId =
|
||||
application.getAppAttemptId().getApplicationId();
|
||||
|
@ -690,7 +690,7 @@ public void transition(RMAppAttemptImpl appAttempt,
|
||||
appAttempt.eventHandler.handle(
|
||||
new AppAddedSchedulerEvent(appAttempt.applicationAttemptId,
|
||||
appAttempt.submissionContext.getQueue(),
|
||||
appAttempt.submissionContext.getUser()));
|
||||
appAttempt.submissionContext.getAMContainerSpec().getUser()));
|
||||
}
|
||||
}
|
||||
|
||||
@ -736,9 +736,10 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
|
||||
RMAppEventType.APP_ACCEPTED));
|
||||
|
||||
// Request a container for the AM.
|
||||
ResourceRequest request = BuilderUtils.newResourceRequest(
|
||||
AM_CONTAINER_PRIORITY, ResourceRequest.ANY, appAttempt.submissionContext
|
||||
.getAMContainerSpec().getResource(), 1);
|
||||
ResourceRequest request =
|
||||
BuilderUtils.newResourceRequest(
|
||||
AM_CONTAINER_PRIORITY, ResourceRequest.ANY, appAttempt
|
||||
.getSubmissionContext().getResource(), 1);
|
||||
|
||||
// SchedulerUtils.validateResourceRequests is not necessary because
|
||||
// AM resource has been checked when submission
|
||||
@ -773,12 +774,8 @@ public void transition(RMAppAttemptImpl appAttempt,
|
||||
// Set the masterContainer
|
||||
appAttempt.setMasterContainer(amContainerAllocation.getContainers().get(
|
||||
0));
|
||||
// Updating CLC's resource is no longer necessary once YARN-486 is
|
||||
// completed, because nothing from Container to CLC will be copied into
|
||||
// CLC then.
|
||||
appAttempt.getSubmissionContext().getAMContainerSpec().setResource(
|
||||
appAttempt.getSubmissionContext().setResource(
|
||||
appAttempt.getMasterContainer().getResource());
|
||||
|
||||
RMStateStore store = appAttempt.rmContext.getStateStore();
|
||||
appAttempt.storeAttempt(store);
|
||||
}
|
||||
|
@ -59,8 +59,8 @@ public AppAttemptInfo(RMAppAttempt attempt) {
|
||||
this.logsLink = join(HttpConfig.getSchemePrefix(),
|
||||
masterContainer.getNodeHttpAddress(),
|
||||
"/node", "/containerlogs/",
|
||||
ConverterUtils.toString(masterContainer.getId()),
|
||||
"/", attempt.getSubmissionContext().getUser());
|
||||
ConverterUtils.toString(masterContainer.getId()), "/",
|
||||
attempt.getSubmissionContext().getAMContainerSpec().getUser());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -130,7 +130,7 @@ public Resource getUsedResources() {
|
||||
public synchronized void submit() throws IOException {
|
||||
ApplicationSubmissionContext context = recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
|
||||
context.setApplicationId(this.applicationId);
|
||||
context.setUser(this.user);
|
||||
context.getAMContainerSpec().setUser(this.user);
|
||||
context.setQueue(this.queue);
|
||||
SubmitApplicationRequest request = recordFactory
|
||||
.newRecordInstance(SubmitApplicationRequest.class);
|
||||
@ -340,7 +340,8 @@ private synchronized void assign(Priority priority, NodeType type,
|
||||
|
||||
// Launch the container
|
||||
StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class);
|
||||
startRequest.setContainerLaunchContext(createCLC(container));
|
||||
startRequest.setContainerLaunchContext(createCLC());
|
||||
startRequest.setContainer(container);
|
||||
nodeManager.startContainer(startRequest);
|
||||
break;
|
||||
}
|
||||
@ -396,11 +397,9 @@ private void updateResourceRequest(ResourceRequest request) {
|
||||
}
|
||||
}
|
||||
|
||||
private ContainerLaunchContext createCLC(Container container) {
|
||||
private ContainerLaunchContext createCLC() {
|
||||
ContainerLaunchContext clc = recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
||||
clc.setContainerId(container.getId());
|
||||
clc.setUser(this.user);
|
||||
clc.setResource(container.getResource());
|
||||
return clc;
|
||||
}
|
||||
}
|
||||
|
@ -161,7 +161,6 @@ public RMApp submitApp(int masterMemory, String name, String user,
|
||||
.newRecord(ApplicationSubmissionContext.class);
|
||||
sub.setApplicationId(appId);
|
||||
sub.setApplicationName(name);
|
||||
sub.setUser(user);
|
||||
sub.setMaxAppAttempts(maxAppAttempts);
|
||||
if(unmanaged) {
|
||||
sub.setUnmanagedAM(true);
|
||||
@ -171,13 +170,13 @@ public RMApp submitApp(int masterMemory, String name, String user,
|
||||
}
|
||||
ContainerLaunchContext clc = Records
|
||||
.newRecord(ContainerLaunchContext.class);
|
||||
Resource capability = Records.newRecord(Resource.class);
|
||||
final Resource capability = Records.newRecord(Resource.class);
|
||||
capability.setMemory(masterMemory);
|
||||
clc.setResource(capability);
|
||||
sub.setResource(capability);
|
||||
clc.setApplicationACLs(acls);
|
||||
clc.setUser(user);
|
||||
sub.setAMContainerSpec(clc);
|
||||
req.setApplicationSubmissionContext(sub);
|
||||
|
||||
UserGroupInformation fakeUser =
|
||||
UserGroupInformation.createUserForTesting(user, new String[] {"someGroup"});
|
||||
PrivilegedAction<SubmitApplicationResponse> action =
|
||||
|
@ -40,7 +40,6 @@
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
|
||||
@ -162,11 +161,10 @@ public void heartbeat() throws IOException {
|
||||
synchronized public StartContainerResponse startContainer(
|
||||
StartContainerRequest request)
|
||||
throws YarnRemoteException {
|
||||
ContainerLaunchContext containerLaunchContext =
|
||||
request.getContainerLaunchContext();
|
||||
|
||||
Container requestContainer = request.getContainer();
|
||||
|
||||
ApplicationId applicationId =
|
||||
containerLaunchContext.getContainerId().getApplicationAttemptId().
|
||||
requestContainer.getId().getApplicationAttemptId().
|
||||
getApplicationId();
|
||||
|
||||
List<Container> applicationContainers = containers.get(applicationId);
|
||||
@ -177,18 +175,18 @@ synchronized public StartContainerResponse startContainer(
|
||||
|
||||
// Sanity check
|
||||
for (Container container : applicationContainers) {
|
||||
if (container.getId().compareTo(containerLaunchContext.getContainerId())
|
||||
if (container.getId().compareTo(requestContainer.getId())
|
||||
== 0) {
|
||||
throw new IllegalStateException(
|
||||
"Container " + containerLaunchContext.getContainerId() +
|
||||
"Container " + requestContainer.getId() +
|
||||
" already setup on node " + containerManagerAddress);
|
||||
}
|
||||
}
|
||||
|
||||
Container container =
|
||||
BuilderUtils.newContainer(containerLaunchContext.getContainerId(),
|
||||
BuilderUtils.newContainer(requestContainer.getId(),
|
||||
this.nodeId, nodeHttpAddress,
|
||||
containerLaunchContext.getResource(),
|
||||
requestContainer.getResource(),
|
||||
null, null // DKDC - Doesn't matter
|
||||
);
|
||||
|
||||
@ -197,8 +195,8 @@ synchronized public StartContainerResponse startContainer(
|
||||
"", -1000);
|
||||
applicationContainers.add(container);
|
||||
containerStatusMap.put(container, containerStatus);
|
||||
Resources.subtractFrom(available, containerLaunchContext.getResource());
|
||||
Resources.addTo(used, containerLaunchContext.getResource());
|
||||
Resources.subtractFrom(available, requestContainer.getResource());
|
||||
Resources.addTo(used, requestContainer.getResource());
|
||||
|
||||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug("startContainer:" + " node=" + containerManagerAddress
|
||||
|
@ -50,6 +50,7 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
import org.apache.hadoop.yarn.service.Service;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
import org.junit.Test;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
@ -503,6 +504,10 @@ public void testRMAppSubmitError() throws Exception {
|
||||
RMApp appOrig = rmContext.getRMApps().get(appID);
|
||||
Assert.assertTrue("app name matches but shouldn't", "testApp1" != appOrig.getName());
|
||||
|
||||
ContainerLaunchContext clc =
|
||||
BuilderUtils.newContainerLaunchContext(null, null, null, null, null,
|
||||
null, null);
|
||||
context.setAMContainerSpec(clc);
|
||||
// our testApp1 should be rejected and original app with same id should be left in place
|
||||
appMonitor.submitApplication(context);
|
||||
|
||||
|
@ -169,7 +169,7 @@ private ApplicationId submitAppAndGetAppId(AccessControlList viewACL,
|
||||
ContainerLaunchContext amContainer = recordFactory
|
||||
.newRecordInstance(ContainerLaunchContext.class);
|
||||
Resource resource = BuilderUtils.newResource(1024, 1);
|
||||
amContainer.setResource(resource);
|
||||
context.setResource(resource);
|
||||
amContainer.setApplicationACLs(acls);
|
||||
context.setAMContainerSpec(amContainer);
|
||||
submitRequest.setApplicationSubmissionContext(context);
|
||||
|
@ -134,7 +134,7 @@ public void testAMLaunchAndCleanup() throws Exception {
|
||||
Assert.assertEquals(app.getSubmitTime(),
|
||||
containerManager.submitTimeAtContainerManager);
|
||||
Assert.assertEquals(app.getRMAppAttempt(appAttemptId)
|
||||
.getSubmissionContext().getAMContainerSpec().getContainerId()
|
||||
.getMasterContainer().getId()
|
||||
.toString(), containerManager.containerIdAtContainerManager);
|
||||
Assert.assertEquals(nm1.getNodeId().getHost(),
|
||||
containerManager.nmHostAtContainerManager);
|
||||
|
@ -340,7 +340,7 @@ public void testInvalidResourceRequestWhenSubmittingApplication()
|
||||
final SubmitApplicationRequest submitRequest = mockSubmitAppRequest(appId);
|
||||
Resource resource = Resources.createResource(
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB + 1);
|
||||
when(submitRequest.getApplicationSubmissionContext().getAMContainerSpec()
|
||||
when(submitRequest.getApplicationSubmissionContext()
|
||||
.getResource()).thenReturn(resource);
|
||||
|
||||
final ClientRMService rmService =
|
||||
@ -364,16 +364,17 @@ private SubmitApplicationRequest mockSubmitAppRequest(ApplicationId appId) {
|
||||
String queue = MockApps.newQueue();
|
||||
|
||||
ContainerLaunchContext amContainerSpec = mock(ContainerLaunchContext.class);
|
||||
|
||||
Resource resource = Resources.createResource(
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
|
||||
when(amContainerSpec.getResource()).thenReturn(resource);
|
||||
|
||||
ApplicationSubmissionContext submissionContext = mock(ApplicationSubmissionContext.class);
|
||||
when(submissionContext.getUser()).thenReturn(user);
|
||||
when(submissionContext.getQueue()).thenReturn(queue);
|
||||
when(submissionContext.getAMContainerSpec()).thenReturn(amContainerSpec);
|
||||
when(submissionContext.getAMContainerSpec().getUser()).thenReturn(user);
|
||||
when(submissionContext.getQueue()).thenReturn(queue);
|
||||
when(submissionContext.getApplicationId()).thenReturn(appId);
|
||||
|
||||
when(submissionContext.getResource()).thenReturn(resource);
|
||||
|
||||
SubmitApplicationRequest submitRequest =
|
||||
recordFactory.newRecordInstance(SubmitApplicationRequest.class);
|
||||
submitRequest.setApplicationSubmissionContext(submissionContext);
|
||||
|
@ -200,14 +200,14 @@ null, new ApplicationTokenSecretManager(conf),
|
||||
final String user = MockApps.newUserName();
|
||||
final String queue = MockApps.newQueue();
|
||||
submissionContext = mock(ApplicationSubmissionContext.class);
|
||||
when(submissionContext.getUser()).thenReturn(user);
|
||||
when(submissionContext.getQueue()).thenReturn(queue);
|
||||
Resource resource = BuilderUtils.newResource(1536, 1);
|
||||
ContainerLaunchContext amContainerSpec =
|
||||
BuilderUtils.newContainerLaunchContext(null, user, resource, null, null,
|
||||
BuilderUtils.newContainerLaunchContext(user, null, null,
|
||||
null, null, null, null);
|
||||
when(submissionContext.getAMContainerSpec()).thenReturn(amContainerSpec);
|
||||
|
||||
when(submissionContext.getResource()).thenReturn(resource);
|
||||
|
||||
unmanagedAM = false;
|
||||
|
||||
application = mock(RMApp.class);
|
||||
@ -494,9 +494,6 @@ private Container allocateApplicationAttempt() {
|
||||
applicationAttempt.handle(
|
||||
new RMAppAttemptStoredEvent(
|
||||
applicationAttempt.getAppAttemptId(), null));
|
||||
assertEquals(resource,
|
||||
applicationAttempt.getSubmissionContext()
|
||||
.getAMContainerSpec().getResource());
|
||||
|
||||
testAppAttemptAllocatedState(container);
|
||||
|
||||
|
@ -45,6 +45,7 @@
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.QueueACL;
|
||||
@ -72,6 +73,7 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.modes.FifoSchedulingMode;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
@ -1406,6 +1408,10 @@ public void testNotAllowSubmitApplication() throws Exception {
|
||||
ApplicationMasterService masterService =
|
||||
new ApplicationMasterService(resourceManager.getRMContext(), scheduler);
|
||||
ApplicationSubmissionContext submissionContext = new ApplicationSubmissionContextPBImpl();
|
||||
ContainerLaunchContext clc =
|
||||
BuilderUtils.newContainerLaunchContext(user, null, null, null, null,
|
||||
null, null);
|
||||
submissionContext.setAMContainerSpec(clc);
|
||||
RMApp application =
|
||||
new RMAppImpl(applicationId, resourceManager.getRMContext(), conf, name, user,
|
||||
queue, submissionContext, scheduler, masterService,
|
||||
|
@ -1079,8 +1079,9 @@ public void verifyAppAttemptInfoGeneric(RMAppAttempt appAttempt, int id,
|
||||
.getMasterContainer().getNodeId().toString(), nodeId);
|
||||
assertTrue("logsLink doesn't match",
|
||||
logsLink.startsWith("http://"));
|
||||
assertTrue("logsLink doesn't contain user info",
|
||||
logsLink.endsWith("/" + appAttempt.getSubmissionContext().getUser()));
|
||||
assertTrue(
|
||||
"logsLink doesn't contain user info", logsLink.endsWith("/"
|
||||
+ appAttempt.getSubmissionContext().getAMContainerSpec().getUser()));
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -357,8 +357,13 @@ public Void run() {
|
||||
|
||||
LOG.info("Going to contact NM with expired token");
|
||||
ContainerLaunchContext context = createContainerLaunchContextForTest(newTokenId);
|
||||
Container container =
|
||||
BuilderUtils.newContainer(newTokenId.getContainerID(), null, null,
|
||||
BuilderUtils.newResource(newTokenId.getResource().getMemory(),
|
||||
newTokenId.getResource().getVirtualCores()), null, null);
|
||||
StartContainerRequest request = Records.newRecord(StartContainerRequest.class);
|
||||
request.setContainerLaunchContext(context);
|
||||
request.setContainer(container);
|
||||
|
||||
//Calling startContainer with an expired token.
|
||||
try {
|
||||
@ -402,18 +407,19 @@ private AMRMProtocol submitAndRegisterApplication(
|
||||
Arrays.asList("ping", "-n", "100", "127.0.0.1", ">nul") :
|
||||
Arrays.asList("sleep", "100");
|
||||
|
||||
ContainerLaunchContext amContainer = BuilderUtils
|
||||
.newContainerLaunchContext(null, "testUser", BuilderUtils
|
||||
.newResource(1024, 1), Collections.<String, LocalResource>emptyMap(),
|
||||
new HashMap<String, String>(), cmd,
|
||||
new HashMap<String, ByteBuffer>(), null,
|
||||
new HashMap<ApplicationAccessType, String>());
|
||||
ContainerLaunchContext amContainer =
|
||||
BuilderUtils.newContainerLaunchContext("testUser",
|
||||
Collections.<String, LocalResource> emptyMap(),
|
||||
new HashMap<String, String>(), cmd,
|
||||
new HashMap<String, ByteBuffer>(), null,
|
||||
new HashMap<ApplicationAccessType, String>());
|
||||
|
||||
ApplicationSubmissionContext appSubmissionContext = recordFactory
|
||||
.newRecordInstance(ApplicationSubmissionContext.class);
|
||||
appSubmissionContext.setApplicationId(appID);
|
||||
appSubmissionContext.setUser("testUser");
|
||||
appSubmissionContext.setAMContainerSpec(amContainer);
|
||||
appSubmissionContext.getAMContainerSpec().setUser("testUser");
|
||||
appSubmissionContext.setResource(BuilderUtils.newResource(1024, 1));
|
||||
|
||||
SubmitApplicationRequest submitRequest = recordFactory
|
||||
.newRecordInstance(SubmitApplicationRequest.class);
|
||||
@ -539,8 +545,11 @@ void callWithIllegalResource(ContainerManager client,
|
||||
// Authenticated but unauthorized, due to wrong resource
|
||||
ContainerLaunchContext context =
|
||||
createContainerLaunchContextForTest(tokenId);
|
||||
context.getResource().setMemory(2048); // Set a different resource size.
|
||||
Container container =
|
||||
BuilderUtils.newContainer(tokenId.getContainerID(), null, null,
|
||||
BuilderUtils.newResource(2048, 1), null, null);
|
||||
request.setContainerLaunchContext(context);
|
||||
request.setContainer(container);
|
||||
try {
|
||||
client.startContainer(request);
|
||||
fail("Connection initiation with unauthorized "
|
||||
@ -551,7 +560,7 @@ void callWithIllegalResource(ContainerManager client,
|
||||
"Unauthorized request to start container. "));
|
||||
Assert.assertTrue(e.getMessage().contains(
|
||||
"\nExpected resource " + tokenId.getResource().toString()
|
||||
+ " but found " + context.getResource().toString()));
|
||||
+ " but found " + container.getResource().toString()));
|
||||
}
|
||||
}
|
||||
|
||||
@ -563,7 +572,12 @@ void callWithIllegalUserName(ContainerManager client,
|
||||
ContainerLaunchContext context =
|
||||
createContainerLaunchContextForTest(tokenId);
|
||||
context.setUser("Saruman"); // Set a different user-name.
|
||||
Container container =
|
||||
BuilderUtils.newContainer(tokenId.getContainerID(), null, null,
|
||||
BuilderUtils.newResource(tokenId.getResource().getMemory(), tokenId
|
||||
.getResource().getVirtualCores()), null, null);
|
||||
request.setContainerLaunchContext(context);
|
||||
request.setContainer(container);
|
||||
try {
|
||||
client.startContainer(request);
|
||||
fail("Connection initiation with unauthorized "
|
||||
@ -581,12 +595,8 @@ void callWithIllegalUserName(ContainerManager client,
|
||||
private ContainerLaunchContext createContainerLaunchContextForTest(
|
||||
ContainerTokenIdentifier tokenId) {
|
||||
ContainerLaunchContext context =
|
||||
BuilderUtils.newContainerLaunchContext(tokenId.getContainerID(),
|
||||
"testUser",
|
||||
BuilderUtils.newResource(
|
||||
tokenId.getResource().getMemory(),
|
||||
tokenId.getResource().getVirtualCores()),
|
||||
new HashMap<String, LocalResource>(),
|
||||
BuilderUtils.newContainerLaunchContext(
|
||||
"testUser", new HashMap<String, LocalResource>(),
|
||||
new HashMap<String, String>(), new ArrayList<String>(),
|
||||
new HashMap<String, ByteBuffer>(), null,
|
||||
new HashMap<ApplicationAccessType, String>());
|
||||
|
Loading…
Reference in New Issue
Block a user