YARN-2882. Add an OPPORTUNISTIC ExecutionType. (Konstantinos Karanasos and Inigo Goiri via kasha)

This commit is contained in:
Karthik Kambatla 2015-12-26 20:22:16 -08:00
parent 8cfd672397
commit fb00794368
10 changed files with 187 additions and 3 deletions

View File

@ -6,6 +6,9 @@ Trunk - Unreleased
NEW FEATURES NEW FEATURES
YARN-2882. Add an OPPORTUNISTIC ExecutionType.
(Konstantinos Karanasos and Inigo Goiri via kasha)
IMPROVEMENTS IMPROVEMENTS
YARN-2438. yarn-env.sh cleanup (aw) YARN-2438. yarn-env.sh cleanup (aw)

View File

@ -20,6 +20,7 @@
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
@ -31,6 +32,7 @@
* It provides details such as: * It provides details such as:
* <ul> * <ul>
* <li>{@code ContainerId} of the container.</li> * <li>{@code ContainerId} of the container.</li>
* <li>{@code ExecutionType} of the container.</li>
* <li>{@code ContainerState} of the container.</li> * <li>{@code ContainerState} of the container.</li>
* <li><em>Exit status</em> of a completed container.</li> * <li><em>Exit status</em> of a completed container.</li>
* <li><em>Diagnostic</em> message for a failed container.</li> * <li><em>Diagnostic</em> message for a failed container.</li>
@ -45,7 +47,17 @@ public abstract class ContainerStatus {
@Unstable @Unstable
public static ContainerStatus newInstance(ContainerId containerId, public static ContainerStatus newInstance(ContainerId containerId,
ContainerState containerState, String diagnostics, int exitStatus) { ContainerState containerState, String diagnostics, int exitStatus) {
return newInstance(containerId, ExecutionType.GUARANTEED, containerState,
diagnostics, exitStatus);
}
@Private
@Unstable
public static ContainerStatus newInstance(ContainerId containerId,
ExecutionType executionType, ContainerState containerState,
String diagnostics, int exitStatus) {
ContainerStatus containerStatus = Records.newRecord(ContainerStatus.class); ContainerStatus containerStatus = Records.newRecord(ContainerStatus.class);
containerStatus.setExecutionType(executionType);
containerStatus.setState(containerState); containerStatus.setState(containerState);
containerStatus.setContainerId(containerId); containerStatus.setContainerId(containerId);
containerStatus.setDiagnostics(diagnostics); containerStatus.setDiagnostics(diagnostics);
@ -65,6 +77,18 @@ public static ContainerStatus newInstance(ContainerId containerId,
@Unstable @Unstable
public abstract void setContainerId(ContainerId containerId); public abstract void setContainerId(ContainerId containerId);
/**
* Get the <code>ExecutionType</code> of the container.
* @return <code>ExecutionType</code> of the container
*/
@Public
@Evolving
public abstract ExecutionType getExecutionType();
@Private
@Unstable
public abstract void setExecutionType(ExecutionType executionType);
/** /**
* Get the <code>ContainerState</code> of the container. * Get the <code>ContainerState</code> of the container.
* @return <code>ContainerState</code> of the container * @return <code>ContainerState</code> of the container

View File

@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
/**
* Container property encoding execution semantics.
*
* <p>
* The execution types are the following:
* <ul>
* <li>{@link #GUARANTEED} - this container is guaranteed to start its
* execution, once the corresponding start container request is received by
* an NM.
* <li>{@link #OPPORTUNISTIC} - the execution of this container may not start
* immediately at the NM that receives the corresponding start container
* request (depending on the NM's available resources). Moreover, it may be
* preempted if it blocks a GUARANTEED container from being executed.
* </ul>
*/
@Public
@Evolving
public enum ExecutionType {
GUARANTEED, OPPORTUNISTIC
}

View File

@ -23,6 +23,7 @@
import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
/** /**
@ -36,6 +37,7 @@ public class ContainerContext {
private final ContainerId containerId; private final ContainerId containerId;
private final Resource resource; private final Resource resource;
private final ContainerType containerType; private final ContainerType containerType;
private final ExecutionType executionType;
@Private @Private
@Unstable @Unstable
@ -48,10 +50,20 @@ public ContainerContext(String user, ContainerId containerId,
@Unstable @Unstable
public ContainerContext(String user, ContainerId containerId, public ContainerContext(String user, ContainerId containerId,
Resource resource, ContainerType containerType) { Resource resource, ContainerType containerType) {
this(user, containerId, resource, containerType,
ExecutionType.GUARANTEED);
}
@Private
@Unstable
public ContainerContext(String user, ContainerId containerId,
Resource resource, ContainerType containerType,
ExecutionType executionType) {
this.user = user; this.user = user;
this.containerId = containerId; this.containerId = containerId;
this.resource = resource; this.resource = resource;
this.containerType = containerType; this.containerType = containerType;
this.executionType = executionType;
} }
/** /**
@ -91,4 +103,14 @@ public Resource getResource() {
public ContainerType getContainerType() { public ContainerType getContainerType() {
return containerType; return containerType;
} }
/**
* Get {@link ExecutionType} the execution type of the container
* being initialized or stopped.
*
* @return the execution type of the container
*/
public ExecutionType getExecutionType() {
return executionType;
}
} }

View File

@ -284,6 +284,11 @@ enum ContainerTypeProto {
TASK = 2; TASK = 2;
} }
enum ExecutionTypeProto {
GUARANTEED = 1;
OPPORTUNISTIC = 2;
}
//////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////
////// From AM_RM_Protocol ///////////////////////////////////////////// ////// From AM_RM_Protocol /////////////////////////////////////////////
//////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////
@ -490,6 +495,7 @@ message ContainerStatusProto {
optional string diagnostics = 3 [default = "N/A"]; optional string diagnostics = 3 [default = "N/A"];
optional int32 exit_status = 4 [default = -1000]; optional int32 exit_status = 4 [default = -1000];
optional ResourceProto capability = 5; optional ResourceProto capability = 5;
optional ExecutionTypeProto executionType = 6 [default = GUARANTEED];
} }
enum ContainerExitStatusProto { enum ContainerExitStatusProto {

View File

@ -24,9 +24,11 @@
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ExecutionTypeProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStateProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStateProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProtoOrBuilder;
@ -79,6 +81,7 @@ public String toString() {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
sb.append("ContainerStatus: ["); sb.append("ContainerStatus: [");
sb.append("ContainerId: ").append(getContainerId()).append(", "); sb.append("ContainerId: ").append(getContainerId()).append(", ");
sb.append("ExecutionType: ").append(getExecutionType()).append(", ");
sb.append("State: ").append(getState()).append(", "); sb.append("State: ").append(getState()).append(", ");
sb.append("Capability: ").append(getCapability()).append(", "); sb.append("Capability: ").append(getCapability()).append(", ");
sb.append("Diagnostics: ").append(getDiagnostics()).append(", "); sb.append("Diagnostics: ").append(getDiagnostics()).append(", ");
@ -107,7 +110,25 @@ private synchronized void maybeInitBuilder() {
} }
viaProto = false; viaProto = false;
} }
@Override
public synchronized ExecutionType getExecutionType() {
ContainerStatusProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasExecutionType()) {
return null;
}
return convertFromProtoFormat(p.getExecutionType());
}
@Override
public synchronized void setExecutionType(ExecutionType executionType) {
maybeInitBuilder();
if (executionType == null) {
builder.clearExecutionType();
return;
}
builder.setExecutionType(convertToProtoFormat(executionType));
}
@Override @Override
public synchronized ContainerState getState() { public synchronized ContainerState getState() {
@ -206,6 +227,14 @@ private ContainerIdProto convertToProtoFormat(ContainerId t) {
return ((ContainerIdPBImpl)t).getProto(); return ((ContainerIdPBImpl)t).getProto();
} }
private ExecutionType convertFromProtoFormat(ExecutionTypeProto e) {
return ProtoUtils.convertFromProtoFormat(e);
}
private ExecutionTypeProto convertToProtoFormat(ExecutionType e) {
return ProtoUtils.convertToProtoFormat(e);
}
private ResourceProto convertToProtoFormat(Resource e) { private ResourceProto convertToProtoFormat(Resource e) {
return ((ResourcePBImpl)e).getProto(); return ((ResourcePBImpl)e).getProto();
} }

View File

@ -27,6 +27,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
@ -54,6 +55,7 @@
import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationAttemptStateProto; import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationAttemptStateProto;
import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationStateProto; import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationStateProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerTypeProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerTypeProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ExecutionTypeProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos; import org.apache.hadoop.yarn.proto.YarnServiceProtos;
import org.apache.hadoop.yarn.server.api.ContainerType; import org.apache.hadoop.yarn.server.api.ContainerType;
@ -282,4 +284,14 @@ public static ContainerTypeProto convertToProtoFormat(ContainerType e) {
public static ContainerType convertFromProtoFormat(ContainerTypeProto e) { public static ContainerType convertFromProtoFormat(ContainerTypeProto e) {
return ContainerType.valueOf(e.name()); return ContainerType.valueOf(e.name());
} }
/*
* ExecutionType
*/
public static ExecutionTypeProto convertToProtoFormat(ExecutionType e) {
return ExecutionTypeProto.valueOf(e.name());
}
public static ExecutionType convertFromProtoFormat(ExecutionTypeProto e) {
return ExecutionType.valueOf(e.name());
}
} }

