diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java
index 323d31d4c6..582389fa11 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java
@@ -34,5 +34,8 @@ public enum ContainerState {
RUNNING,
/** Completed container */
- COMPLETE
+ COMPLETE,
+
+ /** Queued at the NM. */
+ QUEUED
}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 8acee579ff..10efee120c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -659,6 +659,11 @@ public static boolean isAclEnabled(Configuration conf) {
/** Prefix for all node manager configs.*/
public static final String NM_PREFIX = "yarn.nodemanager.";
+ /** Enable Queuing of OPPORTUNISTIC
containers. */
+ public static final String NM_CONTAINER_QUEUING_ENABLED = NM_PREFIX
+ + "container-queuing-enabled";
+ public static final boolean NM_CONTAINER_QUEUING_ENABLED_DEFAULT = false;
+
/** Environment variables that will be sent to containers.*/
public static final String NM_ADMIN_USER_ENV = NM_PREFIX + "admin-env";
public static final String DEFAULT_NM_ADMIN_USER_ENV = "MALLOC_ARENA_MAX=$MALLOC_ARENA_MAX";
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index 9392efd4ef..2fe4edae5e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -82,6 +82,7 @@ enum ContainerStateProto {
C_NEW = 1;
C_RUNNING = 2;
C_COMPLETE = 3;
+ C_QUEUED = 4;
}
message ContainerProto {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 506cf3d9fc..560d5481e9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -964,6 +964,13 @@
4
+
+ Enable Queuing of OPPORTUNISTIC containers on the
+ nodemanager.
+ yarn.nodemanager.container-queuing-enabled
+ false
+
+
Number of seconds after an application finishes before the nodemanager's
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
index 4fdd43c789..a70d143c47 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
@@ -45,6 +45,7 @@
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
@@ -215,6 +216,13 @@ public static NodeReport newNodeReport(NodeId nodeId, NodeState nodeState,
public static ContainerStatus newContainerStatus(ContainerId containerId,
ContainerState containerState, String diagnostics, int exitStatus,
Resource capability) {
+ return newContainerStatus(containerId, containerState, diagnostics,
+ exitStatus, capability, ExecutionType.GUARANTEED);
+ }
+
+ public static ContainerStatus newContainerStatus(ContainerId containerId,
+ ContainerState containerState, String diagnostics, int exitStatus,
+ Resource capability, ExecutionType executionType) {
ContainerStatus containerStatus = recordFactory
.newRecordInstance(ContainerStatus.class);
containerStatus.setState(containerState);
@@ -222,6 +230,7 @@ public static ContainerStatus newContainerStatus(ContainerId containerId,
containerStatus.setDiagnostics(diagnostics);
containerStatus.setExitStatus(exitStatus);
containerStatus.setCapability(capability);
+ containerStatus.setExecutionType(executionType);
return containerStatus;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
index d3251ae46a..205e47572b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
@@ -27,6 +27,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
@@ -42,6 +43,15 @@
*/
public interface Context {
+ /**
+ * Interface exposing methods related to the queuing of containers in the NM.
+ */
+ interface QueuingContext {
+ ConcurrentMap getQueuedContainers();
+
+ ConcurrentMap getKilledQueuedContainers();
+ }
+
/**
* Return the nodeId. Usable only when the ContainerManager is started.
*
@@ -89,4 +99,11 @@ public interface Context {
getLogAggregationStatusForApps();
NodeStatusUpdater getNodeStatusUpdater();
+
+ /**
+ * Returns a QueuingContext
that provides information about the
+ * number of Containers Queued as well as the number of Containers that were
+ * queued and killed.
+ */
+ QueuingContext getQueuingContext();
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
index 7c104d5ee6..e4036ea6f0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
@@ -57,11 +57,13 @@
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.queuing.QueuingContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.ConfigurationNodeLabelsProvider;
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider;
@@ -170,8 +172,14 @@ protected ContainerManagerImpl createContainerManager(Context context,
ContainerExecutor exec, DeletionService del,
NodeStatusUpdater nodeStatusUpdater, ApplicationACLsManager aclsManager,
LocalDirsHandlerService dirsHandler) {
- return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater,
- metrics, dirsHandler);
+ if (getConfig().getBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED,
+ YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED_DEFAULT)) {
+ return new QueuingContainerManagerImpl(context, exec, del,
+ nodeStatusUpdater, metrics, dirsHandler);
+ } else {
+ return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater,
+ metrics, dirsHandler);
+ }
}
protected WebServer createWebServer(Context nmContext,
@@ -461,6 +469,8 @@ public static class NMContext implements Context {
logAggregationReportForApps;
private NodeStatusUpdater nodeStatusUpdater;
+ private final QueuingContext queuingContext;
+
public NMContext(NMContainerTokenSecretManager containerTokenSecretManager,
NMTokenSecretManagerInNM nmTokenSecretManager,
LocalDirsHandlerService dirsHandler, ApplicationACLsManager aclsManager,
@@ -475,6 +485,7 @@ public NMContext(NMContainerTokenSecretManager containerTokenSecretManager,
this.stateStore = stateStore;
this.logAggregationReportForApps = new ConcurrentLinkedQueue<
LogAggregationReport>();
+ this.queuingContext = new QueuingNMContext();
}
/**
@@ -595,8 +606,35 @@ public NodeStatusUpdater getNodeStatusUpdater() {
public void setNodeStatusUpdater(NodeStatusUpdater nodeStatusUpdater) {
this.nodeStatusUpdater = nodeStatusUpdater;
}
+
+ @Override
+ public QueuingContext getQueuingContext() {
+ return this.queuingContext;
+ }
}
+ /**
+ * Class that keeps the context for containers queued at the NM.
+ */
+ public static class QueuingNMContext implements Context.QueuingContext {
+ protected final ConcurrentMap
+ queuedContainers = new ConcurrentSkipListMap<>();
+
+ protected final ConcurrentMap
+ killedQueuedContainers = new ConcurrentHashMap<>();
+
+ @Override
+ public ConcurrentMap
+ getQueuedContainers() {
+ return this.queuedContainers;
+ }
+
+ @Override
+ public ConcurrentMap
+ getKilledQueuedContainers() {
+ return this.killedQueuedContainers;
+ }
+ }
/**
* @return the node health checker
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index b8cca28e82..29ab7f997a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -160,11 +160,11 @@ public class ContainerManagerImpl extends CompositeService implements
private static final Log LOG = LogFactory.getLog(ContainerManagerImpl.class);
- static final String INVALID_NMTOKEN_MSG = "Invalid NMToken";
+ public static final String INVALID_NMTOKEN_MSG = "Invalid NMToken";
static final String INVALID_CONTAINERTOKEN_MSG =
"Invalid ContainerToken";
- final Context context;
+ protected final Context context;
private final ContainersMonitor containersMonitor;
private Server server;
private final ResourceLocalizationService rsrcLocalizationSrvc;
@@ -172,7 +172,7 @@ public class ContainerManagerImpl extends CompositeService implements
private final AuxServices auxiliaryServices;
private final NodeManagerMetrics metrics;
- private final NodeStatusUpdater nodeStatusUpdater;
+ protected final NodeStatusUpdater nodeStatusUpdater;
protected LocalDirsHandlerService dirsHandler;
protected final AsyncDispatcher dispatcher;
@@ -213,14 +213,13 @@ public ContainerManagerImpl(Context context, ContainerExecutor exec,
auxiliaryServices.registerServiceListener(this);
addService(auxiliaryServices);
- this.containersMonitor =
- new ContainersMonitorImpl(exec, dispatcher, this.context);
+ this.containersMonitor = createContainersMonitor(exec);
addService(this.containersMonitor);
dispatcher.register(ContainerEventType.class,
new ContainerEventDispatcher());
dispatcher.register(ApplicationEventType.class,
- new ApplicationEventDispatcher());
+ createApplicationEventDispatcher());
dispatcher.register(LocalizationEventType.class, rsrcLocalizationSrvc);
dispatcher.register(AuxServicesEventType.class, auxiliaryServices);
dispatcher.register(ContainersMonitorEventType.class, containersMonitor);
@@ -235,6 +234,7 @@ public ContainerManagerImpl(Context context, ContainerExecutor exec,
@Override
public void serviceInit(Configuration conf) throws Exception {
+
LogHandler logHandler =
createLogHandler(conf, this.context, this.deletionService);
addIfService(logHandler);
@@ -276,6 +276,10 @@ protected void createAMRMProxyService(Configuration conf) {
}
}
+ protected ContainersMonitor createContainersMonitor(ContainerExecutor exec) {
+ return new ContainersMonitorImpl(exec, dispatcher, this.context);
+ }
+
@SuppressWarnings("unchecked")
private void recover() throws IOException, URISyntaxException {
NMStateStoreService stateStore = context.getNMStateStore();
@@ -418,6 +422,10 @@ protected ContainersLauncher createContainersLauncher(Context context,
return new ContainersLauncher(context, this.dispatcher, exec, dirsHandler, this);
}
+ protected EventHandler createApplicationEventDispatcher() {
+ return new ApplicationEventDispatcher();
+ }
+
@Override
protected void serviceStart() throws Exception {
@@ -802,7 +810,8 @@ public StartContainersResponse startContainers(
.equals(ContainerType.APPLICATION_MASTER)) {
this.getAMRMProxyService().processApplicationStartRequest(request);
}
-
+ performContainerPreStartChecks(nmTokenIdentifier, request,
+ containerTokenIdentifier);
startContainerInternal(nmTokenIdentifier, containerTokenIdentifier,
request);
succeededContainers.add(containerId);
@@ -822,6 +831,42 @@ public StartContainersResponse startContainers(
}
}
+ private void performContainerPreStartChecks(
+ NMTokenIdentifier nmTokenIdentifier, StartContainerRequest request,
+ ContainerTokenIdentifier containerTokenIdentifier)
+ throws YarnException, InvalidToken {
+ /*
+ * 1) It should save the NMToken into NMTokenSecretManager. This is done
+ * here instead of RPC layer because at the time of opening/authenticating
+ * the connection it doesn't know what all RPC calls user will make on it.
+ * Also new NMToken is issued only at startContainer (once it gets
+ * renewed).
+ *
+ * 2) It should validate containerToken. Need to check below things. a) It
+ * is signed by correct master key (part of retrieve password). b) It
+ * belongs to correct Node Manager (part of retrieve password). c) It has
+ * correct RMIdentifier. d) It is not expired.
+ */
+ authorizeStartAndResourceIncreaseRequest(
+ nmTokenIdentifier, containerTokenIdentifier, true);
+ // update NMToken
+ updateNMTokenIdentifier(nmTokenIdentifier);
+
+ ContainerLaunchContext launchContext = request.getContainerLaunchContext();
+
+ Map serviceData = getAuxServiceMetaData();
+ if (launchContext.getServiceData()!=null &&
+ !launchContext.getServiceData().isEmpty()) {
+ for (Entry meta : launchContext.getServiceData()
+ .entrySet()) {
+ if (null == serviceData.get(meta.getKey())) {
+ throw new InvalidAuxServiceException("The auxService:" + meta.getKey()
+ + " does not exist");
+ }
+ }
+ }
+ }
+
private ContainerManagerApplicationProto buildAppProto(ApplicationId appId,
String user, Credentials credentials,
Map appAcls,
@@ -864,26 +909,10 @@ private ContainerManagerApplicationProto buildAppProto(ApplicationId appId,
}
@SuppressWarnings("unchecked")
- private void startContainerInternal(NMTokenIdentifier nmTokenIdentifier,
+ protected void startContainerInternal(NMTokenIdentifier nmTokenIdentifier,
ContainerTokenIdentifier containerTokenIdentifier,
StartContainerRequest request) throws YarnException, IOException {
- /*
- * 1) It should save the NMToken into NMTokenSecretManager. This is done
- * here instead of RPC layer because at the time of opening/authenticating
- * the connection it doesn't know what all RPC calls user will make on it.
- * Also new NMToken is issued only at startContainer (once it gets renewed).
- *
- * 2) It should validate containerToken. Need to check below things. a) It
- * is signed by correct master key (part of retrieve password). b) It
- * belongs to correct Node Manager (part of retrieve password). c) It has
- * correct RMIdentifier. d) It is not expired.
- */
- authorizeStartAndResourceIncreaseRequest(
- nmTokenIdentifier, containerTokenIdentifier, true);
- // update NMToken
- updateNMTokenIdentifier(nmTokenIdentifier);
-
ContainerId containerId = containerTokenIdentifier.getContainerID();
String containerIdStr = containerId.toString();
String user = containerTokenIdentifier.getApplicationSubmitter();
@@ -892,18 +921,6 @@ private void startContainerInternal(NMTokenIdentifier nmTokenIdentifier,
ContainerLaunchContext launchContext = request.getContainerLaunchContext();
- Map serviceData = getAuxServiceMetaData();
- if (launchContext.getServiceData()!=null &&
- !launchContext.getServiceData().isEmpty()) {
- for (Map.Entry meta : launchContext.getServiceData()
- .entrySet()) {
- if (null == serviceData.get(meta.getKey())) {
- throw new InvalidAuxServiceException("The auxService:" + meta.getKey()
- + " does not exist");
- }
- }
- }
-
Credentials credentials =
YarnServerSecurityUtils.parseCredentials(launchContext);
@@ -923,13 +940,14 @@ private void startContainerInternal(NMTokenIdentifier nmTokenIdentifier,
this.readLock.lock();
try {
- if (!serviceStopped) {
+ if (!isServiceStopped()) {
// Create the application
- Application application =
- new ApplicationImpl(dispatcher, user, applicationID, credentials, context);
+ Application application = new ApplicationImpl(dispatcher, user,
+ applicationID, credentials, context);
if (null == context.getApplications().putIfAbsent(applicationID,
application)) {
- LOG.info("Creating a new application reference for app " + applicationID);
+ LOG.info("Creating a new application reference for app "
+ + applicationID);
LogAggregationContext logAggregationContext =
containerTokenIdentifier.getLogAggregationContext();
Map appAcls =
@@ -1147,7 +1165,9 @@ public StopContainersResponse stopContainers(StopContainersRequest requests)
}
for (ContainerId id : requests.getContainerIds()) {
try {
- stopContainerInternal(identifier, id);
+ Container container = this.context.getContainers().get(id);
+ authorizeGetAndStopContainerRequest(id, container, true, identifier);
+ stopContainerInternal(id);
succeededRequests.add(id);
} catch (YarnException e) {
failedRequests.put(id, SerializedException.newInstance(e));
@@ -1158,13 +1178,11 @@ public StopContainersResponse stopContainers(StopContainersRequest requests)
}
@SuppressWarnings("unchecked")
- private void stopContainerInternal(NMTokenIdentifier nmTokenIdentifier,
- ContainerId containerID) throws YarnException, IOException {
+ protected void stopContainerInternal(ContainerId containerID)
+ throws YarnException, IOException {
String containerIDStr = containerID.toString();
Container container = this.context.getContainers().get(containerID);
LOG.info("Stopping container with container Id: " + containerIDStr);
- authorizeGetAndStopContainerRequest(containerID, container, true,
- nmTokenIdentifier);
if (container == null) {
if (!nodeStatusUpdater.isContainerRecentlyStopped(containerID)) {
@@ -1211,7 +1229,7 @@ public GetContainerStatusesResponse getContainerStatuses(
failedRequests);
}
- private ContainerStatus getContainerStatusInternal(ContainerId containerID,
+ protected ContainerStatus getContainerStatusInternal(ContainerId containerID,
NMTokenIdentifier nmTokenIdentifier) throws YarnException {
String containerIDStr = containerID.toString();
Container container = this.context.getContainers().get(containerID);
@@ -1407,4 +1425,7 @@ protected void setAMRMProxyService(AMRMProxyService amrmProxyService) {
this.amrmProxyService = amrmProxyService;
}
+ protected boolean isServiceStopped() {
+ return serviceStopped;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
index a43a005880..676721435c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
@@ -453,7 +453,8 @@ public ContainerStatus cloneAndGetContainerStatus() {
this.readLock.lock();
try {
return BuilderUtils.newContainerStatus(this.containerId,
- getCurrentState(), diagnostics.toString(), exitCode, getResource());
+ getCurrentState(), diagnostics.toString(), exitCode, getResource(),
+ this.containerTokenIdentifier.getExecutionType());
} finally {
this.readLock.unlock();
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java
index 4d69dbfbc2..1069b4fbdf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java
@@ -22,8 +22,26 @@
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.nodemanager.ResourceView;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl.ProcessTreeInfo;
public interface ContainersMonitor extends Service,
EventHandler, ResourceView {
public ResourceUtilization getContainersUtilization();
+
+ ResourceUtilization getContainersAllocation();
+
+ boolean hasResourcesAvailable(ProcessTreeInfo pti);
+
+ void increaseContainersAllocation(ProcessTreeInfo pti);
+
+ void decreaseContainersAllocation(ProcessTreeInfo pti);
+
+ void increaseResourceUtilization(ResourceUtilization resourceUtil,
+ ProcessTreeInfo pti);
+
+ void decreaseResourceUtilization(ResourceUtilization resourceUtil,
+ ProcessTreeInfo pti);
+
+ void subtractNodeResourcesFromResourceUtilization(
+ ResourceUtilization resourceUtil);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
index 446e7a1270..0feac3bff3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
@@ -63,7 +63,7 @@ public class ContainersMonitorImpl extends AbstractService implements
private final ContainerExecutor containerExecutor;
private final Dispatcher eventDispatcher;
- private final Context context;
+ protected final Context context;
private ResourceCalculatorPlugin resourceCalculatorPlugin;
private Configuration conf;
private static float vmemRatio;
@@ -82,6 +82,9 @@ public class ContainersMonitorImpl extends AbstractService implements
private int nodeCpuPercentageForYARN;
private ResourceUtilization containersUtilization;
+ // Tracks the aggregated allocation of the currently allocated containers
+ // when queuing of containers at the NMs is enabled.
+ private ResourceUtilization containersAllocation;
private volatile boolean stopped = false;
@@ -96,6 +99,7 @@ public ContainersMonitorImpl(ContainerExecutor exec,
this.monitoringThread = new MonitoringThread();
this.containersUtilization = ResourceUtilization.newInstance(0, 0, 0.0f);
+ this.containersAllocation = ResourceUtilization.newInstance(0, 0, 0.0f);
}
@Override
@@ -132,10 +136,11 @@ protected void serviceInit(Configuration conf) throws Exception {
YarnConfiguration.DEFAULT_NM_CONTAINER_METRICS_UNREGISTER_DELAY_MS);
long configuredPMemForContainers =
- NodeManagerHardwareUtils.getContainerMemoryMB(conf) * 1024 * 1024L;
+ NodeManagerHardwareUtils.getContainerMemoryMB(
+ this.resourceCalculatorPlugin, conf) * 1024 * 1024L;
long configuredVCoresForContainers =
- NodeManagerHardwareUtils.getVCores(conf);
+ NodeManagerHardwareUtils.getVCores(this.resourceCalculatorPlugin, conf);
// Setting these irrespective of whether checks are enabled. Required in
// the UI.
@@ -233,8 +238,7 @@ protected void serviceStop() throws Exception {
super.serviceStop();
}
- @VisibleForTesting
- static class ProcessTreeInfo {
+ public static class ProcessTreeInfo {
private ContainerId containerId;
private String pid;
private ResourceCalculatorProcessTree pTree;
@@ -697,6 +701,82 @@ public void setContainersUtilization(ResourceUtilization utilization) {
this.containersUtilization = utilization;
}
+ public ResourceUtilization getContainersAllocation() {
+ return this.containersAllocation;
+ }
+
+ /**
+ * @return true if there are available allocated resources for the given
+ * container to start.
+ */
+ @Override
+ public boolean hasResourcesAvailable(ProcessTreeInfo pti) {
+ synchronized (this.containersAllocation) {
+ // Check physical memory.
+ if (this.containersAllocation.getPhysicalMemory() +
+ (int) (pti.getPmemLimit() >> 20) >
+ (int) (getPmemAllocatedForContainers() >> 20)) {
+ return false;
+ }
+ // Check virtual memory.
+ if (isVmemCheckEnabled() &&
+ this.containersAllocation.getVirtualMemory() +
+ (int) (pti.getVmemLimit() >> 20) >
+ (int) (getVmemAllocatedForContainers() >> 20)) {
+ return false;
+ }
+ // Check CPU.
+ if (this.containersAllocation.getCPU()
+ + allocatedCpuUsage(pti) > 1.0f) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public void increaseContainersAllocation(ProcessTreeInfo pti) {
+ synchronized (this.containersAllocation) {
+ increaseResourceUtilization(this.containersAllocation, pti);
+ }
+ }
+
+ @Override
+ public void decreaseContainersAllocation(ProcessTreeInfo pti) {
+ synchronized (this.containersAllocation) {
+ decreaseResourceUtilization(this.containersAllocation, pti);
+ }
+ }
+
+ @Override
+ public void increaseResourceUtilization(ResourceUtilization resourceUtil,
+ ProcessTreeInfo pti) {
+ resourceUtil.addTo((int) (pti.getPmemLimit() >> 20),
+ (int) (pti.getVmemLimit() >> 20), allocatedCpuUsage(pti));
+ }
+
+ @Override
+ public void decreaseResourceUtilization(ResourceUtilization resourceUtil,
+ ProcessTreeInfo pti) {
+ resourceUtil.subtractFrom((int) (pti.getPmemLimit() >> 20),
+ (int) (pti.getVmemLimit() >> 20), allocatedCpuUsage(pti));
+ }
+
+ @Override
+ public void subtractNodeResourcesFromResourceUtilization(
+ ResourceUtilization resourceUtil) {
+ resourceUtil.subtractFrom((int) (getPmemAllocatedForContainers() >> 20),
+ (int) (getVmemAllocatedForContainers() >> 20), 1.0f);
+ }
+
+ private float allocatedCpuUsage(ProcessTreeInfo pti) {
+ float cpuUsagePercentPerCore = pti.getCpuVcores() * 100.0f;
+ float cpuUsageTotalCoresPercentage = cpuUsagePercentPerCore
+ / resourceCalculatorPlugin.getNumProcessors();
+ return (cpuUsageTotalCoresPercentage * 1000 *
+ maxVCoresAllottedForContainers / nodeCpuPercentageForYARN) / 1000.0f;
+ }
+
@Override
@SuppressWarnings("unchecked")
public void handle(ContainersMonitorEvent monitoringEvent) {
@@ -714,40 +794,56 @@ public void handle(ContainersMonitorEvent monitoringEvent) {
switch (monitoringEvent.getType()) {
case START_MONITORING_CONTAINER:
- ContainerStartMonitoringEvent startEvent =
- (ContainerStartMonitoringEvent) monitoringEvent;
- LOG.info("Starting resource-monitoring for " + containerId);
- updateContainerMetrics(monitoringEvent);
- trackingContainers.put(containerId,
- new ProcessTreeInfo(containerId, null, null,
- startEvent.getVmemLimit(), startEvent.getPmemLimit(),
- startEvent.getCpuVcores()));
+ onStartMonitoringContainer(monitoringEvent, containerId);
break;
case STOP_MONITORING_CONTAINER:
- LOG.info("Stopping resource-monitoring for " + containerId);
- updateContainerMetrics(monitoringEvent);
- trackingContainers.remove(containerId);
+ onStopMonitoringContainer(monitoringEvent, containerId);
break;
case CHANGE_MONITORING_CONTAINER_RESOURCE:
- ChangeMonitoringContainerResourceEvent changeEvent =
- (ChangeMonitoringContainerResourceEvent) monitoringEvent;
- ProcessTreeInfo processTreeInfo = trackingContainers.get(containerId);
- if (processTreeInfo == null) {
- LOG.warn("Failed to track container "
- + containerId.toString()
- + ". It may have already completed.");
- break;
- }
- LOG.info("Changing resource-monitoring for " + containerId);
- updateContainerMetrics(monitoringEvent);
- long pmemLimit = changeEvent.getResource().getMemory() * 1024L * 1024L;
- long vmemLimit = (long) (pmemLimit * vmemRatio);
- int cpuVcores = changeEvent.getResource().getVirtualCores();
- processTreeInfo.setResourceLimit(pmemLimit, vmemLimit, cpuVcores);
- changeContainerResource(containerId, changeEvent.getResource());
+ onChangeMonitoringContainerResource(monitoringEvent, containerId);
break;
default:
// TODO: Wrong event.
}
}
+
+ protected void onChangeMonitoringContainerResource(
+ ContainersMonitorEvent monitoringEvent, ContainerId containerId) {
+ ChangeMonitoringContainerResourceEvent changeEvent =
+ (ChangeMonitoringContainerResourceEvent) monitoringEvent;
+ ProcessTreeInfo processTreeInfo = trackingContainers.get(containerId);
+ if (processTreeInfo == null) {
+ LOG.warn("Failed to track container "
+ + containerId.toString()
+ + ". It may have already completed.");
+ return;
+ }
+ LOG.info("Changing resource-monitoring for " + containerId);
+ updateContainerMetrics(monitoringEvent);
+ long pmemLimit = changeEvent.getResource().getMemory() * 1024L * 1024L;
+ long vmemLimit = (long) (pmemLimit * vmemRatio);
+ int cpuVcores = changeEvent.getResource().getVirtualCores();
+ processTreeInfo.setResourceLimit(pmemLimit, vmemLimit, cpuVcores);
+ changeContainerResource(containerId, changeEvent.getResource());
+ }
+
+ protected void onStopMonitoringContainer(
+ ContainersMonitorEvent monitoringEvent, ContainerId containerId) {
+ LOG.info("Stopping resource-monitoring for " + containerId);
+ updateContainerMetrics(monitoringEvent);
+ trackingContainers.remove(containerId);
+ }
+
+ protected void onStartMonitoringContainer(
+ ContainersMonitorEvent monitoringEvent, ContainerId containerId) {
+ ContainerStartMonitoringEvent startEvent =
+ (ContainerStartMonitoringEvent) monitoringEvent;
+ LOG.info("Starting resource-monitoring for " + containerId);
+ updateContainerMetrics(monitoringEvent);
+ trackingContainers.put(containerId,
+ new ProcessTreeInfo(containerId, null, null,
+ startEvent.getVmemLimit(), startEvent.getPmemLimit(),
+ startEvent.getCpuVcores()));
+ }
+
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java
new file mode 100644
index 0000000000..ef4e57146c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java
@@ -0,0 +1,556 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.queuing;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceUtilization;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.security.NMTokenIdentifier;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
+import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerFinishedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl.ProcessTreeInfo;
+import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Class extending {@link ContainerManagerImpl} and is used when queuing at the
+ * NM is enabled.
+ */
+public class QueuingContainerManagerImpl extends ContainerManagerImpl {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(QueuingContainerManagerImpl.class);
+
+ private ConcurrentMap
+ allocatedGuaranteedContainers;
+ private ConcurrentMap
+ allocatedOpportunisticContainers;
+
+ private Queue queuedGuaranteedContainers;
+ private Queue queuedOpportunisticContainers;
+
+ private Set opportunisticContainersToKill;
+
+ public QueuingContainerManagerImpl(Context context, ContainerExecutor exec,
+ DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater,
+ NodeManagerMetrics metrics, LocalDirsHandlerService dirsHandler) {
+ super(context, exec, deletionContext, nodeStatusUpdater, metrics,
+ dirsHandler);
+ this.allocatedGuaranteedContainers = new ConcurrentHashMap<>();
+ this.allocatedOpportunisticContainers = new ConcurrentHashMap<>();
+ this.queuedGuaranteedContainers = new ConcurrentLinkedQueue<>();
+ this.queuedOpportunisticContainers = new ConcurrentLinkedQueue<>();
+ this.opportunisticContainersToKill = Collections.synchronizedSet(
+ new HashSet());
+ }
+
+ @Override
+ protected EventHandler createApplicationEventDispatcher() {
+ return new QueuingApplicationEventDispatcher(
+ super.createApplicationEventDispatcher());
+ }
+
+ @Override
+ protected void startContainerInternal(NMTokenIdentifier nmTokenIdentifier,
+ ContainerTokenIdentifier containerTokenIdentifier,
+ StartContainerRequest request) throws YarnException, IOException {
+ this.context.getQueuingContext().getQueuedContainers().put(
+ containerTokenIdentifier.getContainerID(), containerTokenIdentifier);
+
+ AllocatedContainerInfo allocatedContInfo = new AllocatedContainerInfo(
+ containerTokenIdentifier, nmTokenIdentifier, request,
+ containerTokenIdentifier.getExecutionType(), containerTokenIdentifier
+ .getResource(), getConfig());
+
+ // If there are already free resources for the container to start, and
+ // there are no queued containers waiting to be executed, start this
+ // container immediately.
+ if (queuedGuaranteedContainers.isEmpty() &&
+ queuedOpportunisticContainers.isEmpty() &&
+ getContainersMonitor().
+ hasResourcesAvailable(allocatedContInfo.getPti())) {
+ startAllocatedContainer(allocatedContInfo);
+ } else {
+ if (allocatedContInfo.getExecutionType() == ExecutionType.GUARANTEED) {
+ queuedGuaranteedContainers.add(allocatedContInfo);
+ // Kill running opportunistic containers to make space for
+ // guaranteed container.
+ killOpportunisticContainers(allocatedContInfo);
+ } else {
+ queuedOpportunisticContainers.add(allocatedContInfo);
+ }
+ }
+ }
+
+ @Override
+ protected void stopContainerInternal(ContainerId containerID)
+ throws YarnException, IOException {
+ Container container = this.context.getContainers().get(containerID);
+ // If container is null and distributed scheduling is enabled, container
+ // might be queued. Otherwise, container might not be handled by this NM.
+ if (container == null && this.context.getQueuingContext()
+ .getQueuedContainers().containsKey(containerID)) {
+ ContainerTokenIdentifier containerTokenId = this.context
+ .getQueuingContext().getQueuedContainers().remove(containerID);
+
+ boolean foundInQueue = removeQueuedContainer(containerID,
+ containerTokenId.getExecutionType());
+
+ if (foundInQueue) {
+ this.context.getQueuingContext().getKilledQueuedContainers().put(
+ containerTokenId,
+ "Queued container request removed by ApplicationMaster.");
+ } else {
+ // The container started execution in the meanwhile.
+ try {
+ stopContainerInternalIfRunning(containerID);
+ } catch (YarnException | IOException e) {
+ LOG.error("Container did not get removed successfully.", e);
+ }
+ }
+
+ nodeStatusUpdater.sendOutofBandHeartBeat();
+ }
+ super.stopContainerInternal(containerID);
+ }
+
+ /**
+ * Start the execution of the given container. Also add it to the allocated
+ * containers, and update allocated resource utilization.
+ */
+ private void startAllocatedContainer(
+ AllocatedContainerInfo allocatedContainerInfo) {
+ ProcessTreeInfo pti = allocatedContainerInfo.getPti();
+
+ if (allocatedContainerInfo.getExecutionType() ==
+ ExecutionType.GUARANTEED) {
+ allocatedGuaranteedContainers.put(pti.getContainerId(),
+ allocatedContainerInfo);
+ } else {
+ allocatedOpportunisticContainers.put(pti.getContainerId(),
+ allocatedContainerInfo);
+ }
+
+ getContainersMonitor().increaseContainersAllocation(pti);
+
+ // Start execution of container.
+ ContainerId containerId = allocatedContainerInfo
+ .getContainerTokenIdentifier().getContainerID();
+ this.context.getQueuingContext().getQueuedContainers().remove(containerId);
+ try {
+ super.startContainerInternal(
+ allocatedContainerInfo.getNMTokenIdentifier(),
+ allocatedContainerInfo.getContainerTokenIdentifier(),
+ allocatedContainerInfo.getStartRequest());
+ } catch (YarnException | IOException e) {
+ containerFailedToStart(pti.getContainerId(),
+ allocatedContainerInfo.getContainerTokenIdentifier());
+ LOG.error("Container failed to start.", e);
+ }
+ }
+
+ private void containerFailedToStart(ContainerId containerId,
+ ContainerTokenIdentifier containerTokenId) {
+ this.context.getQueuingContext().getQueuedContainers().remove(containerId);
+
+ removeAllocatedContainer(containerId);
+
+ this.context.getQueuingContext().getKilledQueuedContainers().put(
+ containerTokenId,
+ "Container removed from queue as it failed to start.");
+ }
+
+ /**
+ * Remove the given container from the container queues.
+ *
+ * @return true if the container was found in one of the queues.
+ */
+ private boolean removeQueuedContainer(ContainerId containerId,
+ ExecutionType executionType) {
+ Queue queue =
+ (executionType == ExecutionType.GUARANTEED) ?
+ queuedGuaranteedContainers : queuedOpportunisticContainers;
+
+ boolean foundInQueue = false;
+ Iterator iter = queue.iterator();
+ while (iter.hasNext() && !foundInQueue) {
+ if (iter.next().getPti().getContainerId().equals(containerId)) {
+ iter.remove();
+ foundInQueue = true;
+ }
+ }
+
+ return foundInQueue;
+ }
+
+ /**
+ * Remove the given container from the allocated containers, and update
+ * allocated container utilization accordingly.
+ */
+ private void removeAllocatedContainer(ContainerId containerId) {
+ AllocatedContainerInfo contToRemove = null;
+
+ contToRemove = allocatedGuaranteedContainers.remove(containerId);
+
+ if (contToRemove == null) {
+ contToRemove = allocatedOpportunisticContainers.remove(containerId);
+ }
+
+ // If container was indeed running, update allocated resource utilization.
+ if (contToRemove != null) {
+ getContainersMonitor().decreaseContainersAllocation(contToRemove
+ .getPti());
+ }
+ }
+
+ /**
+ * Stop a container only if it is currently running. If queued, do not stop
+ * it.
+ */
+ private void stopContainerInternalIfRunning(ContainerId containerID)
+ throws YarnException, IOException {
+ if (this.context.getContainers().containsKey(containerID)) {
+ stopContainerInternal(containerID);
+ }
+ }
+
+ /**
+ * Kill opportunistic containers to free up resources for running the given
+ * container.
+ *
+ * @param allocatedContInfo
+ * the container whose execution needs to start by freeing up
+ * resources occupied by opportunistic containers.
+ */
+ private void killOpportunisticContainers(
+ AllocatedContainerInfo allocatedContInfo) {
+ ContainerId containerToStartId = allocatedContInfo.getPti()
+ .getContainerId();
+ List extraOpportContainersToKill =
+ pickOpportunisticContainersToKill(containerToStartId);
+
+ // Kill the opportunistic containers that were chosen.
+ for (ContainerId contIdToKill : extraOpportContainersToKill) {
+ try {
+ stopContainerInternalIfRunning(contIdToKill);
+ } catch (YarnException | IOException e) {
+ LOG.error("Container did not get removed successfully.", e);
+ }
+ LOG.info(
+ "Opportunistic container {} will be killed in order to start the "
+ + "execution of guaranteed container {}.",
+ contIdToKill, containerToStartId);
+ }
+ }
+
+ /**
+ * Choose the opportunistic containers to kill in order to free up resources
+ * for running the given container.
+ *
+ * @param containerToStartId
+ * the container whose execution needs to start by freeing up
+ * resources occupied by opportunistic containers.
+ * @return the additional opportunistic containers that need to be killed.
+ */
+ protected List pickOpportunisticContainersToKill(
+ ContainerId containerToStartId) {
+ // The additional opportunistic containers that need to be killed for the
+ // given container to start.
+ List extraOpportContainersToKill = new ArrayList<>();
+ // Track resources that need to be freed.
+ ResourceUtilization resourcesToFreeUp = resourcesToFreeUp(
+ containerToStartId);
+
+ // Go over the running opportunistic containers. Avoid containers that have
+ // already been marked for killing.
+ boolean hasSufficientResources = false;
+ for (Map.Entry runningOpportCont :
+ allocatedOpportunisticContainers.entrySet()) {
+ ContainerId runningOpportContId = runningOpportCont.getKey();
+
+ // If there are sufficient resources to execute the given container, do
+ // not kill more opportunistic containers.
+ if (resourcesToFreeUp.getPhysicalMemory() <= 0 &&
+ resourcesToFreeUp.getVirtualMemory() <= 0 &&
+ resourcesToFreeUp.getCPU() <= 0.0f) {
+ hasSufficientResources = true;
+ break;
+ }
+
+ if (!opportunisticContainersToKill.contains(runningOpportContId)) {
+ extraOpportContainersToKill.add(runningOpportContId);
+ opportunisticContainersToKill.add(runningOpportContId);
+ getContainersMonitor().decreaseResourceUtilization(resourcesToFreeUp,
+ runningOpportCont.getValue().getPti());
+ }
+ }
+
+ if (!hasSufficientResources) {
+ LOG.info(
+ "There are no sufficient resources to start guaranteed {} even after "
+ + "attempting to kill any running opportunistic containers.",
+ containerToStartId);
+ }
+
+ return extraOpportContainersToKill;
+ }
+
+ /**
+ * Calculates the amount of resources that need to be freed up (by killing
+ * opportunistic containers) in order for the given guaranteed container to
+ * start its execution. Resource allocation to be freed up =
+ * containersAllocation
-
+ * allocation of opportunisticContainersToKill
+
+ * allocation of queuedGuaranteedContainers
that will start
+ * before the given container +
+ * allocation of given container -
+ * total resources of node.
+ *
+ * @param containerToStartId
+ * the ContainerId of the guaranteed container for which we need to
+ * free resources, so that its execution can start.
+ * @return the resources that need to be freed up for the given guaranteed
+ * container to start.
+ */
+ private ResourceUtilization resourcesToFreeUp(
+ ContainerId containerToStartId) {
+ // Get allocation of currently allocated containers.
+ ResourceUtilization resourceAllocationToFreeUp = ResourceUtilization
+ .newInstance(getContainersMonitor().getContainersAllocation());
+
+ // Subtract from the allocation the allocation of the opportunistic
+ // containers that are marked for killing.
+ for (ContainerId opportContId : opportunisticContainersToKill) {
+ if (allocatedOpportunisticContainers.containsKey(opportContId)) {
+ getContainersMonitor().decreaseResourceUtilization(
+ resourceAllocationToFreeUp,
+ allocatedOpportunisticContainers.get(opportContId).getPti());
+ }
+ }
+ // Add to the allocation the allocation of the pending guaranteed
+ // containers that will start before the current container will be started.
+ for (AllocatedContainerInfo guarContInfo : queuedGuaranteedContainers) {
+ getContainersMonitor().increaseResourceUtilization(
+ resourceAllocationToFreeUp, guarContInfo.getPti());
+ if (guarContInfo.getPti().getContainerId().equals(containerToStartId)) {
+ break;
+ }
+ }
+ // Subtract the overall node resources.
+ getContainersMonitor().subtractNodeResourcesFromResourceUtilization(
+ resourceAllocationToFreeUp);
+
+ return resourceAllocationToFreeUp;
+ }
+
+ /**
+ * If there are available resources, try to start as many pending containers
+ * as possible.
+ */
+ private void startPendingContainers() {
+ // Start pending guaranteed containers, if resources available.
+ boolean resourcesAvailable =
+ startContainersFromQueue(queuedGuaranteedContainers);
+
+ // Start opportunistic containers, if resources available.
+ if (resourcesAvailable) {
+ startContainersFromQueue(queuedOpportunisticContainers);
+ }
+ }
+
+ private boolean startContainersFromQueue(
+ Queue queuedContainers) {
+ Iterator guarIter = queuedContainers.iterator();
+ boolean resourcesAvailable = true;
+
+ while (guarIter.hasNext() && resourcesAvailable) {
+ AllocatedContainerInfo allocatedContInfo = guarIter.next();
+
+ if (getContainersMonitor().hasResourcesAvailable(
+ allocatedContInfo.getPti())) {
+ startAllocatedContainer(allocatedContInfo);
+ guarIter.remove();
+ } else {
+ resourcesAvailable = false;
+ }
+ }
+ return resourcesAvailable;
+ }
+
+ @Override
+ protected ContainerStatus getContainerStatusInternal(ContainerId containerID,
+ NMTokenIdentifier nmTokenIdentifier) throws YarnException {
+ Container container = this.context.getContainers().get(containerID);
+ if (container == null) {
+ ContainerTokenIdentifier containerTokenId = this.context
+ .getQueuingContext().getQueuedContainers().get(containerID);
+ if (containerTokenId != null) {
+ ExecutionType executionType = this.context.getQueuingContext()
+ .getQueuedContainers().get(containerID).getExecutionType();
+ return BuilderUtils.newContainerStatus(containerID,
+ org.apache.hadoop.yarn.api.records.ContainerState.QUEUED, "",
+ ContainerExitStatus.INVALID, this.context.getQueuingContext()
+ .getQueuedContainers().get(containerID).getResource(),
+ executionType);
+ }
+ }
+ return super.getContainerStatusInternal(containerID, nmTokenIdentifier);
+ }
+
+ @VisibleForTesting
+ public int getNumAllocatedGuaranteedContainers() {
+ return allocatedGuaranteedContainers.size();
+ }
+
+ @VisibleForTesting
+ public int getNumAllocatedOpportunisticContainers() {
+ return allocatedOpportunisticContainers.size();
+ }
+
+ class QueuingApplicationEventDispatcher implements
+ EventHandler {
+ private EventHandler applicationEventDispatcher;
+
+ public QueuingApplicationEventDispatcher(
+ EventHandler applicationEventDispatcher) {
+ this.applicationEventDispatcher = applicationEventDispatcher;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void handle(ApplicationEvent event) {
+ if (event.getType() ==
+ ApplicationEventType.APPLICATION_CONTAINER_FINISHED) {
+ if (!(event instanceof ApplicationContainerFinishedEvent)) {
+ throw new RuntimeException("Unexpected event type: " + event);
+ }
+ ApplicationContainerFinishedEvent finishEvent =
+ (ApplicationContainerFinishedEvent) event;
+ // Remove finished container from the allocated containers, and
+ // attempt to start new containers.
+ ContainerId contIdToRemove = finishEvent.getContainerID();
+ removeAllocatedContainer(contIdToRemove);
+ opportunisticContainersToKill.remove(contIdToRemove);
+ startPendingContainers();
+ }
+ this.applicationEventDispatcher.handle(event);
+ }
+ }
+
+ static class AllocatedContainerInfo {
+ private final ContainerTokenIdentifier containerTokenIdentifier;
+ private final NMTokenIdentifier nmTokenIdentifier;
+ private final StartContainerRequest startRequest;
+ private final ExecutionType executionType;
+ private final ProcessTreeInfo pti;
+
+ AllocatedContainerInfo(ContainerTokenIdentifier containerTokenIdentifier,
+ NMTokenIdentifier nmTokenIdentifier, StartContainerRequest startRequest,
+ ExecutionType executionType, Resource resource, Configuration conf) {
+ this.containerTokenIdentifier = containerTokenIdentifier;
+ this.nmTokenIdentifier = nmTokenIdentifier;
+ this.startRequest = startRequest;
+ this.executionType = executionType;
+ this.pti = createProcessTreeInfo(containerTokenIdentifier
+ .getContainerID(), resource, conf);
+ }
+
+ private ContainerTokenIdentifier getContainerTokenIdentifier() {
+ return this.containerTokenIdentifier;
+ }
+
+ private NMTokenIdentifier getNMTokenIdentifier() {
+ return this.nmTokenIdentifier;
+ }
+
+ private StartContainerRequest getStartRequest() {
+ return this.startRequest;
+ }
+
+ private ExecutionType getExecutionType() {
+ return this.executionType;
+ }
+
+ protected ProcessTreeInfo getPti() {
+ return this.pti;
+ }
+
+ private ProcessTreeInfo createProcessTreeInfo(ContainerId containerId,
+ Resource resource, Configuration conf) {
+ long pmemBytes = resource.getMemory() * 1024 * 1024L;
+ float pmemRatio = conf.getFloat(YarnConfiguration.NM_VMEM_PMEM_RATIO,
+ YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
+ long vmemBytes = (long) (pmemRatio * pmemBytes);
+ int cpuVcores = resource.getVirtualCores();
+
+ return new ProcessTreeInfo(containerId, null, null, vmemBytes, pmemBytes,
+ cpuVcores);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ boolean equal = false;
+ if (obj instanceof AllocatedContainerInfo) {
+ AllocatedContainerInfo otherContInfo = (AllocatedContainerInfo) obj;
+ equal = this.getPti().getContainerId()
+ .equals(otherContInfo.getPti().getContainerId());
+ }
+ return equal;
+ }
+
+ @Override
+ public int hashCode() {
+ return this.getPti().getContainerId().hashCode();
+ }
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/package-info.java
new file mode 100644
index 0000000000..0250807bc0
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+/**
+ * This package contains classes related to the queuing of containers at
+ * the NM.
+ *
+ */
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.queuing;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
index fa8d131c6b..2fcce1d38e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
@@ -41,7 +41,6 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
index 1aea9d2763..6c904ebef3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
@@ -683,5 +683,10 @@ public NodeResourceMonitor getNodeResourceMonitor() {
public NodeStatusUpdater getNodeStatusUpdater() {
return null;
}
+
+ @Override
+ public QueuingContext getQueuingContext() {
+ return null;
+ }
}
-}
\ No newline at end of file
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
index 787778ef14..f37129e40d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
@@ -280,21 +280,22 @@ public static void waitForContainerState(ContainerManagementProtocol containerMa
list.add(containerID);
GetContainerStatusesRequest request =
GetContainerStatusesRequest.newInstance(list);
- ContainerStatus containerStatus =
- containerManager.getContainerStatuses(request).getContainerStatuses()
- .get(0);
+ ContainerStatus containerStatus = null;
int timeoutSecs = 0;
- while (!containerStatus.getState().equals(finalState)
- && timeoutSecs++ < timeOutMax) {
- Thread.sleep(1000);
- LOG.info("Waiting for container to get into state " + finalState
- + ". Current state is " + containerStatus.getState());
- containerStatus = containerManager.getContainerStatuses(request).getContainerStatuses().get(0);
- }
- LOG.info("Container state is " + containerStatus.getState());
- Assert.assertEquals("ContainerState is not correct (timedout)",
- finalState, containerStatus.getState());
- }
+ do {
+ Thread.sleep(2000);
+ containerStatus =
+ containerManager.getContainerStatuses(request)
+ .getContainerStatuses().get(0);
+ LOG.info("Waiting for container to get into state " + finalState
+ + ". Current state is " + containerStatus.getState());
+ timeoutSecs += 2;
+ } while (!containerStatus.getState().equals(finalState)
+ && timeoutSecs < timeOutMax);
+ LOG.info("Container state is " + containerStatus.getState());
+ Assert.assertEquals("ContainerState is not correct (timedout)",
+ finalState, containerStatus.getState());
+ }
static void waitForApplicationState(ContainerManagerImpl containerManager,
ApplicationId appID, ApplicationState finalState)
@@ -328,19 +329,24 @@ public static void waitForNMContainerState(ContainerManagerImpl
org.apache.hadoop.yarn.server.nodemanager.containermanager
.container.ContainerState finalState, int timeOutMax)
throws InterruptedException, YarnException, IOException {
- Container container =
- containerManager.getContext().getContainers().get(containerID);
+ Container container = null;
org.apache.hadoop.yarn.server.nodemanager
- .containermanager.container.ContainerState currentState =
- container.getContainerState();
+ .containermanager.container.ContainerState currentState = null;
int timeoutSecs = 0;
- while (!currentState.equals(finalState)
- && timeoutSecs++ < timeOutMax) {
- Thread.sleep(1000);
- LOG.info("Waiting for NM container to get into state " + finalState
- + ". Current state is " + currentState);
- currentState = container.getContainerState();
- }
+ do {
+ Thread.sleep(2000);
+ container =
+ containerManager.getContext().getContainers().get(containerID);
+ if (container != null) {
+ currentState = container.getContainerState();
+ }
+ if (currentState != null) {
+ LOG.info("Waiting for NM container to get into state " + finalState
+ + ". Current state is " + currentState);
+ }
+ timeoutSecs += 2;
+ } while (!currentState.equals(finalState)
+ && timeoutSecs++ < timeOutMax);
LOG.info("Container state is " + currentState);
Assert.assertEquals("ContainerState is not correct (timedout)",
finalState, currentState);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
index 3f5fc825c5..702198e9a9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
@@ -58,6 +58,7 @@
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
@@ -74,6 +75,7 @@
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.ContainerType;
import org.apache.hadoop.yarn.server.api.ResourceManagerConstants;
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
import org.apache.hadoop.yarn.server.nodemanager.CMgrDecreaseContainersResourceEvent;
@@ -176,7 +178,7 @@ public void testContainerManagerInitialization() throws IOException {
// Just do a query for a non-existing container.
boolean throwsException = false;
try {
- List containerIds = new ArrayList();
+ List containerIds = new ArrayList<>();
ContainerId id =createContainerId(0);
containerIds.add(id);
GetContainerStatusesRequest request =
@@ -231,14 +233,14 @@ public void testContainerSetup() throws Exception {
containerLaunchContext,
createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(),
user, context.getContainerTokenSecretManager()));
- List list = new ArrayList();
+ List list = new ArrayList<>();
list.add(scRequest);
StartContainersRequest allRequests =
StartContainersRequest.newInstance(list);
containerManager.startContainers(allRequests);
BaseContainerManagerTest.waitForContainerState(containerManager, cId,
- ContainerState.COMPLETE);
+ ContainerState.COMPLETE, 40);
// Now ascertain that the resources are localised correctly.
ApplicationId appId = cId.getApplicationAttemptId().getApplicationId();
@@ -323,7 +325,7 @@ public void testContainerLaunchAndStop() throws IOException,
createContainerToken(cId,
DUMMY_RM_IDENTIFIER, context.getNodeId(), user,
context.getContainerTokenSecretManager()));
- List list = new ArrayList();
+ List list = new ArrayList<>();
list.add(scRequest);
StartContainersRequest allRequests =
StartContainersRequest.newInstance(list);
@@ -355,7 +357,7 @@ public void testContainerLaunchAndStop() throws IOException,
Assert.assertTrue("Process is not alive!",
DefaultContainerExecutor.containerIsAlive(pid));
- List containerIds = new ArrayList();
+ List containerIds = new ArrayList<>();
containerIds.add(cId);
StopContainersRequest stopRequest =
StopContainersRequest.newInstance(containerIds);
@@ -375,7 +377,7 @@ public void testContainerLaunchAndStop() throws IOException,
DefaultContainerExecutor.containerIsAlive(pid));
}
- private void testContainerLaunchAndExit(int exitCode) throws IOException,
+ protected void testContainerLaunchAndExit(int exitCode) throws IOException,
InterruptedException, YarnException {
File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile");
@@ -430,7 +432,7 @@ private void testContainerLaunchAndExit(int exitCode) throws IOException,
containerLaunchContext,
createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(),
user, context.getContainerTokenSecretManager()));
- List list = new ArrayList();
+ List list = new ArrayList<>();
list.add(scRequest);
StartContainersRequest allRequests =
StartContainersRequest.newInstance(list);
@@ -439,12 +441,12 @@ private void testContainerLaunchAndExit(int exitCode) throws IOException,
BaseContainerManagerTest.waitForContainerState(containerManager, cId,
ContainerState.COMPLETE);
- List containerIds = new ArrayList();
+ List containerIds = new ArrayList<>();
containerIds.add(cId);
GetContainerStatusesRequest gcsRequest =
GetContainerStatusesRequest.newInstance(containerIds);
- ContainerStatus containerStatus =
- containerManager.getContainerStatuses(gcsRequest).getContainerStatuses().get(0);
+ ContainerStatus containerStatus = containerManager.
+ getContainerStatuses(gcsRequest).getContainerStatuses().get(0);
// Verify exit status matches exit state of script
Assert.assertEquals(exitCode,
@@ -520,7 +522,7 @@ public void testLocalFilesCleanup() throws InterruptedException,
containerLaunchContext,
createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(),
user, context.getContainerTokenSecretManager()));
- List list = new ArrayList();
+ List list = new ArrayList<>();
list.add(scRequest);
StartContainersRequest allRequests =
StartContainersRequest.newInstance(list);
@@ -605,7 +607,7 @@ public void testContainerLaunchFromPreviousRM() throws IOException,
createContainerToken(cId1,
ResourceManagerConstants.RM_INVALID_IDENTIFIER, context.getNodeId(),
user, context.getContainerTokenSecretManager()));
- List list = new ArrayList();
+ List list = new ArrayList<>();
list.add(startRequest1);
StartContainersRequest allRequests =
StartContainersRequest.newInstance(list);
@@ -635,7 +637,7 @@ public void testContainerLaunchFromPreviousRM() throws IOException,
createContainerToken(cId2,
DUMMY_RM_IDENTIFIER, context.getNodeId(), user,
context.getContainerTokenSecretManager()));
- List list2 = new ArrayList();
+ List list2 = new ArrayList<>();
list.add(startRequest2);
StartContainersRequest allRequests2 =
StartContainersRequest.newInstance(list2);
@@ -655,7 +657,7 @@ public void testContainerLaunchFromPreviousRM() throws IOException,
public void testMultipleContainersLaunch() throws Exception {
containerManager.start();
- List list = new ArrayList();
+ List list = new ArrayList<>();
ContainerLaunchContext containerLaunchContext =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
for (int i = 0; i < 10; i++) {
@@ -679,6 +681,7 @@ public void testMultipleContainersLaunch() throws Exception {
StartContainersResponse response =
containerManager.startContainers(requestList);
+ Thread.sleep(5000);
Assert.assertEquals(5, response.getSuccessfullyStartedContainers().size());
for (ContainerId id : response.getSuccessfullyStartedContainers()) {
@@ -699,12 +702,11 @@ public void testMultipleContainersLaunch() throws Exception {
@Test
public void testMultipleContainersStopAndGetStatus() throws Exception {
containerManager.start();
- List startRequest =
- new ArrayList();
+ List startRequest = new ArrayList<>();
ContainerLaunchContext containerLaunchContext =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
- List containerIds = new ArrayList();
+ List containerIds = new ArrayList<>();
for (int i = 0; i < 10; i++) {
ContainerId cId = createContainerId(i);
String user = null;
@@ -727,6 +729,7 @@ public void testMultipleContainersStopAndGetStatus() throws Exception {
StartContainersRequest requestList =
StartContainersRequest.newInstance(startRequest);
containerManager.startContainers(requestList);
+ Thread.sleep(5000);
// Get container statuses
GetContainerStatusesRequest statusRequest =
@@ -777,8 +780,7 @@ public void testStartContainerFailureWithUnknownAuxService() throws Exception {
ServiceA.class, Service.class);
containerManager.start();
- List startRequest =
- new ArrayList();
+ List startRequest = new ArrayList<>();
ContainerLaunchContext containerLaunchContext =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
@@ -803,8 +805,8 @@ public void testStartContainerFailureWithUnknownAuxService() throws Exception {
StartContainersResponse response =
containerManager.startContainers(requestList);
- Assert.assertTrue(response.getFailedRequests().size() == 1);
- Assert.assertTrue(response.getSuccessfullyStartedContainers().size() == 0);
+ Assert.assertEquals(1, response.getFailedRequests().size());
+ Assert.assertEquals(0, response.getSuccessfullyStartedContainers().size());
Assert.assertTrue(response.getFailedRequests().containsKey(cId));
Assert.assertTrue(response.getFailedRequests().get(cId).getMessage()
.contains("The auxService:" + serviceName + " does not exist"));
@@ -880,8 +882,7 @@ public void testNullTokens() throws Exception {
ContainerManagerImpl.INVALID_NMTOKEN_MSG);
Mockito.doNothing().when(spyContainerMgr).authorizeUser(ugInfo, null);
- List reqList
- = new ArrayList();
+ List reqList = new ArrayList<>();
reqList.add(StartContainerRequest.newInstance(null, null));
StartContainersRequest reqs = new StartContainersRequestPBImpl();
reqs.setStartContainerRequests(reqList);
@@ -925,7 +926,7 @@ public void testIncreaseContainerResourceWithInvalidRequests() throws Exception
Thread.sleep(2000);
// Construct container resource increase request,
- List increaseTokens = new ArrayList();
+ List increaseTokens = new ArrayList<>();
// Add increase request for container-0, the request will fail as the
// container will have exited, and won't be in RUNNING state
ContainerId cId0 = createContainerId(0);
@@ -1012,7 +1013,7 @@ public void testIncreaseContainerResourceWithInvalidResource() throws Exception
containerLaunchContext,
createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(),
user, context.getContainerTokenSecretManager()));
- List list = new ArrayList();
+ List list = new ArrayList<>();
list.add(scRequest);
StartContainersRequest allRequests =
StartContainersRequest.newInstance(list);
@@ -1022,7 +1023,7 @@ public void testIncreaseContainerResourceWithInvalidResource() throws Exception
org.apache.hadoop.yarn.server.nodemanager.
containermanager.container.ContainerState.RUNNING);
// Construct container resource increase request,
- List increaseTokens = new ArrayList();
+ List increaseTokens = new ArrayList<>();
// Add increase request. The increase request should fail
// as the current resource does not fit in the target resource
Token containerToken =
@@ -1096,7 +1097,7 @@ public void testChangeContainerResource() throws Exception {
createContainerToken(cId, DUMMY_RM_IDENTIFIER,
context.getNodeId(), user,
context.getContainerTokenSecretManager()));
- List list = new ArrayList();
+ List list = new ArrayList<>();
list.add(scRequest);
StartContainersRequest allRequests =
StartContainersRequest.newInstance(list);
@@ -1106,7 +1107,7 @@ public void testChangeContainerResource() throws Exception {
org.apache.hadoop.yarn.server.nodemanager.
containermanager.container.ContainerState.RUNNING);
// Construct container resource increase request,
- List increaseTokens = new ArrayList();
+ List increaseTokens = new ArrayList<>();
// Add increase request.
Resource targetResource = Resource.newInstance(4096, 2);
Token containerToken = createContainerToken(cId, DUMMY_RM_IDENTIFIER,
@@ -1184,6 +1185,21 @@ public static Token createContainerToken(ContainerId cId, long rmIdentifier,
containerTokenIdentifier);
}
+ public static Token createContainerToken(ContainerId cId, long rmIdentifier,
+ NodeId nodeId, String user, Resource resource,
+ NMContainerTokenSecretManager containerTokenSecretManager,
+ LogAggregationContext logAggregationContext, ExecutionType executionType)
+ throws IOException {
+ ContainerTokenIdentifier containerTokenIdentifier =
+ new ContainerTokenIdentifier(cId, nodeId.toString(), user, resource,
+ System.currentTimeMillis() + 100000L, 123, rmIdentifier,
+ Priority.newInstance(0), 0, logAggregationContext, null,
+ ContainerType.TASK, executionType);
+ return BuilderUtils.newContainerToken(nodeId, containerTokenSecretManager
+ .retrievePassword(containerTokenIdentifier),
+ containerTokenIdentifier);
+ }
+
@Test
public void testOutputThreadDumpSignal() throws IOException,
InterruptedException, YarnException {
@@ -1241,7 +1257,7 @@ private void testContainerLaunchAndSignal(SignalContainerCommand command)
new HashMap();
localResources.put(destinationFile, rsrc_alpha);
containerLaunchContext.setLocalResources(localResources);
- List commands = new ArrayList();
+ List commands = new ArrayList<>();
commands.add("/bin/bash");
commands.add(scriptFile.getAbsolutePath());
containerLaunchContext.setCommands(commands);
@@ -1250,7 +1266,7 @@ private void testContainerLaunchAndSignal(SignalContainerCommand command)
containerLaunchContext,
createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(),
user, context.getContainerTokenSecretManager()));
- List list = new ArrayList();
+ List list = new ArrayList<>();
list.add(scRequest);
StartContainersRequest allRequests =
StartContainersRequest.newInstance(list);
@@ -1267,7 +1283,7 @@ private void testContainerLaunchAndSignal(SignalContainerCommand command)
// Simulate NodeStatusUpdaterImpl sending CMgrSignalContainersEvent
SignalContainerRequest signalReq =
SignalContainerRequest.newInstance(cId, command);
- List reqs = new ArrayList();
+ List reqs = new ArrayList<>();
reqs.add(signalReq);
containerManager.handle(new CMgrSignalContainersEvent(reqs));
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/MockResourceCalculatorPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/MockResourceCalculatorPlugin.java
index bbde9ed7c5..0dc5c5b3ce 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/MockResourceCalculatorPlugin.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/MockResourceCalculatorPlugin.java
@@ -22,6 +22,10 @@
public class MockResourceCalculatorPlugin extends ResourceCalculatorPlugin {
+ public MockResourceCalculatorPlugin() {
+ super(null);
+ }
+
@Override
public long getVirtualMemorySize() {
return 0;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java
index d7f89fc0b0..1a0c690a7e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java
@@ -27,6 +27,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
@@ -173,8 +174,8 @@ public void testContainersResourceChange() throws Exception {
assertTrue(containerEventHandler
.isContainerKilled(getContainerId(1)));
// create container 2
- containersMonitor.handle(new ContainerStartMonitoringEvent(
- getContainerId(2), 2202009L, 1048576L, 1, 0, 0));
+ containersMonitor.handle(new ContainerStartMonitoringEvent(getContainerId(
+ 2), 2202009L, 1048576L, 1, 0, 0));
// verify that this container is properly tracked
assertNotNull(getProcessTreeInfo(getContainerId(2)));
assertEquals(1048576L, getProcessTreeInfo(getContainerId(2))
@@ -215,8 +216,8 @@ public void testContainersResourceChangeIsTriggeredImmediately()
// now waiting for the next monitor cycle
Thread.sleep(1000);
// create a container with id 3
- containersMonitor.handle(new ContainerStartMonitoringEvent(
- getContainerId(3), 2202009L, 1048576L, 1, 0, 0));
+ containersMonitor.handle(new ContainerStartMonitoringEvent(getContainerId(
+ 3), 2202009L, 1048576L, 1, 0, 0));
// Verify that this container has been tracked
assertNotNull(getProcessTreeInfo(getContainerId(3)));
// trigger a change resource event, check limit after change
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/TestQueuingContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/TestQueuingContainerManager.java
new file mode 100644
index 0000000000..0d951f4eb6
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/TestQueuingContainerManager.java
@@ -0,0 +1,301 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.queuing;
+
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.NMTokenIdentifier;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestContainerManager;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
+
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor
+ .ContainersMonitorImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.MockResourceCalculatorPlugin;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.MockResourceCalculatorProcessTree;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TestQueuingContainerManager extends TestContainerManager {
+
+ interface HasResources {
+ boolean decide(Context context, ContainerId cId);
+ }
+
+ public TestQueuingContainerManager() throws UnsupportedFileSystemException {
+ super();
+ }
+
+ static {
+ LOG = LogFactory.getLog(TestQueuingContainerManager.class);
+ }
+
+ HasResources hasResources = null;
+ boolean shouldDeleteWait = false;
+
+ @Override
+ protected ContainerManagerImpl
+ createContainerManager(DeletionService delSrvc) {
+ return new QueuingContainerManagerImpl(context, exec, delSrvc,
+ nodeStatusUpdater, metrics, dirsHandler) {
+
+ @Override
+ public void serviceInit(Configuration conf) throws Exception {
+ conf.set(
+ YarnConfiguration.NM_CONTAINER_MON_RESOURCE_CALCULATOR,
+ MockResourceCalculatorPlugin.class.getCanonicalName());
+ conf.set(
+ YarnConfiguration.NM_CONTAINER_MON_PROCESS_TREE,
+ MockResourceCalculatorProcessTree.class.getCanonicalName());
+ super.serviceInit(conf);
+ }
+
+ @Override
+ public void
+ setBlockNewContainerRequests(boolean blockNewContainerRequests) {
+ // do nothing
+ }
+
+ @Override
+ protected UserGroupInformation getRemoteUgi() throws YarnException {
+ ApplicationId appId = ApplicationId.newInstance(0, 0);
+ ApplicationAttemptId appAttemptId =
+ ApplicationAttemptId.newInstance(appId, 1);
+ UserGroupInformation ugi =
+ UserGroupInformation.createRemoteUser(appAttemptId.toString());
+ ugi.addTokenIdentifier(new NMTokenIdentifier(appAttemptId, context
+ .getNodeId(), user, context.getNMTokenSecretManager().getCurrentKey()
+ .getKeyId()));
+ return ugi;
+ }
+
+ @Override
+ protected void authorizeGetAndStopContainerRequest(ContainerId containerId,
+ Container container, boolean stopRequest, NMTokenIdentifier identifier) throws YarnException {
+ if(container == null || container.getUser().equals("Fail")){
+ throw new YarnException("Reject this container");
+ }
+ }
+
+ @Override
+ protected ContainersMonitor createContainersMonitor(ContainerExecutor
+ exec) {
+ return new ContainersMonitorImpl(exec, dispatcher, this.context) {
+ @Override
+ public boolean hasResourcesAvailable(
+ ContainersMonitorImpl.ProcessTreeInfo pti) {
+ return hasResources.decide(this.context, pti.getContainerId());
+ }
+ };
+ }
+ };
+ }
+
+ @Override
+ protected DeletionService createDeletionService() {
+ return new DeletionService(exec) {
+ @Override
+ public void delete(String user, Path subDir, Path... baseDirs) {
+ // Don't do any deletions.
+ if (shouldDeleteWait) {
+ try {
+ Thread.sleep(10000);
+ LOG.info("\n\nSleeping Pseudo delete : user - " + user + ", " +
+ "subDir - " + subDir + ", " +
+ "baseDirs - " + Arrays.asList(baseDirs));
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ } else {
+ LOG.info("\n\nPseudo delete : user - " + user + ", " +
+ "subDir - " + subDir + ", " +
+ "baseDirs - " + Arrays.asList(baseDirs));
+ }
+ }
+ };
+ }
+
+ @Override
+ public void setup() throws IOException {
+ super.setup();
+ shouldDeleteWait = false;
+ hasResources = new HasResources() {
+ @Override
+ public boolean decide(Context context, ContainerId cId) {
+ return true;
+ }
+ };
+ }
+
+ /**
+ * Test to verify that an OPPORTUNISTIC container is killed when
+ * a GUARANTEED container arrives and all the Node Resources are used up
+ *
+ * For this specific test case, 4 containers are requested (last one being
+ * guaranteed). Assumptions :
+ * 1) The first OPPORTUNISTIC Container will start running
+ * 2) The second and third OPP containers will be queued
+ * 3) When the GUARANTEED container comes in, the running OPP container
+ * will be killed to make room
+ * 4) After the GUARANTEED container finishes, the remaining 2 OPP
+ * containers will be dequeued and run.
+ * 5) Only the first OPP container will be killed.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testSimpleOpportunisticContainer() throws Exception {
+ shouldDeleteWait = true;
+ containerManager.start();
+
+ // ////// Create the resources for the container
+ File dir = new File(tmpDir, "dir");
+ dir.mkdirs();
+ File file = new File(dir, "file");
+ PrintWriter fileWriter = new PrintWriter(file);
+ fileWriter.write("Hello World!");
+ fileWriter.close();
+
+ // ////// Construct the container-spec.
+ ContainerLaunchContext containerLaunchContext =
+ recordFactory.newRecordInstance(ContainerLaunchContext.class);
+ URL resource_alpha =
+ ConverterUtils.getYarnUrlFromPath(localFS
+ .makeQualified(new Path(file.getAbsolutePath())));
+ LocalResource rsrc_alpha =
+ recordFactory.newRecordInstance(LocalResource.class);
+ rsrc_alpha.setResource(resource_alpha);
+ rsrc_alpha.setSize(-1);
+ rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION);
+ rsrc_alpha.setType(LocalResourceType.FILE);
+ rsrc_alpha.setTimestamp(file.lastModified());
+ String destinationFile = "dest_file";
+ Map localResources =
+ new HashMap();
+ localResources.put(destinationFile, rsrc_alpha);
+ containerLaunchContext.setLocalResources(localResources);
+
+ // Start 3 OPPORTUNISTIC containers and 1 GUARANTEED container
+ List list = new ArrayList<>();
+ list.add(StartContainerRequest.newInstance(
+ containerLaunchContext,
+ createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER,
+ context.getNodeId(),
+ user, BuilderUtils.newResource(1024, 1),
+ context.getContainerTokenSecretManager(), null,
+ ExecutionType.OPPORTUNISTIC)));
+ list.add(StartContainerRequest.newInstance(
+ containerLaunchContext,
+ createContainerToken(createContainerId(1), DUMMY_RM_IDENTIFIER,
+ context.getNodeId(),
+ user, BuilderUtils.newResource(1024, 1),
+ context.getContainerTokenSecretManager(), null,
+ ExecutionType.OPPORTUNISTIC)));
+ list.add(StartContainerRequest.newInstance(
+ containerLaunchContext,
+ createContainerToken(createContainerId(2), DUMMY_RM_IDENTIFIER,
+ context.getNodeId(),
+ user, BuilderUtils.newResource(1024, 1),
+ context.getContainerTokenSecretManager(), null,
+ ExecutionType.OPPORTUNISTIC)));
+ // GUARANTEED
+ list.add(StartContainerRequest.newInstance(
+ containerLaunchContext,
+ createContainerToken(createContainerId(3), DUMMY_RM_IDENTIFIER,
+ context.getNodeId(),
+ user, context.getContainerTokenSecretManager())));
+ StartContainersRequest allRequests =
+ StartContainersRequest.newInstance(list);
+
+ // Plugin to simulate that the Node is full
+ // It only allows 1 container to run at a time.
+ hasResources = new HasResources() {
+ @Override
+ public boolean decide(Context context, ContainerId cId) {
+ int nOpp = ((QueuingContainerManagerImpl) containerManager)
+ .getNumAllocatedOpportunisticContainers();
+ int nGuar = ((QueuingContainerManagerImpl) containerManager)
+ .getNumAllocatedGuaranteedContainers();
+ boolean val = (nOpp + nGuar < 1);
+ System.out.println("\nHasResources : [" + cId + "]," +
+ "Opp[" + nOpp + "], Guar[" + nGuar + "], [" + val + "]\n");
+ return val;
+ }
+ };
+
+ containerManager.startContainers(allRequests);
+
+ BaseContainerManagerTest.waitForContainerState(containerManager,
+ createContainerId(3),
+ ContainerState.COMPLETE, 40);
+ List statList = new ArrayList();
+ for (int i = 0; i < 4; i++) {
+ statList.add(createContainerId(i));
+ }
+ GetContainerStatusesRequest statRequest =
+ GetContainerStatusesRequest.newInstance(statList);
+ List containerStatuses = containerManager
+ .getContainerStatuses(statRequest).getContainerStatuses();
+ for (ContainerStatus status : containerStatuses) {
+ // Ensure that the first opportunistic container is killed
+ if (status.getContainerId().equals(createContainerId(0))) {
+ Assert.assertTrue(status.getDiagnostics()
+ .contains("Container killed by the ApplicationMaster"));
+ }
+ System.out.println("\nStatus : [" + status + "]\n");
+ }
+ }
+}