From 1b89a3e173f8e905074ed6714a7be5c003c0e2c4 Mon Sep 17 00:00:00 2001 From: Jian He Date: Wed, 15 Apr 2015 13:57:06 -0700 Subject: [PATCH] YARN-3354. Add node label expression in ContainerTokenIdentifier to support RM recovery. Contributed by Wangda Tan --- hadoop-yarn-project/CHANGES.txt | 3 + .../security/ContainerTokenIdentifier.java | 21 +- .../proto/server/yarn_security_token.proto | 1 + .../protocolrecords/NMContainerStatus.java | 22 +- .../impl/pb/NMContainerStatusPBImpl.java | 21 +- .../yarn_server_common_service_protos.proto | 1 + .../container/ContainerImpl.java | 7 +- .../TestContainerManager.java | 2 +- .../rmcontainer/RMContainer.java | 2 + .../rmcontainer/RMContainerImpl.java | 26 +- .../scheduler/AbstractYarnScheduler.java | 2 +- .../SchedulerApplicationAttempt.java | 13 +- .../common/fica/FiCaSchedulerApp.java | 7 +- .../RMContainerTokenSecretManager.java | 7 +- .../server/resourcemanager/TestRMRestart.java | 13 +- .../capacity/TestContainerAllocation.java | 5 +- ...stWorkPreservingRMRestartForNodeLabel.java | 282 ++++++++++++++++++ 17 files changed, 408 insertions(+), 27 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestWorkPreservingRMRestartForNodeLabel.java diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 7ec4b50377..ecbdd3c9e9 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -81,6 +81,9 @@ Release 2.8.0 - UNRELEASED YARN-3326. Support RESTful API for getLabelsToNodes. (Naganarasimha G R via ozawa) + YARN-3354. Add node label expression in ContainerTokenIdentifier to support + RM recovery. (Wangda Tan via jianhe) + IMPROVEMENTS YARN-1880. Cleanup TestApplicationClientProtocolOnHA diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java index 593bfc37ae..9a60d018d0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java @@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.api.records.impl.pb.LogAggregationContextPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.PriorityPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.proto.YarnSecurityTokenProtos.ContainerTokenIdentifierProto; import com.google.protobuf.TextFormat; @@ -64,13 +65,14 @@ public ContainerTokenIdentifier(ContainerId containerID, String hostName, String appSubmitter, Resource r, long expiryTimeStamp, int masterKeyId, long rmIdentifier, Priority priority, long creationTime) { this(containerID, hostName, appSubmitter, r, expiryTimeStamp, masterKeyId, - rmIdentifier, priority, creationTime, null); + rmIdentifier, priority, creationTime, null, + CommonNodeLabelsManager.NO_LABEL); } public ContainerTokenIdentifier(ContainerId containerID, String hostName, String appSubmitter, Resource r, long expiryTimeStamp, int masterKeyId, long rmIdentifier, Priority priority, long creationTime, - LogAggregationContext logAggregationContext) { + LogAggregationContext logAggregationContext, String nodeLabelExpression) { ContainerTokenIdentifierProto.Builder builder = ContainerTokenIdentifierProto.newBuilder(); if (containerID != null) { @@ -93,6 +95,11 @@ public ContainerTokenIdentifier(ContainerId containerID, String hostName, builder.setLogAggregationContext( ((LogAggregationContextPBImpl)logAggregationContext).getProto()); } + + if (nodeLabelExpression != null) { + builder.setNodeLabelExpression(nodeLabelExpression); + } + proto = builder.build(); } @@ -186,6 +193,16 @@ public UserGroupInformation getUser() { return UserGroupInformation.createRemoteUser( containerId); } + + /** + * Get the node-label-expression in the original ResourceRequest + */ + public String getNodeLabelExpression() { + if (proto.hasNodeLabelExpression()) { + return proto.getNodeLabelExpression(); + } + return CommonNodeLabelsManager.NO_LABEL; + } // TODO: Needed? @InterfaceAudience.Private diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/proto/server/yarn_security_token.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/proto/server/yarn_security_token.proto index 317032d56b..d1bef21657 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/proto/server/yarn_security_token.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/proto/server/yarn_security_token.proto @@ -49,6 +49,7 @@ message ContainerTokenIdentifierProto { optional PriorityProto priority = 8; optional int64 creationTime = 9; optional LogAggregationContextProto logAggregationContext = 10; + optional string nodeLabelExpression = 11; } message ClientToAMTokenIdentifierProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NMContainerStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NMContainerStatus.java index 2f8f92de45..4067c114be 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NMContainerStatus.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NMContainerStatus.java @@ -23,6 +23,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.util.Records; /** @@ -31,11 +32,21 @@ * inside YARN and by end-users. */ public abstract class NMContainerStatus { - + + // Used by tests only public static NMContainerStatus newInstance(ContainerId containerId, ContainerState containerState, Resource allocatedResource, String diagnostics, int containerExitStatus, Priority priority, long creationTime) { + return newInstance(containerId, containerState, allocatedResource, + diagnostics, containerExitStatus, priority, creationTime, + CommonNodeLabelsManager.NO_LABEL); + } + + public static NMContainerStatus newInstance(ContainerId containerId, + ContainerState containerState, Resource allocatedResource, + String diagnostics, int containerExitStatus, Priority priority, + long creationTime, String nodeLabelExpression) { NMContainerStatus status = Records.newRecord(NMContainerStatus.class); status.setContainerId(containerId); @@ -45,6 +56,7 @@ public static NMContainerStatus newInstance(ContainerId containerId, status.setContainerExitStatus(containerExitStatus); status.setPriority(priority); status.setCreationTime(creationTime); + status.setNodeLabelExpression(nodeLabelExpression); return status; } @@ -105,4 +117,12 @@ public static NMContainerStatus newInstance(ContainerId containerId, public abstract long getCreationTime(); public abstract void setCreationTime(long creationTime); + + /** + * Get the node-label-expression in the original ResourceRequest + */ + public abstract String getNodeLabelExpression(); + + public abstract void setNodeLabelExpression( + String nodeLabelExpression); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NMContainerStatusPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NMContainerStatusPBImpl.java index 86e1d971aa..624b89bffb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NMContainerStatusPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NMContainerStatusPBImpl.java @@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.records.impl.pb.PriorityPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStateProto; import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto; @@ -207,6 +208,25 @@ public void setCreationTime(long creationTime) { maybeInitBuilder(); builder.setCreationTime(creationTime); } + + @Override + public String getNodeLabelExpression() { + NMContainerStatusProtoOrBuilder p = viaProto ? proto : builder; + if (p.hasNodeLabelExpression()) { + return p.getNodeLabelExpression(); + } + return CommonNodeLabelsManager.NO_LABEL; + } + + @Override + public void setNodeLabelExpression(String nodeLabelExpression) { + maybeInitBuilder(); + if (nodeLabelExpression == null) { + builder.clearNodeLabelExpression(); + return; + } + builder.setNodeLabelExpression(nodeLabelExpression); + } private void mergeLocalToBuilder() { if (this.containerId != null @@ -274,5 +294,4 @@ private PriorityPBImpl convertFromProtoFormat(PriorityProto p) { private PriorityProto convertToProtoFormat(Priority t) { return ((PriorityPBImpl)t).getProto(); } - } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index 3103582195..d34c9f7e51 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -92,6 +92,7 @@ message NMContainerStatusProto { optional string diagnostics = 5 [default = "N/A"]; optional int32 container_exit_status = 6; optional int64 creation_time = 7; + optional string nodeLabelExpression = 8; } message SCMUploaderNotifyRequestProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index 131d439b7a..c9874a63bf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -432,9 +432,10 @@ public NMContainerStatus getNMContainerStatus() { this.readLock.lock(); try { return NMContainerStatus.newInstance(this.containerId, getCurrentState(), - getResource(), diagnostics.toString(), exitCode, - containerTokenIdentifier.getPriority(), - containerTokenIdentifier.getCreationTime()); + getResource(), diagnostics.toString(), exitCode, + containerTokenIdentifier.getPriority(), + containerTokenIdentifier.getCreationTime(), + containerTokenIdentifier.getNodeLabelExpression()); } finally { this.readLock.unlock(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java index 86cc4dcede..34495a2427 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java @@ -809,7 +809,7 @@ public static Token createContainerToken(ContainerId cId, long rmIdentifier, ContainerTokenIdentifier containerTokenIdentifier = new ContainerTokenIdentifier(cId, nodeId.toString(), user, r, System.currentTimeMillis() + 100000L, 123, rmIdentifier, - Priority.newInstance(0), 0, logAggregationContext); + Priority.newInstance(0), 0, logAggregationContext, null); Token containerToken = BuilderUtils .newContainerToken(nodeId, containerTokenSecretManager diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java index 20087f5fc0..21d79ee9d6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java @@ -80,4 +80,6 @@ public interface RMContainer extends EventHandler { List getResourceRequests(); String getNodeHttpAddress(); + + String getNodeLabelExpression(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java index 38a03aef14..2750d4e4af 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java @@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent; @@ -153,6 +154,7 @@ RMContainerEventType.RELEASED, new KillTransition()) private final EventHandler eventHandler; private final ContainerAllocationExpirer containerAllocationExpirer; private final String user; + private final String nodeLabelExpression; private Resource reservedResource; private NodeId reservedNode; @@ -162,17 +164,24 @@ RMContainerEventType.RELEASED, new KillTransition()) private ContainerStatus finishedStatus; private boolean isAMContainer; private List resourceRequests; - + public RMContainerImpl(Container container, ApplicationAttemptId appAttemptId, NodeId nodeId, String user, RMContext rmContext) { this(container, appAttemptId, nodeId, user, rmContext, System - .currentTimeMillis()); + .currentTimeMillis(), ""); } public RMContainerImpl(Container container, - ApplicationAttemptId appAttemptId, NodeId nodeId, - String user, RMContext rmContext, long creationTime) { + ApplicationAttemptId appAttemptId, NodeId nodeId, String user, + RMContext rmContext, String nodeLabelExpression) { + this(container, appAttemptId, nodeId, user, rmContext, System + .currentTimeMillis(), nodeLabelExpression); + } + + public RMContainerImpl(Container container, + ApplicationAttemptId appAttemptId, NodeId nodeId, String user, + RMContext rmContext, long creationTime, String nodeLabelExpression) { this.stateMachine = stateMachineFactory.make(this); this.containerId = container.getId(); this.nodeId = nodeId; @@ -185,6 +194,7 @@ public RMContainerImpl(Container container, this.containerAllocationExpirer = rmContext.getContainerAllocationExpirer(); this.isAMContainer = false; this.resourceRequests = null; + this.nodeLabelExpression = nodeLabelExpression; ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); this.readLock = lock.readLock(); @@ -597,4 +607,12 @@ public String getNodeHttpAddress() { readLock.unlock(); } } + + @Override + public String getNodeLabelExpression() { + if (nodeLabelExpression == null) { + return RMNodeLabelsManager.NO_LABEL; + } + return nodeLabelExpression; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index e1f94cf40c..6699b05588 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -408,7 +408,7 @@ private RMContainer recoverAndCreateContainer(NMContainerStatus status, RMContainer rmContainer = new RMContainerImpl(container, attemptId, node.getNodeID(), applications.get(attemptId.getApplicationId()).getUser(), rmContext, - status.getCreationTime()); + status.getCreationTime(), status.getNodeLabelExpression()); return rmContainer; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index fccf7661a2..48233903bc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.HashMultiset; import com.google.common.collect.Multiset; @@ -466,9 +467,10 @@ public List getNMTokenList() { try { // create container token and NMToken altogether. container.setContainerToken(rmContext.getContainerTokenSecretManager() - .createContainerToken(container.getId(), container.getNodeId(), - getUser(), container.getResource(), container.getPriority(), - rmContainer.getCreationTime(), this.logAggregationContext)); + .createContainerToken(container.getId(), container.getNodeId(), + getUser(), container.getResource(), container.getPriority(), + rmContainer.getCreationTime(), this.logAggregationContext, + rmContainer.getNodeLabelExpression())); NMToken nmToken = rmContext.getNMTokenSecretManager().createAndGetNMToken(getUser(), getApplicationAttemptId(), container); @@ -703,4 +705,9 @@ public boolean hasPendingResourceRequest(ResourceCalculator rc, this.attemptResourceUsage, nodePartition, cluster, schedulingMode); } + + @VisibleForTesting + public ResourceUsage getAppAttemptResourceUsage() { + return this.attemptResourceUsage; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index e0413895d4..3085d93ab2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -146,9 +146,10 @@ synchronized public RMContainer allocate(NodeType type, FiCaSchedulerNode node, } // Create RMContainer - RMContainer rmContainer = new RMContainerImpl(container, this - .getApplicationAttemptId(), node.getNodeID(), - appSchedulingInfo.getUser(), this.rmContext); + RMContainer rmContainer = + new RMContainerImpl(container, this.getApplicationAttemptId(), + node.getNodeID(), appSchedulingInfo.getUser(), this.rmContext, + request.getNodeLabelExpression()); // Add it to allContainers list. newlyAllocatedContainers.add(rmContainer); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java index 1595d17926..1c0533d937 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java @@ -179,7 +179,7 @@ public Token createContainerToken(ContainerId containerId, NodeId nodeId, String appSubmitter, Resource capability, Priority priority, long createTime) { return createContainerToken(containerId, nodeId, appSubmitter, capability, - priority, createTime, null); + priority, createTime, null, null); } /** @@ -196,7 +196,8 @@ public Token createContainerToken(ContainerId containerId, NodeId nodeId, */ public Token createContainerToken(ContainerId containerId, NodeId nodeId, String appSubmitter, Resource capability, Priority priority, - long createTime, LogAggregationContext logAggregationContext) { + long createTime, LogAggregationContext logAggregationContext, + String nodeLabelExpression) { byte[] password; ContainerTokenIdentifier tokenIdentifier; long expiryTimeStamp = @@ -210,7 +211,7 @@ public Token createContainerToken(ContainerId containerId, NodeId nodeId, appSubmitter, capability, expiryTimeStamp, this.currentMasterKey .getMasterKey().getKeyId(), ResourceManager.getClusterTimeStamp(), priority, createTime, - logAggregationContext); + logAggregationContext, nodeLabelExpression); password = this.createPassword(tokenIdentifier); } finally { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index a0b67f6be9..06a883042e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -1984,14 +1984,21 @@ private void writeToHostsFile(String... hosts) throws IOException { } } } - + public static NMContainerStatus createNMContainerStatus( ApplicationAttemptId appAttemptId, int id, ContainerState containerState) { + return createNMContainerStatus(appAttemptId, id, containerState, + RMNodeLabelsManager.NO_LABEL); + } + + public static NMContainerStatus createNMContainerStatus( + ApplicationAttemptId appAttemptId, int id, ContainerState containerState, + String nodeLabelExpression) { ContainerId containerId = ContainerId.newContainerId(appAttemptId, id); NMContainerStatus containerReport = NMContainerStatus.newInstance(containerId, containerState, - Resource.newInstance(1024, 1), "recover container", 0, - Priority.newInstance(0), 0); + Resource.newInstance(1024, 1), "recover container", 0, + Priority.newInstance(0), 0, nodeLabelExpression); return containerReport; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java index 54ba61724f..47398e3544 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java @@ -293,10 +293,11 @@ protected RMContainerTokenSecretManager createContainerTokenSecretManager( public Token createContainerToken(ContainerId containerId, NodeId nodeId, String appSubmitter, Resource capability, Priority priority, long createTime, - LogAggregationContext logAggregationContext) { + LogAggregationContext logAggregationContext, String nodeLabelExp) { numRetries++; return super.createContainerToken(containerId, nodeId, appSubmitter, - capability, priority, createTime, logAggregationContext); + capability, priority, createTime, logAggregationContext, + nodeLabelExp); } }; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestWorkPreservingRMRestartForNodeLabel.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestWorkPreservingRMRestartForNodeLabel.java new file mode 100644 index 0000000000..fc9e14ac09 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestWorkPreservingRMRestartForNodeLabel.java @@ -0,0 +1,282 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; + +public class TestWorkPreservingRMRestartForNodeLabel { + private Configuration conf; + private static final int GB = 1024; // 1024 MB + + RMNodeLabelsManager mgr; + + @Before + public void setUp() throws Exception { + conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, true); + conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); + mgr = new NullRMNodeLabelsManager(); + mgr.init(conf); + } + + @SuppressWarnings("unchecked") + private Set toSet(E... elements) { + Set set = Sets.newHashSet(elements); + return set; + } + + private void checkRMContainerLabelExpression(ContainerId containerId, + MockRM rm, String labelExpression) { + RMContainer container = + rm.getRMContext().getScheduler().getRMContainer(containerId); + Assert.assertNotNull("Cannot find RMContainer=" + containerId, container); + Assert.assertEquals(labelExpression, + container.getNodeLabelExpression()); + } + + @SuppressWarnings("rawtypes") + public static void waitForNumContainersToRecover(int num, MockRM rm, + ApplicationAttemptId attemptId) throws Exception { + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm.getResourceScheduler(); + SchedulerApplicationAttempt attempt = + scheduler.getApplicationAttempt(attemptId); + while (attempt == null) { + System.out.println("Wait for scheduler attempt " + attemptId + + " to be created"); + Thread.sleep(200); + attempt = scheduler.getApplicationAttempt(attemptId); + } + while (attempt.getLiveContainers().size() < num) { + System.out.println("Wait for " + num + + " containers to recover. currently: " + + attempt.getLiveContainers().size()); + Thread.sleep(200); + } + } + + private void checkAppResourceUsage(String partition, ApplicationId appId, + MockRM rm, int expectedMemUsage) { + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + FiCaSchedulerApp app = + cs.getSchedulerApplications().get(appId).getCurrentAppAttempt(); + Assert.assertEquals(expectedMemUsage, app.getAppAttemptResourceUsage() + .getUsed(partition).getMemory()); + } + + private void checkQueueResourceUsage(String partition, String queueName, MockRM rm, int expectedMemUsage) { + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + CSQueue queue = cs.getQueue(queueName); + Assert.assertEquals(expectedMemUsage, queue.getQueueResourceUsage() + .getUsed(partition).getMemory()); + } + + @Test + public void testWorkPreservingRestartForNodeLabel() throws Exception { + // This test is pretty much similar to testContainerAllocateWithLabel. + // Difference is, this test doesn't specify label expression in ResourceRequest, + // instead, it uses default queue label expression + + // set node -> label + mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y")); + mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"), + NodeId.newInstance("h2", 0), toSet("y"))); + + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + + conf = TestUtils.getConfigurationWithDefaultQueueLabels(conf); + + // inject node label manager + MockRM rm1 = + new MockRM(conf, + memStore) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 8000); // label = x + MockNM nm2 = rm1.registerNode("h2:1234", 8000); // label = y + MockNM nm3 = rm1.registerNode("h3:1234", 8000); // label = + + ContainerId containerId; + + // launch an app to queue a1 (label = x), and check all container will + // be allocated in h1 + RMApp app1 = rm1.submitApp(200, "app", "user", null, "a1"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + // request a container. + am1.allocate("*", 1024, 1, new ArrayList()); + containerId = + ContainerId.newContainerId(am1.getApplicationAttemptId(), 2); + Assert.assertTrue(rm1.waitForState(nm1, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + checkRMContainerLabelExpression(ContainerId.newContainerId( + am1.getApplicationAttemptId(), 1), rm1, "x"); + checkRMContainerLabelExpression(ContainerId.newContainerId( + am1.getApplicationAttemptId(), 2), rm1, "x"); + + // launch an app to queue b1 (label = y), and check all container will + // be allocated in h2 + RMApp app2 = rm1.submitApp(200, "app", "user", null, "b1"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2); + + // request a container. + am2.allocate("*", 1024, 1, new ArrayList()); + containerId = ContainerId.newContainerId(am2.getApplicationAttemptId(), 2); + Assert.assertTrue(rm1.waitForState(nm2, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + checkRMContainerLabelExpression(ContainerId.newContainerId( + am2.getApplicationAttemptId(), 1), rm1, "y"); + checkRMContainerLabelExpression(ContainerId.newContainerId( + am2.getApplicationAttemptId(), 2), rm1, "y"); + + // launch an app to queue c1 (label = ""), and check all container will + // be allocated in h3 + RMApp app3 = rm1.submitApp(200, "app", "user", null, "c1"); + MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm3); + + // request a container. + am3.allocate("*", 1024, 1, new ArrayList()); + containerId = ContainerId.newContainerId(am3.getApplicationAttemptId(), 2); + Assert.assertTrue(rm1.waitForState(nm3, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + checkRMContainerLabelExpression(ContainerId.newContainerId( + am3.getApplicationAttemptId(), 1), rm1, ""); + checkRMContainerLabelExpression(ContainerId.newContainerId( + am3.getApplicationAttemptId(), 2), rm1, ""); + + // Re-start RM + mgr = new NullRMNodeLabelsManager(); + mgr.init(conf); + mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y")); + mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"), + NodeId.newInstance("h2", 0), toSet("y"))); + MockRM rm2 = + new MockRM(conf, + memStore) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + rm2.start(); + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + nm2.setResourceTrackerService(rm2.getResourceTrackerService()); + nm3.setResourceTrackerService(rm2.getResourceTrackerService()); + + // recover app + NMContainerStatus app1c1 = + TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 1, + ContainerState.RUNNING, "x"); + NMContainerStatus app1c2 = + TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 2, + ContainerState.RUNNING, "x"); + nm1.registerNode(Arrays.asList(app1c1, app1c2), null); + waitForNumContainersToRecover(2, rm2, am1.getApplicationAttemptId()); + checkRMContainerLabelExpression(ContainerId.newContainerId( + am1.getApplicationAttemptId(), 1), rm1, "x"); + checkRMContainerLabelExpression(ContainerId.newContainerId( + am1.getApplicationAttemptId(), 2), rm1, "x"); + + NMContainerStatus app2c1 = + TestRMRestart.createNMContainerStatus(am2.getApplicationAttemptId(), 1, + ContainerState.RUNNING, "y"); + NMContainerStatus app2c2 = + TestRMRestart.createNMContainerStatus(am2.getApplicationAttemptId(), 2, + ContainerState.RUNNING, "y"); + nm2.registerNode(Arrays.asList(app2c1, app2c2), null); + waitForNumContainersToRecover(2, rm2, am2.getApplicationAttemptId()); + checkRMContainerLabelExpression(ContainerId.newContainerId( + am2.getApplicationAttemptId(), 1), rm1, "y"); + checkRMContainerLabelExpression(ContainerId.newContainerId( + am2.getApplicationAttemptId(), 2), rm1, "y"); + + NMContainerStatus app3c1 = + TestRMRestart.createNMContainerStatus(am3.getApplicationAttemptId(), 1, + ContainerState.RUNNING, ""); + NMContainerStatus app3c2 = + TestRMRestart.createNMContainerStatus(am3.getApplicationAttemptId(), 2, + ContainerState.RUNNING, ""); + nm3.registerNode(Arrays.asList(app3c1, app3c2), null); + waitForNumContainersToRecover(2, rm2, am3.getApplicationAttemptId()); + checkRMContainerLabelExpression(ContainerId.newContainerId( + am3.getApplicationAttemptId(), 1), rm1, ""); + checkRMContainerLabelExpression(ContainerId.newContainerId( + am3.getApplicationAttemptId(), 2), rm1, ""); + + // Check recovered resource usage + checkAppResourceUsage("x", app1.getApplicationId(), rm1, 2 * GB); + checkAppResourceUsage("y", app2.getApplicationId(), rm1, 2 * GB); + checkAppResourceUsage("", app3.getApplicationId(), rm1, 2 * GB); + checkQueueResourceUsage("x", "a1", rm1, 2 * GB); + checkQueueResourceUsage("y", "b1", rm1, 2 * GB); + checkQueueResourceUsage("", "c1", rm1, 2 * GB); + checkQueueResourceUsage("x", "a", rm1, 2 * GB); + checkQueueResourceUsage("y", "b", rm1, 2 * GB); + checkQueueResourceUsage("", "c", rm1, 2 * GB); + checkQueueResourceUsage("x", "root", rm1, 2 * GB); + checkQueueResourceUsage("y", "root", rm1, 2 * GB); + checkQueueResourceUsage("", "root", rm1, 2 * GB); + + + rm1.close(); + rm2.close(); + } +} \ No newline at end of file