YARN-2531. Added a configuration for admins to be able to override app-configs and enforce/not-enforce strict control of per-container cpu usage. Contributed by Varun Vasudev.

This commit is contained in:
Vinod Kumar Vavilapalli 2014-09-16 10:14:46 -07:00
parent 0c26412be4
commit 9f6891d9ef
5 changed files with 189 additions and 29 deletions

View File

@ -84,6 +84,10 @@ Release 2.6.0 - UNRELEASED
failures should be ignored towards counting max-attempts. (Xuan Gong via
vinodkv)
YARN-2531. Added a configuration for admins to be able to override app-configs
and enforce/not-enforce strict control of per-container cpu usage. (Varun
Vasudev via vinodkv)
IMPROVEMENTS
YARN-2197. Add a link to YARN CHANGES.txt in the left side of doc

View File

@ -902,6 +902,16 @@ public class YarnConfiguration extends Configuration {
public static final String NM_LINUX_CONTAINER_CGROUPS_MOUNT_PATH =
NM_PREFIX + "linux-container-executor.cgroups.mount-path";
/**
* Whether the apps should run in strict resource usage mode(not allowed to
* use spare CPU)
*/
public static final String NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE =
NM_PREFIX + "linux-container-executor.cgroups.strict-resource-usage";
public static final boolean DEFAULT_NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE =
false;
/**
* Interval of time the linux container executor should try cleaning up

View File

@ -1039,6 +1039,16 @@
<value>^[_.A-Za-z0-9][-@_.A-Za-z0-9]{0,255}?[$]?$</value>
</property>
<property>
<description>This flag determines whether apps should run with strict resource limits
or be allowed to consume spare resources if they need them. For example, turning the
flag on will restrict apps to use only their share of CPU, even if the node has spare
CPU cycles. The default value is false i.e. use available resources. Please note that
turning this flag on may reduce job throughput on the cluster.</description>
<name>yarn.nodemanager.linux-container-executor.cgroups.strict-resource-usage</name>
<value>false</value>
</property>
<property>
<description>T-file compression types used to compress aggregated logs.</description>
<name>yarn.nodemanager.log-aggregation.compression-type</name>

View File

@ -57,6 +57,7 @@ public class CgroupsLCEResourcesHandler implements LCEResourcesHandler {
private String cgroupMountPath;
private boolean cpuWeightEnabled = true;
private boolean strictResourceUsageMode = false;
private final String MTAB_FILE = "/proc/mounts";
private final String CGROUPS_FSTYPE = "cgroup";
@ -71,6 +72,8 @@ public class CgroupsLCEResourcesHandler implements LCEResourcesHandler {
private long deleteCgroupTimeout;
// package private for testing purposes
Clock clock;
private float yarnProcessors;
public CgroupsLCEResourcesHandler() {
this.controllerPaths = new HashMap<String, String>();
@ -105,6 +108,12 @@ void initConfig() throws IOException {
cgroupPrefix = cgroupPrefix.substring(1);
}
this.strictResourceUsageMode =
conf
.getBoolean(
YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE,
YarnConfiguration.DEFAULT_NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE);
int len = cgroupPrefix.length();
if (cgroupPrefix.charAt(len - 1) == '/') {
cgroupPrefix = cgroupPrefix.substring(0, len - 1);
@ -132,8 +141,7 @@ void init(LinuxContainerExecutor lce, ResourceCalculatorPlugin plugin)
initializeControllerPaths();
// cap overall usage to the number of cores allocated to YARN
float yarnProcessors =
NodeManagerHardwareUtils.getContainersCores(plugin, conf);
yarnProcessors = NodeManagerHardwareUtils.getContainersCores(plugin, conf);
int systemProcessors = plugin.getNumProcessors();
if (systemProcessors != (int) yarnProcessors) {
LOG.info("YARN containers restricted to " + yarnProcessors + " cores");
@ -290,10 +298,25 @@ private void setupLimits(ContainerId containerId,
String containerName = containerId.toString();
if (isCpuWeightEnabled()) {
int containerVCores = containerResource.getVirtualCores();
createCgroup(CONTROLLER_CPU, containerName);
int cpuShares = CPU_DEFAULT_WEIGHT * containerResource.getVirtualCores();
int cpuShares = CPU_DEFAULT_WEIGHT * containerVCores;
updateCgroup(CONTROLLER_CPU, containerName, "shares",
String.valueOf(cpuShares));
if (strictResourceUsageMode) {
int nodeVCores =
conf.getInt(YarnConfiguration.NM_VCORES,
YarnConfiguration.DEFAULT_NM_VCORES);
if (nodeVCores != containerVCores) {
float containerCPU =
(containerVCores * yarnProcessors) / (float) nodeVCores;
int[] limits = getOverallLimits(containerCPU);
updateCgroup(CONTROLLER_CPU, containerName, CPU_PERIOD_US,
String.valueOf(limits[0]));
updateCgroup(CONTROLLER_CPU, containerName, CPU_QUOTA_US,
String.valueOf(limits[1]));
}
}
}
}

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.nodemanager.util;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor;
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
import org.junit.Assert;
@ -86,9 +88,13 @@ static class CustomCgroupsLCEResourceHandler extends
String mtabFile;
int[] limits = new int[2];
boolean generateLimitsMode = false;
@Override
int[] getOverallLimits(float x) {
if (generateLimitsMode == true) {
return super.getOverallLimits(x);
}
return limits;
}
@ -116,32 +122,11 @@ public void testInit() throws IOException {
handler.initConfig();
// create mock cgroup
File cgroupDir = new File("target", UUID.randomUUID().toString());
if (!cgroupDir.mkdir()) {
String message = "Could not create dir " + cgroupDir.getAbsolutePath();
throw new IOException(message);
}
File cgroupMountDir = new File(cgroupDir.getAbsolutePath(), "hadoop-yarn");
if (!cgroupMountDir.mkdir()) {
String message =
"Could not create dir " + cgroupMountDir.getAbsolutePath();
throw new IOException(message);
}
File cgroupDir = createMockCgroup();
File cgroupMountDir = createMockCgroupMount(cgroupDir);
// create mock mtab
String mtabContent =
"none " + cgroupDir.getAbsolutePath() + " cgroup rw,relatime,cpu 0 0";
File mockMtab = new File("target", UUID.randomUUID().toString());
if (!mockMtab.exists()) {
if (!mockMtab.createNewFile()) {
String message = "Could not create file " + mockMtab.getAbsolutePath();
throw new IOException(message);
}
}
FileWriter mtabWriter = new FileWriter(mockMtab.getAbsoluteFile());
mtabWriter.write(mtabContent);
mtabWriter.close();
mockMtab.deleteOnExit();
File mockMtab = createMockMTab(cgroupDir);
// setup our handler and call init()
handler.setMtabFile(mockMtab.getAbsolutePath());
@ -156,7 +141,8 @@ public void testInit() throws IOException {
Assert.assertFalse(quotaFile.exists());
// subset of cpu being used, files should be created
conf.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT, 75);
conf
.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT, 75);
handler.limits[0] = 100 * 1000;
handler.limits[1] = 1000 * 1000;
handler.init(mockLCE, plugin);
@ -166,7 +152,8 @@ public void testInit() throws IOException {
Assert.assertEquals(1000 * 1000, quota);
// set cpu back to 100, quota should be -1
conf.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT, 100);
conf.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT,
100);
handler.limits[0] = 100 * 1000;
handler.limits[1] = 1000 * 1000;
handler.init(mockLCE, plugin);
@ -213,4 +200,130 @@ public void testGetOverallLimits() {
Assert.assertEquals(1000 * 1000, ret[0]);
Assert.assertEquals(-1, ret[1]);
}
private File createMockCgroup() throws IOException {
File cgroupDir = new File("target", UUID.randomUUID().toString());
if (!cgroupDir.mkdir()) {
String message = "Could not create dir " + cgroupDir.getAbsolutePath();
throw new IOException(message);
}
return cgroupDir;
}
private File createMockCgroupMount(File cgroupDir) throws IOException {
File cgroupMountDir = new File(cgroupDir.getAbsolutePath(), "hadoop-yarn");
if (!cgroupMountDir.mkdir()) {
String message =
"Could not create dir " + cgroupMountDir.getAbsolutePath();
throw new IOException(message);
}
return cgroupMountDir;
}
private File createMockMTab(File cgroupDir) throws IOException {
String mtabContent =
"none " + cgroupDir.getAbsolutePath() + " cgroup rw,relatime,cpu 0 0";
File mockMtab = new File("target", UUID.randomUUID().toString());
if (!mockMtab.exists()) {
if (!mockMtab.createNewFile()) {
String message = "Could not create file " + mockMtab.getAbsolutePath();
throw new IOException(message);
}
}
FileWriter mtabWriter = new FileWriter(mockMtab.getAbsoluteFile());
mtabWriter.write(mtabContent);
mtabWriter.close();
mockMtab.deleteOnExit();
return mockMtab;
}
@Test
public void testContainerLimits() throws IOException {
LinuxContainerExecutor mockLCE = new MockLinuxContainerExecutor();
CustomCgroupsLCEResourceHandler handler =
new CustomCgroupsLCEResourceHandler();
handler.generateLimitsMode = true;
YarnConfiguration conf = new YarnConfiguration();
final int numProcessors = 4;
ResourceCalculatorPlugin plugin =
Mockito.mock(ResourceCalculatorPlugin.class);
Mockito.doReturn(numProcessors).when(plugin).getNumProcessors();
handler.setConf(conf);
handler.initConfig();
// create mock cgroup
File cgroupDir = createMockCgroup();
File cgroupMountDir = createMockCgroupMount(cgroupDir);
// create mock mtab
File mockMtab = createMockMTab(cgroupDir);
// setup our handler and call init()
handler.setMtabFile(mockMtab.getAbsolutePath());
handler.init(mockLCE, plugin);
// check values
// default case - files shouldn't exist, strict mode off by default
ContainerId id = ContainerId.fromString("container_1_1_1_1");
handler.preExecute(id, Resource.newInstance(1024, 1));
File containerDir = new File(cgroupMountDir, id.toString());
Assert.assertTrue(containerDir.exists());
Assert.assertTrue(containerDir.isDirectory());
File periodFile = new File(containerDir, "cpu.cfs_period_us");
File quotaFile = new File(containerDir, "cpu.cfs_quota_us");
Assert.assertFalse(periodFile.exists());
Assert.assertFalse(quotaFile.exists());
// no files created because we're using all cpu
FileUtils.deleteQuietly(containerDir);
conf.setBoolean(
YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE, true);
handler.initConfig();
handler.preExecute(id,
Resource.newInstance(1024, YarnConfiguration.DEFAULT_NM_VCORES));
Assert.assertTrue(containerDir.exists());
Assert.assertTrue(containerDir.isDirectory());
periodFile = new File(containerDir, "cpu.cfs_period_us");
quotaFile = new File(containerDir, "cpu.cfs_quota_us");
Assert.assertFalse(periodFile.exists());
Assert.assertFalse(quotaFile.exists());
// 50% of CPU
FileUtils.deleteQuietly(containerDir);
conf.setBoolean(
YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE, true);
handler.initConfig();
handler.preExecute(id,
Resource.newInstance(1024, YarnConfiguration.DEFAULT_NM_VCORES / 2));
Assert.assertTrue(containerDir.exists());
Assert.assertTrue(containerDir.isDirectory());
periodFile = new File(containerDir, "cpu.cfs_period_us");
quotaFile = new File(containerDir, "cpu.cfs_quota_us");
Assert.assertTrue(periodFile.exists());
Assert.assertTrue(quotaFile.exists());
Assert.assertEquals(500 * 1000, readIntFromFile(periodFile));
Assert.assertEquals(1000 * 1000, readIntFromFile(quotaFile));
// CGroups set to 50% of CPU, container set to 50% of YARN CPU
FileUtils.deleteQuietly(containerDir);
conf.setBoolean(
YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE, true);
conf
.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT, 50);
handler.initConfig();
handler.init(mockLCE, plugin);
handler.preExecute(id,
Resource.newInstance(1024, YarnConfiguration.DEFAULT_NM_VCORES / 2));
Assert.assertTrue(containerDir.exists());
Assert.assertTrue(containerDir.isDirectory());
periodFile = new File(containerDir, "cpu.cfs_period_us");
quotaFile = new File(containerDir, "cpu.cfs_quota_us");
Assert.assertTrue(periodFile.exists());
Assert.assertTrue(quotaFile.exists());
Assert.assertEquals(1000 * 1000, readIntFromFile(periodFile));
Assert.assertEquals(1000 * 1000, readIntFromFile(quotaFile));
FileUtils.deleteQuietly(cgroupDir);
}
}