View File

@ -33,6 +33,7 @@
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.LogAggregationContext; import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
@ -43,6 +44,7 @@
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerTypeProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerTypeProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ExecutionTypeProto;
import org.apache.hadoop.yarn.proto.YarnSecurityTokenProtos.ContainerTokenIdentifierProto; import org.apache.hadoop.yarn.proto.YarnSecurityTokenProtos.ContainerTokenIdentifierProto;
import org.apache.hadoop.yarn.server.api.ContainerType; import org.apache.hadoop.yarn.server.api.ContainerType;
@ -85,6 +87,16 @@ public ContainerTokenIdentifier(ContainerId containerID, String hostName,
long rmIdentifier, Priority priority, long creationTime, long rmIdentifier, Priority priority, long creationTime,
LogAggregationContext logAggregationContext, String nodeLabelExpression, LogAggregationContext logAggregationContext, String nodeLabelExpression,
ContainerType containerType) { ContainerType containerType) {
this(containerID, hostName, appSubmitter, r, expiryTimeStamp, masterKeyId,
rmIdentifier, priority, creationTime, logAggregationContext,
nodeLabelExpression, containerType, ExecutionType.GUARANTEED);
}
public ContainerTokenIdentifier(ContainerId containerID, String hostName,
String appSubmitter, Resource r, long expiryTimeStamp, int masterKeyId,
long rmIdentifier, Priority priority, long creationTime,
LogAggregationContext logAggregationContext, String nodeLabelExpression,
ContainerType containerType, ExecutionType executionType) {
ContainerTokenIdentifierProto.Builder builder = ContainerTokenIdentifierProto.Builder builder =
ContainerTokenIdentifierProto.newBuilder(); ContainerTokenIdentifierProto.newBuilder();
if (containerID != null) { if (containerID != null) {
@ -112,6 +124,7 @@ public ContainerTokenIdentifier(ContainerId containerID, String hostName,
builder.setNodeLabelExpression(nodeLabelExpression); builder.setNodeLabelExpression(nodeLabelExpression);
} }
builder.setContainerType(convertToProtoFormat(containerType)); builder.setContainerType(convertToProtoFormat(containerType));
builder.setExecutionType(convertToProtoFormat(executionType));
proto = builder.build(); proto = builder.build();
} }
@ -163,7 +176,7 @@ public long getCreationTime() {
return proto.getCreationTime(); return proto.getCreationTime();
} }
/** /**
* Get the RMIdentifier of RM in which containers are allocated * Get the RMIdentifier of RM in which containers are allocated.
* @return RMIdentifier * @return RMIdentifier
*/ */
public long getRMIdentifier() { public long getRMIdentifier() {
@ -181,6 +194,17 @@ public ContainerType getContainerType(){
return convertFromProtoFormat(proto.getContainerType()); return convertFromProtoFormat(proto.getContainerType());
} }
/**
* Get the ExecutionType of container to allocate
* @return ExecutionType
*/
public ExecutionType getExecutionType(){
if (!proto.hasExecutionType()) {
return null;
}
return convertFromProtoFormat(proto.getExecutionType());
}
public ContainerTokenIdentifierProto getProto() { public ContainerTokenIdentifierProto getProto() {
return proto; return proto;
} }
@ -265,4 +289,13 @@ private ContainerType convertFromProtoFormat(
ContainerTypeProto containerType) { ContainerTypeProto containerType) {
return ProtoUtils.convertFromProtoFormat(containerType); return ProtoUtils.convertFromProtoFormat(containerType);
} }
private ExecutionTypeProto convertToProtoFormat(ExecutionType executionType) {
return ProtoUtils.convertToProtoFormat(executionType);
}
private ExecutionType convertFromProtoFormat(
ExecutionTypeProto executionType) {
return ProtoUtils.convertFromProtoFormat(executionType);
}
} }

