From 3ed3c74a6c290b037c103d3f340f5a640d30e509 Mon Sep 17 00:00:00 2001 From: Andrew Chung Date: Mon, 24 Jan 2022 11:03:36 -0500 Subject: [PATCH] YARN-11015. Decouple queue capacity with ability to run OPPORTUNISTIC container (#3779) --- .../hadoop/yarn/conf/YarnConfiguration.java | 16 +- .../src/main/resources/yarn-default.xml | 23 +- ...cationBasedResourceUtilizationTracker.java | 58 +++- .../scheduler/ContainerScheduler.java | 106 ++++++- .../OpportunisticContainersQueuePolicy.java | 46 +++ .../scheduler/ResourceUtilizationTracker.java | 7 + .../BaseContainerSchedulerTest.java | 208 ++++++++++++++ ...inerSchedulerOppContainersByResources.java | 263 ++++++++++++++++++ .../TestContainerSchedulerQueuing.java | 177 +----------- 9 files changed, 711 insertions(+), 193 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/OpportunisticContainersQueuePolicy.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerSchedulerTest.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerOppContainersByResources.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 7df41498ef..7ed46109b3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1255,7 +1255,21 @@ public class YarnConfiguration extends Configuration { /** Prefix for all node manager configs.*/ public static final String NM_PREFIX = "yarn.nodemanager."; - /** Max Queue length of OPPORTUNISTIC containers on the NM. */ + /** + * At the NM, the policy to determine whether to queue an + * OPPORTUNISTIC container or not. + * If set to BY_QUEUE_LEN, uses the queue capacity, as set by + * {@link YarnConfiguration#NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH}, + * to limit how many containers to accept/queue. + * If set to BY_RESOURCES, limits the number of containers + * accepted based on the resource capacity of the node. + */ + public static final String NM_OPPORTUNISTIC_CONTAINERS_QUEUE_POLICY = + NM_PREFIX + "opportunistic-containers-queue-policy"; + + /** Max Queue length of OPPORTUNISTIC containers on the NM. + * If set to 0, NM does not accept any OPPORTUNISTIC containers. + * If set to {@literal > 0}, enforces the queue capacity. */ public static final String NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH = NM_PREFIX + "opportunistic-containers-max-queue-length"; public static final int DEFAULT_NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index d6bef5b3a9..53b6b14cfd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -1292,8 +1292,27 @@ - Max number of OPPORTUNISTIC containers to queue at the - nodemanager. + + At the NM, the policy to determine whether to queue an + OPPORTUNISTIC container or not. + If set to BY_QUEUE_LEN, uses the queue capacity, as set by + yarn.nodemanager.opportunistic-containers-max-queue-length + to limit how many containers to accept/queue. + If set to BY_RESOURCES, limits the number of containers + accepted based on the resource capacity of the node. + + yarn.nodemanager.opportunistic-containers-queue-policy + BY_QUEUE_LEN + + + + + Max number of OPPORTUNISTIC containers to queue at the + nodemanager (NM). If the value is 0 or negative, + NMs do not allow any OPPORTUNISTIC containers. + If the value is positive, the NM caps the number of OPPORTUNISTIC + containers that can be queued at the NM. + yarn.nodemanager.opportunistic-containers-max-queue-length 0 diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceUtilizationTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceUtilizationTracker.java index 100676d27d..879a5d9d0b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceUtilizationTracker.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/AllocationBasedResourceUtilizationTracker.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; @@ -34,6 +35,9 @@ public class AllocationBasedResourceUtilizationTracker implements private static final Logger LOG = LoggerFactory.getLogger(AllocationBasedResourceUtilizationTracker.class); + private static final long LEFT_SHIFT_MB_IN_BYTES = 20; + private static final int RIGHT_SHIFT_BYTES_IN_MB = 20; + private ResourceUtilization containersAllocation; private ContainerScheduler scheduler; @@ -80,10 +84,34 @@ public class AllocationBasedResourceUtilizationTracker implements */ @Override public boolean hasResourcesAvailable(Container container) { - long pMemBytes = container.getResource().getMemorySize() * 1024 * 1024L; - return hasResourcesAvailable(pMemBytes, - (long) (getContainersMonitor().getVmemRatio()* pMemBytes), - container.getResource().getVirtualCores()); + return hasResourcesAvailable(container.getResource()); + } + + /** + * Converts memory in megabytes to bytes by bitwise left-shifting 20 times. + * @param memMB the memory in megabytes + * @return the memory in bytes + */ + private static long convertMBToBytes(final long memMB) { + return memMB << LEFT_SHIFT_MB_IN_BYTES; + } + + /** + * Converts memory in bytes to megabytes by bitwise right-shifting 20 times. + * @param bytes the memory in bytes + * @return the memory in megabytes + */ + private static long convertBytesToMB(final long bytes) { + return bytes >> RIGHT_SHIFT_BYTES_IN_MB; + } + + @Override + public boolean hasResourcesAvailable(Resource resource) { + long pMemBytes = convertMBToBytes(resource.getMemorySize()); + final long vmemBytes = (long) + (getContainersMonitor().getVmemRatio() * pMemBytes); + return hasResourcesAvailable( + pMemBytes, vmemBytes, resource.getVirtualCores()); } private boolean hasResourcesAvailable(long pMemBytes, long vMemBytes, @@ -92,13 +120,13 @@ public class AllocationBasedResourceUtilizationTracker implements if (LOG.isDebugEnabled()) { LOG.debug("pMemCheck [current={} + asked={} > allowed={}]", this.containersAllocation.getPhysicalMemory(), - (pMemBytes >> 20), - (getContainersMonitor().getPmemAllocatedForContainers() >> 20)); + convertBytesToMB(pMemBytes), + convertBytesToMB( + getContainersMonitor().getPmemAllocatedForContainers())); } if (this.containersAllocation.getPhysicalMemory() + - (int) (pMemBytes >> 20) > - (int) (getContainersMonitor() - .getPmemAllocatedForContainers() >> 20)) { + convertBytesToMB(pMemBytes) > convertBytesToMB( + getContainersMonitor().getPmemAllocatedForContainers())) { return false; } @@ -106,15 +134,17 @@ public class AllocationBasedResourceUtilizationTracker implements LOG.debug("before vMemCheck" + "[isEnabled={}, current={} + asked={} > allowed={}]", getContainersMonitor().isVmemCheckEnabled(), - this.containersAllocation.getVirtualMemory(), (vMemBytes >> 20), - (getContainersMonitor().getVmemAllocatedForContainers() >> 20)); + this.containersAllocation.getVirtualMemory(), + convertBytesToMB(vMemBytes), + convertBytesToMB( + getContainersMonitor().getVmemAllocatedForContainers())); } // Check virtual memory. if (getContainersMonitor().isVmemCheckEnabled() && this.containersAllocation.getVirtualMemory() + - (int) (vMemBytes >> 20) > - (int) (getContainersMonitor() - .getVmemAllocatedForContainers() >> 20)) { + convertBytesToMB(vMemBytes) > + convertBytesToMB(getContainersMonitor() + .getVmemAllocatedForContainers())) { return false; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java index b52e125367..4ae7936c83 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java @@ -24,6 +24,7 @@ import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; @@ -46,6 +47,7 @@ import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService .RecoveredContainerState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus; +import org.apache.hadoop.yarn.util.resource.Resources; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,6 +76,7 @@ public class ContainerScheduler extends AbstractService implements private final Context context; // Capacity of the queue for opportunistic Containers. private final int maxOppQueueLength; + private final boolean forceStartGuaranteedContainers; // Queue of Guaranteed Containers waiting for resources to run private final LinkedHashMap @@ -106,9 +109,37 @@ public class ContainerScheduler extends AbstractService implements private final AsyncDispatcher dispatcher; private final NodeManagerMetrics metrics; + private final OpportunisticContainersQueuePolicy oppContainersQueuePolicy; private Boolean usePauseEventForPreemption = false; + private static int getMaxOppQueueLengthFromConf(final Context context) { + if (context == null || context.getConf() == null) { + return YarnConfiguration + .DEFAULT_NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH; + } + + return context.getConf().getInt( + YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH, + YarnConfiguration.DEFAULT_NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH + ); + } + + private static OpportunisticContainersQueuePolicy + getOppContainersQueuePolicyFromConf(final Context context) { + final OpportunisticContainersQueuePolicy queuePolicy; + if (context == null || context.getConf() == null) { + queuePolicy = OpportunisticContainersQueuePolicy.DEFAULT; + } else { + queuePolicy = context.getConf().getEnum( + YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_QUEUE_POLICY, + OpportunisticContainersQueuePolicy.DEFAULT + ); + } + + return queuePolicy; + } + @VisibleForTesting ResourceHandlerChain resourceHandlerChain = null; @@ -120,10 +151,9 @@ public class ContainerScheduler extends AbstractService implements */ public ContainerScheduler(Context context, AsyncDispatcher dispatcher, NodeManagerMetrics metrics) { - this(context, dispatcher, metrics, context.getConf().getInt( - YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH, - YarnConfiguration. - DEFAULT_NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH)); + this(context, dispatcher, metrics, + getOppContainersQueuePolicyFromConf(context), + getMaxOppQueueLengthFromConf(context)); } @@ -149,13 +179,35 @@ public class ContainerScheduler extends AbstractService implements @VisibleForTesting public ContainerScheduler(Context context, AsyncDispatcher dispatcher, NodeManagerMetrics metrics, int qLength) { + this(context, dispatcher, metrics, + getOppContainersQueuePolicyFromConf(context), qLength); + } + + @VisibleForTesting + public ContainerScheduler(Context context, AsyncDispatcher dispatcher, + NodeManagerMetrics metrics, + OpportunisticContainersQueuePolicy oppContainersQueuePolicy, + int qLength) { super(ContainerScheduler.class.getName()); this.context = context; this.dispatcher = dispatcher; this.metrics = metrics; - this.maxOppQueueLength = (qLength <= 0) ? 0 : qLength; this.utilizationTracker = new AllocationBasedResourceUtilizationTracker(this); + this.oppContainersQueuePolicy = oppContainersQueuePolicy; + switch (oppContainersQueuePolicy) { + case BY_RESOURCES: + this.maxOppQueueLength = 0; + this.forceStartGuaranteedContainers = false; + LOG.info("Setting max opportunistic queue length to 0," + + " as {} is incompatible with queue length", + oppContainersQueuePolicy); + break; + case BY_QUEUE_LEN: + default: + this.maxOppQueueLength = qLength; + this.forceStartGuaranteedContainers = (maxOppQueueLength <= 0); + } this.opportunisticContainersStatus = OpportunisticContainersStatus.newInstance(); } @@ -187,7 +239,7 @@ public class ContainerScheduler extends AbstractService implements shedQueuedOpportunisticContainers(); break; case RECOVERY_COMPLETED: - startPendingContainers(maxOppQueueLength <= 0); + startPendingContainers(forceStartGuaranteedContainers); metrics.setQueuedContainers(queuedOpportunisticContainers.size(), queuedGuaranteedContainers.size()); break; @@ -243,7 +295,7 @@ public class ContainerScheduler extends AbstractService implements LOG.warn(String.format("Could not update resources on " + "continer update of %s", containerId), ex); } - startPendingContainers(maxOppQueueLength <= 0); + startPendingContainers(forceStartGuaranteedContainers); metrics.setQueuedContainers(queuedOpportunisticContainers.size(), queuedGuaranteedContainers.size()); } @@ -371,7 +423,6 @@ public class ContainerScheduler extends AbstractService implements ExecutionType.OPPORTUNISTIC) { this.metrics.completeOpportunisticContainer(container.getResource()); } - boolean forceStartGuaranteedContainers = (maxOppQueueLength <= 0); startPendingContainers(forceStartGuaranteedContainers); } this.metrics.setQueuedContainers(queuedOpportunisticContainers.size(), @@ -380,13 +431,13 @@ public class ContainerScheduler extends AbstractService implements /** * Start pending containers in the queue. - * @param forceStartGuaranteedContaieners When this is true, start guaranteed + * @param forceStartGContainers When this is true, start guaranteed * container without looking at available resource */ - private void startPendingContainers(boolean forceStartGuaranteedContaieners) { + private void startPendingContainers(boolean forceStartGContainers) { // Start guaranteed containers that are paused, if resources available. boolean resourcesAvailable = startContainers( - queuedGuaranteedContainers.values(), forceStartGuaranteedContaieners); + queuedGuaranteedContainers.values(), forceStartGContainers); // Start opportunistic containers, if resources available. if (resourcesAvailable) { startContainers(queuedOpportunisticContainers.values(), false); @@ -429,6 +480,21 @@ public class ContainerScheduler extends AbstractService implements return this.utilizationTracker.hasResourcesAvailable(container); } + private boolean resourceAvailableToQueueOppContainer( + Container newOppContainer) { + final Resource cumulativeResource = Resource.newInstance(Resources.none()); + for (final Container container : queuedGuaranteedContainers.values()) { + Resources.addTo(cumulativeResource, container.getResource()); + } + + for (final Container container : queuedOpportunisticContainers.values()) { + Resources.addTo(cumulativeResource, container.getResource()); + } + + Resources.addTo(cumulativeResource, newOppContainer.getResource()); + return this.utilizationTracker.hasResourcesAvailable(cumulativeResource); + } + private boolean enqueueContainer(Container container) { boolean isGuaranteedContainer = container.getContainerTokenIdentifier(). getExecutionType() == ExecutionType.GUARANTEED; @@ -438,7 +504,21 @@ public class ContainerScheduler extends AbstractService implements queuedGuaranteedContainers.put(container.getContainerId(), container); isQueued = true; } else { - if (queuedOpportunisticContainers.size() < maxOppQueueLength) { + switch (oppContainersQueuePolicy) { + case BY_RESOURCES: + isQueued = resourceAvailableToQueueOppContainer(container); + break; + case BY_QUEUE_LEN: + default: + if (maxOppQueueLength <= 0) { + isQueued = false; + } else { + isQueued = + queuedOpportunisticContainers.size() < maxOppQueueLength; + } + } + + if (isQueued) { LOG.info("Opportunistic container {} will be queued at the NM.", container.getContainerId()); queuedOpportunisticContainers.put( @@ -451,7 +531,6 @@ public class ContainerScheduler extends AbstractService implements container.sendKillEvent( ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER, "Opportunistic container queue is full."); - isQueued = false; } } @@ -484,7 +563,6 @@ public class ContainerScheduler extends AbstractService implements // When opportunistic container not allowed (which is determined by // max-queue length of pending opportunistic containers <= 0), start // guaranteed containers without looking at available resources. - boolean forceStartGuaranteedContainers = (maxOppQueueLength <= 0); startPendingContainers(forceStartGuaranteedContainers); // if the guaranteed container is queued, we need to preempt opportunistic diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/OpportunisticContainersQueuePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/OpportunisticContainersQueuePolicy.java new file mode 100644 index 0000000000..ce7aafbc4e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/OpportunisticContainersQueuePolicy.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler; + +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +/** + * Determines how to schedule opportunistic containers at the NodeManager, + * i.e., whether or not to accept, queue, or reject a container run request. + */ +public enum OpportunisticContainersQueuePolicy { + /** + * Determines whether or not to run a container by the queue capacity: + * {@link YarnConfiguration#NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH}. + * If there's enough capacity in the queue, + * queues the container, otherwise rejects it. + */ + BY_QUEUE_LEN, + /** + * Determines whether or not to run a container based on the amount of + * resource capacity the node has. + * Sums up the resources running + already queued at the node, compares + * it with the total capacity of the node, and accepts the new container only + * if the computed resources above + resources used by the container + * is less than or equal to the node capacity. + */ + BY_RESOURCES; + + public static final OpportunisticContainersQueuePolicy DEFAULT = BY_QUEUE_LEN; +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java index 3c17ecaffe..3d28de0204 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ResourceUtilizationTracker.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; @@ -56,4 +57,10 @@ public interface ResourceUtilizationTracker { */ boolean hasResourcesAvailable(Container container); + /** + * Check if NM has resources available currently to run requested resources. + * @param resource the resources. + * @return True, if NM has enough available resources. + */ + boolean hasResourcesAvailable(Resource resource); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerSchedulerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerSchedulerTest.java new file mode 100644 index 0000000000..5a495d7413 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerSchedulerTest.java @@ -0,0 +1,208 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager; + +import org.apache.hadoop.fs.UnsupportedFileSystemException; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.ConfigurationException; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.NMTokenIdentifier; +import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; +import org.apache.hadoop.yarn.server.nodemanager.ContainerStateTransitionListener; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor; +import org.apache.hadoop.yarn.server.nodemanager.DeletionService; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerScheduler; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.TestContainerSchedulerQueuing; +import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import static org.mockito.Mockito.spy; + +/** + * Base test class that overrides the behavior of + * {@link ContainerStateTransitionListener} for testing + * the {@link ContainerScheduler}. + */ +public class BaseContainerSchedulerTest extends BaseContainerManagerTest { + private static final long TWO_GB = 2048 * 1024 * 1024L; + + public BaseContainerSchedulerTest() throws UnsupportedFileSystemException { + super(); + } + + static { + LOG = LoggerFactory.getLogger(TestContainerSchedulerQueuing.class); + } + + public static class Listener implements ContainerStateTransitionListener { + + private final Map> states = + new HashMap<>(); + private final Map> events = + new HashMap<>(); + + public Map> getEvents() { + return events; + } + + public Map> getStates() { + return states; + } + + @Override + public void init(Context context) {} + + @Override + public void preTransition(ContainerImpl op, + ContainerState beforeState, + ContainerEvent eventToBeProcessed) { + if (!states.containsKey(op.getContainerId())) { + states.put(op.getContainerId(), new ArrayList<>()); + states.get(op.getContainerId()).add(beforeState); + events.put(op.getContainerId(), new ArrayList<>()); + } + } + + @Override + public void postTransition(ContainerImpl op, ContainerState beforeState, + ContainerState afterState, ContainerEvent processedEvent) { + states.get(op.getContainerId()).add(afterState); + events.get(op.getContainerId()).add(processedEvent.getType()); + } + } + + private boolean delayContainers = true; + + protected void setDelayContainers(final boolean delayContainersParam) { + this.delayContainers = delayContainersParam; + } + + @Override + protected ContainerManagerImpl createContainerManager( + DeletionService delSrvc) { + return new ContainerManagerImpl(context, exec, delSrvc, + getNodeStatusUpdater(), metrics, dirsHandler) { + + @Override + protected UserGroupInformation getRemoteUgi() throws YarnException { + ApplicationId appId = ApplicationId.newInstance(0, 0); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 1); + UserGroupInformation ugi = + UserGroupInformation.createRemoteUser(appAttemptId.toString()); + ugi.addTokenIdentifier(new NMTokenIdentifier(appAttemptId, context + .getNodeId(), user, context.getNMTokenSecretManager().getCurrentKey() + .getKeyId())); + return ugi; + } + + @Override + protected ContainersMonitor createContainersMonitor( + ContainerExecutor exec) { + return new ContainersMonitorImpl(exec, dispatcher, this.context) { + // Define resources available for containers to be executed. + @Override + public long getPmemAllocatedForContainers() { + return TWO_GB; + } + + @Override + public long getVmemAllocatedForContainers() { + float pmemRatio = getConfig().getFloat( + YarnConfiguration.NM_VMEM_PMEM_RATIO, + YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO); + return (long) (pmemRatio * getPmemAllocatedForContainers()); + } + + @Override + public long getVCoresAllocatedForContainers() { + return 4; + } + }; + } + }; + } + + @Override + protected ContainerExecutor createContainerExecutor() { + DefaultContainerExecutor exec = new DefaultContainerExecutor() { + ConcurrentMap oversleepMap = + new ConcurrentHashMap(); + + /** + * Launches the container. + * If delayContainers is turned on, then we sleep a while before + * starting the container. + */ + @Override + public int launchContainer(ContainerStartContext ctx) + throws IOException, ConfigurationException { + final String containerId = + ctx.getContainer().getContainerId().toString(); + oversleepMap.put(containerId, false); + if (delayContainers) { + try { + Thread.sleep(10000); + if (oversleepMap.get(containerId)) { + Thread.sleep(10000); + } + } catch (InterruptedException e) { + // Nothing.. + } + } + return super.launchContainer(ctx); + } + + @Override + public void pauseContainer(Container container) { + // To mimic pausing we force the container to be in the PAUSED state + // a little longer by oversleeping. + oversleepMap.put(container.getContainerId().toString(), true); + LOG.info("Container was paused"); + } + + @Override + public void resumeContainer(Container container) { + LOG.info("Container was resumed"); + } + }; + exec.setConf(conf); + return spy(exec); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerOppContainersByResources.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerOppContainersByResources.java new file mode 100644 index 0000000000..30fbbde129 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerOppContainersByResources.java @@ -0,0 +1,263 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler; + +import org.apache.hadoop.fs.UnsupportedFileSystemException; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ContainerSubState; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerSchedulerTest; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeoutException; + +/** + * Tests the behavior of {@link ContainerScheduler} when its queueing policy + * is set to {@link OpportunisticContainersQueuePolicy#BY_RESOURCES} + * such that the NM only queues containers if there's enough resources + * on the node to start all queued containers. + */ +public class TestContainerSchedulerOppContainersByResources + extends BaseContainerSchedulerTest { + public TestContainerSchedulerOppContainersByResources() + throws UnsupportedFileSystemException { + } + + @Override + public void setup() throws IOException { + conf.set(YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_QUEUE_POLICY, + OpportunisticContainersQueuePolicy.BY_RESOURCES.name()); + super.setup(); + containerManager.start(); + } + + /** + * Checks if a container is in a running or successfully run state. + * @param containerStatus the container status + * @return true if the container is running or completed + * with a successful state, false if the container has not started or failed + */ + private static boolean isContainerInSuccessfulState( + final ContainerStatus containerStatus) { + final org.apache.hadoop.yarn.api.records.ContainerState state = + containerStatus.getState(); + final ContainerSubState subState = containerStatus.getContainerSubState(); + switch (subState) { + case RUNNING: + case COMPLETING: + return true; + case DONE: + // If the state is not COMPLETE, then the + // container is a failed container + return state == + org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE; + default: + return false; + } + } + + private void verifyRunAndKilledContainers( + final List statList, + final int numExpectedContainers, final Set runContainers, + final Set killedContainers) + throws TimeoutException, InterruptedException { + GenericTestUtils.waitFor( + () -> { + GetContainerStatusesRequest statRequest = + GetContainerStatusesRequest.newInstance(statList); + final List containerStatuses; + try { + containerStatuses = containerManager + .getContainerStatuses(statRequest).getContainerStatuses(); + } catch (final Exception e) { + return false; + } + + if (numExpectedContainers != containerStatuses.size()) { + return false; + } + + for (final ContainerStatus status : containerStatuses) { + if (runContainers.contains(status.getContainerId())) { + if (!isContainerInSuccessfulState(status)) { + return false; + } + } else if (killedContainers.contains(status.getContainerId())) { + if (!status.getDiagnostics() + .contains("Opportunistic container queue is full")) { + return false; + } + } else { + return false; + } + } + + return true; + }, 1000, 10000); + } + + /** + * Verifies that nothing is queued at the container scheduler. + */ + private void verifyNothingQueued() { + // Check that nothing is queued + ContainerScheduler containerScheduler = + containerManager.getContainerScheduler(); + Assert.assertEquals(0, + containerScheduler.getNumQueuedContainers()); + Assert.assertEquals(0, + containerScheduler.getNumQueuedGuaranteedContainers()); + Assert.assertEquals(0, + containerScheduler.getNumQueuedOpportunisticContainers()); + Assert.assertEquals(0, + metrics.getQueuedOpportunisticContainers()); + Assert.assertEquals(0, metrics.getQueuedGuaranteedContainers()); + } + + /** + * Tests that newly arrived containers after the resources are filled up + * get killed and never gets run. + */ + @Test + public void testKillOpportunisticWhenNoResourcesAvailable() throws Exception { + List startContainerRequests = new ArrayList<>(); + + // GContainer that takes up the whole node + startContainerRequests.add(StartContainerRequest.newInstance( + recordFactory.newRecordInstance(ContainerLaunchContext.class), + createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER, + context.getNodeId(), + user, BuilderUtils.newResource(2048, 1), + context.getContainerTokenSecretManager(), null, + ExecutionType.GUARANTEED))); + + // OContainer that should be killed + startContainerRequests.add(StartContainerRequest.newInstance( + recordFactory.newRecordInstance(ContainerLaunchContext.class), + createContainerToken(createContainerId(1), DUMMY_RM_IDENTIFIER, + context.getNodeId(), + user, BuilderUtils.newResource(2048, 1), + context.getContainerTokenSecretManager(), null, + ExecutionType.OPPORTUNISTIC))); + + StartContainersRequest allRequests = + StartContainersRequest.newInstance(startContainerRequests); + containerManager.startContainers(allRequests); + + BaseContainerManagerTest.waitForNMContainerState(containerManager, + createContainerId(0), ContainerState.RUNNING, 40); + + // Wait for the OContainer to get killed + BaseContainerManagerTest.waitForNMContainerState(containerManager, + createContainerId(1), ContainerState.DONE, 40); + + // Get container statuses. + // Container 0 should be running and container 1 should be killed + List statList = ImmutableList.of(createContainerId(0), + createContainerId(1)); + + verifyRunAndKilledContainers( + statList, 2, + Collections.singleton(createContainerId(0)), + Collections.singleton(createContainerId(1)) + ); + + verifyNothingQueued(); + } + + /** + * Tests that newly arrived containers after the resources are filled up + * get killed and never gets run. + * This scenario is more granular and runs more small container compared to + * {@link #testKillOpportunisticWhenNoResourcesAvailable()}. + */ + @Test + public void testOpportunisticRunsWhenResourcesAvailable() throws Exception { + List startContainerRequests = new ArrayList<>(); + final int numContainers = 8; + final int numContainersQueued = 4; + final Set runContainers = new HashSet<>(); + final Set killedContainers = new HashSet<>(); + + for (int i = 0; i < numContainers; i++) { + // OContainers that should be run + startContainerRequests.add(StartContainerRequest.newInstance( + recordFactory.newRecordInstance(ContainerLaunchContext.class), + createContainerToken(createContainerId(i), DUMMY_RM_IDENTIFIER, + context.getNodeId(), + user, BuilderUtils.newResource(512, 1), + context.getContainerTokenSecretManager(), null, + ExecutionType.OPPORTUNISTIC))); + } + + StartContainersRequest allRequests = + StartContainersRequest.newInstance(startContainerRequests); + containerManager.startContainers(allRequests); + + // Wait for containers to start + for (int i = 0; i < numContainersQueued; i++) { + final ContainerId containerId = createContainerId(i); + BaseContainerManagerTest + .waitForNMContainerState(containerManager, containerId, + ContainerState.RUNNING, 40); + runContainers.add(containerId); + } + + // Wait for containers to be killed + for (int i = numContainersQueued; i < numContainers; i++) { + final ContainerId containerId = createContainerId(i); + BaseContainerManagerTest + .waitForNMContainerState(containerManager, createContainerId(i), + ContainerState.DONE, 40); + killedContainers.add(containerId); + } + + Thread.sleep(5000); + + // Get container statuses. + List statList = new ArrayList<>(); + for (int i = 0; i < numContainers; i++) { + statList.add(createContainerId(i)); + } + + + verifyRunAndKilledContainers( + statList, numContainers, runContainers, killedContainers); + + verifyNothingQueued(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java index 508b8bd091..218d03afe7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java @@ -18,18 +18,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - import org.apache.hadoop.fs.UnsupportedFileSystemException; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateRequest; import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateResponse; @@ -37,8 +26,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -46,35 +33,27 @@ import org.apache.hadoop.yarn.api.records.ContainerSubState; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.ConfigurationException; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.security.NMTokenIdentifier; import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; -import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; -import org.apache.hadoop.yarn.server.nodemanager.ContainerStateTransitionListener; -import org.apache.hadoop.yarn.server.nodemanager.Context; -import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor; -import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.NodeManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerSchedulerTest; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerChain; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl; -import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.junit.Assert; import org.junit.Test; -import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -82,137 +61,11 @@ import static org.mockito.Mockito.verify; * Tests to verify that the {@link ContainerScheduler} is able to queue and * make room for containers. */ -public class TestContainerSchedulerQueuing extends BaseContainerManagerTest { +public class TestContainerSchedulerQueuing extends BaseContainerSchedulerTest { public TestContainerSchedulerQueuing() throws UnsupportedFileSystemException { super(); } - static { - LOG = LoggerFactory.getLogger(TestContainerSchedulerQueuing.class); - } - - private static class Listener implements ContainerStateTransitionListener { - - private final Map> states = new HashMap<>(); - private final Map> events = - new HashMap<>(); - - @Override - public void init(Context context) {} - - @Override - public void preTransition(ContainerImpl op, - org.apache.hadoop.yarn.server.nodemanager.containermanager.container. - ContainerState beforeState, - ContainerEvent eventToBeProcessed) { - if (!states.containsKey(op.getContainerId())) { - states.put(op.getContainerId(), new ArrayList<>()); - states.get(op.getContainerId()).add(beforeState); - events.put(op.getContainerId(), new ArrayList<>()); - } - } - - @Override - public void postTransition(ContainerImpl op, - org.apache.hadoop.yarn.server.nodemanager.containermanager.container. - ContainerState beforeState, - org.apache.hadoop.yarn.server.nodemanager.containermanager.container. - ContainerState afterState, - ContainerEvent processedEvent) { - states.get(op.getContainerId()).add(afterState); - events.get(op.getContainerId()).add(processedEvent.getType()); - } - } - - private boolean delayContainers = true; - - @Override - protected ContainerManagerImpl createContainerManager( - DeletionService delSrvc) { - return new ContainerManagerImpl(context, exec, delSrvc, - getNodeStatusUpdater(), metrics, dirsHandler) { - - @Override - protected UserGroupInformation getRemoteUgi() throws YarnException { - ApplicationId appId = ApplicationId.newInstance(0, 0); - ApplicationAttemptId appAttemptId = - ApplicationAttemptId.newInstance(appId, 1); - UserGroupInformation ugi = - UserGroupInformation.createRemoteUser(appAttemptId.toString()); - ugi.addTokenIdentifier(new NMTokenIdentifier(appAttemptId, context - .getNodeId(), user, context.getNMTokenSecretManager().getCurrentKey() - .getKeyId())); - return ugi; - } - - @Override - protected ContainersMonitor createContainersMonitor( - ContainerExecutor exec) { - return new ContainersMonitorImpl(exec, dispatcher, this.context) { - // Define resources available for containers to be executed. - @Override - public long getPmemAllocatedForContainers() { - return 2048 * 1024 * 1024L; - } - - @Override - public long getVmemAllocatedForContainers() { - float pmemRatio = getConfig().getFloat( - YarnConfiguration.NM_VMEM_PMEM_RATIO, - YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO); - return (long) (pmemRatio * getPmemAllocatedForContainers()); - } - - @Override - public long getVCoresAllocatedForContainers() { - return 4; - } - }; - } - }; - } - - @Override - protected ContainerExecutor createContainerExecutor() { - DefaultContainerExecutor exec = new DefaultContainerExecutor() { - ConcurrentMap oversleepMap = - new ConcurrentHashMap(); - @Override - public int launchContainer(ContainerStartContext ctx) - throws IOException, ConfigurationException { - oversleepMap.put(ctx.getContainer().getContainerId().toString(), false); - if (delayContainers) { - try { - Thread.sleep(10000); - if(oversleepMap.get(ctx.getContainer().getContainerId().toString()) - == true) { - Thread.sleep(10000); - } - } catch (InterruptedException e) { - // Nothing.. - } - } - return super.launchContainer(ctx); - } - - @Override - public void pauseContainer(Container container) { - // To mimic pausing we force the container to be in the PAUSED state - // a little longer by oversleeping. - oversleepMap.put(container.getContainerId().toString(), true); - LOG.info("Container was paused"); - } - - @Override - public void resumeContainer(Container container) { - LOG.info("Container was resumed"); - } - }; - exec.setConf(conf); - return spy(exec); - } - @Override public void setup() throws IOException { conf.setInt( @@ -408,7 +261,7 @@ public class TestContainerSchedulerQueuing extends BaseContainerManagerTest { * @throws Exception */ @Test - public void testStartOpportunistcsWhenOppQueueIsFull() throws Exception { + public void testStartOpportunisticsWhenOppQueueIsFull() throws Exception { containerManager.start(); List list = new ArrayList<>(); @@ -655,7 +508,7 @@ public class TestContainerSchedulerQueuing extends BaseContainerManagerTest { List containerStates = - listener.states.get(createContainerId(0)); + listener.getStates().get(createContainerId(0)); Assert.assertEquals(Arrays.asList( org.apache.hadoop.yarn.server.nodemanager.containermanager.container. ContainerState.NEW, @@ -676,7 +529,7 @@ public class TestContainerSchedulerQueuing extends BaseContainerManagerTest { org.apache.hadoop.yarn.server.nodemanager.containermanager.container. ContainerState.DONE), containerStates); List containerEventTypes = - listener.events.get(createContainerId(0)); + listener.getEvents().get(createContainerId(0)); Assert.assertEquals(Arrays.asList(ContainerEventType.INIT_CONTAINER, ContainerEventType.CONTAINER_LAUNCHED, ContainerEventType.PAUSE_CONTAINER, @@ -1230,7 +1083,7 @@ public class TestContainerSchedulerQueuing extends BaseContainerManagerTest { List containerStates = - listener.states.get(createContainerId(1)); + listener.getStates().get(createContainerId(1)); Assert.assertEquals(Arrays.asList( org.apache.hadoop.yarn.server.nodemanager.containermanager.container. ContainerState.NEW, @@ -1241,7 +1094,7 @@ public class TestContainerSchedulerQueuing extends BaseContainerManagerTest { org.apache.hadoop.yarn.server.nodemanager.containermanager.container. ContainerState.RUNNING), containerStates); List containerEventTypes = - listener.events.get(createContainerId(1)); + listener.getEvents().get(createContainerId(1)); Assert.assertEquals(Arrays.asList( ContainerEventType.INIT_CONTAINER, ContainerEventType.UPDATE_CONTAINER_TOKEN, @@ -1254,7 +1107,7 @@ public class TestContainerSchedulerQueuing extends BaseContainerManagerTest { @Test public void testContainerUpdateExecTypeGuaranteedToOpportunistic() throws Exception { - delayContainers = true; + setDelayContainers(true); containerManager.start(); // Construct the Container-id ContainerId cId = createContainerId(0);