YARN-3116. RM notifies NM whether a container is an AM container or normal task container. Contributed by Giovanni Matteo Fumarola.
This commit is contained in:
parent
47f4c54106
commit
1ea36299a4
@ -134,6 +134,9 @@ Release 2.8.0 - UNRELEASED
|
||||
YARN-1012. Report NM aggregated container resource utilization in heartbeat.
|
||||
(Inigo Goiri via kasha)
|
||||
|
||||
YARN-3116. RM notifies NM whether a container is an AM container or normal
|
||||
task container. (Giovanni Matteo Fumarola via zjshen)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
YARN-644. Basic null check is not performed on passed in arguments before
|
||||
|
@ -35,14 +35,23 @@ public class ContainerContext {
|
||||
private final String user;
|
||||
private final ContainerId containerId;
|
||||
private final Resource resource;
|
||||
private final ContainerType containerType;
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public ContainerContext(String user, ContainerId containerId,
|
||||
Resource resource) {
|
||||
this(user, containerId, resource, ContainerType.TASK);
|
||||
}
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public ContainerContext(String user, ContainerId containerId,
|
||||
Resource resource, ContainerType containerType) {
|
||||
this.user = user;
|
||||
this.containerId = containerId;
|
||||
this.resource = resource;
|
||||
this.containerType = containerType;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -72,4 +81,14 @@ public ContainerId getContainerId() {
|
||||
public Resource getResource() {
|
||||
return resource;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get {@link ContainerType} the type of the container
|
||||
* being initialized or stopped.
|
||||
*
|
||||
* @return the type of the container
|
||||
*/
|
||||
public ContainerType getContainerType() {
|
||||
return containerType;
|
||||
}
|
||||
}
|
||||
|
@ -41,4 +41,11 @@ public ContainerInitializationContext(String user, ContainerId containerId,
|
||||
super(user, containerId, resource);
|
||||
}
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public ContainerInitializationContext(String user, ContainerId containerId,
|
||||
Resource resource, ContainerType containerType) {
|
||||
super(user, containerId, resource, containerType);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -41,4 +41,11 @@ public ContainerTerminationContext(String user, ContainerId containerId,
|
||||
super(user, containerId, resource);
|
||||
}
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public ContainerTerminationContext(String user, ContainerId containerId,
|
||||
Resource resource, ContainerType containerType) {
|
||||
super(user, containerId, resource, containerType);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -0,0 +1,34 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.api;
|
||||
|
||||
/**
|
||||
* Container property encoding allocation and execution semantics.
|
||||
*
|
||||
* <p>
|
||||
* The container types are the following:
|
||||
* <ul>
|
||||
* <li>{@link #APPLICATION_MASTER}
|
||||
* <li>{@link #TASK}
|
||||
* </ul>
|
||||
* </p>
|
||||
*/
|
||||
public enum ContainerType {
|
||||
APPLICATION_MASTER, TASK
|
||||
}
|
@ -264,6 +264,11 @@ message NodeLabelProto {
|
||||
optional bool isExclusive = 2 [default = true];
|
||||
}
|
||||
|
||||
enum ContainerTypeProto {
|
||||
APPLICATION_MASTER = 1;
|
||||
TASK = 2;
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////
|
||||
////// From AM_RM_Protocol /////////////////////////////////////////////
|
||||
////////////////////////////////////////////////////////////////////////
|
||||
|
@ -53,7 +53,9 @@
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ReservationRequestInterpreterProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationAttemptStateProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationStateProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerTypeProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos;
|
||||
import org.apache.hadoop.yarn.server.api.ContainerType;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
|
||||
@ -270,4 +272,14 @@ public static LogAggregationStatus convertFromProtoFormat(
|
||||
return LogAggregationStatus.valueOf(e.name().replace(
|
||||
LOG_AGGREGATION_STATUS_PREFIX, ""));
|
||||
}
|
||||
|
||||
/*
|
||||
* ContainerType
|
||||
*/
|
||||
public static ContainerTypeProto convertToProtoFormat(ContainerType e) {
|
||||
return ContainerTypeProto.valueOf(e.name());
|
||||
}
|
||||
public static ContainerType convertFromProtoFormat(ContainerTypeProto e) {
|
||||
return ContainerType.valueOf(e.name());
|
||||
}
|
||||
}
|
||||
|
@ -39,13 +39,15 @@
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.LogAggregationContextPBImpl;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.PriorityPBImpl;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
|
||||
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerTypeProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnSecurityTokenProtos.ContainerTokenIdentifierProto;
|
||||
import org.apache.hadoop.yarn.server.api.ContainerType;
|
||||
|
||||
import com.google.protobuf.TextFormat;
|
||||
|
||||
|
||||
/**
|
||||
* TokenIdentifier for a container. Encodes {@link ContainerId},
|
||||
* {@link Resource} needed by the container and the target NMs host-address.
|
||||
@ -66,14 +68,24 @@ public ContainerTokenIdentifier(ContainerId containerID,
|
||||
int masterKeyId, long rmIdentifier, Priority priority, long creationTime) {
|
||||
this(containerID, hostName, appSubmitter, r, expiryTimeStamp, masterKeyId,
|
||||
rmIdentifier, priority, creationTime, null,
|
||||
CommonNodeLabelsManager.NO_LABEL);
|
||||
CommonNodeLabelsManager.NO_LABEL, ContainerType.TASK);
|
||||
}
|
||||
|
||||
public ContainerTokenIdentifier(ContainerId containerID, String hostName,
|
||||
String appSubmitter, Resource r, long expiryTimeStamp, int masterKeyId,
|
||||
long rmIdentifier, Priority priority, long creationTime,
|
||||
LogAggregationContext logAggregationContext, String nodeLabelExpression) {
|
||||
ContainerTokenIdentifierProto.Builder builder =
|
||||
this(containerID, hostName, appSubmitter, r, expiryTimeStamp, masterKeyId,
|
||||
rmIdentifier, priority, creationTime, logAggregationContext,
|
||||
nodeLabelExpression, ContainerType.TASK);
|
||||
}
|
||||
|
||||
public ContainerTokenIdentifier(ContainerId containerID, String hostName,
|
||||
String appSubmitter, Resource r, long expiryTimeStamp, int masterKeyId,
|
||||
long rmIdentifier, Priority priority, long creationTime,
|
||||
LogAggregationContext logAggregationContext, String nodeLabelExpression,
|
||||
ContainerType containerType) {
|
||||
ContainerTokenIdentifierProto.Builder builder =
|
||||
ContainerTokenIdentifierProto.newBuilder();
|
||||
if (containerID != null) {
|
||||
builder.setContainerId(((ContainerIdPBImpl)containerID).getProto());
|
||||
@ -99,7 +111,8 @@ public ContainerTokenIdentifier(ContainerId containerID, String hostName,
|
||||
if (nodeLabelExpression != null) {
|
||||
builder.setNodeLabelExpression(nodeLabelExpression);
|
||||
}
|
||||
|
||||
builder.setContainerType(convertToProtoFormat(containerType));
|
||||
|
||||
proto = builder.build();
|
||||
}
|
||||
|
||||
@ -156,7 +169,18 @@ public long getCreationTime() {
|
||||
public long getRMIdentifier() {
|
||||
return proto.getRmIdentifier();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Get the ContainerType of container to allocate
|
||||
* @return ContainerType
|
||||
*/
|
||||
public ContainerType getContainerType(){
|
||||
if (!proto.hasContainerType()) {
|
||||
return null;
|
||||
}
|
||||
return convertFromProtoFormat(proto.getContainerType());
|
||||
}
|
||||
|
||||
public ContainerTokenIdentifierProto getProto() {
|
||||
return proto;
|
||||
}
|
||||
@ -232,4 +256,13 @@ public boolean equals(Object other) {
|
||||
public String toString() {
|
||||
return TextFormat.shortDebugString(getProto());
|
||||
}
|
||||
|
||||
private ContainerTypeProto convertToProtoFormat(ContainerType containerType) {
|
||||
return ProtoUtils.convertToProtoFormat(containerType);
|
||||
}
|
||||
|
||||
private ContainerType convertFromProtoFormat(
|
||||
ContainerTypeProto containerType) {
|
||||
return ProtoUtils.convertFromProtoFormat(containerType);
|
||||
}
|
||||
}
|
||||
|
@ -50,6 +50,7 @@ message ContainerTokenIdentifierProto {
|
||||
optional int64 creationTime = 9;
|
||||
optional LogAggregationContextProto logAggregationContext = 10;
|
||||
optional string nodeLabelExpression = 11;
|
||||
optional ContainerTypeProto containerType = 12;
|
||||
}
|
||||
|
||||
message ClientToAMTokenIdentifierProto {
|
||||
|
@ -33,10 +33,12 @@
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.proto.YarnSecurityTokenProtos.YARNDelegationTokenIdentifierProto;
|
||||
import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.api.ContainerType;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
@ -201,6 +203,12 @@ public void testContainerTokenIdentifier() throws IOException {
|
||||
anotherToken.getCreationTime(), creationTime);
|
||||
|
||||
Assert.assertNull(anotherToken.getLogAggregationContext());
|
||||
|
||||
Assert.assertEquals(CommonNodeLabelsManager.NO_LABEL,
|
||||
anotherToken.getNodeLabelExpression());
|
||||
|
||||
Assert.assertEquals(ContainerType.TASK,
|
||||
anotherToken.getContainerType());
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -347,4 +355,49 @@ public void testParseTimelineDelegationTokenIdentifierRenewer() throws IOExcepti
|
||||
Assert.assertEquals(new Text("yarn"), token.getRenewer());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAMContainerTokenIdentifier() throws IOException {
|
||||
ContainerId containerID = ContainerId.newContainerId(
|
||||
ApplicationAttemptId.newInstance(ApplicationId.newInstance(
|
||||
1, 1), 1), 1);
|
||||
String hostName = "host0";
|
||||
String appSubmitter = "usr0";
|
||||
Resource r = Resource.newInstance(1024, 1);
|
||||
long expiryTimeStamp = 1000;
|
||||
int masterKeyId = 1;
|
||||
long rmIdentifier = 1;
|
||||
Priority priority = Priority.newInstance(1);
|
||||
long creationTime = 1000;
|
||||
|
||||
ContainerTokenIdentifier token =
|
||||
new ContainerTokenIdentifier(containerID, hostName, appSubmitter, r,
|
||||
expiryTimeStamp, masterKeyId, rmIdentifier, priority, creationTime,
|
||||
null, CommonNodeLabelsManager.NO_LABEL, ContainerType.APPLICATION_MASTER);
|
||||
|
||||
ContainerTokenIdentifier anotherToken = new ContainerTokenIdentifier();
|
||||
|
||||
byte[] tokenContent = token.getBytes();
|
||||
DataInputBuffer dib = new DataInputBuffer();
|
||||
dib.reset(tokenContent, tokenContent.length);
|
||||
anotherToken.readFields(dib);
|
||||
|
||||
Assert.assertEquals(ContainerType.APPLICATION_MASTER,
|
||||
anotherToken.getContainerType());
|
||||
|
||||
token =
|
||||
new ContainerTokenIdentifier(containerID, hostName, appSubmitter, r,
|
||||
expiryTimeStamp, masterKeyId, rmIdentifier, priority, creationTime,
|
||||
null, CommonNodeLabelsManager.NO_LABEL, ContainerType.TASK);
|
||||
|
||||
anotherToken = new ContainerTokenIdentifier();
|
||||
|
||||
tokenContent = token.getBytes();
|
||||
dib = new DataInputBuffer();
|
||||
dib.reset(tokenContent, tokenContent.length);
|
||||
anotherToken.readFields(dib);
|
||||
|
||||
Assert.assertEquals(ContainerType.TASK,
|
||||
anotherToken.getContainerType());
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -225,7 +225,8 @@ public void handle(AuxServicesEvent event) {
|
||||
try {
|
||||
serv.initializeContainer(new ContainerInitializationContext(
|
||||
event.getUser(), event.getContainer().getContainerId(),
|
||||
event.getContainer().getResource()));
|
||||
event.getContainer().getResource(), event.getContainer()
|
||||
.getContainerTokenIdentifier().getContainerType()));
|
||||
} catch (Throwable th) {
|
||||
logWarningWhenAuxServiceThrowExceptions(serv,
|
||||
AuxServicesEventType.CONTAINER_INIT, th);
|
||||
@ -237,7 +238,8 @@ public void handle(AuxServicesEvent event) {
|
||||
try {
|
||||
serv.stopContainer(new ContainerTerminationContext(
|
||||
event.getUser(), event.getContainer().getContainerId(),
|
||||
event.getContainer().getResource()));
|
||||
event.getContainer().getResource(), event.getContainer()
|
||||
.getContainerTokenIdentifier().getContainerType()));
|
||||
} catch (Throwable th) {
|
||||
logWarningWhenAuxServiceThrowExceptions(serv,
|
||||
AuxServicesEventType.CONTAINER_STOP, th);
|
||||
|
@ -45,6 +45,7 @@
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.server.api.ContainerType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
@ -61,7 +62,6 @@
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity;
|
||||
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.HashMultiset;
|
||||
@ -467,13 +467,26 @@ public List<NMToken> getNMTokenList() {
|
||||
.hasNext();) {
|
||||
RMContainer rmContainer = i.next();
|
||||
Container container = rmContainer.getContainer();
|
||||
ContainerType containerType = ContainerType.TASK;
|
||||
// The working knowledge is that masterContainer for AM is null as it
|
||||
// itself is the master container.
|
||||
RMAppAttempt appAttempt =
|
||||
rmContext
|
||||
.getRMApps()
|
||||
.get(
|
||||
container.getId().getApplicationAttemptId()
|
||||
.getApplicationId()).getCurrentAppAttempt();
|
||||
if (appAttempt.getMasterContainer() == null
|
||||
&& appAttempt.getSubmissionContext().getUnmanagedAM() == false) {
|
||||
containerType = ContainerType.APPLICATION_MASTER;
|
||||
}
|
||||
try {
|
||||
// create container token and NMToken altogether.
|
||||
container.setContainerToken(rmContext.getContainerTokenSecretManager()
|
||||
.createContainerToken(container.getId(), container.getNodeId(),
|
||||
getUser(), container.getResource(), container.getPriority(),
|
||||
rmContainer.getCreationTime(), this.logAggregationContext,
|
||||
rmContainer.getNodeLabelExpression()));
|
||||
rmContainer.getNodeLabelExpression(), containerType));
|
||||
NMToken nmToken =
|
||||
rmContext.getNMTokenSecretManager().createAndGetNMToken(getUser(),
|
||||
getApplicationAttemptId(), container);
|
||||
|
@ -829,7 +829,8 @@ public synchronized CSAssignment assignContainers(Resource clusterResource,
|
||||
RMAppAttempt rmAppAttempt =
|
||||
csContext.getRMContext().getRMApps()
|
||||
.get(application.getApplicationId()).getCurrentAppAttempt();
|
||||
if (null == rmAppAttempt.getMasterContainer()) {
|
||||
if (rmAppAttempt.getSubmissionContext().getUnmanagedAM() == false
|
||||
&& null == rmAppAttempt.getMasterContainer()) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Skip allocating AM container to app_attempt="
|
||||
+ application.getApplicationAttemptId()
|
||||
|
@ -33,6 +33,7 @@
|
||||
import org.apache.hadoop.yarn.api.records.Token;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.api.ContainerType;
|
||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||
import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
|
||||
@ -166,7 +167,7 @@ public void run() {
|
||||
|
||||
/**
|
||||
* Helper function for creating ContainerTokens
|
||||
*
|
||||
*
|
||||
* @param containerId
|
||||
* @param nodeId
|
||||
* @param appSubmitter
|
||||
@ -179,12 +180,12 @@ public Token createContainerToken(ContainerId containerId, NodeId nodeId,
|
||||
String appSubmitter, Resource capability, Priority priority,
|
||||
long createTime) {
|
||||
return createContainerToken(containerId, nodeId, appSubmitter, capability,
|
||||
priority, createTime, null, null);
|
||||
priority, createTime, null, null, ContainerType.TASK);
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper function for creating ContainerTokens
|
||||
*
|
||||
*
|
||||
* @param containerId
|
||||
* @param nodeId
|
||||
* @param appSubmitter
|
||||
@ -192,12 +193,14 @@ public Token createContainerToken(ContainerId containerId, NodeId nodeId,
|
||||
* @param priority
|
||||
* @param createTime
|
||||
* @param logAggregationContext
|
||||
* @param nodeLabelExpression
|
||||
* @param containerType
|
||||
* @return the container-token
|
||||
*/
|
||||
public Token createContainerToken(ContainerId containerId, NodeId nodeId,
|
||||
String appSubmitter, Resource capability, Priority priority,
|
||||
long createTime, LogAggregationContext logAggregationContext,
|
||||
String nodeLabelExpression) {
|
||||
String nodeLabelExpression, ContainerType containerType) {
|
||||
byte[] password;
|
||||
ContainerTokenIdentifier tokenIdentifier;
|
||||
long expiryTimeStamp =
|
||||
@ -211,7 +214,7 @@ public Token createContainerToken(ContainerId containerId, NodeId nodeId,
|
||||
appSubmitter, capability, expiryTimeStamp, this.currentMasterKey
|
||||
.getMasterKey().getKeyId(),
|
||||
ResourceManager.getClusterTimeStamp(), priority, createTime,
|
||||
logAggregationContext, nodeLabelExpression);
|
||||
logAggregationContext, nodeLabelExpression, containerType);
|
||||
password = this.createPassword(tokenIdentifier);
|
||||
|
||||
} finally {
|
||||
|
@ -55,6 +55,8 @@
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.Task.State;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
@ -168,6 +170,16 @@ public synchronized void submit() throws IOException, YarnException {
|
||||
|
||||
resourceManager.getClientRMService().submitApplication(request);
|
||||
|
||||
RMAppEvent event =
|
||||
new RMAppEvent(this.applicationId, RMAppEventType.START);
|
||||
resourceManager.getRMContext().getRMApps().get(applicationId).handle(event);
|
||||
event =
|
||||
new RMAppEvent(this.applicationId, RMAppEventType.APP_NEW_SAVED);
|
||||
resourceManager.getRMContext().getRMApps().get(applicationId).handle(event);
|
||||
event =
|
||||
new RMAppEvent(this.applicationId, RMAppEventType.APP_ACCEPTED);
|
||||
resourceManager.getRMContext().getRMApps().get(applicationId).handle(event);
|
||||
|
||||
// Notify scheduler
|
||||
AppAddedSchedulerEvent addAppEvent =
|
||||
new AppAddedSchedulerEvent(this.applicationId, this.queue, "user");
|
||||
|
@ -38,6 +38,7 @@
|
||||
import org.apache.hadoop.yarn.api.records.Token;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.api.ContainerType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||
@ -286,11 +287,11 @@ protected RMContainerTokenSecretManager createContainerTokenSecretManager(
|
||||
public Token createContainerToken(ContainerId containerId,
|
||||
NodeId nodeId, String appSubmitter, Resource capability,
|
||||
Priority priority, long createTime,
|
||||
LogAggregationContext logAggregationContext, String nodeLabelExp) {
|
||||
LogAggregationContext logAggregationContext, String nodeLabelExp, ContainerType containerType) {
|
||||
numRetries++;
|
||||
return super.createContainerToken(containerId, nodeId, appSubmitter,
|
||||
capability, priority, createTime, logAggregationContext,
|
||||
nodeLabelExp);
|
||||
nodeLabelExp, containerType);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
@ -28,6 +28,7 @@
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
@ -38,6 +39,8 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
|
||||
@ -152,6 +155,11 @@ protected ApplicationAttemptId createSchedulingRequest(
|
||||
when(rmApp.getCurrentAppAttempt()).thenReturn(rmAppAttempt);
|
||||
when(rmAppAttempt.getRMAppAttemptMetrics()).thenReturn(
|
||||
new RMAppAttemptMetrics(id, resourceManager.getRMContext()));
|
||||
ApplicationSubmissionContext submissionContext = mock(ApplicationSubmissionContext.class);
|
||||
when(submissionContext.getUnmanagedAM()).thenReturn(false);
|
||||
when(rmAppAttempt.getSubmissionContext()).thenReturn(submissionContext);
|
||||
Container container = mock(Container.class);
|
||||
when(rmAppAttempt.getMasterContainer()).thenReturn(container);
|
||||
resourceManager.getRMContext().getRMApps()
|
||||
.put(id.getApplicationId(), rmApp);
|
||||
|
||||
@ -175,6 +183,9 @@ protected ApplicationAttemptId createSchedulingRequest(String queueId,
|
||||
when(rmApp.getCurrentAppAttempt()).thenReturn(rmAppAttempt);
|
||||
when(rmAppAttempt.getRMAppAttemptMetrics()).thenReturn(
|
||||
new RMAppAttemptMetrics(id,resourceManager.getRMContext()));
|
||||
ApplicationSubmissionContext submissionContext = mock(ApplicationSubmissionContext.class);
|
||||
when(submissionContext.getUnmanagedAM()).thenReturn(false);
|
||||
when(rmAppAttempt.getSubmissionContext()).thenReturn(submissionContext);
|
||||
resourceManager.getRMContext().getRMApps()
|
||||
.put(id.getApplicationId(), rmApp);
|
||||
|
||||
@ -206,13 +217,20 @@ protected void createSchedulingRequestExistingApplication(
|
||||
protected void createApplicationWithAMResource(ApplicationAttemptId attId,
|
||||
String queue, String user, Resource amResource) {
|
||||
RMContext rmContext = resourceManager.getRMContext();
|
||||
RMApp rmApp = new RMAppImpl(attId.getApplicationId(), rmContext, conf,
|
||||
null, null, null, ApplicationSubmissionContext.newInstance(null, null,
|
||||
null, null, null, false, false, 0, amResource, null), null, null,
|
||||
ApplicationId appId = attId.getApplicationId();
|
||||
RMApp rmApp = new RMAppImpl(appId, rmContext, conf,
|
||||
null, user, null, ApplicationSubmissionContext.newInstance(appId, null,
|
||||
queue, null, null, false, false, 0, amResource, null), null, null,
|
||||
0, null, null, null);
|
||||
rmContext.getRMApps().put(attId.getApplicationId(), rmApp);
|
||||
rmContext.getRMApps().put(appId, rmApp);
|
||||
RMAppEvent event = new RMAppEvent(appId, RMAppEventType.START);
|
||||
resourceManager.getRMContext().getRMApps().get(appId).handle(event);
|
||||
event = new RMAppEvent(appId, RMAppEventType.APP_NEW_SAVED);
|
||||
resourceManager.getRMContext().getRMApps().get(appId).handle(event);
|
||||
event = new RMAppEvent(appId, RMAppEventType.APP_ACCEPTED);
|
||||
resourceManager.getRMContext().getRMApps().get(appId).handle(event);
|
||||
AppAddedSchedulerEvent appAddedEvent = new AppAddedSchedulerEvent(
|
||||
attId.getApplicationId(), queue, user);
|
||||
appId, queue, user);
|
||||
scheduler.handle(appAddedEvent);
|
||||
AppAttemptAddedSchedulerEvent attempAddedEvent =
|
||||
new AppAttemptAddedSchedulerEvent(attId, false);
|
||||
@ -227,6 +245,9 @@ protected RMApp createMockRMApp(ApplicationAttemptId attemptId) {
|
||||
RMAppAttemptMetrics attemptMetric = mock(RMAppAttemptMetrics.class);
|
||||
when(attempt.getRMAppAttemptMetrics()).thenReturn(attemptMetric);
|
||||
when(app.getCurrentAppAttempt()).thenReturn(attempt);
|
||||
ApplicationSubmissionContext submissionContext = mock(ApplicationSubmissionContext.class);
|
||||
when(submissionContext.getUnmanagedAM()).thenReturn(false);
|
||||
when(attempt.getSubmissionContext()).thenReturn(submissionContext);
|
||||
resourceManager.getRMContext().getRMApps()
|
||||
.put(attemptId.getApplicationId(), app);
|
||||
return app;
|
||||
|
@ -39,6 +39,7 @@
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
@ -1184,6 +1185,9 @@ private RMAppImpl createMockRMApp(ApplicationAttemptId attemptId,
|
||||
RMAppAttemptMetrics attemptMetric = mock(RMAppAttemptMetrics.class);
|
||||
when(attempt.getRMAppAttemptMetrics()).thenReturn(attemptMetric);
|
||||
when(app.getCurrentAppAttempt()).thenReturn(attempt);
|
||||
ApplicationSubmissionContext submissionContext = mock(ApplicationSubmissionContext.class);
|
||||
when(submissionContext.getUnmanagedAM()).thenReturn(false);
|
||||
when(attempt.getSubmissionContext()).thenReturn(submissionContext);
|
||||
context.getRMApps().putIfAbsent(attemptId.getApplicationId(), app);
|
||||
return app;
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user