YARN-5216. Expose configurable preemption policy for OPPORTUNISTIC containers running on the NM. (Hitesh Sharma via asuresh)

This commit is contained in:
Arun Suresh 2016-12-24 17:16:52 -08:00
parent 864fbacd45
commit 4f8194430f
7 changed files with 218 additions and 26 deletions

View File

@ -1088,6 +1088,15 @@ public static boolean isAclEnabled(Configuration conf) {
NM_PREFIX + "container-retry-minimum-interval-ms";
public static final int DEFAULT_NM_CONTAINER_RETRY_MINIMUM_INTERVAL_MS = 1000;
/**
* Use container pause as the preemption policy over kill in the container
* queue at a NodeManager.
**/
public static final String NM_CONTAINER_QUEUING_USE_PAUSE_FOR_PREEMPTION =
NM_PREFIX + "opportunistic-containers-use-pause-for-preemption";
public static final boolean
DEFAULT_NM_CONTAINER_QUEUING_USE_PAUSE_FOR_PREEMPTION = false;
/** Interval at which the delayed token removal thread runs */
public static final String RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS =
RM_PREFIX + "delayed.delegation-token.removal-interval-ms";

View File

@ -3017,6 +3017,15 @@
<value>100</value>
</property>
<property>
<description>
Use container pause as the preemption policy over kill in the container
queue at a NodeManager.
</description>
<name>yarn.nodemanager.opportunistic-containers-use-pause-for-preemption</name>
<value>false</value>
</property>
<property>
<description>
Error filename pattern, to identify the file in the container's

View File

@ -103,4 +103,6 @@ public interface Container extends EventHandler<ContainerEvent> {
* @return Resource Mappings of the container
*/
ResourceMappings getResourceMappings();
void sendPauseEvent(String description);
}

View File

@ -816,15 +816,22 @@ private void sendFinishedEvents() {
@SuppressWarnings("unchecked") // dispatcher not typed
@Override
public void sendLaunchEvent() {
ContainersLauncherEventType launcherEvent =
ContainersLauncherEventType.LAUNCH_CONTAINER;
if (recoveredStatus == RecoveredContainerStatus.LAUNCHED) {
// try to recover a container that was previously launched
launcherEvent = ContainersLauncherEventType.RECOVER_CONTAINER;
if (ContainerState.PAUSED == getContainerState()) {
dispatcher.getEventHandler().handle(
new ContainerResumeEvent(containerId,
"Container Resumed as some resources freed up"));
} else {
ContainersLauncherEventType launcherEvent =
ContainersLauncherEventType.LAUNCH_CONTAINER;
if (recoveredStatus == RecoveredContainerStatus.LAUNCHED) {
// try to recover a container that was previously launched
launcherEvent = ContainersLauncherEventType.RECOVER_CONTAINER;
}
containerLaunchStartTime = clock.getTime();
dispatcher.getEventHandler().handle(
new ContainersLauncherEvent(this, launcherEvent));
}
containerLaunchStartTime = clock.getTime();
dispatcher.getEventHandler().handle(
new ContainersLauncherEvent(this, launcherEvent));
}
@SuppressWarnings("unchecked") // dispatcher not typed
@ -843,6 +850,13 @@ public void sendKillEvent(int exitStatus, String description) {
new ContainerKillEvent(containerId, exitStatus, description));
}
@SuppressWarnings("unchecked") // dispatcher not typed
@Override
public void sendPauseEvent(String description) {
dispatcher.getEventHandler().handle(
new ContainerPauseEvent(containerId, description));
}
@SuppressWarnings("unchecked") // dispatcher not typed
private void sendRelaunchEvent() {
ContainersLauncherEventType launcherEvent =
@ -1799,7 +1813,7 @@ public void transition(ContainerImpl container, ContainerEvent event) {
/**
* Transitions upon receiving PAUSE_CONTAINER.
* - RUNNING -> PAUSED
* - RUNNING -> PAUSING
*/
@SuppressWarnings("unchecked") // dispatcher not typed
static class PauseContainerTransition implements

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
@ -34,6 +35,7 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor
.ChangeMonitoringContainerResourceEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
@ -74,7 +76,7 @@ public class ContainerScheduler extends AbstractService implements
queuedOpportunisticContainers = new LinkedHashMap<>();
// Used to keep track of containers that have been marked to be killed
// to make room for a guaranteed container.
// or paused to make room for a guaranteed container.
private final Map<ContainerId, Container> oppContainersToKill =
new HashMap<>();
@ -98,6 +100,8 @@ public class ContainerScheduler extends AbstractService implements
private final AsyncDispatcher dispatcher;
private final NodeManagerMetrics metrics;
private Boolean usePauseEventForPreemption = false;
/**
* Instantiate a Container Scheduler.
* @param context NodeManager Context.
@ -112,6 +116,17 @@ public ContainerScheduler(Context context, AsyncDispatcher dispatcher,
DEFAULT_NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH));
}
@Override
public void serviceInit(Configuration conf) throws Exception {
super.serviceInit(conf);
this.usePauseEventForPreemption =
conf.getBoolean(
YarnConfiguration.NM_CONTAINER_QUEUING_USE_PAUSE_FOR_PREEMPTION,
YarnConfiguration.
DEFAULT_NM_CONTAINER_QUEUING_USE_PAUSE_FOR_PREEMPTION);
}
@VisibleForTesting
public ContainerScheduler(Context context, AsyncDispatcher dispatcher,
NodeManagerMetrics metrics, int qLength) {
@ -136,8 +151,9 @@ public void handle(ContainerSchedulerEvent event) {
case SCHEDULE_CONTAINER:
scheduleContainer(event.getContainer());
break;
case CONTAINER_PAUSED:
case CONTAINER_COMPLETED:
onContainerCompleted(event.getContainer());
onResourcesReclaimed(event.getContainer());
break;
case UPDATE_CONTAINER:
if (event instanceof UpdateContainerSchedulerEvent) {
@ -203,9 +219,9 @@ private void onUpdateContainer(UpdateContainerSchedulerEvent updateEvent) {
queuedGuaranteedContainers.put(containerId,
updateEvent.getContainer());
}
//Kill opportunistic containers if any to make room for
//Kill/pause opportunistic containers if any to make room for
// promotion request
killOpportunisticContainers(updateEvent.getContainer());
reclaimOpportunisticContainerResources(updateEvent.getContainer());
} else {
// Demotion of queued container.. Should not happen too often
// since you should not find too many queued guaranteed
@ -243,6 +259,12 @@ public int getNumRunningContainers() {
return this.runningContainers.size();
}
@VisibleForTesting
public void setUsePauseEventForPreemption(
boolean usePauseEventForPreemption) {
this.usePauseEventForPreemption = usePauseEventForPreemption;
}
public OpportunisticContainersStatus getOpportunisticContainersStatus() {
this.opportunisticContainersStatus.setQueuedOpportContainers(
getNumQueuedOpportunisticContainers());
@ -257,7 +279,7 @@ public OpportunisticContainersStatus getOpportunisticContainersStatus() {
return this.opportunisticContainersStatus;
}
private void onContainerCompleted(Container container) {
private void onResourcesReclaimed(Container container) {
oppContainersToKill.remove(container.getContainerId());
// This could be killed externally for eg. by the ContainerManager,
@ -292,6 +314,23 @@ private void startPendingContainers(boolean forceStartGuaranteedContaieners) {
// Start pending guaranteed containers, if resources available.
boolean resourcesAvailable = startContainers(
queuedGuaranteedContainers.values(), forceStartGuaranteedContaieners);
// Resume opportunistic containers, if resource available.
if (resourcesAvailable) {
List<Container> pausedContainers = new ArrayList<Container>();
Map<ContainerId, Container> containers =
context.getContainers();
for (Map.Entry<ContainerId, Container>entry : containers.entrySet()) {
ContainerId contId = entry.getKey();
// Find containers that were not already started and are in paused state
if(false == runningContainers.containsKey(contId)) {
if(containers.get(contId).getContainerState()
== ContainerState.PAUSED) {
pausedContainers.add(containers.get(contId));
}
}
}
resourcesAvailable = startContainers(pausedContainers, false);
}
// Start opportunistic containers, if resources available.
if (resourcesAvailable) {
startContainers(queuedOpportunisticContainers.values(), false);
@ -395,7 +434,7 @@ protected void scheduleContainer(Container container) {
// if the guaranteed container is queued, we need to preempt opportunistic
// containers for make room for it
if (queuedGuaranteedContainers.containsKey(container.getContainerId())) {
killOpportunisticContainers(container);
reclaimOpportunisticContainerResources(container);
}
} else {
// Given an opportunistic container, we first try to start as many queuing
@ -413,19 +452,30 @@ protected void scheduleContainer(Container container) {
}
}
private void killOpportunisticContainers(Container container) {
List<Container> extraOpportContainersToKill =
pickOpportunisticContainersToKill(container.getContainerId());
@SuppressWarnings("unchecked")
private void reclaimOpportunisticContainerResources(Container container) {
List<Container> extraOppContainersToReclaim =
pickOpportunisticContainersToReclaimResources(
container.getContainerId());
// Kill the opportunistic containers that were chosen.
for (Container contToKill : extraOpportContainersToKill) {
contToKill.sendKillEvent(
ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER,
"Container Killed to make room for Guaranteed Container.");
oppContainersToKill.put(contToKill.getContainerId(), contToKill);
for (Container contToReclaim : extraOppContainersToReclaim) {
String preemptionAction = usePauseEventForPreemption == true ? "paused" :
"resumed";
LOG.info(
"Opportunistic container {} will be killed in order to start the "
"Container {} will be {} to start the "
+ "execution of guaranteed container {}.",
contToKill.getContainerId(), container.getContainerId());
contToReclaim.getContainerId(), preemptionAction,
container.getContainerId());
if (usePauseEventForPreemption) {
contToReclaim.sendPauseEvent(
"Container Paused to make room for Guaranteed Container");
} else {
contToReclaim.sendKillEvent(
ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER,
"Container Killed to make room for Guaranteed Container.");
}
oppContainersToKill.put(contToReclaim.getContainerId(), contToReclaim);
}
}
@ -440,7 +490,7 @@ private void startContainer(Container container) {
container.sendLaunchEvent();
}
private List<Container> pickOpportunisticContainersToKill(
private List<Container> pickOpportunisticContainersToReclaimResources(
ContainerId containerToStartId) {
// The opportunistic containers that need to be killed for the
// given container to start.

View File

@ -23,6 +23,8 @@
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.security.UserGroupInformation;
@ -49,6 +51,7 @@
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
@ -124,18 +127,38 @@ public long getVCoresAllocatedForContainers() {
@Override
protected ContainerExecutor createContainerExecutor() {
DefaultContainerExecutor exec = new DefaultContainerExecutor() {
ConcurrentMap<String, Boolean> oversleepMap =
new ConcurrentHashMap<String, Boolean>();
@Override
public int launchContainer(ContainerStartContext ctx)
throws IOException, ConfigurationException {
oversleepMap.put(ctx.getContainer().getContainerId().toString(), false);
if (delayContainers) {
try {
Thread.sleep(10000);
if(oversleepMap.get(ctx.getContainer().getContainerId().toString())
== true) {
Thread.sleep(10000);
}
} catch (InterruptedException e) {
// Nothing..
}
}
return super.launchContainer(ctx);
}
@Override
public void pauseContainer(Container container) {
// To mimic pausing we force the container to be in the PAUSED state
// a little longer by oversleeping.
oversleepMap.put(container.getContainerId().toString(), true);
LOG.info("Container was paused");
}
@Override
public void resumeContainer(Container container) {
LOG.info("Container was resumed");
}
};
exec.setConf(conf);
return spy(exec);
@ -505,6 +528,86 @@ public void testKillOpportunisticForGuaranteedContainer() throws Exception {
contStatus1.getState());
}
/**
* Submit two OPPORTUNISTIC and one GUARANTEED containers. The resources
* requests by each container as such that only one can run in parallel.
* Thus, the OPPORTUNISTIC container that started running, will be
* paused for the GUARANTEED container to start.
* Once the GUARANTEED container finishes its execution, the remaining
* OPPORTUNISTIC container will be executed.
* @throws Exception
*/
@Test
public void testPauseOpportunisticForGuaranteedContainer() throws Exception {
containerManager.start();
containerManager.getContainerScheduler().
setUsePauseEventForPreemption(true);
ContainerLaunchContext containerLaunchContext =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
List<StartContainerRequest> list = new ArrayList<>();
list.add(StartContainerRequest.newInstance(
containerLaunchContext,
createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER,
context.getNodeId(),
user, BuilderUtils.newResource(2048, 1),
context.getContainerTokenSecretManager(), null,
ExecutionType.OPPORTUNISTIC)));
StartContainersRequest allRequests =
StartContainersRequest.newInstance(list);
containerManager.startContainers(allRequests);
BaseContainerManagerTest.waitForNMContainerState(containerManager,
createContainerId(0), ContainerState.RUNNING, 40);
list = new ArrayList<>();
list.add(StartContainerRequest.newInstance(
containerLaunchContext,
createContainerToken(createContainerId(1), DUMMY_RM_IDENTIFIER,
context.getNodeId(),
user, BuilderUtils.newResource(2048, 1),
context.getContainerTokenSecretManager(), null,
ExecutionType.GUARANTEED)));
allRequests =
StartContainersRequest.newInstance(list);
containerManager.startContainers(allRequests);
BaseContainerManagerTest.waitForNMContainerState(containerManager,
createContainerId(1), ContainerState.RUNNING, 40);
// Get container statuses. Container 0 should be paused, container 1
// should be running.
List<ContainerId> statList = new ArrayList<ContainerId>();
for (int i = 0; i < 2; i++) {
statList.add(createContainerId(i));
}
GetContainerStatusesRequest statRequest =
GetContainerStatusesRequest.newInstance(statList);
List<ContainerStatus> containerStatuses = containerManager
.getContainerStatuses(statRequest).getContainerStatuses();
for (ContainerStatus status : containerStatuses) {
if (status.getContainerId().equals(createContainerId(0))) {
Assert.assertTrue(status.getDiagnostics().contains(
"Container Paused to make room for Guaranteed Container"));
} else if (status.getContainerId().equals(createContainerId(1))) {
Assert.assertEquals(
org.apache.hadoop.yarn.api.records.ContainerState.RUNNING,
status.getState());
}
System.out.println("\nStatus : [" + status + "]\n");
}
// Make sure that the GUARANTEED container completes
BaseContainerManagerTest.waitForNMContainerState(containerManager,
createContainerId(1), ContainerState.DONE, 40);
// Make sure that the PAUSED opportunistic container resumes and
// starts running
BaseContainerManagerTest.waitForNMContainerState(containerManager,
createContainerId(0), ContainerState.DONE, 40);
}
/**
* 1. Submit a long running GUARANTEED container to hog all NM resources.
* 2. Submit 6 OPPORTUNISTIC containers, all of which will be queued.

View File

@ -245,4 +245,9 @@ public long getContainerStartTime() {
public ResourceMappings getResourceMappings() {
return null;
}
@Override
public void sendPauseEvent(String description) {
}
}