YARN-7715. Support NM promotion/demotion of running containers. (Miklos Szegedi via Haibo Chen)

This commit is contained in:
Haibo Chen 2018-05-10 11:01:01 -07:00
parent 274eee3284
commit 6341c3a437
15 changed files with 202 additions and 64 deletions

View File

@ -155,6 +155,12 @@ public List<PrivilegedOperation> reacquireContainer(ContainerId containerId)
return null;
}
@Override
public List<PrivilegedOperation> updateContainer(Container container)
throws ResourceHandlerException {
return null;
}
@Override
public List<PrivilegedOperation> postComplete(ContainerId containerId)
throws ResourceHandlerException {

View File

@ -186,38 +186,8 @@ public static int[] getOverallLimits(float yarnProcessors) {
public List<PrivilegedOperation> preStart(Container container)
throws ResourceHandlerException {
String cgroupId = container.getContainerId().toString();
Resource containerResource = container.getResource();
cGroupsHandler.createCGroup(CPU, cgroupId);
try {
int containerVCores = containerResource.getVirtualCores();
ContainerTokenIdentifier id = container.getContainerTokenIdentifier();
if (id != null && id.getExecutionType() ==
ExecutionType.OPPORTUNISTIC) {
cGroupsHandler
.updateCGroupParam(CPU, cgroupId, CGroupsHandler.CGROUP_CPU_SHARES,
String.valueOf(CPU_DEFAULT_WEIGHT_OPPORTUNISTIC));
} else {
int cpuShares = CPU_DEFAULT_WEIGHT * containerVCores;
cGroupsHandler
.updateCGroupParam(CPU, cgroupId, CGroupsHandler.CGROUP_CPU_SHARES,
String.valueOf(cpuShares));
}
if (strictResourceUsageMode) {
if (nodeVCores != containerVCores) {
float containerCPU =
(containerVCores * yarnProcessors) / (float) nodeVCores;
int[] limits = getOverallLimits(containerCPU);
cGroupsHandler.updateCGroupParam(CPU, cgroupId,
CGroupsHandler.CGROUP_CPU_PERIOD_US, String.valueOf(limits[0]));
cGroupsHandler.updateCGroupParam(CPU, cgroupId,
CGroupsHandler.CGROUP_CPU_QUOTA_US, String.valueOf(limits[1]));
}
}
} catch (ResourceHandlerException re) {
cGroupsHandler.deleteCGroup(CPU, cgroupId);
LOG.warn("Could not update cgroup for container", re);
throw re;
}
updateContainer(container);
List<PrivilegedOperation> ret = new ArrayList<>();
ret.add(new PrivilegedOperation(
PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP,
@ -232,6 +202,49 @@ public List<PrivilegedOperation> reacquireContainer(ContainerId containerId)
return null;
}
@Override
public List<PrivilegedOperation> updateContainer(Container container)
throws ResourceHandlerException {
Resource containerResource = container.getResource();
String cgroupId = container.getContainerId().toString();
File cgroup = new File(cGroupsHandler.getPathForCGroup(CPU, cgroupId));
if (cgroup.exists()) {
try {
int containerVCores = containerResource.getVirtualCores();
ContainerTokenIdentifier id = container.getContainerTokenIdentifier();
if (id != null && id.getExecutionType() ==
ExecutionType.OPPORTUNISTIC) {
cGroupsHandler
.updateCGroupParam(CPU, cgroupId,
CGroupsHandler.CGROUP_CPU_SHARES,
String.valueOf(CPU_DEFAULT_WEIGHT_OPPORTUNISTIC));
} else {
int cpuShares = CPU_DEFAULT_WEIGHT * containerVCores;
cGroupsHandler
.updateCGroupParam(CPU, cgroupId,
CGroupsHandler.CGROUP_CPU_SHARES,
String.valueOf(cpuShares));
}
if (strictResourceUsageMode) {
if (nodeVCores != containerVCores) {
float containerCPU =
(containerVCores * yarnProcessors) / (float) nodeVCores;
int[] limits = getOverallLimits(containerCPU);
cGroupsHandler.updateCGroupParam(CPU, cgroupId,
CGroupsHandler.CGROUP_CPU_PERIOD_US, String.valueOf(limits[0]));
cGroupsHandler.updateCGroupParam(CPU, cgroupId,
CGroupsHandler.CGROUP_CPU_QUOTA_US, String.valueOf(limits[1]));
}
}
} catch (ResourceHandlerException re) {
cGroupsHandler.deleteCGroup(CPU, cgroupId);
LOG.warn("Could not update cgroup for container", re);
throw re;
}
}
return null;
}
@Override
public List<PrivilegedOperation> postComplete(ContainerId containerId)
throws ResourceHandlerException {

View File

@ -31,6 +31,7 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
@ -119,43 +120,53 @@ public List<PrivilegedOperation> reacquireContainer(ContainerId containerId)
}
@Override
public List<PrivilegedOperation> preStart(Container container)
public List<PrivilegedOperation> updateContainer(Container container)
throws ResourceHandlerException {
String cgroupId = container.getContainerId().toString();
//memory is in MB
long containerSoftLimit =
(long) (container.getResource().getMemorySize() * this.softLimit);
long containerHardLimit = container.getResource().getMemorySize();
cGroupsHandler.createCGroup(MEMORY, cgroupId);
if (enforce) {
try {
cGroupsHandler.updateCGroupParam(MEMORY, cgroupId,
CGroupsHandler.CGROUP_PARAM_MEMORY_HARD_LIMIT_BYTES,
String.valueOf(containerHardLimit) + "M");
ContainerTokenIdentifier id = container.getContainerTokenIdentifier();
if (id != null && id.getExecutionType() ==
ExecutionType.OPPORTUNISTIC) {
File cgroup = new File(cGroupsHandler.getPathForCGroup(MEMORY, cgroupId));
if (cgroup.exists()) {
//memory is in MB
long containerSoftLimit =
(long) (container.getResource().getMemorySize() * this.softLimit);
long containerHardLimit = container.getResource().getMemorySize();
if (enforce) {
try {
cGroupsHandler.updateCGroupParam(MEMORY, cgroupId,
CGroupsHandler.CGROUP_PARAM_MEMORY_SOFT_LIMIT_BYTES,
String.valueOf(OPPORTUNISTIC_SOFT_LIMIT) + "M");
cGroupsHandler.updateCGroupParam(MEMORY, cgroupId,
CGroupsHandler.CGROUP_PARAM_MEMORY_SWAPPINESS,
String.valueOf(OPPORTUNISTIC_SWAPPINESS));
} else {
cGroupsHandler.updateCGroupParam(MEMORY, cgroupId,
CGroupsHandler.CGROUP_PARAM_MEMORY_SOFT_LIMIT_BYTES,
String.valueOf(containerSoftLimit) + "M");
cGroupsHandler.updateCGroupParam(MEMORY, cgroupId,
CGroupsHandler.CGROUP_PARAM_MEMORY_SWAPPINESS,
String.valueOf(swappiness));
CGroupsHandler.CGROUP_PARAM_MEMORY_HARD_LIMIT_BYTES,
String.valueOf(containerHardLimit) + "M");
ContainerTokenIdentifier id = container.getContainerTokenIdentifier();
if (id != null && id.getExecutionType() ==
ExecutionType.OPPORTUNISTIC) {
cGroupsHandler.updateCGroupParam(MEMORY, cgroupId,
CGroupsHandler.CGROUP_PARAM_MEMORY_SOFT_LIMIT_BYTES,
String.valueOf(OPPORTUNISTIC_SOFT_LIMIT) + "M");
cGroupsHandler.updateCGroupParam(MEMORY, cgroupId,
CGroupsHandler.CGROUP_PARAM_MEMORY_SWAPPINESS,
String.valueOf(OPPORTUNISTIC_SWAPPINESS));
} else {
cGroupsHandler.updateCGroupParam(MEMORY, cgroupId,
CGroupsHandler.CGROUP_PARAM_MEMORY_SOFT_LIMIT_BYTES,
String.valueOf(containerSoftLimit) + "M");
cGroupsHandler.updateCGroupParam(MEMORY, cgroupId,
CGroupsHandler.CGROUP_PARAM_MEMORY_SWAPPINESS,
String.valueOf(swappiness));
}
} catch (ResourceHandlerException re) {
cGroupsHandler.deleteCGroup(MEMORY, cgroupId);
LOG.warn("Could not update cgroup for container", re);
throw re;
}
} catch (ResourceHandlerException re) {
cGroupsHandler.deleteCGroup(MEMORY, cgroupId);
LOG.warn("Could not update cgroup for container", re);
throw re;
}
}
return null;
}
@Override
public List<PrivilegedOperation> preStart(Container container)
throws ResourceHandlerException {
String cgroupId = container.getContainerId().toString();
cGroupsHandler.createCGroup(MEMORY, cgroupId);
updateContainer(container);
List<PrivilegedOperation> ret = new ArrayList<>();
ret.add(new PrivilegedOperation(
PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP,

View File

@ -128,6 +128,12 @@ public List<PrivilegedOperation> reacquireContainer(ContainerId containerId)
return null;
}
@Override
public List<PrivilegedOperation> updateContainer(Container container)
throws ResourceHandlerException {
return null;
}
/**
* Cleanup operation once container is completed - deletes cgroup.
*

View File

@ -61,7 +61,7 @@ List<PrivilegedOperation> preStart(Container container)
/**
* Require state for container that was already launched
*
* @param containerId if of the container being reacquired.
* @param containerId id of the container being reacquired.
* @return (possibly empty) list of operations that require elevated
* privileges
* @throws ResourceHandlerException
@ -71,7 +71,19 @@ List<PrivilegedOperation> reacquireContainer(ContainerId containerId)
throws ResourceHandlerException;
/**
* Perform any tasks necessary after container completion
* Update state for container that was already launched
*
* @param container the container being updated.
* @return (possibly empty) list of operations that require elevated
* privileges
* @throws ResourceHandlerException
*/
List<PrivilegedOperation> updateContainer(Container container)
throws ResourceHandlerException;
/**
* Perform any tasks necessary after container completion.
* @param containerId of the container that was completed.
* @return (possibly empty) list of operations that require elevated
* privileges

View File

@ -100,6 +100,24 @@ public List<PrivilegedOperation> reacquireContainer(ContainerId containerId)
return allOperations;
}
@Override
public List<PrivilegedOperation> updateContainer(Container container)
throws ResourceHandlerException {
List<PrivilegedOperation> allOperations = new
ArrayList<PrivilegedOperation>();
for (ResourceHandler resourceHandler : resourceHandlers) {
List<PrivilegedOperation> handlerOperations =
resourceHandler.updateContainer(container);
if (handlerOperations != null) {
allOperations.addAll(handlerOperations);
}
}
return allOperations;
}
@Override
public List<PrivilegedOperation> postComplete(ContainerId containerId)
throws ResourceHandlerException {

View File

@ -203,6 +203,12 @@ public List<PrivilegedOperation> reacquireContainer(ContainerId containerId)
return null;
}
@Override
public List<PrivilegedOperation> updateContainer(Container container)
throws ResourceHandlerException {
return null;
}
/**
* Returns total bytes sent per container to be used for metrics tracking
* purposes.

View File

@ -205,6 +205,12 @@ public List<PrivilegedOperation> reacquireContainer(ContainerId containerId) thr
return null;
}
@Override
public List<PrivilegedOperation> updateContainer(Container container)
throws ResourceHandlerException {
return null;
}
@Override
public List<PrivilegedOperation> postComplete(ContainerId containerId) throws ResourceHandlerException {
allocator.cleanupAssignFpgas(containerId.toString());

View File

@ -164,6 +164,12 @@ public List<PrivilegedOperation> reacquireContainer(ContainerId containerId)
return null;
}
@Override
public List<PrivilegedOperation> updateContainer(Container container)
throws ResourceHandlerException {
return null;
}
@Override
public synchronized List<PrivilegedOperation> postComplete(
ContainerId containerId) throws ResourceHandlerException {

View File

@ -94,6 +94,12 @@ public List<PrivilegedOperation> reacquireContainer(ContainerId containerId)
return null;
}
@Override
public List<PrivilegedOperation> updateContainer(Container container)
throws ResourceHandlerException {
return null;
}
@Override
public List<PrivilegedOperation> postComplete(ContainerId containerId)
throws ResourceHandlerException {

View File

@ -34,6 +34,9 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerChain;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerModule;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor
.ChangeMonitoringContainerResourceEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
@ -105,6 +108,9 @@ public class ContainerScheduler extends AbstractService implements
private Boolean usePauseEventForPreemption = false;
@VisibleForTesting
ResourceHandlerChain resourceHandlerChain = null;
/**
* Instantiate a Container Scheduler.
* @param context NodeManager Context.
@ -123,6 +129,24 @@ public ContainerScheduler(Context context, AsyncDispatcher dispatcher,
@Override
public void serviceInit(Configuration conf) throws Exception {
super.serviceInit(conf);
try {
if (resourceHandlerChain == null) {
resourceHandlerChain = ResourceHandlerModule
.getConfiguredResourceHandlerChain(conf, context);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Resource handler chain enabled = " + (resourceHandlerChain
!= null));
}
if (resourceHandlerChain != null) {
LOG.debug("Bootstrapping resource handler chain");
resourceHandlerChain.bootstrap(conf);
}
} catch (ResourceHandlerException e) {
LOG.error("Failed to bootstrap configured resource subsystems! ", e);
throw new IOException(
"Failed to bootstrap configured resource subsystems!");
}
this.usePauseEventForPreemption =
conf.getBoolean(
YarnConfiguration.NM_CONTAINER_QUEUING_USE_PAUSE_FOR_PREEMPTION,
@ -218,6 +242,12 @@ private void onUpdateContainer(UpdateContainerSchedulerEvent updateEvent) {
updateEvent.getContainer());
}
}
try {
resourceHandlerChain.updateContainer(updateEvent.getContainer());
} catch (Exception ex) {
LOG.warn(String.format("Could not update resources on " +
"continer update of %s", containerId), ex);
}
startPendingContainers(maxOppQueueLength <= 0);
}
}

View File

@ -49,6 +49,7 @@ public class TestCGroupsCpuResourceHandlerImpl {
@Before
public void setup() {
mockCGroupsHandler = mock(CGroupsHandler.class);
when(mockCGroupsHandler.getPathForCGroup(any(), any())).thenReturn(".");
cGroupsCpuResourceHandler =
new CGroupsCpuResourceHandlerImpl(mockCGroupsHandler);

View File

@ -45,6 +45,7 @@ public class TestCGroupsMemoryResourceHandlerImpl {
@Before
public void setup() {
mockCGroupsHandler = mock(CGroupsHandler.class);
when(mockCGroupsHandler.getPathForCGroup(any(), any())).thenReturn(".");
cGroupsMemoryResourceHandler =
new CGroupsMemoryResourceHandlerImpl(mockCGroupsHandler);
}

View File

@ -116,6 +116,12 @@ public List<PrivilegedOperation> reacquireContainer(ContainerId containerId)
return null;
}
@Override
public List<PrivilegedOperation> updateContainer(Container container)
throws ResourceHandlerException {
return null;
}
@Override
public List<PrivilegedOperation> postComplete(ContainerId containerId)
throws ResourceHandlerException {

View File

@ -64,6 +64,7 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerChain;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
@ -72,7 +73,11 @@
import org.junit.Test;
import org.slf4j.LoggerFactory;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
/**
* Tests to verify that the {@link ContainerScheduler} is able to queue and
@ -1183,6 +1188,8 @@ public void testPromotionOfOpportunisticContainers() throws Exception {
ContainerScheduler containerScheduler =
containerManager.getContainerScheduler();
containerScheduler.resourceHandlerChain =
mock(ResourceHandlerChain.class);
// Ensure two containers are properly queued.
Assert.assertEquals(1, containerScheduler.getNumQueuedContainers());
Assert.assertEquals(0,
@ -1246,6 +1253,9 @@ public void testPromotionOfOpportunisticContainers() throws Exception {
ContainerEventType.INIT_CONTAINER,
ContainerEventType.UPDATE_CONTAINER_TOKEN,
ContainerEventType.CONTAINER_LAUNCHED), containerEventTypes);
verify(containerScheduler.resourceHandlerChain,
times(1))
.updateContainer(any());
}
@Test