YARN-7696. Add container tags to ContainerTokenIdentifier, api.Container and NMContainerStatus to handle all recovery cases. (asuresh)

This commit is contained in:
Arun Suresh 2018-01-12 14:37:06 -08:00
parent 4eda58c136
commit a5c1fc881e
14 changed files with 194 additions and 32 deletions

View File

@ -27,6 +27,9 @@
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.util.Records;
import java.util.Collections;
import java.util.Set;
/**
* {@code Container} represents an allocated resource in the cluster.
* <p>
@ -256,4 +259,16 @@ public int getVersion() {
public void setVersion(int version) {
throw new UnsupportedOperationException();
}
@Private
@Unstable
public Set<String> getAllocationTags() {
return Collections.EMPTY_SET;
}
@Private
@Unstable
public void setAllocationTags(Set<String> allocationTags) {
}
}

View File

@ -152,6 +152,7 @@ message ContainerProto {
optional ExecutionTypeProto execution_type = 7 [default = GUARANTEED];
optional int64 allocation_request_id = 8 [default = -1];
optional int32 version = 9 [default = 0];
repeated string allocation_tags = 10;
}
message ContainerReportProto {

View File

@ -36,6 +36,9 @@
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ExecutionTypeProto;
import java.util.HashSet;
import java.util.Set;
@Private
@Unstable
public class ContainerPBImpl extends Container {
@ -49,6 +52,7 @@ public class ContainerPBImpl extends Container {
private Resource resource = null;
private Priority priority = null;
private Token containerToken = null;
private Set<String> allocationTags = null;
public ContainerPBImpl() {
builder = ContainerProto.newBuilder();
@ -106,6 +110,10 @@ private void mergeLocalToBuilder() {
builder.getContainerToken())) {
builder.setContainerToken(convertToProtoFormat(this.containerToken));
}
if (this.allocationTags != null) {
builder.clearAllocationTags();
builder.addAllAllocationTags(this.allocationTags);
}
}
private void mergeLocalToProto() {
@ -284,6 +292,29 @@ public void setVersion(int version) {
builder.setVersion(version);
}
private void initAllocationTags() {
if (this.allocationTags != null) {
return;
}
ContainerProtoOrBuilder p = viaProto ? proto : builder;
this.allocationTags = new HashSet<>();
this.allocationTags.addAll(p.getAllocationTagsList());
}
@Override
public Set<String> getAllocationTags() {
initAllocationTags();
return this.allocationTags;
}
@Override
public void setAllocationTags(Set<String> allocationTags) {
maybeInitBuilder();
builder.clearAllocationTags();
this.allocationTags = allocationTags;
}
private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) {
return new ContainerIdPBImpl(p);
}

View File

@ -22,6 +22,9 @@
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -115,7 +118,7 @@ public ContainerTokenIdentifier(ContainerId containerID, String hostName,
this(containerID, 0, hostName, appSubmitter, r, expiryTimeStamp,
masterKeyId, rmIdentifier, priority, creationTime,
logAggregationContext, nodeLabelExpression, containerType,
ExecutionType.GUARANTEED, -1);
ExecutionType.GUARANTEED, -1, null);
}
public ContainerTokenIdentifier(ContainerId containerID, int containerVersion,
@ -127,15 +130,66 @@ public ContainerTokenIdentifier(ContainerId containerID, int containerVersion,
this(containerID, containerVersion, hostName, appSubmitter, r,
expiryTimeStamp, masterKeyId, rmIdentifier, priority, creationTime,
logAggregationContext, nodeLabelExpression, containerType,
executionType, -1);
executionType, -1, null);
}
/**
* Convenience Constructor for existing clients.
*
* @param containerID containerID
* @param containerVersion containerVersion
* @param hostName hostName
* @param appSubmitter appSubmitter
* @param r resource
* @param expiryTimeStamp expiryTimeStamp
* @param masterKeyId masterKeyId
* @param rmIdentifier rmIdentifier
* @param priority priority
* @param creationTime creationTime
* @param logAggregationContext logAggregationContext
* @param nodeLabelExpression nodeLabelExpression
* @param containerType containerType
* @param executionType executionType
* @param allocationRequestId allocationRequestId
*/
public ContainerTokenIdentifier(ContainerId containerID, int containerVersion,
String hostName, String appSubmitter, Resource r, long expiryTimeStamp,
int masterKeyId, long rmIdentifier, Priority priority, long creationTime,
LogAggregationContext logAggregationContext, String nodeLabelExpression,
ContainerType containerType, ExecutionType executionType,
long allocationRequestId) {
this(containerID, containerVersion, hostName, appSubmitter, r,
expiryTimeStamp, masterKeyId, rmIdentifier, priority, creationTime,
logAggregationContext, nodeLabelExpression, containerType,
executionType, allocationRequestId, null);
}
/**
* Create a Container Token Identifier.
*
* @param containerID containerID
* @param containerVersion containerVersion
* @param hostName hostName
* @param appSubmitter appSubmitter
* @param r resource
* @param expiryTimeStamp expiryTimeStamp
* @param masterKeyId masterKeyId
* @param rmIdentifier rmIdentifier
* @param priority priority
* @param creationTime creationTime
* @param logAggregationContext logAggregationContext
* @param nodeLabelExpression nodeLabelExpression
* @param containerType containerType
* @param executionType executionType
* @param allocationRequestId allocationRequestId
* @param allocationTags Set of allocation Tags.
*/
public ContainerTokenIdentifier(ContainerId containerID, int containerVersion,
String hostName, String appSubmitter, Resource r, long expiryTimeStamp,
int masterKeyId, long rmIdentifier, Priority priority, long creationTime,
LogAggregationContext logAggregationContext, String nodeLabelExpression,
ContainerType containerType, ExecutionType executionType,
long allocationRequestId, Set<String> allocationTags) {
ContainerTokenIdentifierProto.Builder builder =
ContainerTokenIdentifierProto.newBuilder();
if (containerID != null) {
@ -166,7 +220,9 @@ public ContainerTokenIdentifier(ContainerId containerID, int containerVersion,
builder.setContainerType(convertToProtoFormat(containerType));
builder.setExecutionType(convertToProtoFormat(executionType));
builder.setAllocationRequestId(allocationRequestId);
if (allocationTags != null) {
builder.addAllAllocationTags(allocationTags);
}
proto = builder.build();
}
@ -308,6 +364,13 @@ public String getNodeLabelExpression() {
return CommonNodeLabelsManager.NO_LABEL;
}
public Set<String> getAllcationTags() {
if (proto.getAllocationTagsList() != null) {
return new HashSet<>(proto.getAllocationTagsList());
}
return Collections.EMPTY_SET;
}
// TODO: Needed?
@InterfaceAudience.Private
public static class Renewer extends Token.TrivialRenewer {

View File

@ -54,6 +54,7 @@ message ContainerTokenIdentifierProto {
optional ExecutionTypeProto executionType = 13 [default = GUARANTEED];
optional int32 version = 14 [default = 0];
optional int64 allocation_request_id = 15 [default = -1];
repeated string allocation_tags = 16;
}
message ClientToAMTokenIdentifierProto {

View File

@ -27,6 +27,9 @@
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.util.Records;
import java.util.Collections;
import java.util.Set;
/**
* NMContainerStatus includes the current information of a container. This
* record is used by YARN only, whereas {@link ContainerStatus} is used both
@ -161,4 +164,15 @@ public ExecutionType getExecutionType() {
}
public void setExecutionType(ExecutionType executionType) { }
/**
* Get and set the Allocation tags associated with the container.
*/
public Set<String> getAllocationTags() {
return Collections.EMPTY_SET;
}
public void setAllocationTags(Set<String> allocationTags) {
}
}

View File

@ -37,6 +37,9 @@
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NMContainerStatusProtoOrBuilder;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import java.util.HashSet;
import java.util.Set;
public class NMContainerStatusPBImpl extends NMContainerStatus {
NMContainerStatusProto proto = NMContainerStatusProto
@ -47,6 +50,7 @@ public class NMContainerStatusPBImpl extends NMContainerStatus {
private ContainerId containerId = null;
private Resource resource = null;
private Priority priority = null;
private Set<String> allocationTags = null;
public NMContainerStatusPBImpl() {
builder = NMContainerStatusProto.newBuilder();
@ -91,8 +95,11 @@ public String toString() {
.append("Diagnostics: ").append(getDiagnostics()).append(", ")
.append("ExitStatus: ").append(getContainerExitStatus()).append(", ")
.append("NodeLabelExpression: ").append(getNodeLabelExpression())
.append(", ")
.append("Priority: ").append(getPriority()).append(", ")
.append("AllocationRequestId: ").append(getAllocationRequestId())
.append(", ")
.append("AllocationTags: ").append(getAllocationTags()).append(", ")
.append("]");
return sb.toString();
}
@ -283,6 +290,28 @@ public void setAllocationRequestId(long allocationRequestId) {
builder.setAllocationRequestId(allocationRequestId);
}
private void initAllocationTags() {
if (this.allocationTags != null) {
return;
}
NMContainerStatusProtoOrBuilder p = viaProto ? proto : builder;
this.allocationTags = new HashSet<>();
this.allocationTags.addAll(p.getAllocationTagsList());
}
@Override
public Set<String> getAllocationTags() {
initAllocationTags();
return this.allocationTags;
}
@Override
public void setAllocationTags(Set<String> allocationTags) {
maybeInitBuilder();
builder.clearAllocationTags();
this.allocationTags = allocationTags;
}
private void mergeLocalToBuilder() {
if (this.containerId != null
&& !((ContainerIdPBImpl) containerId).getProto().equals(
@ -297,6 +326,10 @@ private void mergeLocalToBuilder() {
if (this.priority != null) {
builder.setPriority(convertToProtoFormat(this.priority));
}
if (this.allocationTags != null) {
builder.clearAllocationTags();
builder.addAllAllocationTags(this.allocationTags);
}
}
private void mergeLocalToProto() {

View File

@ -177,6 +177,7 @@ message NMContainerStatusProto {
optional int32 version = 9;
optional ExecutionTypeProto executionType = 10 [default = GUARANTEED];
optional int64 allocation_request_id = 11 [default = -1];
repeated string allocation_tags = 12;
}
message SCMUploaderNotifyRequestProto {

View File

@ -451,7 +451,8 @@ private void recoverContainer(RecoveredContainerState rcs)
originalToken.getLogAggregationContext(),
originalToken.getNodeLabelExpression(),
originalToken.getContainerType(), originalToken.getExecutionType(),
originalToken.getAllocationRequestId());
originalToken.getAllocationRequestId(),
originalToken.getAllcationTags());
} else {
token = BuilderUtils.newContainerTokenIdentifier(req.getContainerToken());

View File

@ -849,7 +849,8 @@ public ContainerStatus cloneAndGetContainerStatus() {
public NMContainerStatus getNMContainerStatus() {
this.readLock.lock();
try {
return NMContainerStatus.newInstance(this.containerId,
NMContainerStatus status =
NMContainerStatus.newInstance(this.containerId,
this.version, getCurrentState(), getResource(),
diagnostics.toString(), exitCode,
containerTokenIdentifier.getPriority(),
@ -857,6 +858,8 @@ this.version, getCurrentState(), getResource(),
containerTokenIdentifier.getNodeLabelExpression(),
containerTokenIdentifier.getExecutionType(),
containerTokenIdentifier.getAllocationRequestId());
status.setAllocationTags(containerTokenIdentifier.getAllcationTags());
return status;
} finally {
this.readLock.unlock();
}

View File

@ -530,12 +530,18 @@ public RMContainerState transition(RMContainerImpl container,
RMContainerEvent event) {
NMContainerStatus report =
((RMContainerRecoverEvent) event).getContainerReport();
// Set the allocation tags from the
container.setAllocationTags(report.getAllocationTags());
// Notify AllocationTagsManager
container.rmContext.getAllocationTagsManager().addContainer(
container.getNodeId(), container.getContainerId(),
container.getAllocationTags());
if (report.getContainerState().equals(ContainerState.COMPLETE)) {
ContainerStatus status =
ContainerStatus.newInstance(report.getContainerId(),
report.getContainerState(), report.getDiagnostics(),
report.getContainerExitStatus());
new FinishedTransition().transition(container,
new RMContainerFinishedEvent(container.getContainerId(), status,
RMContainerEventType.FINISHED));
@ -577,7 +583,7 @@ private static final class ContainerStartedTransition extends
@Override
public void transition(RMContainerImpl container, RMContainerEvent event) {
// Notify placementManager
// Notify AllocationTagsManager
container.rmContext.getAllocationTagsManager().addContainer(
container.getNodeId(), container.getContainerId(),
container.getAllocationTags());

View File

@ -670,7 +670,8 @@ private Container updateContainerAndNMToken(RMContainer rmContainer,
container.getPriority(), rmContainer.getCreationTime(),
this.logAggregationContext, rmContainer.getNodeLabelExpression(),
containerType, container.getExecutionType(),
container.getAllocationRequestId()));
container.getAllocationRequestId(),
rmContainer.getAllocationTags()));
updateNMToken(container);
} catch (IllegalArgumentException e) {
// DNS might be down, skip returning this container.

View File

@ -18,9 +18,11 @@
package org.apache.hadoop.yarn.server.resourcemanager.security;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@ -166,25 +168,14 @@ public void run() {
}
}
/**
* Helper function for creating ContainerTokens.
*
* @param containerId Container Id
* @param containerVersion Container Version
* @param nodeId Node Id
* @param appSubmitter App Submitter
* @param capability Capability
* @param priority Priority
* @param createTime Create Time
* @return the container-token
*/
@VisibleForTesting
public Token createContainerToken(ContainerId containerId,
int containerVersion, NodeId nodeId, String appSubmitter,
Resource capability, Priority priority, long createTime) {
return createContainerToken(containerId, containerVersion, nodeId,
appSubmitter, capability, priority, createTime,
null, null, ContainerType.TASK,
ExecutionType.GUARANTEED, -1);
ExecutionType.GUARANTEED, -1, null);
}
/**
@ -209,7 +200,7 @@ public Token createContainerToken(ContainerId containerId,
Resource capability, Priority priority, long createTime,
LogAggregationContext logAggregationContext, String nodeLabelExpression,
ContainerType containerType, ExecutionType execType,
long allocationRequestId) {
long allocationRequestId, Set<String> allocationTags) {
byte[] password;
ContainerTokenIdentifier tokenIdentifier;
long expiryTimeStamp =
@ -224,7 +215,7 @@ public Token createContainerToken(ContainerId containerId,
this.currentMasterKey.getMasterKey().getKeyId(),
ResourceManager.getClusterTimeStamp(), priority, createTime,
logAggregationContext, nodeLabelExpression, containerType,
execType, allocationRequestId);
execType, allocationRequestId, allocationTags);
password = this.createPassword(tokenIdentifier);
} finally {

View File

@ -20,6 +20,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -296,12 +297,12 @@ public Token createContainerToken(ContainerId containerId,
Resource capability, Priority priority, long createTime,
LogAggregationContext logAggregationContext, String nodeLabelExp,
ContainerType containerType, ExecutionType executionType,
long allocationRequestId) {
long allocationRequestId, Set<String> allocationTags) {
numRetries++;
return super.createContainerToken(containerId, containerVersion,
nodeId, appSubmitter, capability, priority, createTime,
logAggregationContext, nodeLabelExp, containerType,
executionType, allocationRequestId);
executionType, allocationRequestId, allocationTags);
}
};
}