From 5b007921cdf01ecc8ed97c164b7d327b8304c529 Mon Sep 17 00:00:00 2001 From: Arun Suresh Date: Mon, 17 Jul 2017 14:07:23 -0700 Subject: [PATCH] YARN-6706. Refactor ContainerScheduler to make oversubscription change easier. (Haibo Chen via asuresh) --- .../scheduler/ContainerScheduler.java | 135 ++++++++++++------ .../TestContainerManagerRecovery.java | 2 +- .../TestContainerSchedulerQueuing.java | 85 +++++++++++ 3 files changed, 177 insertions(+), 45 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java index 24530b365b..19243acd66 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java @@ -192,7 +192,9 @@ private void onContainerCompleted(Container container) { // decrement only if it was a running container Container completedContainer = runningContainers.remove(container .getContainerId()); - if (completedContainer != null) { + // only a running container releases resources upon completion + boolean resourceReleased = completedContainer != null; + if (resourceReleased) { this.utilizationTracker.subtractContainerResource(container); if (container.getContainerTokenIdentifier().getExecutionType() == ExecutionType.OPPORTUNISTIC) { @@ -218,8 +220,7 @@ private boolean startContainersFromQueue( boolean resourcesAvailable = true; while (cIter.hasNext() && resourcesAvailable) { Container container = cIter.next(); - if (this.utilizationTracker.hasResourcesAvailable(container)) { - startAllocatedContainer(container); + if (tryStartContainer(container)) { cIter.remove(); } else { resourcesAvailable = false; @@ -228,50 +229,95 @@ private boolean startContainersFromQueue( return resourcesAvailable; } + private boolean tryStartContainer(Container container) { + boolean containerStarted = false; + if (resourceAvailableToStartContainer(container)) { + startContainer(container); + containerStarted = true; + } + return containerStarted; + } + + /** + * Check if there is resource available to start a given container + * immediately. (This can be extended to include overallocated resources) + * @param container the container to start + * @return true if container can be launched directly + */ + private boolean resourceAvailableToStartContainer(Container container) { + return this.utilizationTracker.hasResourcesAvailable(container); + } + + private boolean enqueueContainer(Container container) { + boolean isGuaranteedContainer = container.getContainerTokenIdentifier(). + getExecutionType() == ExecutionType.GUARANTEED; + + boolean isQueued; + if (isGuaranteedContainer) { + queuedGuaranteedContainers.put(container.getContainerId(), container); + isQueued = true; + } else { + if (queuedOpportunisticContainers.size() < maxOppQueueLength) { + LOG.info("Opportunistic container {} will be queued at the NM.", + container.getContainerId()); + queuedOpportunisticContainers.put( + container.getContainerId(), container); + isQueued = true; + } else { + LOG.info("Opportunistic container [{}] will not be queued at the NM" + + "since max queue length [{}] has been reached", + container.getContainerId(), maxOppQueueLength); + container.sendKillEvent( + ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER, + "Opportunistic container queue is full."); + isQueued = false; + } + } + + if (isQueued) { + try { + this.context.getNMStateStore().storeContainerQueued( + container.getContainerId()); + } catch (IOException e) { + LOG.warn("Could not store container [" + container.getContainerId() + + "] state. The Container has been queued.", e); + } + } + + return isQueued; + } + @VisibleForTesting protected void scheduleContainer(Container container) { - if (maxOppQueueLength <= 0) { - startAllocatedContainer(container); - return; - } - if (queuedGuaranteedContainers.isEmpty() && - queuedOpportunisticContainers.isEmpty() && - this.utilizationTracker.hasResourcesAvailable(container)) { - startAllocatedContainer(container); - } else { - LOG.info("No available resources for container {} to start its execution " - + "immediately.", container.getContainerId()); - boolean isQueued = true; - if (container.getContainerTokenIdentifier().getExecutionType() == - ExecutionType.GUARANTEED) { - queuedGuaranteedContainers.put(container.getContainerId(), container); - // Kill running opportunistic containers to make space for - // guaranteed container. + boolean isGuaranteedContainer = container.getContainerTokenIdentifier(). + getExecutionType() == ExecutionType.GUARANTEED; + + // Given a guaranteed container, we enqueue it first and then try to start + // as many queuing guaranteed containers as possible followed by queuing + // opportunistic containers based on remaining resources available. If the + // container still stays in the queue afterwards, we need to preempt just + // enough number of opportunistic containers. + if (isGuaranteedContainer) { + enqueueContainer(container); + startPendingContainers(); + + // if the guaranteed container is queued, we need to preempt opportunistic + // containers for make room for it + if (queuedGuaranteedContainers.containsKey(container.getContainerId())) { killOpportunisticContainers(container); - } else { - if (queuedOpportunisticContainers.size() <= maxOppQueueLength) { - LOG.info("Opportunistic container {} will be queued at the NM.", - container.getContainerId()); - queuedOpportunisticContainers.put( - container.getContainerId(), container); - } else { - isQueued = false; - LOG.info("Opportunistic container [{}] will not be queued at the NM" + - "since max queue length [{}] has been reached", - container.getContainerId(), maxOppQueueLength); - container.sendKillEvent( - ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER, - "Opportunistic container queue is full."); - } } - if (isQueued) { - try { - this.context.getNMStateStore().storeContainerQueued( - container.getContainerId()); - } catch (IOException e) { - LOG.warn("Could not store container [" + container.getContainerId() - + "] state. The Container has been queued.", e); - } + } else { + // Given an opportunistic container, we first try to start as many queuing + // guaranteed containers as possible followed by queuing opportunistic + // containers based on remaining resource available, then enqueue the + // opportunistic container. If the container is enqueued, we do another + // pass to try to start the newly enqueued opportunistic container. + startPendingContainers(); + boolean containerQueued = enqueueContainer(container); + // container may not get queued because the max opportunistic container + // queue length is reached. If so, there is no point doing another pass + if (containerQueued) { + startPendingContainers(); } } } @@ -292,7 +338,7 @@ private void killOpportunisticContainers(Container container) { } } - private void startAllocatedContainer(Container container) { + private void startContainer(Container container) { LOG.info("Starting container [" + container.getContainerId()+ "]"); runningContainers.put(container.getContainerId(), container); this.utilizationTracker.addContainerResources(container); @@ -416,4 +462,5 @@ private void shedQueuedOpportunisticContainers() { public ContainersMonitor getContainersMonitor() { return this.context.getContainerManager().getContainersMonitor(); } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java index 075d8574e9..b1a7b4ba05 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java @@ -583,7 +583,7 @@ public long getVmemAllocatedForContainers() { @Override public long getPmemAllocatedForContainers() { - return 10240; + return (long) 2048 << 20; } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java index 8264f2e015..aeba399ca0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java @@ -331,6 +331,91 @@ public void testStartAndQueueMultipleContainers() throws Exception { containerScheduler.getNumQueuedOpportunisticContainers()); } + /** + * Starts one GUARANTEED container that takes us the whole node's resources. + * and submit more OPPORTUNISTIC containers than the opportunistic container + * queue can hold. OPPORTUNISTIC containers that cannot be queue should be + * killed. + * @throws Exception + */ + @Test + public void testStartOpportunistcsWhenOppQueueIsFull() throws Exception { + containerManager.start(); + + ContainerLaunchContext containerLaunchContext = + recordFactory.newRecordInstance(ContainerLaunchContext.class); + + List list = new ArrayList<>(); + list.add(StartContainerRequest.newInstance( + containerLaunchContext, + createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER, + context.getNodeId(), + user, BuilderUtils.newResource(2048, 1), + context.getContainerTokenSecretManager(), null, + ExecutionType.GUARANTEED))); + + final int maxOppQueueLength = conf.getInt( + YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH, + YarnConfiguration.DEFAULT_NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH); + for (int i = 1; i < maxOppQueueLength + 2; i++) { + list.add(StartContainerRequest.newInstance( + containerLaunchContext, + createContainerToken(createContainerId(i), DUMMY_RM_IDENTIFIER, + context.getNodeId(), + user, BuilderUtils.newResource(2048, 1), + context.getContainerTokenSecretManager(), null, + ExecutionType.OPPORTUNISTIC))); + } + + StartContainersRequest allRequests = + StartContainersRequest.newInstance(list); + containerManager.startContainers(allRequests); + + BaseContainerManagerTest.waitForNMContainerState(containerManager, + createContainerId(0), ContainerState.RUNNING, 40); + BaseContainerManagerTest.waitForNMContainerState(containerManager, + createContainerId(maxOppQueueLength + 1), ContainerState.DONE, + 40); + Thread.sleep(5000); + + // Get container statuses. Container 0 should be running and container + // 1 to maxOppQueueLength should be queued and the last container should + // be killed + List statList = new ArrayList<>(); + for (int i = 0; i < maxOppQueueLength + 2; i++) { + statList.add(createContainerId(i)); + } + GetContainerStatusesRequest statRequest = + GetContainerStatusesRequest.newInstance(statList); + List containerStatuses = containerManager + .getContainerStatuses(statRequest).getContainerStatuses(); + for (ContainerStatus status : containerStatuses) { + if (status.getContainerId().equals(createContainerId(0))) { + Assert.assertEquals( + org.apache.hadoop.yarn.api.records.ContainerState.RUNNING, + status.getState()); + } else if (status.getContainerId().equals(createContainerId( + maxOppQueueLength + 1))) { + Assert.assertTrue(status.getDiagnostics().contains( + "Opportunistic container queue is full")); + } else { + Assert.assertEquals( + org.apache.hadoop.yarn.api.records.ContainerState.SCHEDULED, + status.getState()); + } + System.out.println("\nStatus : [" + status + "]\n"); + } + + ContainerScheduler containerScheduler = + containerManager.getContainerScheduler(); + Assert.assertEquals(maxOppQueueLength, + containerScheduler.getNumQueuedContainers()); + Assert.assertEquals(0, + containerScheduler.getNumQueuedGuaranteedContainers()); + Assert.assertEquals(maxOppQueueLength, + containerScheduler.getNumQueuedOpportunisticContainers()); + } + /** * Submit two OPPORTUNISTIC and one GUARANTEED containers. The resources * requests by each container as such that only one can run in parallel.