From 4f8194430fc6a69d9cc99b78828fd7045d5683e8 Mon Sep 17 00:00:00 2001 From: Arun Suresh Date: Sat, 24 Dec 2016 17:16:52 -0800 Subject: [PATCH] YARN-5216. Expose configurable preemption policy for OPPORTUNISTIC containers running on the NM. (Hitesh Sharma via asuresh) --- .../hadoop/yarn/conf/YarnConfiguration.java | 9 ++ .../src/main/resources/yarn-default.xml | 9 ++ .../containermanager/container/Container.java | 2 + .../container/ContainerImpl.java | 32 ++++-- .../scheduler/ContainerScheduler.java | 84 +++++++++++--- .../TestContainerSchedulerQueuing.java | 103 ++++++++++++++++++ .../nodemanager/webapp/MockContainer.java | 5 + 7 files changed, 218 insertions(+), 26 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index c6ec6fddfb..48910b372d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1088,6 +1088,15 @@ public static boolean isAclEnabled(Configuration conf) { NM_PREFIX + "container-retry-minimum-interval-ms"; public static final int DEFAULT_NM_CONTAINER_RETRY_MINIMUM_INTERVAL_MS = 1000; + /** + * Use container pause as the preemption policy over kill in the container + * queue at a NodeManager. + **/ + public static final String NM_CONTAINER_QUEUING_USE_PAUSE_FOR_PREEMPTION = + NM_PREFIX + "opportunistic-containers-use-pause-for-preemption"; + public static final boolean + DEFAULT_NM_CONTAINER_QUEUING_USE_PAUSE_FOR_PREEMPTION = false; + /** Interval at which the delayed token removal thread runs */ public static final String RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS = RM_PREFIX + "delayed.delegation-token.removal-interval-ms"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index d16d956d77..6444da9a47 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -3017,6 +3017,15 @@ 100 + + + Use container pause as the preemption policy over kill in the container + queue at a NodeManager. + + yarn.nodemanager.opportunistic-containers-use-pause-for-preemption + false + + Error filename pattern, to identify the file in the container's diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java index ef5d72c6b7..86f2554af7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java @@ -103,4 +103,6 @@ public interface Container extends EventHandler { * @return Resource Mappings of the container */ ResourceMappings getResourceMappings(); + + void sendPauseEvent(String description); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index 7a1237171f..95ebfd585e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -816,15 +816,22 @@ private void sendFinishedEvents() { @SuppressWarnings("unchecked") // dispatcher not typed @Override public void sendLaunchEvent() { - ContainersLauncherEventType launcherEvent = - ContainersLauncherEventType.LAUNCH_CONTAINER; - if (recoveredStatus == RecoveredContainerStatus.LAUNCHED) { - // try to recover a container that was previously launched - launcherEvent = ContainersLauncherEventType.RECOVER_CONTAINER; + if (ContainerState.PAUSED == getContainerState()) { + dispatcher.getEventHandler().handle( + new ContainerResumeEvent(containerId, + "Container Resumed as some resources freed up")); + } else { + ContainersLauncherEventType launcherEvent = + ContainersLauncherEventType.LAUNCH_CONTAINER; + if (recoveredStatus == RecoveredContainerStatus.LAUNCHED) { + // try to recover a container that was previously launched + launcherEvent = ContainersLauncherEventType.RECOVER_CONTAINER; + } + containerLaunchStartTime = clock.getTime(); + dispatcher.getEventHandler().handle( + new ContainersLauncherEvent(this, launcherEvent)); } - containerLaunchStartTime = clock.getTime(); - dispatcher.getEventHandler().handle( - new ContainersLauncherEvent(this, launcherEvent)); + } @SuppressWarnings("unchecked") // dispatcher not typed @@ -843,6 +850,13 @@ public void sendKillEvent(int exitStatus, String description) { new ContainerKillEvent(containerId, exitStatus, description)); } + @SuppressWarnings("unchecked") // dispatcher not typed + @Override + public void sendPauseEvent(String description) { + dispatcher.getEventHandler().handle( + new ContainerPauseEvent(containerId, description)); + } + @SuppressWarnings("unchecked") // dispatcher not typed private void sendRelaunchEvent() { ContainersLauncherEventType launcherEvent = @@ -1799,7 +1813,7 @@ public void transition(ContainerImpl container, ContainerEvent event) { /** * Transitions upon receiving PAUSE_CONTAINER. - * - RUNNING -> PAUSED + * - RUNNING -> PAUSING */ @SuppressWarnings("unchecked") // dispatcher not typed static class PauseContainerTransition implements diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java index 7780f9fb64..830a06d529 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler; import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -34,6 +35,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor .ChangeMonitoringContainerResourceEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; @@ -74,7 +76,7 @@ public class ContainerScheduler extends AbstractService implements queuedOpportunisticContainers = new LinkedHashMap<>(); // Used to keep track of containers that have been marked to be killed - // to make room for a guaranteed container. + // or paused to make room for a guaranteed container. private final Map oppContainersToKill = new HashMap<>(); @@ -98,6 +100,8 @@ public class ContainerScheduler extends AbstractService implements private final AsyncDispatcher dispatcher; private final NodeManagerMetrics metrics; + private Boolean usePauseEventForPreemption = false; + /** * Instantiate a Container Scheduler. * @param context NodeManager Context. @@ -112,6 +116,17 @@ public ContainerScheduler(Context context, AsyncDispatcher dispatcher, DEFAULT_NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH)); } + + @Override + public void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + this.usePauseEventForPreemption = + conf.getBoolean( + YarnConfiguration.NM_CONTAINER_QUEUING_USE_PAUSE_FOR_PREEMPTION, + YarnConfiguration. + DEFAULT_NM_CONTAINER_QUEUING_USE_PAUSE_FOR_PREEMPTION); + } + @VisibleForTesting public ContainerScheduler(Context context, AsyncDispatcher dispatcher, NodeManagerMetrics metrics, int qLength) { @@ -136,8 +151,9 @@ public void handle(ContainerSchedulerEvent event) { case SCHEDULE_CONTAINER: scheduleContainer(event.getContainer()); break; + case CONTAINER_PAUSED: case CONTAINER_COMPLETED: - onContainerCompleted(event.getContainer()); + onResourcesReclaimed(event.getContainer()); break; case UPDATE_CONTAINER: if (event instanceof UpdateContainerSchedulerEvent) { @@ -203,9 +219,9 @@ private void onUpdateContainer(UpdateContainerSchedulerEvent updateEvent) { queuedGuaranteedContainers.put(containerId, updateEvent.getContainer()); } - //Kill opportunistic containers if any to make room for + //Kill/pause opportunistic containers if any to make room for // promotion request - killOpportunisticContainers(updateEvent.getContainer()); + reclaimOpportunisticContainerResources(updateEvent.getContainer()); } else { // Demotion of queued container.. Should not happen too often // since you should not find too many queued guaranteed @@ -243,6 +259,12 @@ public int getNumRunningContainers() { return this.runningContainers.size(); } + @VisibleForTesting + public void setUsePauseEventForPreemption( + boolean usePauseEventForPreemption) { + this.usePauseEventForPreemption = usePauseEventForPreemption; + } + public OpportunisticContainersStatus getOpportunisticContainersStatus() { this.opportunisticContainersStatus.setQueuedOpportContainers( getNumQueuedOpportunisticContainers()); @@ -257,7 +279,7 @@ public OpportunisticContainersStatus getOpportunisticContainersStatus() { return this.opportunisticContainersStatus; } - private void onContainerCompleted(Container container) { + private void onResourcesReclaimed(Container container) { oppContainersToKill.remove(container.getContainerId()); // This could be killed externally for eg. by the ContainerManager, @@ -292,6 +314,23 @@ private void startPendingContainers(boolean forceStartGuaranteedContaieners) { // Start pending guaranteed containers, if resources available. boolean resourcesAvailable = startContainers( queuedGuaranteedContainers.values(), forceStartGuaranteedContaieners); + // Resume opportunistic containers, if resource available. + if (resourcesAvailable) { + List pausedContainers = new ArrayList(); + Map containers = + context.getContainers(); + for (Map.Entryentry : containers.entrySet()) { + ContainerId contId = entry.getKey(); + // Find containers that were not already started and are in paused state + if(false == runningContainers.containsKey(contId)) { + if(containers.get(contId).getContainerState() + == ContainerState.PAUSED) { + pausedContainers.add(containers.get(contId)); + } + } + } + resourcesAvailable = startContainers(pausedContainers, false); + } // Start opportunistic containers, if resources available. if (resourcesAvailable) { startContainers(queuedOpportunisticContainers.values(), false); @@ -395,7 +434,7 @@ protected void scheduleContainer(Container container) { // if the guaranteed container is queued, we need to preempt opportunistic // containers for make room for it if (queuedGuaranteedContainers.containsKey(container.getContainerId())) { - killOpportunisticContainers(container); + reclaimOpportunisticContainerResources(container); } } else { // Given an opportunistic container, we first try to start as many queuing @@ -413,19 +452,30 @@ protected void scheduleContainer(Container container) { } } - private void killOpportunisticContainers(Container container) { - List extraOpportContainersToKill = - pickOpportunisticContainersToKill(container.getContainerId()); + @SuppressWarnings("unchecked") + private void reclaimOpportunisticContainerResources(Container container) { + List extraOppContainersToReclaim = + pickOpportunisticContainersToReclaimResources( + container.getContainerId()); // Kill the opportunistic containers that were chosen. - for (Container contToKill : extraOpportContainersToKill) { - contToKill.sendKillEvent( - ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER, - "Container Killed to make room for Guaranteed Container."); - oppContainersToKill.put(contToKill.getContainerId(), contToKill); + for (Container contToReclaim : extraOppContainersToReclaim) { + String preemptionAction = usePauseEventForPreemption == true ? "paused" : + "resumed"; LOG.info( - "Opportunistic container {} will be killed in order to start the " + "Container {} will be {} to start the " + "execution of guaranteed container {}.", - contToKill.getContainerId(), container.getContainerId()); + contToReclaim.getContainerId(), preemptionAction, + container.getContainerId()); + + if (usePauseEventForPreemption) { + contToReclaim.sendPauseEvent( + "Container Paused to make room for Guaranteed Container"); + } else { + contToReclaim.sendKillEvent( + ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER, + "Container Killed to make room for Guaranteed Container."); + } + oppContainersToKill.put(contToReclaim.getContainerId(), contToReclaim); } } @@ -440,7 +490,7 @@ private void startContainer(Container container) { container.sendLaunchEvent(); } - private List pickOpportunisticContainersToKill( + private List pickOpportunisticContainersToReclaimResources( ContainerId containerToStartId) { // The opportunistic containers that need to be killed for the // given container to start. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java index 9676568387..f3fc724bad 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java @@ -23,6 +23,8 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.security.UserGroupInformation; @@ -49,6 +51,7 @@ import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl; @@ -124,18 +127,38 @@ public long getVCoresAllocatedForContainers() { @Override protected ContainerExecutor createContainerExecutor() { DefaultContainerExecutor exec = new DefaultContainerExecutor() { + ConcurrentMap oversleepMap = + new ConcurrentHashMap(); @Override public int launchContainer(ContainerStartContext ctx) throws IOException, ConfigurationException { + oversleepMap.put(ctx.getContainer().getContainerId().toString(), false); if (delayContainers) { try { Thread.sleep(10000); + if(oversleepMap.get(ctx.getContainer().getContainerId().toString()) + == true) { + Thread.sleep(10000); + } } catch (InterruptedException e) { // Nothing.. } } return super.launchContainer(ctx); } + + @Override + public void pauseContainer(Container container) { + // To mimic pausing we force the container to be in the PAUSED state + // a little longer by oversleeping. + oversleepMap.put(container.getContainerId().toString(), true); + LOG.info("Container was paused"); + } + + @Override + public void resumeContainer(Container container) { + LOG.info("Container was resumed"); + } }; exec.setConf(conf); return spy(exec); @@ -505,6 +528,86 @@ public void testKillOpportunisticForGuaranteedContainer() throws Exception { contStatus1.getState()); } + /** + * Submit two OPPORTUNISTIC and one GUARANTEED containers. The resources + * requests by each container as such that only one can run in parallel. + * Thus, the OPPORTUNISTIC container that started running, will be + * paused for the GUARANTEED container to start. + * Once the GUARANTEED container finishes its execution, the remaining + * OPPORTUNISTIC container will be executed. + * @throws Exception + */ + @Test + public void testPauseOpportunisticForGuaranteedContainer() throws Exception { + containerManager.start(); + containerManager.getContainerScheduler(). + setUsePauseEventForPreemption(true); + ContainerLaunchContext containerLaunchContext = + recordFactory.newRecordInstance(ContainerLaunchContext.class); + + List list = new ArrayList<>(); + list.add(StartContainerRequest.newInstance( + containerLaunchContext, + createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER, + context.getNodeId(), + user, BuilderUtils.newResource(2048, 1), + context.getContainerTokenSecretManager(), null, + ExecutionType.OPPORTUNISTIC))); + + StartContainersRequest allRequests = + StartContainersRequest.newInstance(list); + containerManager.startContainers(allRequests); + + BaseContainerManagerTest.waitForNMContainerState(containerManager, + createContainerId(0), ContainerState.RUNNING, 40); + + list = new ArrayList<>(); + list.add(StartContainerRequest.newInstance( + containerLaunchContext, + createContainerToken(createContainerId(1), DUMMY_RM_IDENTIFIER, + context.getNodeId(), + user, BuilderUtils.newResource(2048, 1), + context.getContainerTokenSecretManager(), null, + ExecutionType.GUARANTEED))); + allRequests = + StartContainersRequest.newInstance(list); + + containerManager.startContainers(allRequests); + + BaseContainerManagerTest.waitForNMContainerState(containerManager, + createContainerId(1), ContainerState.RUNNING, 40); + + // Get container statuses. Container 0 should be paused, container 1 + // should be running. + List statList = new ArrayList(); + for (int i = 0; i < 2; i++) { + statList.add(createContainerId(i)); + } + GetContainerStatusesRequest statRequest = + GetContainerStatusesRequest.newInstance(statList); + List containerStatuses = containerManager + .getContainerStatuses(statRequest).getContainerStatuses(); + for (ContainerStatus status : containerStatuses) { + if (status.getContainerId().equals(createContainerId(0))) { + Assert.assertTrue(status.getDiagnostics().contains( + "Container Paused to make room for Guaranteed Container")); + } else if (status.getContainerId().equals(createContainerId(1))) { + Assert.assertEquals( + org.apache.hadoop.yarn.api.records.ContainerState.RUNNING, + status.getState()); + } + System.out.println("\nStatus : [" + status + "]\n"); + } + + // Make sure that the GUARANTEED container completes + BaseContainerManagerTest.waitForNMContainerState(containerManager, + createContainerId(1), ContainerState.DONE, 40); + // Make sure that the PAUSED opportunistic container resumes and + // starts running + BaseContainerManagerTest.waitForNMContainerState(containerManager, + createContainerId(0), ContainerState.DONE, 40); + } + /** * 1. Submit a long running GUARANTEED container to hog all NM resources. * 2. Submit 6 OPPORTUNISTIC containers, all of which will be queued. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java index d435ba085f..77ebd347aa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java @@ -245,4 +245,9 @@ public long getContainerStartTime() { public ResourceMappings getResourceMappings() { return null; } + + @Override + public void sendPauseEvent(String description) { + + } }