YARN-11674. Add CPUResourceHandler for cgroup v2. (#6751)

This commit is contained in:
Benjamin Teke 2024-04-26 15:00:00 +02:00 committed by GitHub
parent 579b3bcea9
commit 399299104c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 677 additions and 198 deletions

View File

@ -0,0 +1,219 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils;
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
@InterfaceStability.Unstable
@InterfaceAudience.Private
public abstract class AbstractCGroupsCpuResourceHandler implements CpuResourceHandler {
static final Logger LOG =
LoggerFactory.getLogger(AbstractCGroupsCpuResourceHandler.class);
protected CGroupsHandler cGroupsHandler;
private boolean strictResourceUsageMode = false;
private float yarnProcessors;
private int nodeVCores;
private static final CGroupsHandler.CGroupController CPU =
CGroupsHandler.CGroupController.CPU;
@VisibleForTesting
static final int MAX_QUOTA_US = 1000 * 1000;
@VisibleForTesting
static final int MIN_PERIOD_US = 1000;
AbstractCGroupsCpuResourceHandler(CGroupsHandler cGroupsHandler) {
this.cGroupsHandler = cGroupsHandler;
}
@Override
public List<PrivilegedOperation> bootstrap(Configuration conf)
throws ResourceHandlerException {
return bootstrap(
ResourceCalculatorPlugin.getResourceCalculatorPlugin(null, conf), conf);
}
@VisibleForTesting
List<PrivilegedOperation> bootstrap(
ResourceCalculatorPlugin plugin, Configuration conf)
throws ResourceHandlerException {
this.strictResourceUsageMode = conf.getBoolean(
YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE,
YarnConfiguration.DEFAULT_NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE);
this.cGroupsHandler.initializeCGroupController(CPU);
nodeVCores = NodeManagerHardwareUtils.getVCores(plugin, conf);
// cap overall usage to the number of cores allocated to YARN
yarnProcessors = NodeManagerHardwareUtils.getContainersCPUs(plugin, conf);
int systemProcessors = NodeManagerHardwareUtils.getNodeCPUs(plugin, conf);
boolean existingCpuLimits;
existingCpuLimits = cpuLimitExists(
cGroupsHandler.getPathForCGroup(CPU, ""));
if (systemProcessors != (int) yarnProcessors) {
LOG.info("YARN containers restricted to " + yarnProcessors + " cores");
int[] limits = getOverallLimits(yarnProcessors);
updateCgroupMaxCpuLimit("", String.valueOf(limits[1]), String.valueOf(limits[0]));
} else if (existingCpuLimits) {
LOG.info("Removing CPU constraints for YARN containers.");
updateCgroupMaxCpuLimit("", String.valueOf(-1), null);
}
return null;
}
protected abstract void updateCgroupMaxCpuLimit(String cgroupId, String quota, String period)
throws ResourceHandlerException;
protected abstract boolean cpuLimitExists(String path) throws ResourceHandlerException;
@VisibleForTesting
@InterfaceAudience.Private
public static int[] getOverallLimits(float yarnProcessors) {
int[] ret = new int[2];
if (yarnProcessors < 0.01f) {
throw new IllegalArgumentException("Number of processors can't be <= 0.");
}
int quotaUS = MAX_QUOTA_US;
int periodUS = (int) (MAX_QUOTA_US / yarnProcessors);
if (yarnProcessors < 1.0f) {
periodUS = MAX_QUOTA_US;
quotaUS = (int) (periodUS * yarnProcessors);
if (quotaUS < MIN_PERIOD_US) {
LOG.warn("The quota calculated for the cgroup was too low."
+ " The minimum value is " + MIN_PERIOD_US
+ ", calculated value is " + quotaUS
+ ". Setting quota to minimum value.");
quotaUS = MIN_PERIOD_US;
}
}
// cfs_period_us can't be less than 1000 microseconds
// if the value of periodUS is less than 1000, we can't really use cgroups
// to limit cpu
if (periodUS < MIN_PERIOD_US) {
LOG.warn("The period calculated for the cgroup was too low."
+ " The minimum value is " + MIN_PERIOD_US
+ ", calculated value is " + periodUS
+ ". Using all available CPU.");
periodUS = MAX_QUOTA_US;
quotaUS = -1;
}
ret[0] = periodUS;
ret[1] = quotaUS;
return ret;
}
@Override
public List<PrivilegedOperation> preStart(Container container)
throws ResourceHandlerException {
String cgroupId = container.getContainerId().toString();
cGroupsHandler.createCGroup(CPU, cgroupId);
updateContainer(container);
List<PrivilegedOperation> ret = new ArrayList<>();
ret.add(new PrivilegedOperation(
PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP,
PrivilegedOperation.CGROUP_ARG_PREFIX + cGroupsHandler
.getPathForCGroupTasks(CPU, cgroupId)));
return ret;
}
@Override
public List<PrivilegedOperation> reacquireContainer(ContainerId containerId)
throws ResourceHandlerException {
return null;
}
@Override
public List<PrivilegedOperation> updateContainer(Container container)
throws ResourceHandlerException {
Resource containerResource = container.getResource();
String cgroupId = container.getContainerId().toString();
File cgroup = new File(cGroupsHandler.getPathForCGroup(CPU, cgroupId));
if (cgroup.exists()) {
try {
int containerVCores = containerResource.getVirtualCores();
ContainerTokenIdentifier id = container.getContainerTokenIdentifier();
if (id != null && id.getExecutionType() ==
ExecutionType.OPPORTUNISTIC) {
updateCgroupCpuWeight(cgroupId, getOpportunisticCpuWeight());
} else {
updateCgroupCpuWeight(cgroupId, getCpuWeightByContainerVcores(containerVCores));
}
if (strictResourceUsageMode) {
if (nodeVCores != containerVCores) {
float containerCPU =
(containerVCores * yarnProcessors) / (float) nodeVCores;
int[] limits = getOverallLimits(containerCPU);
updateCgroupMaxCpuLimit(cgroupId, String.valueOf(limits[1]), String.valueOf(limits[0]));
}
}
} catch (ResourceHandlerException re) {
cGroupsHandler.deleteCGroup(CPU, cgroupId);
LOG.warn("Could not update cgroup for container", re);
throw re;
}
}
return null;
}
protected abstract int getOpportunisticCpuWeight();
protected abstract int getCpuWeightByContainerVcores(int containerVcores);
protected abstract void updateCgroupCpuWeight(String cgroupId, int weight)
throws ResourceHandlerException;
@Override
public List<PrivilegedOperation> postComplete(ContainerId containerId)
throws ResourceHandlerException {
cGroupsHandler.deleteCGroup(CPU, containerId.toString());
return null;
}
@Override public List<PrivilegedOperation> teardown()
throws ResourceHandlerException {
return null;
}
@Override
public String toString() {
return AbstractCGroupsCpuResourceHandler.class.getName();
}
}

View File

@ -414,9 +414,10 @@ private String getErrorWithDetails(
public String createCGroup(CGroupController controller, String cGroupId) public String createCGroup(CGroupController controller, String cGroupId)
throws ResourceHandlerException { throws ResourceHandlerException {
String path = getPathForCGroup(controller, cGroupId); String path = getPathForCGroup(controller, cGroupId);
File cgroup = new File(path);
LOG.debug("createCgroup: {}", path); LOG.debug("createCgroup: {}", path);
if (!new File(path).mkdir()) { if (!cgroup.exists() && !cgroup.mkdir()) {
throw new ResourceHandlerException("Failed to create cgroup at " + path); throw new ResourceHandlerException("Failed to create cgroup at " + path);
} }

View File

@ -134,7 +134,7 @@ public List<PrivilegedOperation> preStart(Container container)
.createCGroup(CGroupsHandler.CGroupController.BLKIO, cgroupId); .createCGroup(CGroupsHandler.CGroupController.BLKIO, cgroupId);
try { try {
cGroupsHandler.updateCGroupParam(CGroupsHandler.CGroupController.BLKIO, cGroupsHandler.updateCGroupParam(CGroupsHandler.CGroupController.BLKIO,
cgroupId, CGroupsHandler.CGROUP_PARAM_BLKIO_WEIGHT, DEFAULT_WEIGHT); cgroupId, CGroupsHandler.CGROUP_PARAM_WEIGHT, DEFAULT_WEIGHT);
} catch (ResourceHandlerException re) { } catch (ResourceHandlerException re) {
cGroupsHandler.deleteCGroup(CGroupsHandler.CGroupController.BLKIO, cGroupsHandler.deleteCGroup(CGroupsHandler.CGroupController.BLKIO,
cgroupId); cgroupId);

View File

@ -18,28 +18,14 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources; package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils;
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
/** /**
* An implementation for using CGroups to restrict CPU usage on Linux. The * An implementation for using CGroups to restrict CPU usage on Linux. The
@ -58,208 +44,62 @@
*/ */
@InterfaceStability.Unstable @InterfaceStability.Unstable
@InterfaceAudience.Private @InterfaceAudience.Private
public class CGroupsCpuResourceHandlerImpl implements CpuResourceHandler { public class CGroupsCpuResourceHandlerImpl extends AbstractCGroupsCpuResourceHandler {
static final Logger LOG =
LoggerFactory.getLogger(CGroupsCpuResourceHandlerImpl.class);
private CGroupsHandler cGroupsHandler;
private boolean strictResourceUsageMode = false;
private float yarnProcessors;
private int nodeVCores;
private static final CGroupsHandler.CGroupController CPU = private static final CGroupsHandler.CGroupController CPU =
CGroupsHandler.CGroupController.CPU; CGroupsHandler.CGroupController.CPU;
@VisibleForTesting
static final int MAX_QUOTA_US = 1000 * 1000;
@VisibleForTesting
static final int MIN_PERIOD_US = 1000;
@VisibleForTesting @VisibleForTesting
static final int CPU_DEFAULT_WEIGHT = 1024; // set by kernel static final int CPU_DEFAULT_WEIGHT = 1024; // set by kernel
static final int CPU_DEFAULT_WEIGHT_OPPORTUNISTIC = 2; static final int CPU_DEFAULT_WEIGHT_OPPORTUNISTIC = 2;
CGroupsCpuResourceHandlerImpl(CGroupsHandler cGroupsHandler) { CGroupsCpuResourceHandlerImpl(CGroupsHandler cGroupsHandler) {
this.cGroupsHandler = cGroupsHandler; super(cGroupsHandler);
} }
@Override @Override
public List<PrivilegedOperation> bootstrap(Configuration conf) protected void updateCgroupMaxCpuLimit(String cgroupId, String quota, String period) throws ResourceHandlerException {
throws ResourceHandlerException { if (quota != null) {
return bootstrap( cGroupsHandler
ResourceCalculatorPlugin.getResourceCalculatorPlugin(null, conf), conf); .updateCGroupParam(CPU, cgroupId, CGroupsHandler.CGROUP_CPU_QUOTA_US, quota);
}
if (period != null) {
cGroupsHandler
.updateCGroupParam(CPU, cgroupId, CGroupsHandler.CGROUP_CPU_PERIOD_US, period);
}
} }
@VisibleForTesting @Override
List<PrivilegedOperation> bootstrap( protected int getOpportunisticCpuWeight() {
ResourceCalculatorPlugin plugin, Configuration conf) return CPU_DEFAULT_WEIGHT_OPPORTUNISTIC;
throws ResourceHandlerException { }
this.strictResourceUsageMode = conf.getBoolean( protected int getCpuWeightByContainerVcores(int containerVCores) {
YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE, return containerVCores * CPU_DEFAULT_WEIGHT;
YarnConfiguration.DEFAULT_NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE); }
this.cGroupsHandler.initializeCGroupController(CPU);
nodeVCores = NodeManagerHardwareUtils.getVCores(plugin, conf);
// cap overall usage to the number of cores allocated to YARN @Override
yarnProcessors = NodeManagerHardwareUtils.getContainersCPUs(plugin, conf); protected void updateCgroupCpuWeight(String cgroupId, int weight) throws ResourceHandlerException {
int systemProcessors = NodeManagerHardwareUtils.getNodeCPUs(plugin, conf); cGroupsHandler.updateCGroupParam(CPU, cgroupId, CGroupsHandler.CGROUP_CPU_SHARES,
boolean existingCpuLimits; String.valueOf(weight));
}
@Override
public boolean cpuLimitExists(String cgroupPath) throws ResourceHandlerException {
try { try {
existingCpuLimits = return checkCgroupV1CPULimitExists(cgroupPath);
cpuLimitsExist(cGroupsHandler.getPathForCGroup(CPU, "")); } catch (IOException e) {
} catch (IOException ie) { throw new ResourceHandlerException("Failed to check CPU limit", e);
throw new ResourceHandlerException(ie);
} }
if (systemProcessors != (int) yarnProcessors) {
LOG.info("YARN containers restricted to " + yarnProcessors + " cores");
int[] limits = getOverallLimits(yarnProcessors);
cGroupsHandler
.updateCGroupParam(CPU, "", CGroupsHandler.CGROUP_CPU_PERIOD_US,
String.valueOf(limits[0]));
cGroupsHandler
.updateCGroupParam(CPU, "", CGroupsHandler.CGROUP_CPU_QUOTA_US,
String.valueOf(limits[1]));
} else if (existingCpuLimits) {
LOG.info("Removing CPU constraints for YARN containers.");
cGroupsHandler
.updateCGroupParam(CPU, "", CGroupsHandler.CGROUP_CPU_QUOTA_US,
String.valueOf(-1));
}
return null;
} }
@InterfaceAudience.Private @InterfaceAudience.Private
public static boolean cpuLimitsExist(String path) public static boolean checkCgroupV1CPULimitExists(String path) throws IOException {
throws IOException {
File quotaFile = new File(path, File quotaFile = new File(path,
CPU.getName() + "." + CGroupsHandler.CGROUP_CPU_QUOTA_US); CPU.getName() + "." + CGroupsHandler.CGROUP_CPU_QUOTA_US);
if (quotaFile.exists()) { if (quotaFile.exists()) {
String contents = FileUtils.readFileToString(quotaFile, StandardCharsets.UTF_8); String contents = FileUtils.readFileToString(quotaFile, StandardCharsets.UTF_8);
int quotaUS = Integer.parseInt(contents.trim()); return Integer.parseInt(contents.trim()) != -1;
if (quotaUS != -1) {
return true;
}
} }
return false; return false;
} }
@VisibleForTesting
@InterfaceAudience.Private
public static int[] getOverallLimits(float yarnProcessors) {
int[] ret = new int[2];
if (yarnProcessors < 0.01f) {
throw new IllegalArgumentException("Number of processors can't be <= 0.");
}
int quotaUS = MAX_QUOTA_US;
int periodUS = (int) (MAX_QUOTA_US / yarnProcessors);
if (yarnProcessors < 1.0f) {
periodUS = MAX_QUOTA_US;
quotaUS = (int) (periodUS * yarnProcessors);
if (quotaUS < MIN_PERIOD_US) {
LOG.warn("The quota calculated for the cgroup was too low."
+ " The minimum value is " + MIN_PERIOD_US
+ ", calculated value is " + quotaUS
+ ". Setting quota to minimum value.");
quotaUS = MIN_PERIOD_US;
}
}
// cfs_period_us can't be less than 1000 microseconds
// if the value of periodUS is less than 1000, we can't really use cgroups
// to limit cpu
if (periodUS < MIN_PERIOD_US) {
LOG.warn("The period calculated for the cgroup was too low."
+ " The minimum value is " + MIN_PERIOD_US
+ ", calculated value is " + periodUS
+ ". Using all available CPU.");
periodUS = MAX_QUOTA_US;
quotaUS = -1;
}
ret[0] = periodUS;
ret[1] = quotaUS;
return ret;
}
@Override
public List<PrivilegedOperation> preStart(Container container)
throws ResourceHandlerException {
String cgroupId = container.getContainerId().toString();
cGroupsHandler.createCGroup(CPU, cgroupId);
updateContainer(container);
List<PrivilegedOperation> ret = new ArrayList<>();
ret.add(new PrivilegedOperation(
PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP,
PrivilegedOperation.CGROUP_ARG_PREFIX + cGroupsHandler
.getPathForCGroupTasks(CPU, cgroupId)));
return ret;
}
@Override
public List<PrivilegedOperation> reacquireContainer(ContainerId containerId)
throws ResourceHandlerException {
return null;
}
@Override
public List<PrivilegedOperation> updateContainer(Container container)
throws ResourceHandlerException {
Resource containerResource = container.getResource();
String cgroupId = container.getContainerId().toString();
File cgroup = new File(cGroupsHandler.getPathForCGroup(CPU, cgroupId));
if (cgroup.exists()) {
try {
int containerVCores = containerResource.getVirtualCores();
ContainerTokenIdentifier id = container.getContainerTokenIdentifier();
if (id != null && id.getExecutionType() ==
ExecutionType.OPPORTUNISTIC) {
cGroupsHandler
.updateCGroupParam(CPU, cgroupId,
CGroupsHandler.CGROUP_CPU_SHARES,
String.valueOf(CPU_DEFAULT_WEIGHT_OPPORTUNISTIC));
} else {
int cpuShares = CPU_DEFAULT_WEIGHT * containerVCores;
cGroupsHandler
.updateCGroupParam(CPU, cgroupId,
CGroupsHandler.CGROUP_CPU_SHARES,
String.valueOf(cpuShares));
}
if (strictResourceUsageMode) {
if (nodeVCores != containerVCores) {
float containerCPU =
(containerVCores * yarnProcessors) / (float) nodeVCores;
int[] limits = getOverallLimits(containerCPU);
cGroupsHandler.updateCGroupParam(CPU, cgroupId,
CGroupsHandler.CGROUP_CPU_PERIOD_US, String.valueOf(limits[0]));
cGroupsHandler.updateCGroupParam(CPU, cgroupId,
CGroupsHandler.CGROUP_CPU_QUOTA_US, String.valueOf(limits[1]));
}
}
} catch (ResourceHandlerException re) {
cGroupsHandler.deleteCGroup(CPU, cgroupId);
LOG.warn("Could not update cgroup for container", re);
throw re;
}
}
return null;
}
@Override
public List<PrivilegedOperation> postComplete(ContainerId containerId)
throws ResourceHandlerException {
cGroupsHandler.deleteCGroup(CPU, containerId.toString());
return null;
}
@Override public List<PrivilegedOperation> teardown()
throws ResourceHandlerException {
return null;
}
@Override
public String toString() {
return CGroupsCpuResourceHandlerImpl.class.getName();
}
} }

View File

@ -122,11 +122,12 @@ public static Set<String> getValidV2CGroups() {
// v2 specific params // v2 specific params
String CGROUP_CONTROLLERS_FILE = "cgroup.controllers"; String CGROUP_CONTROLLERS_FILE = "cgroup.controllers";
String CGROUP_SUBTREE_CONTROL_FILE = "cgroup.subtree_control"; String CGROUP_SUBTREE_CONTROL_FILE = "cgroup.subtree_control";
String CGROUP_CPU_MAX = "max";
// present in v1 and v2 // present in v1 and v2
String CGROUP_PROCS_FILE = "cgroup.procs"; String CGROUP_PROCS_FILE = "cgroup.procs";
String CGROUP_PARAM_CLASSID = "classid"; String CGROUP_PARAM_CLASSID = "classid";
String CGROUP_PARAM_BLKIO_WEIGHT = "weight"; String CGROUP_PARAM_WEIGHT = "weight";
/** /**
* Mounts or initializes a cgroup controller. * Mounts or initializes a cgroup controller.

View File

@ -0,0 +1,105 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.VisibleForTesting;
/**
* An implementation for using CGroups V2 to restrict CPU usage on Linux. The
* implementation supports 3 different controls - restrict usage of all YARN
* containers, restrict relative usage of individual YARN containers and
* restrict usage of individual YARN containers. Admins can set the overall CPU
* to be used by all YARN containers - this is implemented by setting
* cpu.max to the value desired. If strict resource usage mode is not enabled,
* cpu.weight is set for individual containers - this prevents containers from
* exceeding the overall limit for YARN containers but individual containers
* can use as much of the CPU as available(under the YARN limit). If strict
* resource usage is enabled, then container can only use the percentage of
* CPU allocated to them and this is again implemented using cpu.max.
*/
@InterfaceStability.Unstable
@InterfaceAudience.Private
public class CGroupsV2CpuResourceHandlerImpl extends AbstractCGroupsCpuResourceHandler {
private static final CGroupsHandler.CGroupController CPU =
CGroupsHandler.CGroupController.CPU;
@VisibleForTesting
static final int CPU_DEFAULT_WEIGHT = 100; // cgroup v2 default
static final int CPU_DEFAULT_WEIGHT_OPPORTUNISTIC = 1;
static final int CPU_MAX_WEIGHT = 10000;
static final String NO_LIMIT = "max";
CGroupsV2CpuResourceHandlerImpl(CGroupsHandler cGroupsHandler) {
super(cGroupsHandler);
}
@Override
protected void updateCgroupMaxCpuLimit(String cgroupId, String max, String period)
throws ResourceHandlerException {
// The cpu.max file in cgroup v2 is a read-write two value file which exists on
// non-root cgroups. The default is max 100000.
// It is the maximum bandwidth limit. Its in the following format:
// $MAX $PERIOD
// which indicates that the group may consume up to $MAX in each $PERIOD duration.
// max for $MAX indicates no limit. If only one number is written, $MAX is updated.
String currentCpuMax = cGroupsHandler.getCGroupParam(CPU, cgroupId,
CGroupsHandler.CGROUP_CPU_MAX);
if (currentCpuMax == null) {
currentCpuMax = "";
}
String[] currentCpuMaxArray = currentCpuMax.split(" ");
String maxToSet = max != null ? max : currentCpuMaxArray[0];
maxToSet = maxToSet.equals("-1") ? NO_LIMIT : maxToSet;
String periodToSet = period != null ? period : currentCpuMaxArray[1];
cGroupsHandler
.updateCGroupParam(CPU, cgroupId, CGroupsHandler.CGROUP_CPU_MAX,
maxToSet + " " + periodToSet);
}
@Override
protected int getOpportunisticCpuWeight() {
return CPU_DEFAULT_WEIGHT_OPPORTUNISTIC;
}
protected int getCpuWeightByContainerVcores(int containerVCores) {
return Math.min(containerVCores * CPU_DEFAULT_WEIGHT, CPU_MAX_WEIGHT);
}
@Override
protected void updateCgroupCpuWeight(String cgroupId, int weight) throws ResourceHandlerException {
cGroupsHandler.updateCGroupParam(CPU, cgroupId, CGroupsHandler.CGROUP_PARAM_WEIGHT,
String.valueOf(weight));
}
@Override
public boolean cpuLimitExists(String cgroupPath) throws ResourceHandlerException {
String globalCpuMaxLimit = cGroupsHandler.getCGroupParam(CPU, "",
CGroupsHandler.CGROUP_CPU_MAX);
if (globalCpuMaxLimit == null) {
return false;
}
String[] cpuMaxLimitArray = globalCpuMaxLimit.split(" ");
return !cpuMaxLimitArray[0].equals(NO_LIMIT);
}
}

View File

@ -175,7 +175,7 @@ void init(LinuxContainerExecutor lce, ResourceCalculatorPlugin plugin)
String.valueOf(limits[0])); String.valueOf(limits[0]));
updateCgroup(CONTROLLER_CPU, "", CPU_QUOTA_US, updateCgroup(CONTROLLER_CPU, "", CPU_QUOTA_US,
String.valueOf(limits[1])); String.valueOf(limits[1]));
} else if (CGroupsCpuResourceHandlerImpl.cpuLimitsExist( } else if (CGroupsCpuResourceHandlerImpl.checkCgroupV1CPULimitExists(
pathForCgroup(CONTROLLER_CPU, ""))) { pathForCgroup(CONTROLLER_CPU, ""))) {
LOG.info("Removing CPU constraints for YARN containers."); LOG.info("Removing CPU constraints for YARN containers.");
updateCgroup(CONTROLLER_CPU, "", CPU_QUOTA_US, String.valueOf(-1)); updateCgroup(CONTROLLER_CPU, "", CPU_QUOTA_US, String.valueOf(-1));

View File

@ -77,7 +77,7 @@ public void testPreStart() throws Exception {
CGroupsHandler.CGroupController.BLKIO, id); CGroupsHandler.CGroupController.BLKIO, id);
verify(mockCGroupsHandler, times(1)).updateCGroupParam( verify(mockCGroupsHandler, times(1)).updateCGroupParam(
CGroupsHandler.CGroupController.BLKIO, id, CGroupsHandler.CGroupController.BLKIO, id,
CGroupsHandler.CGROUP_PARAM_BLKIO_WEIGHT, CGroupsHandler.CGROUP_PARAM_WEIGHT,
CGroupsBlkioResourceHandlerImpl.DEFAULT_WEIGHT); CGroupsBlkioResourceHandlerImpl.DEFAULT_WEIGHT);
Assert.assertNotNull(ret); Assert.assertNotNull(ret);
Assert.assertEquals(1, ret.size()); Assert.assertEquals(1, ret.size());

View File

@ -0,0 +1,313 @@
/*
* *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* /
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import java.util.List;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class TestCGroupsV2CpuResourceHandlerImpl {
private CGroupsHandler mockCGroupsHandler;
private CGroupsV2CpuResourceHandlerImpl cGroupsCpuResourceHandler;
private ResourceCalculatorPlugin plugin;
final int numProcessors = 4;
@Before
public void setup() {
mockCGroupsHandler = mock(CGroupsHandler.class);
when(mockCGroupsHandler.getPathForCGroup(any(), any())).thenReturn(".");
cGroupsCpuResourceHandler =
new CGroupsV2CpuResourceHandlerImpl(mockCGroupsHandler);
plugin = mock(ResourceCalculatorPlugin.class);
Mockito.doReturn(numProcessors).when(plugin).getNumProcessors();
Mockito.doReturn(numProcessors).when(plugin).getNumCores();
}
@Test
public void testBootstrap() throws Exception {
Configuration conf = new YarnConfiguration();
List<PrivilegedOperation> ret =
cGroupsCpuResourceHandler.bootstrap(plugin, conf);
verify(mockCGroupsHandler, times(1))
.initializeCGroupController(CGroupsHandler.CGroupController.CPU);
verify(mockCGroupsHandler, times(0))
.updateCGroupParam(CGroupsHandler.CGroupController.CPU, "",
CGroupsHandler.CGROUP_CPU_MAX, "");
Assert.assertNull(ret);
}
@Test
public void testBootstrapLimits() throws Exception {
Configuration conf = new YarnConfiguration();
int cpuPerc = 80;
conf.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT,
cpuPerc);
int period = (CGroupsV2CpuResourceHandlerImpl.MAX_QUOTA_US * 100) / (cpuPerc
* numProcessors);
String cpuMaxValue = CGroupsV2CpuResourceHandlerImpl.MAX_QUOTA_US + " " + period;
List<PrivilegedOperation> ret =
cGroupsCpuResourceHandler.bootstrap(plugin, conf);
verify(mockCGroupsHandler, times(1))
.initializeCGroupController(CGroupsHandler.CGroupController.CPU);
verify(mockCGroupsHandler, times(1))
.updateCGroupParam(CGroupsHandler.CGroupController.CPU, "",
CGroupsHandler.CGROUP_CPU_MAX, cpuMaxValue);
Assert.assertNull(ret);
}
@Test
public void testBootstrapExistingLimits() throws Exception {
Configuration conf = new YarnConfiguration();
when(mockCGroupsHandler
.getCGroupParam(CGroupsHandler.CGroupController.CPU, "",
CGroupsHandler.CGROUP_CPU_MAX))
.thenReturn("100 100000");
List<PrivilegedOperation> ret =
cGroupsCpuResourceHandler.bootstrap(plugin, conf);
verify(mockCGroupsHandler, times(1))
.initializeCGroupController(CGroupsHandler.CGroupController.CPU);
verify(mockCGroupsHandler, times(2))
.getCGroupParam(CGroupsHandler.CGroupController.CPU, "",
CGroupsHandler.CGROUP_CPU_MAX);
verify(mockCGroupsHandler, times(1))
.updateCGroupParam(CGroupsHandler.CGroupController.CPU, "",
CGroupsHandler.CGROUP_CPU_MAX, "max 100000");
Assert.assertNull(ret);
}
@Test
public void testPreStart() throws Exception {
String id = "container_01_01";
String path = "test-path/" + id;
ContainerId mockContainerId = mock(ContainerId.class);
when(mockContainerId.toString()).thenReturn(id);
Container mockContainer = mock(Container.class);
when(mockContainer.getContainerId()).thenReturn(mockContainerId);
when(mockCGroupsHandler
.getPathForCGroupTasks(CGroupsHandler.CGroupController.CPU, id))
.thenReturn(path);
when(mockContainer.getResource()).thenReturn(Resource.newInstance(1024, 2));
List<PrivilegedOperation> ret =
cGroupsCpuResourceHandler.preStart(mockContainer);
verify(mockCGroupsHandler, times(1))
.createCGroup(CGroupsHandler.CGroupController.CPU, id);
verify(mockCGroupsHandler, times(1))
.updateCGroupParam(CGroupsHandler.CGroupController.CPU, id,
CGroupsHandler.CGROUP_PARAM_WEIGHT, String
.valueOf(CGroupsV2CpuResourceHandlerImpl.CPU_DEFAULT_WEIGHT * 2)); // 2 vcores
// don't set cpu.max
verify(mockCGroupsHandler, never())
.updateCGroupParam(eq(CGroupsHandler.CGroupController.CPU), eq(id),
eq(CGroupsHandler.CGROUP_CPU_MAX), anyString());
validatePrivilegedOperationList(ret, path);
}
@Test
public void testPreStartStrictUsage() throws Exception {
String id = "container_01_01";
String path = "test-path/" + id;
ContainerId mockContainerId = mock(ContainerId.class);
when(mockContainerId.toString()).thenReturn(id);
Container mockContainer = mock(Container.class);
when(mockContainer.getContainerId()).thenReturn(mockContainerId);
when(mockCGroupsHandler
.getPathForCGroupTasks(CGroupsHandler.CGroupController.CPU, id))
.thenReturn(path);
when(mockContainer.getResource()).thenReturn(Resource.newInstance(1024, 1));
Configuration conf = new YarnConfiguration();
conf.setBoolean(
YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE,
true);
cGroupsCpuResourceHandler.bootstrap(plugin, conf);
int defaultVCores = 8;
float share = (float) numProcessors / (float) defaultVCores;
List<PrivilegedOperation> ret =
cGroupsCpuResourceHandler.preStart(mockContainer);
verify(mockCGroupsHandler, times(1))
.createCGroup(CGroupsHandler.CGroupController.CPU, id);
verify(mockCGroupsHandler, times(1))
.updateCGroupParam(CGroupsHandler.CGroupController.CPU, id,
CGroupsHandler.CGROUP_PARAM_WEIGHT,
String.valueOf(CGroupsV2CpuResourceHandlerImpl.CPU_DEFAULT_WEIGHT));
// set quota and period
String cpuMaxValue = (int) (CGroupsV2CpuResourceHandlerImpl.MAX_QUOTA_US * share) +
" " + CGroupsV2CpuResourceHandlerImpl.MAX_QUOTA_US;
verify(mockCGroupsHandler, times(1))
.updateCGroupParam(CGroupsHandler.CGroupController.CPU, id,
CGroupsHandler.CGROUP_CPU_MAX, cpuMaxValue);
validatePrivilegedOperationList(ret, path);
}
@Test
public void testPreStartRestrictedContainers() throws Exception {
String id = "container_01_01";
String path = "test-path/" + id;
int defaultVCores = 8;
Configuration conf = new YarnConfiguration();
conf.setBoolean(
YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE,
true);
int cpuPerc = 75;
conf.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT,
cpuPerc);
cGroupsCpuResourceHandler.bootstrap(plugin, conf);
String maxCpuLimit = CGroupsV2CpuResourceHandlerImpl.MAX_QUOTA_US + " " +
CGroupsV2CpuResourceHandlerImpl.MAX_QUOTA_US * 100 / (cpuPerc * numProcessors);
verify(mockCGroupsHandler, times(1))
.updateCGroupParam(CGroupsHandler.CGroupController.CPU, "",
CGroupsHandler.CGROUP_CPU_MAX, maxCpuLimit);
float yarnCores = (float) (cpuPerc * numProcessors) / 100;
int[] containerVCores = { 2, 4 };
for (int cVcores : containerVCores) {
ContainerId mockContainerId = mock(ContainerId.class);
when(mockContainerId.toString()).thenReturn(id);
Container mockContainer = mock(Container.class);
when(mockContainer.getContainerId()).thenReturn(mockContainerId);
when(mockCGroupsHandler
.getPathForCGroupTasks(CGroupsHandler.CGroupController.CPU, id))
.thenReturn(path);
when(mockContainer.getResource())
.thenReturn(Resource.newInstance(1024, cVcores));
when(mockCGroupsHandler
.getPathForCGroupTasks(CGroupsHandler.CGroupController.CPU, id))
.thenReturn(path);
float share = (cVcores * yarnCores) / defaultVCores;
int quotaUS;
int periodUS;
if (share > 1.0f) {
quotaUS = CGroupsV2CpuResourceHandlerImpl.MAX_QUOTA_US;
periodUS =
(int) ((float) CGroupsV2CpuResourceHandlerImpl.MAX_QUOTA_US / share);
} else {
quotaUS = (int) (CGroupsV2CpuResourceHandlerImpl.MAX_QUOTA_US * share);
periodUS = CGroupsV2CpuResourceHandlerImpl.MAX_QUOTA_US;
}
cGroupsCpuResourceHandler.preStart(mockContainer);
verify(mockCGroupsHandler, times(1))
.updateCGroupParam(CGroupsHandler.CGroupController.CPU, id,
CGroupsHandler.CGROUP_PARAM_WEIGHT, String.valueOf(
CGroupsV2CpuResourceHandlerImpl.CPU_DEFAULT_WEIGHT * cVcores));
// set cpu.max
verify(mockCGroupsHandler, times(1))
.updateCGroupParam(CGroupsHandler.CGroupController.CPU, id,
CGroupsHandler.CGROUP_CPU_MAX, quotaUS + " " + periodUS);
}
}
@Test
public void testReacquireContainer() throws Exception {
ContainerId containerIdMock = mock(ContainerId.class);
Assert.assertNull(
cGroupsCpuResourceHandler.reacquireContainer(containerIdMock));
}
@Test
public void testPostComplete() throws Exception {
String id = "container_01_01";
ContainerId mockContainerId = mock(ContainerId.class);
when(mockContainerId.toString()).thenReturn(id);
Assert.assertNull(cGroupsCpuResourceHandler.postComplete(mockContainerId));
verify(mockCGroupsHandler, times(1))
.deleteCGroup(CGroupsHandler.CGroupController.CPU, id);
}
@Test
public void testTeardown() throws Exception {
Assert.assertNull(cGroupsCpuResourceHandler.teardown());
}
@Test
public void testOpportunistic() throws Exception {
Configuration conf = new YarnConfiguration();
cGroupsCpuResourceHandler.bootstrap(plugin, conf);
ContainerTokenIdentifier tokenId = mock(ContainerTokenIdentifier.class);
when(tokenId.getExecutionType()).thenReturn(ExecutionType.OPPORTUNISTIC);
Container container = mock(Container.class);
String id = "container_01_01";
ContainerId mockContainerId = mock(ContainerId.class);
when(mockContainerId.toString()).thenReturn(id);
when(container.getContainerId()).thenReturn(mockContainerId);
when(container.getContainerTokenIdentifier()).thenReturn(tokenId);
when(container.getResource()).thenReturn(Resource.newInstance(1024, 2));
cGroupsCpuResourceHandler.preStart(container);
verify(mockCGroupsHandler, times(1))
.updateCGroupParam(CGroupsHandler.CGroupController.CPU, id,
CGroupsHandler.CGROUP_PARAM_WEIGHT, String.valueOf(
CGroupsV2CpuResourceHandlerImpl.CPU_DEFAULT_WEIGHT_OPPORTUNISTIC));
}
private void validatePrivilegedOperationList(List<PrivilegedOperation> ops, String path) {
Assert.assertNotNull(ops);
Assert.assertEquals(1, ops.size());
PrivilegedOperation op = ops.get(0);
Assert.assertEquals(PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP,
op.getOperationType());
List<String> args = op.getArguments();
Assert.assertEquals(1, args.size());
Assert.assertEquals(PrivilegedOperation.CGROUP_ARG_PREFIX + path,
args.get(0));
}
}