diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 7df41498ef..7ed46109b3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1255,7 +1255,21 @@ public class YarnConfiguration extends Configuration {
/** Prefix for all node manager configs.*/
public static final String NM_PREFIX = "yarn.nodemanager.";
- /** Max Queue length of OPPORTUNISTIC
containers on the NM. */
+ /**
+ * At the NM, the policy to determine whether to queue an
+ * OPPORTUNISTIC
container or not.
+ * If set to BY_QUEUE_LEN
, uses the queue capacity, as set by
+ * {@link YarnConfiguration#NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH},
+ * to limit how many containers to accept/queue.
+ * If set to BY_RESOURCES
, limits the number of containers
+ * accepted based on the resource capacity of the node.
+ */
+ public static final String NM_OPPORTUNISTIC_CONTAINERS_QUEUE_POLICY =
+ NM_PREFIX + "opportunistic-containers-queue-policy";
+
+ /** Max Queue length of OPPORTUNISTIC
containers on the NM.
+ * If set to 0, NM does not accept any OPPORTUNISTIC
containers.
+ * If set to {@literal > 0}, enforces the queue capacity. */
public static final String NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH =
NM_PREFIX + "opportunistic-containers-max-queue-length";
public static final int DEFAULT_NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH =
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index d6bef5b3a9..53b6b14cfd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -1292,8 +1292,27 @@
- Max number of OPPORTUNISTIC containers to queue at the
- nodemanager.
+
+ At the NM, the policy to determine whether to queue an
+ OPPORTUNISTIC container or not.
+ If set to BY_QUEUE_LEN, uses the queue capacity, as set by
+ yarn.nodemanager.opportunistic-containers-max-queue-length
+ to limit how many containers to accept/queue.
+ If set to BY_RESOURCES, limits the number of containers
+ accepted based on the resource capacity of the node.
+
+ yarn.nodemanager.opportunistic-containers-queue-policy
+ BY_QUEUE_LEN
+
+
+
+
+ Max number of OPPORTUNISTIC containers to queue at the
+ nodemanager (NM). If the value is 0 or negative,
+ NMs do not allow any OPPORTUNISTIC containers.
+ If the value is positive, the NM caps the number of OPPORTUNISTIC
+ containers that can be queued at the NM.
+
yarn.nodemanager.opportunistic-containers-max-queue-length
0
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceUtilizationTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceUtilizationTracker.java
index 100676d27d..879a5d9d0b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceUtilizationTracker.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceUtilizationTracker.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
@@ -34,6 +35,9 @@ public class AllocationBasedResourceUtilizationTracker implements
private static final Logger LOG =
LoggerFactory.getLogger(AllocationBasedResourceUtilizationTracker.class);
+ private static final long LEFT_SHIFT_MB_IN_BYTES = 20;
+ private static final int RIGHT_SHIFT_BYTES_IN_MB = 20;
+
private ResourceUtilization containersAllocation;
private ContainerScheduler scheduler;
@@ -80,10 +84,34 @@ public class AllocationBasedResourceUtilizationTracker implements
*/
@Override
public boolean hasResourcesAvailable(Container container) {
- long pMemBytes = container.getResource().getMemorySize() * 1024 * 1024L;
- return hasResourcesAvailable(pMemBytes,
- (long) (getContainersMonitor().getVmemRatio()* pMemBytes),
- container.getResource().getVirtualCores());
+ return hasResourcesAvailable(container.getResource());
+ }
+
+ /**
+ * Converts memory in megabytes to bytes by bitwise left-shifting 20 times.
+ * @param memMB the memory in megabytes
+ * @return the memory in bytes
+ */
+ private static long convertMBToBytes(final long memMB) {
+ return memMB << LEFT_SHIFT_MB_IN_BYTES;
+ }
+
+ /**
+ * Converts memory in bytes to megabytes by bitwise right-shifting 20 times.
+ * @param bytes the memory in bytes
+ * @return the memory in megabytes
+ */
+ private static long convertBytesToMB(final long bytes) {
+ return bytes >> RIGHT_SHIFT_BYTES_IN_MB;
+ }
+
+ @Override
+ public boolean hasResourcesAvailable(Resource resource) {
+ long pMemBytes = convertMBToBytes(resource.getMemorySize());
+ final long vmemBytes = (long)
+ (getContainersMonitor().getVmemRatio() * pMemBytes);
+ return hasResourcesAvailable(
+ pMemBytes, vmemBytes, resource.getVirtualCores());
}
private boolean hasResourcesAvailable(long pMemBytes, long vMemBytes,
@@ -92,13 +120,13 @@ public class AllocationBasedResourceUtilizationTracker implements
if (LOG.isDebugEnabled()) {
LOG.debug("pMemCheck [current={} + asked={} > allowed={}]",
this.containersAllocation.getPhysicalMemory(),
- (pMemBytes >> 20),
- (getContainersMonitor().getPmemAllocatedForContainers() >> 20));
+ convertBytesToMB(pMemBytes),
+ convertBytesToMB(
+ getContainersMonitor().getPmemAllocatedForContainers()));
}
if (this.containersAllocation.getPhysicalMemory() +
- (int) (pMemBytes >> 20) >
- (int) (getContainersMonitor()
- .getPmemAllocatedForContainers() >> 20)) {
+ convertBytesToMB(pMemBytes) > convertBytesToMB(
+ getContainersMonitor().getPmemAllocatedForContainers())) {
return false;
}
@@ -106,15 +134,17 @@ public class AllocationBasedResourceUtilizationTracker implements
LOG.debug("before vMemCheck" +
"[isEnabled={}, current={} + asked={} > allowed={}]",
getContainersMonitor().isVmemCheckEnabled(),
- this.containersAllocation.getVirtualMemory(), (vMemBytes >> 20),
- (getContainersMonitor().getVmemAllocatedForContainers() >> 20));
+ this.containersAllocation.getVirtualMemory(),
+ convertBytesToMB(vMemBytes),
+ convertBytesToMB(
+ getContainersMonitor().getVmemAllocatedForContainers()));
}
// Check virtual memory.
if (getContainersMonitor().isVmemCheckEnabled() &&
this.containersAllocation.getVirtualMemory() +
- (int) (vMemBytes >> 20) >
- (int) (getContainersMonitor()
- .getVmemAllocatedForContainers() >> 20)) {
+ convertBytesToMB(vMemBytes) >
+ convertBytesToMB(getContainersMonitor()
+ .getVmemAllocatedForContainers())) {
return false;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
index b52e125367..4ae7936c83 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
@@ -46,6 +47,7 @@ import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService
.RecoveredContainerState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
+import org.apache.hadoop.yarn.util.resource.Resources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -74,6 +76,7 @@ public class ContainerScheduler extends AbstractService implements
private final Context context;
// Capacity of the queue for opportunistic Containers.
private final int maxOppQueueLength;
+ private final boolean forceStartGuaranteedContainers;
// Queue of Guaranteed Containers waiting for resources to run
private final LinkedHashMap
@@ -106,9 +109,37 @@ public class ContainerScheduler extends AbstractService implements
private final AsyncDispatcher dispatcher;
private final NodeManagerMetrics metrics;
+ private final OpportunisticContainersQueuePolicy oppContainersQueuePolicy;
private Boolean usePauseEventForPreemption = false;
+ private static int getMaxOppQueueLengthFromConf(final Context context) {
+ if (context == null || context.getConf() == null) {
+ return YarnConfiguration
+ .DEFAULT_NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH;
+ }
+
+ return context.getConf().getInt(
+ YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH,
+ YarnConfiguration.DEFAULT_NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH
+ );
+ }
+
+ private static OpportunisticContainersQueuePolicy
+ getOppContainersQueuePolicyFromConf(final Context context) {
+ final OpportunisticContainersQueuePolicy queuePolicy;
+ if (context == null || context.getConf() == null) {
+ queuePolicy = OpportunisticContainersQueuePolicy.DEFAULT;
+ } else {
+ queuePolicy = context.getConf().getEnum(
+ YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_QUEUE_POLICY,
+ OpportunisticContainersQueuePolicy.DEFAULT
+ );
+ }
+
+ return queuePolicy;
+ }
+
@VisibleForTesting
ResourceHandlerChain resourceHandlerChain = null;
@@ -120,10 +151,9 @@ public class ContainerScheduler extends AbstractService implements
*/
public ContainerScheduler(Context context, AsyncDispatcher dispatcher,
NodeManagerMetrics metrics) {
- this(context, dispatcher, metrics, context.getConf().getInt(
- YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH,
- YarnConfiguration.
- DEFAULT_NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH));
+ this(context, dispatcher, metrics,
+ getOppContainersQueuePolicyFromConf(context),
+ getMaxOppQueueLengthFromConf(context));
}
@@ -149,13 +179,35 @@ public class ContainerScheduler extends AbstractService implements
@VisibleForTesting
public ContainerScheduler(Context context, AsyncDispatcher dispatcher,
NodeManagerMetrics metrics, int qLength) {
+ this(context, dispatcher, metrics,
+ getOppContainersQueuePolicyFromConf(context), qLength);
+ }
+
+ @VisibleForTesting
+ public ContainerScheduler(Context context, AsyncDispatcher dispatcher,
+ NodeManagerMetrics metrics,
+ OpportunisticContainersQueuePolicy oppContainersQueuePolicy,
+ int qLength) {
super(ContainerScheduler.class.getName());
this.context = context;
this.dispatcher = dispatcher;
this.metrics = metrics;
- this.maxOppQueueLength = (qLength <= 0) ? 0 : qLength;
this.utilizationTracker =
new AllocationBasedResourceUtilizationTracker(this);
+ this.oppContainersQueuePolicy = oppContainersQueuePolicy;
+ switch (oppContainersQueuePolicy) {
+ case BY_RESOURCES:
+ this.maxOppQueueLength = 0;
+ this.forceStartGuaranteedContainers = false;
+ LOG.info("Setting max opportunistic queue length to 0,"
+ + " as {} is incompatible with queue length",
+ oppContainersQueuePolicy);
+ break;
+ case BY_QUEUE_LEN:
+ default:
+ this.maxOppQueueLength = qLength;
+ this.forceStartGuaranteedContainers = (maxOppQueueLength <= 0);
+ }
this.opportunisticContainersStatus =
OpportunisticContainersStatus.newInstance();
}
@@ -187,7 +239,7 @@ public class ContainerScheduler extends AbstractService implements
shedQueuedOpportunisticContainers();
break;
case RECOVERY_COMPLETED:
- startPendingContainers(maxOppQueueLength <= 0);
+ startPendingContainers(forceStartGuaranteedContainers);
metrics.setQueuedContainers(queuedOpportunisticContainers.size(),
queuedGuaranteedContainers.size());
break;
@@ -243,7 +295,7 @@ public class ContainerScheduler extends AbstractService implements
LOG.warn(String.format("Could not update resources on " +
"continer update of %s", containerId), ex);
}
- startPendingContainers(maxOppQueueLength <= 0);
+ startPendingContainers(forceStartGuaranteedContainers);
metrics.setQueuedContainers(queuedOpportunisticContainers.size(),
queuedGuaranteedContainers.size());
}
@@ -371,7 +423,6 @@ public class ContainerScheduler extends AbstractService implements
ExecutionType.OPPORTUNISTIC) {
this.metrics.completeOpportunisticContainer(container.getResource());
}
- boolean forceStartGuaranteedContainers = (maxOppQueueLength <= 0);
startPendingContainers(forceStartGuaranteedContainers);
}
this.metrics.setQueuedContainers(queuedOpportunisticContainers.size(),
@@ -380,13 +431,13 @@ public class ContainerScheduler extends AbstractService implements
/**
* Start pending containers in the queue.
- * @param forceStartGuaranteedContaieners When this is true, start guaranteed
+ * @param forceStartGContainers When this is true, start guaranteed
* container without looking at available resource
*/
- private void startPendingContainers(boolean forceStartGuaranteedContaieners) {
+ private void startPendingContainers(boolean forceStartGContainers) {
// Start guaranteed containers that are paused, if resources available.
boolean resourcesAvailable = startContainers(
- queuedGuaranteedContainers.values(), forceStartGuaranteedContaieners);
+ queuedGuaranteedContainers.values(), forceStartGContainers);
// Start opportunistic containers, if resources available.
if (resourcesAvailable) {
startContainers(queuedOpportunisticContainers.values(), false);
@@ -429,6 +480,21 @@ public class ContainerScheduler extends AbstractService implements
return this.utilizationTracker.hasResourcesAvailable(container);
}
+ private boolean resourceAvailableToQueueOppContainer(
+ Container newOppContainer) {
+ final Resource cumulativeResource = Resource.newInstance(Resources.none());
+ for (final Container container : queuedGuaranteedContainers.values()) {
+ Resources.addTo(cumulativeResource, container.getResource());
+ }
+
+ for (final Container container : queuedOpportunisticContainers.values()) {
+ Resources.addTo(cumulativeResource, container.getResource());
+ }
+
+ Resources.addTo(cumulativeResource, newOppContainer.getResource());
+ return this.utilizationTracker.hasResourcesAvailable(cumulativeResource);
+ }
+
private boolean enqueueContainer(Container container) {
boolean isGuaranteedContainer = container.getContainerTokenIdentifier().
getExecutionType() == ExecutionType.GUARANTEED;
@@ -438,7 +504,21 @@ public class ContainerScheduler extends AbstractService implements
queuedGuaranteedContainers.put(container.getContainerId(), container);
isQueued = true;
} else {
- if (queuedOpportunisticContainers.size() < maxOppQueueLength) {
+ switch (oppContainersQueuePolicy) {
+ case BY_RESOURCES:
+ isQueued = resourceAvailableToQueueOppContainer(container);
+ break;
+ case BY_QUEUE_LEN:
+ default:
+ if (maxOppQueueLength <= 0) {
+ isQueued = false;
+ } else {
+ isQueued =
+ queuedOpportunisticContainers.size() < maxOppQueueLength;
+ }
+ }
+
+ if (isQueued) {
LOG.info("Opportunistic container {} will be queued at the NM.",
container.getContainerId());
queuedOpportunisticContainers.put(
@@ -451,7 +531,6 @@ public class ContainerScheduler extends AbstractService implements
container.sendKillEvent(
ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER,
"Opportunistic container queue is full.");
- isQueued = false;
}
}
@@ -484,7 +563,6 @@ public class ContainerScheduler extends AbstractService implements
// When opportunistic container not allowed (which is determined by
// max-queue length of pending opportunistic containers <= 0), start
// guaranteed containers without looking at available resources.
- boolean forceStartGuaranteedContainers = (maxOppQueueLength <= 0);
startPendingContainers(forceStartGuaranteedContainers);
// if the guaranteed container is queued, we need to preempt opportunistic
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/OpportunisticContainersQueuePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/OpportunisticContainersQueuePolicy.java
new file mode 100644
index 0000000000..ce7aafbc4e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/OpportunisticContainersQueuePolicy.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
+
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+/**
+ * Determines how to schedule opportunistic containers at the NodeManager,
+ * i.e., whether or not to accept, queue, or reject a container run request.
+ */
+public enum OpportunisticContainersQueuePolicy {
+ /**
+ * Determines whether or not to run a container by the queue capacity:
+ * {@link YarnConfiguration#NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH}.
+ * If there's enough capacity in the queue,
+ * queues the container, otherwise rejects it.
+ */
+ BY_QUEUE_LEN,
+ /**
+ * Determines whether or not to run a container based on the amount of
+ * resource capacity the node has.
+ * Sums up the resources running + already queued at the node, compares
+ * it with the total capacity of the node, and accepts the new container only
+ * if the computed resources above + resources used by the container
+ * is less than or equal to the node capacity.
+ */
+ BY_RESOURCES;
+
+ public static final OpportunisticContainersQueuePolicy DEFAULT = BY_QUEUE_LEN;
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java
index 3c17ecaffe..3d28de0204 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
@@ -56,4 +57,10 @@ public interface ResourceUtilizationTracker {
*/
boolean hasResourcesAvailable(Container container);
+ /**
+ * Check if NM has resources available currently to run requested resources.
+ * @param resource the resources.
+ * @return True, if NM has enough available resources.
+ */
+ boolean hasResourcesAvailable(Resource resource);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerSchedulerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerSchedulerTest.java
new file mode 100644
index 0000000000..5a495d7413
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerSchedulerTest.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager;
+
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ConfigurationException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.NMTokenIdentifier;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerStateTransitionListener;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerScheduler;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.TestContainerSchedulerQueuing;
+import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import static org.mockito.Mockito.spy;
+
+/**
+ * Base test class that overrides the behavior of
+ * {@link ContainerStateTransitionListener} for testing
+ * the {@link ContainerScheduler}.
+ */
+public class BaseContainerSchedulerTest extends BaseContainerManagerTest {
+ private static final long TWO_GB = 2048 * 1024 * 1024L;
+
+ public BaseContainerSchedulerTest() throws UnsupportedFileSystemException {
+ super();
+ }
+
+ static {
+ LOG = LoggerFactory.getLogger(TestContainerSchedulerQueuing.class);
+ }
+
+ public static class Listener implements ContainerStateTransitionListener {
+
+ private final Map> states =
+ new HashMap<>();
+ private final Map> events =
+ new HashMap<>();
+
+ public Map> getEvents() {
+ return events;
+ }
+
+ public Map> getStates() {
+ return states;
+ }
+
+ @Override
+ public void init(Context context) {}
+
+ @Override
+ public void preTransition(ContainerImpl op,
+ ContainerState beforeState,
+ ContainerEvent eventToBeProcessed) {
+ if (!states.containsKey(op.getContainerId())) {
+ states.put(op.getContainerId(), new ArrayList<>());
+ states.get(op.getContainerId()).add(beforeState);
+ events.put(op.getContainerId(), new ArrayList<>());
+ }
+ }
+
+ @Override
+ public void postTransition(ContainerImpl op, ContainerState beforeState,
+ ContainerState afterState, ContainerEvent processedEvent) {
+ states.get(op.getContainerId()).add(afterState);
+ events.get(op.getContainerId()).add(processedEvent.getType());
+ }
+ }
+
+ private boolean delayContainers = true;
+
+ protected void setDelayContainers(final boolean delayContainersParam) {
+ this.delayContainers = delayContainersParam;
+ }
+
+ @Override
+ protected ContainerManagerImpl createContainerManager(
+ DeletionService delSrvc) {
+ return new ContainerManagerImpl(context, exec, delSrvc,
+ getNodeStatusUpdater(), metrics, dirsHandler) {
+
+ @Override
+ protected UserGroupInformation getRemoteUgi() throws YarnException {
+ ApplicationId appId = ApplicationId.newInstance(0, 0);
+ ApplicationAttemptId appAttemptId =
+ ApplicationAttemptId.newInstance(appId, 1);
+ UserGroupInformation ugi =
+ UserGroupInformation.createRemoteUser(appAttemptId.toString());
+ ugi.addTokenIdentifier(new NMTokenIdentifier(appAttemptId, context
+ .getNodeId(), user, context.getNMTokenSecretManager().getCurrentKey()
+ .getKeyId()));
+ return ugi;
+ }
+
+ @Override
+ protected ContainersMonitor createContainersMonitor(
+ ContainerExecutor exec) {
+ return new ContainersMonitorImpl(exec, dispatcher, this.context) {
+ // Define resources available for containers to be executed.
+ @Override
+ public long getPmemAllocatedForContainers() {
+ return TWO_GB;
+ }
+
+ @Override
+ public long getVmemAllocatedForContainers() {
+ float pmemRatio = getConfig().getFloat(
+ YarnConfiguration.NM_VMEM_PMEM_RATIO,
+ YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
+ return (long) (pmemRatio * getPmemAllocatedForContainers());
+ }
+
+ @Override
+ public long getVCoresAllocatedForContainers() {
+ return 4;
+ }
+ };
+ }
+ };
+ }
+
+ @Override
+ protected ContainerExecutor createContainerExecutor() {
+ DefaultContainerExecutor exec = new DefaultContainerExecutor() {
+ ConcurrentMap oversleepMap =
+ new ConcurrentHashMap();
+
+ /**
+ * Launches the container.
+ * If delayContainers is turned on, then we sleep a while before
+ * starting the container.
+ */
+ @Override
+ public int launchContainer(ContainerStartContext ctx)
+ throws IOException, ConfigurationException {
+ final String containerId =
+ ctx.getContainer().getContainerId().toString();
+ oversleepMap.put(containerId, false);
+ if (delayContainers) {
+ try {
+ Thread.sleep(10000);
+ if (oversleepMap.get(containerId)) {
+ Thread.sleep(10000);
+ }
+ } catch (InterruptedException e) {
+ // Nothing..
+ }
+ }
+ return super.launchContainer(ctx);
+ }
+
+ @Override
+ public void pauseContainer(Container container) {
+ // To mimic pausing we force the container to be in the PAUSED state
+ // a little longer by oversleeping.
+ oversleepMap.put(container.getContainerId().toString(), true);
+ LOG.info("Container was paused");
+ }
+
+ @Override
+ public void resumeContainer(Container container) {
+ LOG.info("Container was resumed");
+ }
+ };
+ exec.setConf(conf);
+ return spy(exec);
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerOppContainersByResources.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerOppContainersByResources.java
new file mode 100644
index 0000000000..30fbbde129
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerOppContainersByResources.java
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
+
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ContainerSubState;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerSchedulerTest;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Tests the behavior of {@link ContainerScheduler} when its queueing policy
+ * is set to {@link OpportunisticContainersQueuePolicy#BY_RESOURCES}
+ * such that the NM only queues containers if there's enough resources
+ * on the node to start all queued containers.
+ */
+public class TestContainerSchedulerOppContainersByResources
+ extends BaseContainerSchedulerTest {
+ public TestContainerSchedulerOppContainersByResources()
+ throws UnsupportedFileSystemException {
+ }
+
+ @Override
+ public void setup() throws IOException {
+ conf.set(YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_QUEUE_POLICY,
+ OpportunisticContainersQueuePolicy.BY_RESOURCES.name());
+ super.setup();
+ containerManager.start();
+ }
+
+ /**
+ * Checks if a container is in a running or successfully run state.
+ * @param containerStatus the container status
+ * @return true if the container is running or completed
+ * with a successful state, false if the container has not started or failed
+ */
+ private static boolean isContainerInSuccessfulState(
+ final ContainerStatus containerStatus) {
+ final org.apache.hadoop.yarn.api.records.ContainerState state =
+ containerStatus.getState();
+ final ContainerSubState subState = containerStatus.getContainerSubState();
+ switch (subState) {
+ case RUNNING:
+ case COMPLETING:
+ return true;
+ case DONE:
+ // If the state is not COMPLETE, then the
+ // container is a failed container
+ return state ==
+ org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE;
+ default:
+ return false;
+ }
+ }
+
+ private void verifyRunAndKilledContainers(
+ final List statList,
+ final int numExpectedContainers, final Set runContainers,
+ final Set killedContainers)
+ throws TimeoutException, InterruptedException {
+ GenericTestUtils.waitFor(
+ () -> {
+ GetContainerStatusesRequest statRequest =
+ GetContainerStatusesRequest.newInstance(statList);
+ final List containerStatuses;
+ try {
+ containerStatuses = containerManager
+ .getContainerStatuses(statRequest).getContainerStatuses();
+ } catch (final Exception e) {
+ return false;
+ }
+
+ if (numExpectedContainers != containerStatuses.size()) {
+ return false;
+ }
+
+ for (final ContainerStatus status : containerStatuses) {
+ if (runContainers.contains(status.getContainerId())) {
+ if (!isContainerInSuccessfulState(status)) {
+ return false;
+ }
+ } else if (killedContainers.contains(status.getContainerId())) {
+ if (!status.getDiagnostics()
+ .contains("Opportunistic container queue is full")) {
+ return false;
+ }
+ } else {
+ return false;
+ }
+ }
+
+ return true;
+ }, 1000, 10000);
+ }
+
+ /**
+ * Verifies that nothing is queued at the container scheduler.
+ */
+ private void verifyNothingQueued() {
+ // Check that nothing is queued
+ ContainerScheduler containerScheduler =
+ containerManager.getContainerScheduler();
+ Assert.assertEquals(0,
+ containerScheduler.getNumQueuedContainers());
+ Assert.assertEquals(0,
+ containerScheduler.getNumQueuedGuaranteedContainers());
+ Assert.assertEquals(0,
+ containerScheduler.getNumQueuedOpportunisticContainers());
+ Assert.assertEquals(0,
+ metrics.getQueuedOpportunisticContainers());
+ Assert.assertEquals(0, metrics.getQueuedGuaranteedContainers());
+ }
+
+ /**
+ * Tests that newly arrived containers after the resources are filled up
+ * get killed and never gets run.
+ */
+ @Test
+ public void testKillOpportunisticWhenNoResourcesAvailable() throws Exception {
+ List startContainerRequests = new ArrayList<>();
+
+ // GContainer that takes up the whole node
+ startContainerRequests.add(StartContainerRequest.newInstance(
+ recordFactory.newRecordInstance(ContainerLaunchContext.class),
+ createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER,
+ context.getNodeId(),
+ user, BuilderUtils.newResource(2048, 1),
+ context.getContainerTokenSecretManager(), null,
+ ExecutionType.GUARANTEED)));
+
+ // OContainer that should be killed
+ startContainerRequests.add(StartContainerRequest.newInstance(
+ recordFactory.newRecordInstance(ContainerLaunchContext.class),
+ createContainerToken(createContainerId(1), DUMMY_RM_IDENTIFIER,
+ context.getNodeId(),
+ user, BuilderUtils.newResource(2048, 1),
+ context.getContainerTokenSecretManager(), null,
+ ExecutionType.OPPORTUNISTIC)));
+
+ StartContainersRequest allRequests =
+ StartContainersRequest.newInstance(startContainerRequests);
+ containerManager.startContainers(allRequests);
+
+ BaseContainerManagerTest.waitForNMContainerState(containerManager,
+ createContainerId(0), ContainerState.RUNNING, 40);
+
+ // Wait for the OContainer to get killed
+ BaseContainerManagerTest.waitForNMContainerState(containerManager,
+ createContainerId(1), ContainerState.DONE, 40);
+
+ // Get container statuses.
+ // Container 0 should be running and container 1 should be killed
+ List statList = ImmutableList.of(createContainerId(0),
+ createContainerId(1));
+
+ verifyRunAndKilledContainers(
+ statList, 2,
+ Collections.singleton(createContainerId(0)),
+ Collections.singleton(createContainerId(1))
+ );
+
+ verifyNothingQueued();
+ }
+
+ /**
+ * Tests that newly arrived containers after the resources are filled up
+ * get killed and never gets run.
+ * This scenario is more granular and runs more small container compared to
+ * {@link #testKillOpportunisticWhenNoResourcesAvailable()}.
+ */
+ @Test
+ public void testOpportunisticRunsWhenResourcesAvailable() throws Exception {
+ List startContainerRequests = new ArrayList<>();
+ final int numContainers = 8;
+ final int numContainersQueued = 4;
+ final Set runContainers = new HashSet<>();
+ final Set killedContainers = new HashSet<>();
+
+ for (int i = 0; i < numContainers; i++) {
+ // OContainers that should be run
+ startContainerRequests.add(StartContainerRequest.newInstance(
+ recordFactory.newRecordInstance(ContainerLaunchContext.class),
+ createContainerToken(createContainerId(i), DUMMY_RM_IDENTIFIER,
+ context.getNodeId(),
+ user, BuilderUtils.newResource(512, 1),
+ context.getContainerTokenSecretManager(), null,
+ ExecutionType.OPPORTUNISTIC)));
+ }
+
+ StartContainersRequest allRequests =
+ StartContainersRequest.newInstance(startContainerRequests);
+ containerManager.startContainers(allRequests);
+
+ // Wait for containers to start
+ for (int i = 0; i < numContainersQueued; i++) {
+ final ContainerId containerId = createContainerId(i);
+ BaseContainerManagerTest
+ .waitForNMContainerState(containerManager, containerId,
+ ContainerState.RUNNING, 40);
+ runContainers.add(containerId);
+ }
+
+ // Wait for containers to be killed
+ for (int i = numContainersQueued; i < numContainers; i++) {
+ final ContainerId containerId = createContainerId(i);
+ BaseContainerManagerTest
+ .waitForNMContainerState(containerManager, createContainerId(i),
+ ContainerState.DONE, 40);
+ killedContainers.add(containerId);
+ }
+
+ Thread.sleep(5000);
+
+ // Get container statuses.
+ List statList = new ArrayList<>();
+ for (int i = 0; i < numContainers; i++) {
+ statList.add(createContainerId(i));
+ }
+
+
+ verifyRunAndKilledContainers(
+ statList, numContainers, runContainers, killedContainers);
+
+ verifyNothingQueued();
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java
index 508b8bd091..218d03afe7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java
@@ -18,18 +18,7 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
import org.apache.hadoop.fs.UnsupportedFileSystemException;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateResponse;
@@ -37,8 +26,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
@@ -46,35 +33,27 @@ import org.apache.hadoop.yarn.api.records.ContainerSubState;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.ConfigurationException;
import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
-import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
-import org.apache.hadoop.yarn.server.nodemanager.ContainerStateTransitionListener;
-import org.apache.hadoop.yarn.server.nodemanager.Context;
-import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
-import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerSchedulerTest;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerChain;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
-import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.Assert;
import org.junit.Test;
-import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -82,137 +61,11 @@ import static org.mockito.Mockito.verify;
* Tests to verify that the {@link ContainerScheduler} is able to queue and
* make room for containers.
*/
-public class TestContainerSchedulerQueuing extends BaseContainerManagerTest {
+public class TestContainerSchedulerQueuing extends BaseContainerSchedulerTest {
public TestContainerSchedulerQueuing() throws UnsupportedFileSystemException {
super();
}
- static {
- LOG = LoggerFactory.getLogger(TestContainerSchedulerQueuing.class);
- }
-
- private static class Listener implements ContainerStateTransitionListener {
-
- private final Map> states = new HashMap<>();
- private final Map> events =
- new HashMap<>();
-
- @Override
- public void init(Context context) {}
-
- @Override
- public void preTransition(ContainerImpl op,
- org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
- ContainerState beforeState,
- ContainerEvent eventToBeProcessed) {
- if (!states.containsKey(op.getContainerId())) {
- states.put(op.getContainerId(), new ArrayList<>());
- states.get(op.getContainerId()).add(beforeState);
- events.put(op.getContainerId(), new ArrayList<>());
- }
- }
-
- @Override
- public void postTransition(ContainerImpl op,
- org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
- ContainerState beforeState,
- org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
- ContainerState afterState,
- ContainerEvent processedEvent) {
- states.get(op.getContainerId()).add(afterState);
- events.get(op.getContainerId()).add(processedEvent.getType());
- }
- }
-
- private boolean delayContainers = true;
-
- @Override
- protected ContainerManagerImpl createContainerManager(
- DeletionService delSrvc) {
- return new ContainerManagerImpl(context, exec, delSrvc,
- getNodeStatusUpdater(), metrics, dirsHandler) {
-
- @Override
- protected UserGroupInformation getRemoteUgi() throws YarnException {
- ApplicationId appId = ApplicationId.newInstance(0, 0);
- ApplicationAttemptId appAttemptId =
- ApplicationAttemptId.newInstance(appId, 1);
- UserGroupInformation ugi =
- UserGroupInformation.createRemoteUser(appAttemptId.toString());
- ugi.addTokenIdentifier(new NMTokenIdentifier(appAttemptId, context
- .getNodeId(), user, context.getNMTokenSecretManager().getCurrentKey()
- .getKeyId()));
- return ugi;
- }
-
- @Override
- protected ContainersMonitor createContainersMonitor(
- ContainerExecutor exec) {
- return new ContainersMonitorImpl(exec, dispatcher, this.context) {
- // Define resources available for containers to be executed.
- @Override
- public long getPmemAllocatedForContainers() {
- return 2048 * 1024 * 1024L;
- }
-
- @Override
- public long getVmemAllocatedForContainers() {
- float pmemRatio = getConfig().getFloat(
- YarnConfiguration.NM_VMEM_PMEM_RATIO,
- YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
- return (long) (pmemRatio * getPmemAllocatedForContainers());
- }
-
- @Override
- public long getVCoresAllocatedForContainers() {
- return 4;
- }
- };
- }
- };
- }
-
- @Override
- protected ContainerExecutor createContainerExecutor() {
- DefaultContainerExecutor exec = new DefaultContainerExecutor() {
- ConcurrentMap oversleepMap =
- new ConcurrentHashMap();
- @Override
- public int launchContainer(ContainerStartContext ctx)
- throws IOException, ConfigurationException {
- oversleepMap.put(ctx.getContainer().getContainerId().toString(), false);
- if (delayContainers) {
- try {
- Thread.sleep(10000);
- if(oversleepMap.get(ctx.getContainer().getContainerId().toString())
- == true) {
- Thread.sleep(10000);
- }
- } catch (InterruptedException e) {
- // Nothing..
- }
- }
- return super.launchContainer(ctx);
- }
-
- @Override
- public void pauseContainer(Container container) {
- // To mimic pausing we force the container to be in the PAUSED state
- // a little longer by oversleeping.
- oversleepMap.put(container.getContainerId().toString(), true);
- LOG.info("Container was paused");
- }
-
- @Override
- public void resumeContainer(Container container) {
- LOG.info("Container was resumed");
- }
- };
- exec.setConf(conf);
- return spy(exec);
- }
-
@Override
public void setup() throws IOException {
conf.setInt(
@@ -408,7 +261,7 @@ public class TestContainerSchedulerQueuing extends BaseContainerManagerTest {
* @throws Exception
*/
@Test
- public void testStartOpportunistcsWhenOppQueueIsFull() throws Exception {
+ public void testStartOpportunisticsWhenOppQueueIsFull() throws Exception {
containerManager.start();
List list = new ArrayList<>();
@@ -655,7 +508,7 @@ public class TestContainerSchedulerQueuing extends BaseContainerManagerTest {
List containerStates =
- listener.states.get(createContainerId(0));
+ listener.getStates().get(createContainerId(0));
Assert.assertEquals(Arrays.asList(
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState.NEW,
@@ -676,7 +529,7 @@ public class TestContainerSchedulerQueuing extends BaseContainerManagerTest {
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState.DONE), containerStates);
List containerEventTypes =
- listener.events.get(createContainerId(0));
+ listener.getEvents().get(createContainerId(0));
Assert.assertEquals(Arrays.asList(ContainerEventType.INIT_CONTAINER,
ContainerEventType.CONTAINER_LAUNCHED,
ContainerEventType.PAUSE_CONTAINER,
@@ -1230,7 +1083,7 @@ public class TestContainerSchedulerQueuing extends BaseContainerManagerTest {
List containerStates =
- listener.states.get(createContainerId(1));
+ listener.getStates().get(createContainerId(1));
Assert.assertEquals(Arrays.asList(
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState.NEW,
@@ -1241,7 +1094,7 @@ public class TestContainerSchedulerQueuing extends BaseContainerManagerTest {
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState.RUNNING), containerStates);
List containerEventTypes =
- listener.events.get(createContainerId(1));
+ listener.getEvents().get(createContainerId(1));
Assert.assertEquals(Arrays.asList(
ContainerEventType.INIT_CONTAINER,
ContainerEventType.UPDATE_CONTAINER_TOKEN,
@@ -1254,7 +1107,7 @@ public class TestContainerSchedulerQueuing extends BaseContainerManagerTest {
@Test
public void testContainerUpdateExecTypeGuaranteedToOpportunistic()
throws Exception {
- delayContainers = true;
+ setDelayContainers(true);
containerManager.start();
// Construct the Container-id
ContainerId cId = createContainerId(0);