YARN-1643. Make ContainersMonitor support changing monitoring size of an allocated container. Contributed by Meng Ding and Wangda Tan

This commit is contained in:
Jian He 2015-08-05 15:19:33 -07:00 committed by Wangda Tan
parent 5f5a968d65
commit c59ae4eeb1
7 changed files with 615 additions and 76 deletions

View File

@ -212,6 +212,9 @@ Release 2.8.0 - UNRELEASED
YARN-3867. ContainerImpl changes to support container resizing. (Meng Ding
via jianhe)
YARN-1643. Make ContainersMonitor support changing monitoring size of an
allocated container. (Meng Ding and Wangda Tan)
IMPROVEMENTS
YARN-644. Basic null check is not performed on passed in arguments before

View File

@ -18,13 +18,11 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -32,12 +30,14 @@
import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils;
import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
@ -56,16 +56,16 @@ public class ContainersMonitorImpl extends AbstractService implements
private boolean containerMetricsEnabled;
private long containerMetricsPeriodMs;
final List<ContainerId> containersToBeRemoved;
final Map<ContainerId, ProcessTreeInfo> containersToBeAdded;
Map<ContainerId, ProcessTreeInfo> trackingContainers =
new HashMap<ContainerId, ProcessTreeInfo>();
@VisibleForTesting
final Map<ContainerId, ProcessTreeInfo> trackingContainers =
new ConcurrentHashMap<>();
final ContainerExecutor containerExecutor;
private final ContainerExecutor containerExecutor;
private final Dispatcher eventDispatcher;
private final Context context;
private ResourceCalculatorPlugin resourceCalculatorPlugin;
private Configuration conf;
private static float vmemRatio;
private Class<? extends ResourceCalculatorProcessTree> processTreeClass;
private long maxVmemAllottedForContainers = UNKNOWN_MEMORY_LIMIT;
@ -82,6 +82,8 @@ public class ContainersMonitorImpl extends AbstractService implements
private ResourceUtilization containersUtilization;
private volatile boolean stopped = false;
public ContainersMonitorImpl(ContainerExecutor exec,
AsyncDispatcher dispatcher, Context context) {
super("containers-monitor");
@ -90,8 +92,6 @@ public ContainersMonitorImpl(ContainerExecutor exec,
this.eventDispatcher = dispatcher;
this.context = context;
this.containersToBeAdded = new HashMap<ContainerId, ProcessTreeInfo>();
this.containersToBeRemoved = new ArrayList<ContainerId>();
this.monitoringThread = new MonitoringThread();
this.containersUtilization = ResourceUtilization.newInstance(0, 0, 0.0f);
@ -140,7 +140,7 @@ protected void serviceInit(Configuration conf) throws Exception {
this.maxVCoresAllottedForContainers = configuredVCoresForContainers;
// ///////// Virtual memory configuration //////
float vmemRatio = conf.getFloat(YarnConfiguration.NM_VMEM_PMEM_RATIO,
vmemRatio = conf.getFloat(YarnConfiguration.NM_VMEM_PMEM_RATIO,
YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
Preconditions.checkArgument(vmemRatio > 0.99f,
YarnConfiguration.NM_VMEM_PMEM_RATIO + " should be at least 1.0");
@ -218,6 +218,7 @@ protected void serviceStart() throws Exception {
@Override
protected void serviceStop() throws Exception {
if (containersMonitorEnabled) {
stopped = true;
this.monitoringThread.interrupt();
try {
this.monitoringThread.join();
@ -228,7 +229,8 @@ protected void serviceStop() throws Exception {
super.serviceStop();
}
private static class ProcessTreeInfo {
@VisibleForTesting
static class ProcessTreeInfo {
private ContainerId containerId;
private String pid;
private ResourceCalculatorProcessTree pTree;
@ -267,26 +269,43 @@ public void setProcessTree(ResourceCalculatorProcessTree pTree) {
this.pTree = pTree;
}
public long getVmemLimit() {
/**
* @return Virtual memory limit for the process tree in bytes
*/
public synchronized long getVmemLimit() {
return this.vmemLimit;
}
/**
* @return Physical memory limit for the process tree in bytes
*/
public long getPmemLimit() {
public synchronized long getPmemLimit() {
return this.pmemLimit;
}
/**
* Return the number of cpu vcores assigned
* @return
* @return Number of cpu vcores assigned
*/
public int getCpuVcores() {
public synchronized int getCpuVcores() {
return this.cpuVcores;
}
}
/**
* Set resource limit for enforcement
* @param pmemLimit
* Physical memory limit for the process tree in bytes
* @param vmemLimit
* Virtual memory limit for the process tree in bytes
* @param cpuVcores
* Number of cpu vcores assigned
*/
public synchronized void setResourceLimit(
long pmemLimit, long vmemLimit, int cpuVcores) {
this.pmemLimit = pmemLimit;
this.vmemLimit = vmemLimit;
this.cpuVcores = cpuVcores;
}
}
/**
* Check whether a container's process tree's current memory usage is over
@ -359,8 +378,7 @@ public MonitoringThread() {
@Override
public void run() {
while (true) {
while (!stopped && !Thread.currentThread().isInterrupted()) {
// Print the processTrees for debugging.
if (LOG.isDebugEnabled()) {
StringBuilder tmp = new StringBuilder("[ ");
@ -372,31 +390,6 @@ public void run() {
+ tmp.substring(0, tmp.length()) + "]");
}
// Add new containers
synchronized (containersToBeAdded) {
for (Entry<ContainerId, ProcessTreeInfo> entry : containersToBeAdded
.entrySet()) {
ContainerId containerId = entry.getKey();
ProcessTreeInfo processTreeInfo = entry.getValue();
LOG.info("Starting resource-monitoring for " + containerId);
trackingContainers.put(containerId, processTreeInfo);
}
containersToBeAdded.clear();
}
// Remove finished containers
synchronized (containersToBeRemoved) {
for (ContainerId containerId : containersToBeRemoved) {
if (containerMetricsEnabled) {
ContainerMetrics.forContainer(
containerId, containerMetricsPeriodMs).finished();
}
trackingContainers.remove(containerId);
LOG.info("Stopping resource-monitoring for " + containerId);
}
containersToBeRemoved.clear();
}
// Temporary structure to calculate the total resource utilization of
// the containers
ResourceUtilization trackedContainersUtilization =
@ -408,10 +401,8 @@ public void run() {
long pmemByAllContainers = 0;
long cpuUsagePercentPerCoreByAllContainers = 0;
long cpuUsageTotalCoresByAllContainers = 0;
for (Iterator<Map.Entry<ContainerId, ProcessTreeInfo>> it =
trackingContainers.entrySet().iterator(); it.hasNext();) {
Map.Entry<ContainerId, ProcessTreeInfo> entry = it.next();
for (Entry<ContainerId, ProcessTreeInfo> entry : trackingContainers
.entrySet()) {
ContainerId containerId = entry.getKey();
ProcessTreeInfo ptInfo = entry.getValue();
try {
@ -435,11 +426,6 @@ public void run() {
if (containerMetricsEnabled) {
ContainerMetrics usageMetrics = ContainerMetrics
.forContainer(containerId, containerMetricsPeriodMs);
int cpuVcores = ptInfo.getCpuVcores();
final int vmemLimit = (int) (ptInfo.getVmemLimit() >> 20);
final int pmemLimit = (int) (ptInfo.getPmemLimit() >> 20);
usageMetrics.recordResourceLimit(
vmemLimit, pmemLimit, cpuVcores);
usageMetrics.recordProcessId(pId);
}
}
@ -548,7 +534,7 @@ && isProcessTreeOverLimit(containerId.toString(),
eventDispatcher.getEventHandler().handle(
new ContainerKillEvent(containerId,
containerExitStatus, msg));
it.remove();
trackingContainers.remove(containerId);
LOG.info("Removed ProcessTree with root " + pId);
}
} catch (Exception e) {
@ -605,6 +591,60 @@ private String formatUsageString(long currentVmemUsage, long vmemLimit,
}
}
private void changeContainerResource(
ContainerId containerId, Resource resource) {
Container container = context.getContainers().get(containerId);
// Check container existence
if (container == null) {
LOG.warn("Container " + containerId.toString() + "does not exist");
return;
}
container.setResource(resource);
}
private void updateContainerMetrics(ContainersMonitorEvent monitoringEvent) {
if (!containerMetricsEnabled || monitoringEvent == null) {
return;
}
ContainerId containerId = monitoringEvent.getContainerId();
ContainerMetrics usageMetrics = ContainerMetrics
.forContainer(containerId, containerMetricsPeriodMs);
int vmemLimitMBs;
int pmemLimitMBs;
int cpuVcores;
switch (monitoringEvent.getType()) {
case START_MONITORING_CONTAINER:
ContainerStartMonitoringEvent startEvent =
(ContainerStartMonitoringEvent) monitoringEvent;
usageMetrics.recordStateChangeDurations(
startEvent.getLaunchDuration(),
startEvent.getLocalizationDuration());
cpuVcores = startEvent.getCpuVcores();
vmemLimitMBs = (int) (startEvent.getVmemLimit() >> 20);
pmemLimitMBs = (int) (startEvent.getPmemLimit() >> 20);
usageMetrics.recordResourceLimit(
vmemLimitMBs, pmemLimitMBs, cpuVcores);
break;
case STOP_MONITORING_CONTAINER:
usageMetrics.finished();
break;
case CHANGE_MONITORING_CONTAINER_RESOURCE:
ChangeMonitoringContainerResourceEvent changeEvent =
(ChangeMonitoringContainerResourceEvent) monitoringEvent;
Resource resource = changeEvent.getResource();
pmemLimitMBs = resource.getMemory();
vmemLimitMBs = (int) (pmemLimitMBs * vmemRatio);
cpuVcores = resource.getVirtualCores();
usageMetrics.recordResourceLimit(
vmemLimitMBs, pmemLimitMBs, cpuVcores);
break;
default:
break;
}
}
@Override
public long getVmemAllocatedForContainers() {
return this.maxVmemAllottedForContainers;
@ -650,38 +690,53 @@ public void setContainersUtilization(ResourceUtilization utilization) {
}
@Override
@SuppressWarnings("unchecked")
public void handle(ContainersMonitorEvent monitoringEvent) {
ContainerId containerId = monitoringEvent.getContainerId();
if (!containersMonitorEnabled) {
if (monitoringEvent.getType() == ContainersMonitorEventType
.CHANGE_MONITORING_CONTAINER_RESOURCE) {
// Nothing to enforce. Update container resource immediately.
ChangeMonitoringContainerResourceEvent changeEvent =
(ChangeMonitoringContainerResourceEvent) monitoringEvent;
changeContainerResource(containerId, changeEvent.getResource());
}
return;
}
ContainerId containerId = monitoringEvent.getContainerId();
switch (monitoringEvent.getType()) {
case START_MONITORING_CONTAINER:
ContainerStartMonitoringEvent startEvent =
(ContainerStartMonitoringEvent) monitoringEvent;
if (containerMetricsEnabled) {
ContainerMetrics usageMetrics = ContainerMetrics
.forContainer(containerId, containerMetricsPeriodMs);
usageMetrics.recordStateChangeDurations(
startEvent.getLaunchDuration(),
startEvent.getLocalizationDuration());
}
synchronized (this.containersToBeAdded) {
ProcessTreeInfo processTreeInfo =
LOG.info("Starting resource-monitoring for " + containerId);
updateContainerMetrics(monitoringEvent);
trackingContainers.put(containerId,
new ProcessTreeInfo(containerId, null, null,
startEvent.getVmemLimit(), startEvent.getPmemLimit(),
startEvent.getCpuVcores());
this.containersToBeAdded.put(containerId, processTreeInfo);
}
startEvent.getCpuVcores()));
break;
case STOP_MONITORING_CONTAINER:
synchronized (this.containersToBeRemoved) {
this.containersToBeRemoved.add(containerId);
LOG.info("Stopping resource-monitoring for " + containerId);
updateContainerMetrics(monitoringEvent);
trackingContainers.remove(containerId);
break;
case CHANGE_MONITORING_CONTAINER_RESOURCE:
ChangeMonitoringContainerResourceEvent changeEvent =
(ChangeMonitoringContainerResourceEvent) monitoringEvent;
ProcessTreeInfo processTreeInfo = trackingContainers.get(containerId);
if (processTreeInfo == null) {
LOG.warn("Failed to track container "
+ containerId.toString()
+ ". It may have already completed.");
break;
}
LOG.info("Changing resource-monitoring for " + containerId);
updateContainerMetrics(monitoringEvent);
long pmemLimit = changeEvent.getResource().getMemory() * 1024L * 1024L;
long vmemLimit = (long) (pmemLimit * vmemRatio);
int cpuVcores = changeEvent.getResource().getVirtualCores();
processTreeInfo.setResourceLimit(pmemLimit, vmemLimit, cpuVcores);
changeContainerResource(containerId, changeEvent.getResource());
break;
default:
// TODO: Wrong event.

View File

@ -211,6 +211,17 @@ public void testIncreaseContainerResourceWithInvalidResource() throws Exception
super.testIncreaseContainerResourceWithInvalidResource();
}
@Override
public void testChangeContainerResource() throws Exception {
// Don't run the test if the binary is not available.
if (!shouldRunTest()) {
LOG.info("LCE binary path is not passed. Not running the test");
return;
}
LOG.info("Running testChangeContainerResource");
super.testChangeContainerResource();
}
private boolean shouldRunTest() {
return System
.getProperty(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH) != null;

View File

@ -1046,6 +1046,102 @@ public void testIncreaseContainerResourceWithInvalidResource() throws Exception
}
}
@Test
public void testChangeContainerResource() throws Exception {
containerManager.start();
File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile");
PrintWriter fileWriter = new PrintWriter(scriptFile);
// Construct the Container-id
ContainerId cId = createContainerId(0);
if (Shell.WINDOWS) {
fileWriter.println("@ping -n 100 127.0.0.1 >nul");
} else {
fileWriter.write("\numask 0");
fileWriter.write("\nexec sleep 100");
}
fileWriter.close();
ContainerLaunchContext containerLaunchContext =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
URL resource_alpha =
ConverterUtils.getYarnUrlFromPath(localFS
.makeQualified(new Path(scriptFile.getAbsolutePath())));
LocalResource rsrc_alpha =
recordFactory.newRecordInstance(LocalResource.class);
rsrc_alpha.setResource(resource_alpha);
rsrc_alpha.setSize(-1);
rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION);
rsrc_alpha.setType(LocalResourceType.FILE);
rsrc_alpha.setTimestamp(scriptFile.lastModified());
String destinationFile = "dest_file";
Map<String, LocalResource> localResources =
new HashMap<String, LocalResource>();
localResources.put(destinationFile, rsrc_alpha);
containerLaunchContext.setLocalResources(localResources);
List<String> commands =
Arrays.asList(Shell.getRunScriptCommand(scriptFile));
containerLaunchContext.setCommands(commands);
StartContainerRequest scRequest =
StartContainerRequest.newInstance(
containerLaunchContext,
createContainerToken(cId, DUMMY_RM_IDENTIFIER,
context.getNodeId(), user,
context.getContainerTokenSecretManager()));
List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
list.add(scRequest);
StartContainersRequest allRequests =
StartContainersRequest.newInstance(list);
containerManager.startContainers(allRequests);
// Make sure the container reaches RUNNING state
BaseContainerManagerTest.waitForNMContainerState(containerManager, cId,
org.apache.hadoop.yarn.server.nodemanager.
containermanager.container.ContainerState.RUNNING);
// Construct container resource increase request,
List<Token> increaseTokens = new ArrayList<Token>();
// Add increase request.
Resource targetResource = Resource.newInstance(4096, 2);
Token containerToken = createContainerToken(cId, DUMMY_RM_IDENTIFIER,
context.getNodeId(), user, targetResource,
context.getContainerTokenSecretManager(), null);
increaseTokens.add(containerToken);
IncreaseContainersResourceRequest increaseRequest =
IncreaseContainersResourceRequest.newInstance(increaseTokens);
IncreaseContainersResourceResponse increaseResponse =
containerManager.increaseContainersResource(increaseRequest);
Assert.assertEquals(
1, increaseResponse.getSuccessfullyIncreasedContainers().size());
Assert.assertTrue(increaseResponse.getFailedRequests().isEmpty());
// Check status
List<ContainerId> containerIds = new ArrayList<>();
containerIds.add(cId);
GetContainerStatusesRequest gcsRequest =
GetContainerStatusesRequest.newInstance(containerIds);
ContainerStatus containerStatus = containerManager
.getContainerStatuses(gcsRequest).getContainerStatuses().get(0);
// Check status immediately as resource increase is blocking
assertEquals(targetResource, containerStatus.getCapability());
// Simulate a decrease request
List<org.apache.hadoop.yarn.api.records.Container> containersToDecrease
= new ArrayList<>();
targetResource = Resource.newInstance(2048, 2);
org.apache.hadoop.yarn.api.records.Container decreasedContainer =
org.apache.hadoop.yarn.api.records.Container
.newInstance(cId, null, null, targetResource, null, null);
containersToDecrease.add(decreasedContainer);
containerManager.handle(
new CMgrDecreaseContainersResourceEvent(containersToDecrease));
// Check status with retry
containerStatus = containerManager
.getContainerStatuses(gcsRequest).getContainerStatuses().get(0);
int retry = 0;
while (!targetResource.equals(containerStatus.getCapability()) &&
(retry++ < 5)) {
Thread.sleep(200);
containerStatus = containerManager.getContainerStatuses(gcsRequest)
.getContainerStatuses().get(0);
}
assertEquals(targetResource, containerStatus.getCapability());
}
public static Token createContainerToken(ContainerId cId, long rmIdentifier,
NodeId nodeId, String user,
NMContainerTokenSecretManager containerTokenSecretManager)

View File

@ -0,0 +1,69 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
public class MockResourceCalculatorPlugin extends ResourceCalculatorPlugin {
@Override
public long getVirtualMemorySize() {
return 0;
}
@Override
public long getPhysicalMemorySize() {
return 0;
}
@Override
public long getAvailableVirtualMemorySize() {
return 0;
}
@Override
public long getAvailablePhysicalMemorySize() {
return 0;
}
@Override
public int getNumProcessors() {
return 0;
}
@Override
public int getNumCores() {
return 0;
}
@Override
public long getCpuFrequency() {
return 0;
}
@Override
public long getCumulativeCpuTime() {
return 0;
}
@Override
public float getCpuUsage() {
return 0;
}
}

View File

@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
public class MockResourceCalculatorProcessTree extends ResourceCalculatorProcessTree {
private long rssMemorySize = 0;
public MockResourceCalculatorProcessTree(String root) {
super(root);
}
@Override
public void updateProcessTree() {
}
@Override
public String getProcessTreeDump() {
return "";
}
@Override
public long getCumulativeCpuTime() {
return 0;
}
@Override
public boolean checkPidPgrpidForMatch() {
return true;
}
public void setRssMemorySize(long rssMemorySize) {
this.rssMemorySize = rssMemorySize;
}
public long getRssMemorySize() {
return this.rssMemorySize;
}
}

View File

@ -0,0 +1,248 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl.ProcessTreeInfo;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerLivenessContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
public class TestContainersMonitorResourceChange {
private ContainersMonitorImpl containersMonitor;
private MockExecutor executor;
private Configuration conf;
private AsyncDispatcher dispatcher;
private Context context;
private MockContainerEventHandler containerEventHandler;
private static class MockExecutor extends ContainerExecutor {
@Override
public void init() throws IOException {
}
@Override
public void startLocalizer(LocalizerStartContext ctx)
throws IOException, InterruptedException {
}
@Override
public int launchContainer(ContainerStartContext ctx) throws
IOException {
return 0;
}
@Override
public boolean signalContainer(ContainerSignalContext ctx)
throws IOException {
return true;
}
@Override
public void deleteAsUser(DeletionAsUserContext ctx)
throws IOException, InterruptedException {
}
@Override
public String getProcessId(ContainerId containerId) {
return String.valueOf(containerId.getContainerId());
}
@Override
public boolean isContainerAlive(ContainerLivenessContext ctx)
throws IOException {
return true;
}
}
private static class MockContainerEventHandler implements
EventHandler<ContainerEvent> {
final private Set<ContainerId> killedContainer
= new HashSet<>();
@Override
public void handle(ContainerEvent event) {
if (event.getType() == ContainerEventType.KILL_CONTAINER) {
synchronized (killedContainer) {
killedContainer.add(event.getContainerID());
}
}
}
public boolean isContainerKilled(ContainerId containerId) {
synchronized (killedContainer) {
return killedContainer.contains(containerId);
}
}
}
@Before
public void setup() {
executor = new MockExecutor();
dispatcher = new AsyncDispatcher();
context = Mockito.mock(Context.class);
Mockito.doReturn(new ConcurrentSkipListMap<ContainerId, Container>())
.when(context).getContainers();
conf = new Configuration();
conf.set(
YarnConfiguration.NM_CONTAINER_MON_RESOURCE_CALCULATOR,
MockResourceCalculatorPlugin.class.getCanonicalName());
conf.set(
YarnConfiguration.NM_CONTAINER_MON_PROCESS_TREE,
MockResourceCalculatorProcessTree.class.getCanonicalName());
dispatcher.init(conf);
dispatcher.start();
containerEventHandler = new MockContainerEventHandler();
dispatcher.register(ContainerEventType.class, containerEventHandler);
}
@After
public void tearDown() throws Exception {
if (containersMonitor != null) {
containersMonitor.stop();
}
if (dispatcher != null) {
dispatcher.stop();
}
}
@Test
public void testContainersResourceChange() throws Exception {
// set container monitor interval to be 20ms
conf.setLong(YarnConfiguration.NM_CONTAINER_MON_INTERVAL_MS, 20L);
containersMonitor = createContainersMonitor(executor, dispatcher, context);
containersMonitor.init(conf);
containersMonitor.start();
// create container 1
containersMonitor.handle(new ContainerStartMonitoringEvent(
getContainerId(1), 2100L, 1000L, 1, 0, 0));
// verify that this container is properly tracked
assertNotNull(getProcessTreeInfo(getContainerId(1)));
assertEquals(1000L, getProcessTreeInfo(getContainerId(1))
.getPmemLimit());
assertEquals(2100L, getProcessTreeInfo(getContainerId(1))
.getVmemLimit());
// sleep longer than the monitor interval to make sure resource
// enforcement has started
Thread.sleep(200);
// increase pmem usage, the container should be killed
MockResourceCalculatorProcessTree mockTree =
(MockResourceCalculatorProcessTree) getProcessTreeInfo(
getContainerId(1)).getProcessTree();
mockTree.setRssMemorySize(2500L);
// verify that this container is killed
Thread.sleep(200);
assertTrue(containerEventHandler
.isContainerKilled(getContainerId(1)));
// create container 2
containersMonitor.handle(new ContainerStartMonitoringEvent(
getContainerId(2), 2202009L, 1048576L, 1, 0, 0));
// verify that this container is properly tracked
assertNotNull(getProcessTreeInfo(getContainerId(2)));
assertEquals(1048576L, getProcessTreeInfo(getContainerId(2))
.getPmemLimit());
assertEquals(2202009L, getProcessTreeInfo(getContainerId(2))
.getVmemLimit());
// trigger a change resource event, check limit after change
containersMonitor.handle(new ChangeMonitoringContainerResourceEvent(
getContainerId(2), Resource.newInstance(2, 1)));
assertEquals(2097152L, getProcessTreeInfo(getContainerId(2))
.getPmemLimit());
assertEquals(4404019L, getProcessTreeInfo(getContainerId(2))
.getVmemLimit());
// sleep longer than the monitor interval to make sure resource
// enforcement has started
Thread.sleep(200);
// increase pmem usage, the container should NOT be killed
mockTree =
(MockResourceCalculatorProcessTree) getProcessTreeInfo(
getContainerId(2)).getProcessTree();
mockTree.setRssMemorySize(2000000L);
// verify that this container is not killed
Thread.sleep(200);
assertFalse(containerEventHandler
.isContainerKilled(getContainerId(2)));
containersMonitor.stop();
}
@Test
public void testContainersResourceChangeIsTriggeredImmediately()
throws Exception {
// set container monitor interval to be 20s
conf.setLong(YarnConfiguration.NM_CONTAINER_MON_INTERVAL_MS, 20000L);
containersMonitor = createContainersMonitor(executor, dispatcher, context);
containersMonitor.init(conf);
containersMonitor.start();
// sleep 1 second to make sure the container monitor thread is
// now waiting for the next monitor cycle
Thread.sleep(1000);
// create a container with id 3
containersMonitor.handle(new ContainerStartMonitoringEvent(
getContainerId(3), 2202009L, 1048576L, 1, 0, 0));
// Verify that this container has been tracked
assertNotNull(getProcessTreeInfo(getContainerId(3)));
// trigger a change resource event, check limit after change
containersMonitor.handle(new ChangeMonitoringContainerResourceEvent(
getContainerId(3), Resource.newInstance(2, 1)));
// verify that this container has been properly tracked with the
// correct size
assertEquals(2097152L, getProcessTreeInfo(getContainerId(3))
.getPmemLimit());
assertEquals(4404019L, getProcessTreeInfo(getContainerId(3))
.getVmemLimit());
containersMonitor.stop();
}
private ContainersMonitorImpl createContainersMonitor(
ContainerExecutor containerExecutor, AsyncDispatcher dispatcher,
Context context) {
return new ContainersMonitorImpl(containerExecutor, dispatcher, context);
}
private ContainerId getContainerId(int id) {
return ContainerId.newContainerId(ApplicationAttemptId.newInstance(
ApplicationId.newInstance(123456L, 1), 1), id);
}
private ProcessTreeInfo getProcessTreeInfo(ContainerId id) {
return containersMonitor.trackingContainers.get(id);
}
}