YARN-6706. Refactor ContainerScheduler to make oversubscription change easier. (Haibo Chen via asuresh)

This commit is contained in:
Arun Suresh 2017-07-17 14:07:23 -07:00
parent ed27f2b2cc
commit 5b007921cd
3 changed files with 177 additions and 45 deletions

View File

@ -192,7 +192,9 @@ private void onContainerCompleted(Container container) {
// decrement only if it was a running container
Container completedContainer = runningContainers.remove(container
.getContainerId());
if (completedContainer != null) {
// only a running container releases resources upon completion
boolean resourceReleased = completedContainer != null;
if (resourceReleased) {
this.utilizationTracker.subtractContainerResource(container);
if (container.getContainerTokenIdentifier().getExecutionType() ==
ExecutionType.OPPORTUNISTIC) {
@ -218,8 +220,7 @@ private boolean startContainersFromQueue(
boolean resourcesAvailable = true;
while (cIter.hasNext() && resourcesAvailable) {
Container container = cIter.next();
if (this.utilizationTracker.hasResourcesAvailable(container)) {
startAllocatedContainer(container);
if (tryStartContainer(container)) {
cIter.remove();
} else {
resourcesAvailable = false;
@ -228,50 +229,95 @@ private boolean startContainersFromQueue(
return resourcesAvailable;
}
private boolean tryStartContainer(Container container) {
boolean containerStarted = false;
if (resourceAvailableToStartContainer(container)) {
startContainer(container);
containerStarted = true;
}
return containerStarted;
}
/**
* Check if there is resource available to start a given container
* immediately. (This can be extended to include overallocated resources)
* @param container the container to start
* @return true if container can be launched directly
*/
private boolean resourceAvailableToStartContainer(Container container) {
return this.utilizationTracker.hasResourcesAvailable(container);
}
private boolean enqueueContainer(Container container) {
boolean isGuaranteedContainer = container.getContainerTokenIdentifier().
getExecutionType() == ExecutionType.GUARANTEED;
boolean isQueued;
if (isGuaranteedContainer) {
queuedGuaranteedContainers.put(container.getContainerId(), container);
isQueued = true;
} else {
if (queuedOpportunisticContainers.size() < maxOppQueueLength) {
LOG.info("Opportunistic container {} will be queued at the NM.",
container.getContainerId());
queuedOpportunisticContainers.put(
container.getContainerId(), container);
isQueued = true;
} else {
LOG.info("Opportunistic container [{}] will not be queued at the NM" +
"since max queue length [{}] has been reached",
container.getContainerId(), maxOppQueueLength);
container.sendKillEvent(
ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER,
"Opportunistic container queue is full.");
isQueued = false;
}
}
if (isQueued) {
try {
this.context.getNMStateStore().storeContainerQueued(
container.getContainerId());
} catch (IOException e) {
LOG.warn("Could not store container [" + container.getContainerId()
+ "] state. The Container has been queued.", e);
}
}
return isQueued;
}
@VisibleForTesting
protected void scheduleContainer(Container container) {
if (maxOppQueueLength <= 0) {
startAllocatedContainer(container);
return;
}
if (queuedGuaranteedContainers.isEmpty() &&
queuedOpportunisticContainers.isEmpty() &&
this.utilizationTracker.hasResourcesAvailable(container)) {
startAllocatedContainer(container);
} else {
LOG.info("No available resources for container {} to start its execution "
+ "immediately.", container.getContainerId());
boolean isQueued = true;
if (container.getContainerTokenIdentifier().getExecutionType() ==
ExecutionType.GUARANTEED) {
queuedGuaranteedContainers.put(container.getContainerId(), container);
// Kill running opportunistic containers to make space for
// guaranteed container.
boolean isGuaranteedContainer = container.getContainerTokenIdentifier().
getExecutionType() == ExecutionType.GUARANTEED;
// Given a guaranteed container, we enqueue it first and then try to start
// as many queuing guaranteed containers as possible followed by queuing
// opportunistic containers based on remaining resources available. If the
// container still stays in the queue afterwards, we need to preempt just
// enough number of opportunistic containers.
if (isGuaranteedContainer) {
enqueueContainer(container);
startPendingContainers();
// if the guaranteed container is queued, we need to preempt opportunistic
// containers for make room for it
if (queuedGuaranteedContainers.containsKey(container.getContainerId())) {
killOpportunisticContainers(container);
} else {
if (queuedOpportunisticContainers.size() <= maxOppQueueLength) {
LOG.info("Opportunistic container {} will be queued at the NM.",
container.getContainerId());
queuedOpportunisticContainers.put(
container.getContainerId(), container);
} else {
isQueued = false;
LOG.info("Opportunistic container [{}] will not be queued at the NM" +
"since max queue length [{}] has been reached",
container.getContainerId(), maxOppQueueLength);
container.sendKillEvent(
ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER,
"Opportunistic container queue is full.");
}
}
if (isQueued) {
try {
this.context.getNMStateStore().storeContainerQueued(
container.getContainerId());
} catch (IOException e) {
LOG.warn("Could not store container [" + container.getContainerId()
+ "] state. The Container has been queued.", e);
}
} else {
// Given an opportunistic container, we first try to start as many queuing
// guaranteed containers as possible followed by queuing opportunistic
// containers based on remaining resource available, then enqueue the
// opportunistic container. If the container is enqueued, we do another
// pass to try to start the newly enqueued opportunistic container.
startPendingContainers();
boolean containerQueued = enqueueContainer(container);
// container may not get queued because the max opportunistic container
// queue length is reached. If so, there is no point doing another pass
if (containerQueued) {
startPendingContainers();
}
}
}
@ -292,7 +338,7 @@ private void killOpportunisticContainers(Container container) {
}
}
private void startAllocatedContainer(Container container) {
private void startContainer(Container container) {
LOG.info("Starting container [" + container.getContainerId()+ "]");
runningContainers.put(container.getContainerId(), container);
this.utilizationTracker.addContainerResources(container);
@ -416,4 +462,5 @@ private void shedQueuedOpportunisticContainers() {
public ContainersMonitor getContainersMonitor() {
return this.context.getContainerManager().getContainersMonitor();
}
}

View File

@ -583,7 +583,7 @@ public long getVmemAllocatedForContainers() {
@Override
public long getPmemAllocatedForContainers() {
return 10240;
return (long) 2048 << 20;
}
@Override

View File

@ -331,6 +331,91 @@ public void testStartAndQueueMultipleContainers() throws Exception {
containerScheduler.getNumQueuedOpportunisticContainers());
}
/**
* Starts one GUARANTEED container that takes us the whole node's resources.
* and submit more OPPORTUNISTIC containers than the opportunistic container
* queue can hold. OPPORTUNISTIC containers that cannot be queue should be
* killed.
* @throws Exception
*/
@Test
public void testStartOpportunistcsWhenOppQueueIsFull() throws Exception {
containerManager.start();
ContainerLaunchContext containerLaunchContext =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
List<StartContainerRequest> list = new ArrayList<>();
list.add(StartContainerRequest.newInstance(
containerLaunchContext,
createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER,
context.getNodeId(),
user, BuilderUtils.newResource(2048, 1),
context.getContainerTokenSecretManager(), null,
ExecutionType.GUARANTEED)));
final int maxOppQueueLength = conf.getInt(
YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH,
YarnConfiguration.DEFAULT_NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH);
for (int i = 1; i < maxOppQueueLength + 2; i++) {
list.add(StartContainerRequest.newInstance(
containerLaunchContext,
createContainerToken(createContainerId(i), DUMMY_RM_IDENTIFIER,
context.getNodeId(),
user, BuilderUtils.newResource(2048, 1),
context.getContainerTokenSecretManager(), null,
ExecutionType.OPPORTUNISTIC)));
}
StartContainersRequest allRequests =
StartContainersRequest.newInstance(list);
containerManager.startContainers(allRequests);
BaseContainerManagerTest.waitForNMContainerState(containerManager,
createContainerId(0), ContainerState.RUNNING, 40);
BaseContainerManagerTest.waitForNMContainerState(containerManager,
createContainerId(maxOppQueueLength + 1), ContainerState.DONE,
40);
Thread.sleep(5000);
// Get container statuses. Container 0 should be running and container
// 1 to maxOppQueueLength should be queued and the last container should
// be killed
List<ContainerId> statList = new ArrayList<>();
for (int i = 0; i < maxOppQueueLength + 2; i++) {
statList.add(createContainerId(i));
}
GetContainerStatusesRequest statRequest =
GetContainerStatusesRequest.newInstance(statList);
List<ContainerStatus> containerStatuses = containerManager
.getContainerStatuses(statRequest).getContainerStatuses();
for (ContainerStatus status : containerStatuses) {
if (status.getContainerId().equals(createContainerId(0))) {
Assert.assertEquals(
org.apache.hadoop.yarn.api.records.ContainerState.RUNNING,
status.getState());
} else if (status.getContainerId().equals(createContainerId(
maxOppQueueLength + 1))) {
Assert.assertTrue(status.getDiagnostics().contains(
"Opportunistic container queue is full"));
} else {
Assert.assertEquals(
org.apache.hadoop.yarn.api.records.ContainerState.SCHEDULED,
status.getState());
}
System.out.println("\nStatus : [" + status + "]\n");
}
ContainerScheduler containerScheduler =
containerManager.getContainerScheduler();
Assert.assertEquals(maxOppQueueLength,
containerScheduler.getNumQueuedContainers());
Assert.assertEquals(0,
containerScheduler.getNumQueuedGuaranteedContainers());
Assert.assertEquals(maxOppQueueLength,
containerScheduler.getNumQueuedOpportunisticContainers());
}
/**
* Submit two OPPORTUNISTIC and one GUARANTEED containers. The resources
* requests by each container as such that only one can run in parallel.