From 1ea36299a47af302379ae0750b571ec021eb54ad Mon Sep 17 00:00:00 2001 From: Zhijie Shen Date: Fri, 10 Jul 2015 18:58:10 -0700 Subject: [PATCH] YARN-3116. RM notifies NM whether a container is an AM container or normal task container. Contributed by Giovanni Matteo Fumarola. --- hadoop-yarn-project/CHANGES.txt | 3 ++ .../yarn/server/api/ContainerContext.java | 19 +++++++ .../api/ContainerInitializationContext.java | 7 +++ .../api/ContainerTerminationContext.java | 7 +++ .../hadoop/yarn/server/api/ContainerType.java | 34 ++++++++++++ .../src/main/proto/yarn_protos.proto | 5 ++ .../yarn/api/records/impl/pb/ProtoUtils.java | 12 +++++ .../security/ContainerTokenIdentifier.java | 43 +++++++++++++-- .../proto/server/yarn_security_token.proto | 1 + .../security/TestYARNTokenIdentifier.java | 53 +++++++++++++++++++ .../containermanager/AuxServices.java | 6 ++- .../SchedulerApplicationAttempt.java | 17 +++++- .../scheduler/capacity/LeafQueue.java | 3 +- .../RMContainerTokenSecretManager.java | 13 +++-- .../server/resourcemanager/Application.java | 12 +++++ .../capacity/TestContainerAllocation.java | 5 +- .../scheduler/fair/FairSchedulerTestBase.java | 31 +++++++++-- .../scheduler/fifo/TestFifoScheduler.java | 4 ++ 18 files changed, 253 insertions(+), 22 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ContainerType.java diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index db000d7e88..f78bbfa154 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -134,6 +134,9 @@ Release 2.8.0 - UNRELEASED YARN-1012. Report NM aggregated container resource utilization in heartbeat. (Inigo Goiri via kasha) + YARN-3116. RM notifies NM whether a container is an AM container or normal + task container. (Giovanni Matteo Fumarola via zjshen) + IMPROVEMENTS YARN-644. Basic null check is not performed on passed in arguments before diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ContainerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ContainerContext.java index d13159b308..f7a9b02c2a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ContainerContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ContainerContext.java @@ -35,14 +35,23 @@ public class ContainerContext { private final String user; private final ContainerId containerId; private final Resource resource; + private final ContainerType containerType; @Private @Unstable public ContainerContext(String user, ContainerId containerId, Resource resource) { + this(user, containerId, resource, ContainerType.TASK); + } + + @Private + @Unstable + public ContainerContext(String user, ContainerId containerId, + Resource resource, ContainerType containerType) { this.user = user; this.containerId = containerId; this.resource = resource; + this.containerType = containerType; } /** @@ -72,4 +81,14 @@ public ContainerId getContainerId() { public Resource getResource() { return resource; } + + /** + * Get {@link ContainerType} the type of the container + * being initialized or stopped. + * + * @return the type of the container + */ + public ContainerType getContainerType() { + return containerType; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ContainerInitializationContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ContainerInitializationContext.java index 5b5bbda0c0..44428f9765 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ContainerInitializationContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ContainerInitializationContext.java @@ -41,4 +41,11 @@ public ContainerInitializationContext(String user, ContainerId containerId, super(user, containerId, resource); } + @Private + @Unstable + public ContainerInitializationContext(String user, ContainerId containerId, + Resource resource, ContainerType containerType) { + super(user, containerId, resource, containerType); + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ContainerTerminationContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ContainerTerminationContext.java index 34ba73e221..2e4ad3bab5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ContainerTerminationContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ContainerTerminationContext.java @@ -41,4 +41,11 @@ public ContainerTerminationContext(String user, ContainerId containerId, super(user, containerId, resource); } + @Private + @Unstable + public ContainerTerminationContext(String user, ContainerId containerId, + Resource resource, ContainerType containerType) { + super(user, containerId, resource, containerType); + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ContainerType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ContainerType.java new file mode 100644 index 0000000000..ffae811226 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ContainerType.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api; + +/** + * Container property encoding allocation and execution semantics. + * + *

+ * The container types are the following: + *

+ *

+ */ +public enum ContainerType { + APPLICATION_MASTER, TASK +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index f8014093d1..33cc2551bf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -264,6 +264,11 @@ message NodeLabelProto { optional bool isExclusive = 2 [default = true]; } +enum ContainerTypeProto { + APPLICATION_MASTER = 1; + TASK = 2; +} + //////////////////////////////////////////////////////////////////////// ////// From AM_RM_Protocol ///////////////////////////////////////////// //////////////////////////////////////////////////////////////////////// diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java index 4e8a19c65d..e742f4c402 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java @@ -53,7 +53,9 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ReservationRequestInterpreterProto; import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationAttemptStateProto; import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationStateProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerTypeProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos; +import org.apache.hadoop.yarn.server.api.ContainerType; import com.google.protobuf.ByteString; @@ -270,4 +272,14 @@ public static LogAggregationStatus convertFromProtoFormat( return LogAggregationStatus.valueOf(e.name().replace( LOG_AGGREGATION_STATUS_PREFIX, "")); } + + /* + * ContainerType + */ + public static ContainerTypeProto convertToProtoFormat(ContainerType e) { + return ContainerTypeProto.valueOf(e.name()); + } + public static ContainerType convertFromProtoFormat(ContainerTypeProto e) { + return ContainerType.valueOf(e.name()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java index 9a60d018d0..106e6d5d7e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java @@ -39,13 +39,15 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.LogAggregationContextPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.PriorityPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerTypeProto; import org.apache.hadoop.yarn.proto.YarnSecurityTokenProtos.ContainerTokenIdentifierProto; +import org.apache.hadoop.yarn.server.api.ContainerType; import com.google.protobuf.TextFormat; - /** * TokenIdentifier for a container. Encodes {@link ContainerId}, * {@link Resource} needed by the container and the target NMs host-address. @@ -66,14 +68,24 @@ public ContainerTokenIdentifier(ContainerId containerID, int masterKeyId, long rmIdentifier, Priority priority, long creationTime) { this(containerID, hostName, appSubmitter, r, expiryTimeStamp, masterKeyId, rmIdentifier, priority, creationTime, null, - CommonNodeLabelsManager.NO_LABEL); + CommonNodeLabelsManager.NO_LABEL, ContainerType.TASK); } public ContainerTokenIdentifier(ContainerId containerID, String hostName, String appSubmitter, Resource r, long expiryTimeStamp, int masterKeyId, long rmIdentifier, Priority priority, long creationTime, LogAggregationContext logAggregationContext, String nodeLabelExpression) { - ContainerTokenIdentifierProto.Builder builder = + this(containerID, hostName, appSubmitter, r, expiryTimeStamp, masterKeyId, + rmIdentifier, priority, creationTime, logAggregationContext, + nodeLabelExpression, ContainerType.TASK); + } + + public ContainerTokenIdentifier(ContainerId containerID, String hostName, + String appSubmitter, Resource r, long expiryTimeStamp, int masterKeyId, + long rmIdentifier, Priority priority, long creationTime, + LogAggregationContext logAggregationContext, String nodeLabelExpression, + ContainerType containerType) { + ContainerTokenIdentifierProto.Builder builder = ContainerTokenIdentifierProto.newBuilder(); if (containerID != null) { builder.setContainerId(((ContainerIdPBImpl)containerID).getProto()); @@ -99,7 +111,8 @@ public ContainerTokenIdentifier(ContainerId containerID, String hostName, if (nodeLabelExpression != null) { builder.setNodeLabelExpression(nodeLabelExpression); } - + builder.setContainerType(convertToProtoFormat(containerType)); + proto = builder.build(); } @@ -156,7 +169,18 @@ public long getCreationTime() { public long getRMIdentifier() { return proto.getRmIdentifier(); } - + + /** + * Get the ContainerType of container to allocate + * @return ContainerType + */ + public ContainerType getContainerType(){ + if (!proto.hasContainerType()) { + return null; + } + return convertFromProtoFormat(proto.getContainerType()); + } + public ContainerTokenIdentifierProto getProto() { return proto; } @@ -232,4 +256,13 @@ public boolean equals(Object other) { public String toString() { return TextFormat.shortDebugString(getProto()); } + + private ContainerTypeProto convertToProtoFormat(ContainerType containerType) { + return ProtoUtils.convertToProtoFormat(containerType); + } + + private ContainerType convertFromProtoFormat( + ContainerTypeProto containerType) { + return ProtoUtils.convertFromProtoFormat(containerType); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/proto/server/yarn_security_token.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/proto/server/yarn_security_token.proto index d1bef21657..339e99ecdc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/proto/server/yarn_security_token.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/proto/server/yarn_security_token.proto @@ -50,6 +50,7 @@ message ContainerTokenIdentifierProto { optional int64 creationTime = 9; optional LogAggregationContextProto logAggregationContext = 10; optional string nodeLabelExpression = 11; + optional ContainerTypeProto containerType = 12; } message ClientToAMTokenIdentifierProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/security/TestYARNTokenIdentifier.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/security/TestYARNTokenIdentifier.java index 5fe75bc12c..68f0b9df83 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/security/TestYARNTokenIdentifier.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/security/TestYARNTokenIdentifier.java @@ -33,10 +33,12 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.proto.YarnSecurityTokenProtos.YARNDelegationTokenIdentifierProto; import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier; +import org.apache.hadoop.yarn.server.api.ContainerType; import org.junit.Assert; import org.junit.Test; @@ -201,6 +203,12 @@ public void testContainerTokenIdentifier() throws IOException { anotherToken.getCreationTime(), creationTime); Assert.assertNull(anotherToken.getLogAggregationContext()); + + Assert.assertEquals(CommonNodeLabelsManager.NO_LABEL, + anotherToken.getNodeLabelExpression()); + + Assert.assertEquals(ContainerType.TASK, + anotherToken.getContainerType()); } @Test @@ -347,4 +355,49 @@ public void testParseTimelineDelegationTokenIdentifierRenewer() throws IOExcepti Assert.assertEquals(new Text("yarn"), token.getRenewer()); } + @Test + public void testAMContainerTokenIdentifier() throws IOException { + ContainerId containerID = ContainerId.newContainerId( + ApplicationAttemptId.newInstance(ApplicationId.newInstance( + 1, 1), 1), 1); + String hostName = "host0"; + String appSubmitter = "usr0"; + Resource r = Resource.newInstance(1024, 1); + long expiryTimeStamp = 1000; + int masterKeyId = 1; + long rmIdentifier = 1; + Priority priority = Priority.newInstance(1); + long creationTime = 1000; + + ContainerTokenIdentifier token = + new ContainerTokenIdentifier(containerID, hostName, appSubmitter, r, + expiryTimeStamp, masterKeyId, rmIdentifier, priority, creationTime, + null, CommonNodeLabelsManager.NO_LABEL, ContainerType.APPLICATION_MASTER); + + ContainerTokenIdentifier anotherToken = new ContainerTokenIdentifier(); + + byte[] tokenContent = token.getBytes(); + DataInputBuffer dib = new DataInputBuffer(); + dib.reset(tokenContent, tokenContent.length); + anotherToken.readFields(dib); + + Assert.assertEquals(ContainerType.APPLICATION_MASTER, + anotherToken.getContainerType()); + + token = + new ContainerTokenIdentifier(containerID, hostName, appSubmitter, r, + expiryTimeStamp, masterKeyId, rmIdentifier, priority, creationTime, + null, CommonNodeLabelsManager.NO_LABEL, ContainerType.TASK); + + anotherToken = new ContainerTokenIdentifier(); + + tokenContent = token.getBytes(); + dib = new DataInputBuffer(); + dib.reset(tokenContent, tokenContent.length); + anotherToken.readFields(dib); + + Assert.assertEquals(ContainerType.TASK, + anotherToken.getContainerType()); + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java index fb6f79b7c3..cd5ed88738 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java @@ -225,7 +225,8 @@ public void handle(AuxServicesEvent event) { try { serv.initializeContainer(new ContainerInitializationContext( event.getUser(), event.getContainer().getContainerId(), - event.getContainer().getResource())); + event.getContainer().getResource(), event.getContainer() + .getContainerTokenIdentifier().getContainerType())); } catch (Throwable th) { logWarningWhenAuxServiceThrowExceptions(serv, AuxServicesEventType.CONTAINER_INIT, th); @@ -237,7 +238,8 @@ public void handle(AuxServicesEvent event) { try { serv.stopContainer(new ContainerTerminationContext( event.getUser(), event.getContainer().getContainerId(), - event.getContainer().getResource())); + event.getContainer().getResource(), event.getContainer() + .getContainerTokenIdentifier().getContainerType())); } catch (Throwable th) { logWarningWhenAuxServiceThrowExceptions(serv, AuxServicesEventType.CONTAINER_STOP, th); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index dbc3cb5713..475f2c7fe3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.server.api.ContainerType; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -61,7 +62,6 @@ import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity; - import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.HashMultiset; @@ -467,13 +467,26 @@ public List getNMTokenList() { .hasNext();) { RMContainer rmContainer = i.next(); Container container = rmContainer.getContainer(); + ContainerType containerType = ContainerType.TASK; + // The working knowledge is that masterContainer for AM is null as it + // itself is the master container. + RMAppAttempt appAttempt = + rmContext + .getRMApps() + .get( + container.getId().getApplicationAttemptId() + .getApplicationId()).getCurrentAppAttempt(); + if (appAttempt.getMasterContainer() == null + && appAttempt.getSubmissionContext().getUnmanagedAM() == false) { + containerType = ContainerType.APPLICATION_MASTER; + } try { // create container token and NMToken altogether. container.setContainerToken(rmContext.getContainerTokenSecretManager() .createContainerToken(container.getId(), container.getNodeId(), getUser(), container.getResource(), container.getPriority(), rmContainer.getCreationTime(), this.logAggregationContext, - rmContainer.getNodeLabelExpression())); + rmContainer.getNodeLabelExpression(), containerType)); NMToken nmToken = rmContext.getNMTokenSecretManager().createAndGetNMToken(getUser(), getApplicationAttemptId(), container); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index e5b44a6a0d..598f279063 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -829,7 +829,8 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, RMAppAttempt rmAppAttempt = csContext.getRMContext().getRMApps() .get(application.getApplicationId()).getCurrentAppAttempt(); - if (null == rmAppAttempt.getMasterContainer()) { + if (rmAppAttempt.getSubmissionContext().getUnmanagedAM() == false + && null == rmAppAttempt.getMasterContainer()) { if (LOG.isDebugEnabled()) { LOG.debug("Skip allocating AM container to app_attempt=" + application.getApplicationAttemptId() diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java index 1c0533d937..6f00615378 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; +import org.apache.hadoop.yarn.server.api.ContainerType; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager; @@ -166,7 +167,7 @@ public void run() { /** * Helper function for creating ContainerTokens - * + * * @param containerId * @param nodeId * @param appSubmitter @@ -179,12 +180,12 @@ public Token createContainerToken(ContainerId containerId, NodeId nodeId, String appSubmitter, Resource capability, Priority priority, long createTime) { return createContainerToken(containerId, nodeId, appSubmitter, capability, - priority, createTime, null, null); + priority, createTime, null, null, ContainerType.TASK); } /** * Helper function for creating ContainerTokens - * + * * @param containerId * @param nodeId * @param appSubmitter @@ -192,12 +193,14 @@ public Token createContainerToken(ContainerId containerId, NodeId nodeId, * @param priority * @param createTime * @param logAggregationContext + * @param nodeLabelExpression + * @param containerType * @return the container-token */ public Token createContainerToken(ContainerId containerId, NodeId nodeId, String appSubmitter, Resource capability, Priority priority, long createTime, LogAggregationContext logAggregationContext, - String nodeLabelExpression) { + String nodeLabelExpression, ContainerType containerType) { byte[] password; ContainerTokenIdentifier tokenIdentifier; long expiryTimeStamp = @@ -211,7 +214,7 @@ public Token createContainerToken(ContainerId containerId, NodeId nodeId, appSubmitter, capability, expiryTimeStamp, this.currentMasterKey .getMasterKey().getKeyId(), ResourceManager.getClusterTimeStamp(), priority, createTime, - logAggregationContext, nodeLabelExpression); + logAggregationContext, nodeLabelExpression, containerType); password = this.createPassword(tokenIdentifier); } finally { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java index 9b7eb840db..e62f7d7057 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java @@ -55,6 +55,8 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.Task.State; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; @@ -168,6 +170,16 @@ public synchronized void submit() throws IOException, YarnException { resourceManager.getClientRMService().submitApplication(request); + RMAppEvent event = + new RMAppEvent(this.applicationId, RMAppEventType.START); + resourceManager.getRMContext().getRMApps().get(applicationId).handle(event); + event = + new RMAppEvent(this.applicationId, RMAppEventType.APP_NEW_SAVED); + resourceManager.getRMContext().getRMApps().get(applicationId).handle(event); + event = + new RMAppEvent(this.applicationId, RMAppEventType.APP_ACCEPTED); + resourceManager.getRMContext().getRMApps().get(applicationId).handle(event); + // Notify scheduler AppAddedSchedulerEvent addAppEvent = new AppAddedSchedulerEvent(this.applicationId, this.queue, "user"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java index 0ea993fdaf..6183bf675b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java @@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; +import org.apache.hadoop.yarn.server.api.ContainerType; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; @@ -286,11 +287,11 @@ protected RMContainerTokenSecretManager createContainerTokenSecretManager( public Token createContainerToken(ContainerId containerId, NodeId nodeId, String appSubmitter, Resource capability, Priority priority, long createTime, - LogAggregationContext logAggregationContext, String nodeLabelExp) { + LogAggregationContext logAggregationContext, String nodeLabelExp, ContainerType containerType) { numRetries++; return super.createContainerToken(containerId, nodeId, appSubmitter, capability, priority, createTime, logAggregationContext, - nodeLabelExp); + nodeLabelExp, containerType); } }; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java index 23b708add2..403c8ea313 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; @@ -38,6 +39,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; @@ -152,6 +155,11 @@ protected ApplicationAttemptId createSchedulingRequest( when(rmApp.getCurrentAppAttempt()).thenReturn(rmAppAttempt); when(rmAppAttempt.getRMAppAttemptMetrics()).thenReturn( new RMAppAttemptMetrics(id, resourceManager.getRMContext())); + ApplicationSubmissionContext submissionContext = mock(ApplicationSubmissionContext.class); + when(submissionContext.getUnmanagedAM()).thenReturn(false); + when(rmAppAttempt.getSubmissionContext()).thenReturn(submissionContext); + Container container = mock(Container.class); + when(rmAppAttempt.getMasterContainer()).thenReturn(container); resourceManager.getRMContext().getRMApps() .put(id.getApplicationId(), rmApp); @@ -175,6 +183,9 @@ protected ApplicationAttemptId createSchedulingRequest(String queueId, when(rmApp.getCurrentAppAttempt()).thenReturn(rmAppAttempt); when(rmAppAttempt.getRMAppAttemptMetrics()).thenReturn( new RMAppAttemptMetrics(id,resourceManager.getRMContext())); + ApplicationSubmissionContext submissionContext = mock(ApplicationSubmissionContext.class); + when(submissionContext.getUnmanagedAM()).thenReturn(false); + when(rmAppAttempt.getSubmissionContext()).thenReturn(submissionContext); resourceManager.getRMContext().getRMApps() .put(id.getApplicationId(), rmApp); @@ -206,13 +217,20 @@ protected void createSchedulingRequestExistingApplication( protected void createApplicationWithAMResource(ApplicationAttemptId attId, String queue, String user, Resource amResource) { RMContext rmContext = resourceManager.getRMContext(); - RMApp rmApp = new RMAppImpl(attId.getApplicationId(), rmContext, conf, - null, null, null, ApplicationSubmissionContext.newInstance(null, null, - null, null, null, false, false, 0, amResource, null), null, null, + ApplicationId appId = attId.getApplicationId(); + RMApp rmApp = new RMAppImpl(appId, rmContext, conf, + null, user, null, ApplicationSubmissionContext.newInstance(appId, null, + queue, null, null, false, false, 0, amResource, null), null, null, 0, null, null, null); - rmContext.getRMApps().put(attId.getApplicationId(), rmApp); + rmContext.getRMApps().put(appId, rmApp); + RMAppEvent event = new RMAppEvent(appId, RMAppEventType.START); + resourceManager.getRMContext().getRMApps().get(appId).handle(event); + event = new RMAppEvent(appId, RMAppEventType.APP_NEW_SAVED); + resourceManager.getRMContext().getRMApps().get(appId).handle(event); + event = new RMAppEvent(appId, RMAppEventType.APP_ACCEPTED); + resourceManager.getRMContext().getRMApps().get(appId).handle(event); AppAddedSchedulerEvent appAddedEvent = new AppAddedSchedulerEvent( - attId.getApplicationId(), queue, user); + appId, queue, user); scheduler.handle(appAddedEvent); AppAttemptAddedSchedulerEvent attempAddedEvent = new AppAttemptAddedSchedulerEvent(attId, false); @@ -227,6 +245,9 @@ protected RMApp createMockRMApp(ApplicationAttemptId attemptId) { RMAppAttemptMetrics attemptMetric = mock(RMAppAttemptMetrics.class); when(attempt.getRMAppAttemptMetrics()).thenReturn(attemptMetric); when(app.getCurrentAppAttempt()).thenReturn(attempt); + ApplicationSubmissionContext submissionContext = mock(ApplicationSubmissionContext.class); + when(submissionContext.getUnmanagedAM()).thenReturn(false); + when(attempt.getSubmissionContext()).thenReturn(submissionContext); resourceManager.getRMContext().getRMApps() .put(attemptId.getApplicationId(), app); return app; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java index e4583d1d49..a454801ca3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java @@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; @@ -1184,6 +1185,9 @@ private RMAppImpl createMockRMApp(ApplicationAttemptId attemptId, RMAppAttemptMetrics attemptMetric = mock(RMAppAttemptMetrics.class); when(attempt.getRMAppAttemptMetrics()).thenReturn(attemptMetric); when(app.getCurrentAppAttempt()).thenReturn(attempt); + ApplicationSubmissionContext submissionContext = mock(ApplicationSubmissionContext.class); + when(submissionContext.getUnmanagedAM()).thenReturn(false); + when(attempt.getSubmissionContext()).thenReturn(submissionContext); context.getRMApps().putIfAbsent(attemptId.getApplicationId(), app); return app; }