View File

@ -51,6 +51,7 @@ message ContainerTokenIdentifierProto {
optional LogAggregationContextProto logAggregationContext = 10; optional LogAggregationContextProto logAggregationContext = 10;
optional string nodeLabelExpression = 11; optional string nodeLabelExpression = 11;
optional ContainerTypeProto containerType = 12; optional ContainerTypeProto containerType = 12;
optional ExecutionTypeProto executionType = 13 [default = GUARANTEED];
} }
message ClientToAMTokenIdentifierProto { message ClientToAMTokenIdentifierProto {

View File

@ -29,6 +29,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
@ -209,6 +210,9 @@ public void testContainerTokenIdentifier() throws IOException {
Assert.assertEquals(ContainerType.TASK, Assert.assertEquals(ContainerType.TASK,
anotherToken.getContainerType()); anotherToken.getContainerType());
Assert.assertEquals(ExecutionType.GUARANTEED,
anotherToken.getExecutionType());
} }
@Test @Test
@ -384,10 +388,14 @@ public void testAMContainerTokenIdentifier() throws IOException {
Assert.assertEquals(ContainerType.APPLICATION_MASTER, Assert.assertEquals(ContainerType.APPLICATION_MASTER,
anotherToken.getContainerType()); anotherToken.getContainerType());
Assert.assertEquals(ExecutionType.GUARANTEED,
anotherToken.getExecutionType());
token = token =
new ContainerTokenIdentifier(containerID, hostName, appSubmitter, r, new ContainerTokenIdentifier(containerID, hostName, appSubmitter, r,
expiryTimeStamp, masterKeyId, rmIdentifier, priority, creationTime, expiryTimeStamp, masterKeyId, rmIdentifier, priority, creationTime,
null, CommonNodeLabelsManager.NO_LABEL, ContainerType.TASK); null, CommonNodeLabelsManager.NO_LABEL, ContainerType.TASK,
ExecutionType.OPPORTUNISTIC);
anotherToken = new ContainerTokenIdentifier(); anotherToken = new ContainerTokenIdentifier();
@ -398,6 +406,9 @@ public void testAMContainerTokenIdentifier() throws IOException {
Assert.assertEquals(ContainerType.TASK, Assert.assertEquals(ContainerType.TASK,
anotherToken.getContainerType()); anotherToken.getContainerType());
Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
anotherToken.getExecutionType());
} }
} }