YARN-11672. Create a CgroupHandler implementation for cgroup v2 (#6734)

This commit is contained in:
Benjamin Teke 2024-04-24 11:33:50 +02:00 committed by GitHub
parent 23286b0632
commit 5d0a40c143
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 1364 additions and 646 deletions

View File

@ -0,0 +1,579 @@
/*
* *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* /
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public abstract class AbstractCGroupsHandler implements CGroupsHandler {
private static final Logger LOG =
LoggerFactory.getLogger(AbstractCGroupsHandler.class);
protected static final String MTAB_FILE = "/proc/mounts";
private final long deleteCGroupTimeout;
private final long deleteCGroupDelay;
private final Clock clock;
protected final String mtabFile;
protected final CGroupsMountConfig cGroupsMountConfig;
protected final ReadWriteLock rwLock;
protected Map<CGroupController, String> controllerPaths;
protected Map<String, Set<String>> parsedMtab;
protected final PrivilegedOperationExecutor privilegedOperationExecutor;
protected final String cGroupPrefix;
/**
* Create cgroup handler object.
*
* @param conf configuration
* @param privilegedOperationExecutor provides mechanisms to execute
* PrivilegedContainerOperations
* @param mtab mount file location
* @throws ResourceHandlerException if initialization failed
*/
AbstractCGroupsHandler(Configuration conf, PrivilegedOperationExecutor
privilegedOperationExecutor, String mtab)
throws ResourceHandlerException {
// Remove leading and trialing slash(es)
this.cGroupPrefix = conf.get(YarnConfiguration.
NM_LINUX_CONTAINER_CGROUPS_HIERARCHY, "/hadoop-yarn")
.replaceAll("^/+", "").replaceAll("/+$", "");
this.cGroupsMountConfig = new CGroupsMountConfig(conf);
this.deleteCGroupTimeout = conf.getLong(
YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_DELETE_TIMEOUT,
YarnConfiguration.DEFAULT_NM_LINUX_CONTAINER_CGROUPS_DELETE_TIMEOUT) +
conf.getLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS,
YarnConfiguration.DEFAULT_NM_SLEEP_DELAY_BEFORE_SIGKILL_MS) + 1000;
this.deleteCGroupDelay =
conf.getLong(YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_DELETE_DELAY,
YarnConfiguration.DEFAULT_NM_LINUX_CONTAINER_CGROUPS_DELETE_DELAY);
this.controllerPaths = new HashMap<>();
this.parsedMtab = new HashMap<>();
this.rwLock = new ReentrantReadWriteLock();
this.privilegedOperationExecutor = privilegedOperationExecutor;
this.clock = SystemClock.getInstance();
mtabFile = mtab;
init();
}
protected void init() throws ResourceHandlerException {
initializeControllerPaths();
}
@Override
public String getControllerPath(CGroupController controller) {
rwLock.readLock().lock();
try {
return controllerPaths.get(controller);
} finally {
rwLock.readLock().unlock();
}
}
private void initializeControllerPaths() throws ResourceHandlerException {
// Cluster admins may have some subsystems mounted in specific locations
// We'll attempt to figure out mount points. We do this even if we plan
// to mount cgroups into our own tree to control the path permissions or
// to mount subsystems that are not mounted previously.
// The subsystems for new and existing mount points have to match, and
// the same hierarchy will be mounted at each mount point with the same
// subsystem set.
Map<String, Set<String>> newMtab = null;
Map<CGroupController, String> cPaths;
try {
if (this.cGroupsMountConfig.mountDisabledButMountPathDefined()) {
newMtab = parsePreConfiguredMountPath();
}
if (newMtab == null) {
// parse mtab
newMtab = parseMtab(mtabFile);
}
// find cgroup controller paths
cPaths = initializeControllerPathsFromMtab(newMtab);
} catch (IOException e) {
LOG.warn("Failed to initialize controller paths! Exception: ", e);
throw new ResourceHandlerException(
"Failed to initialize controller paths!");
}
// we want to do a bulk update without the paths changing concurrently
rwLock.writeLock().lock();
try {
controllerPaths = cPaths;
parsedMtab = newMtab;
} finally {
rwLock.writeLock().unlock();
}
}
protected abstract Map<String, Set<String>> parsePreConfiguredMountPath() throws IOException;
protected Map<CGroupController, String> initializeControllerPathsFromMtab(
Map<String, Set<String>> mtab) {
Map<CGroupController, String> ret = new HashMap<>();
for (CGroupController controller : getCGroupControllers()) {
String subsystemName = controller.getName();
String controllerPath = findControllerInMtab(subsystemName, mtab);
if (controllerPath != null) {
ret.put(controller, controllerPath);
}
}
return ret;
}
protected abstract List<CGroupController> getCGroupControllers();
/* We are looking for entries of the form:
* none /cgroup/path/mem cgroup rw,memory 0 0
*
* Use a simple pattern that splits on the five spaces, and
* grabs the 2, 3, and 4th fields.
*/
private static final Pattern MTAB_FILE_FORMAT = Pattern.compile(
"^[^\\s]+\\s([^\\s]+)\\s([^\\s]+)\\s([^\\s]+)\\s[^\\s]+\\s[^\\s]+$");
/*
* Returns a map: path -> mount options
* for mounts with type "cgroup". Cgroup controllers will
* appear in the list of options for a path.
*/
protected Map<String, Set<String>> parseMtab(String mtab)
throws IOException {
Map<String, Set<String>> ret = new HashMap<>();
BufferedReader in = null;
try {
FileInputStream fis = new FileInputStream(mtab);
in = new BufferedReader(new InputStreamReader(fis, StandardCharsets.UTF_8));
for (String str = in.readLine(); str != null;
str = in.readLine()) {
Matcher m = MTAB_FILE_FORMAT.matcher(str);
boolean mat = m.find();
if (mat) {
String path = m.group(1);
String type = m.group(2);
String options = m.group(3);
Set<String> controllerSet = handleMtabEntry(path, type, options);
if (controllerSet != null) {
ret.put(path, controllerSet);
}
}
}
} catch (IOException e) {
if (Shell.LINUX) {
throw new IOException("Error while reading " + mtab, e);
} else {
// Ignore the error, if we are running on an os other than Linux
LOG.warn("Error while reading " + mtab, e);
}
} finally {
IOUtils.cleanupWithLogger(LOG, in);
}
return ret;
}
protected abstract Set<String> handleMtabEntry(String path, String type, String options)
throws IOException;
/**
* Find the hierarchy of the subsystem.
* The kernel ensures that a subsystem can only be part of a single hierarchy.
* The subsystem can be part of multiple mount points, if they belong to the
* same hierarchy.
*
* @param controller subsystem like cpu, cpuset, etc...
* @param entries map of paths to mount options
* @return the first mount path that has the requested subsystem
*/
protected String findControllerInMtab(String controller,
Map<String, Set<String>> entries) {
for (Map.Entry<String, Set<String>> e : entries.entrySet()) {
if (e.getValue().contains(controller)) {
if (new File(e.getKey()).canRead()) {
return e.getKey();
} else {
LOG.warn(String.format(
"Skipping inaccessible cgroup mount point %s", e.getKey()));
}
}
}
return null;
}
protected abstract void mountCGroupController(CGroupController controller)
throws ResourceHandlerException;
@Override
public String getRelativePathForCGroup(String cGroupId) {
return cGroupPrefix + Path.SEPARATOR + cGroupId;
}
@Override
public String getPathForCGroup(CGroupController controller, String cGroupId) {
return getControllerPath(controller) + Path.SEPARATOR + cGroupPrefix
+ Path.SEPARATOR + cGroupId;
}
@Override
public String getPathForCGroupTasks(CGroupController controller,
String cGroupId) {
return getPathForCGroup(controller, cGroupId)
+ Path.SEPARATOR + CGROUP_PROCS_FILE;
}
@Override
public String getPathForCGroupParam(CGroupController controller,
String cGroupId, String param) {
return getPathForCGroup(controller, cGroupId)
+ Path.SEPARATOR + controller.getName()
+ "." + param;
}
/**
* Mount cgroup or use existing mount point based on configuration.
*
* @param controller - the controller being initialized
* @throws ResourceHandlerException yarn hierarchy cannot be created or
* accessed for any reason
*/
@Override
public void initializeCGroupController(CGroupController controller) throws
ResourceHandlerException {
if (this.cGroupsMountConfig.isMountEnabled() &&
cGroupsMountConfig.ensureMountPathIsDefined()) {
// We have a controller that needs to be mounted
mountCGroupController(controller);
}
// We are working with a pre-mounted contoller
// Make sure that YARN cgroup hierarchy path exists
initializePreMountedCGroupController(controller);
}
/**
* This function is called when the administrator opted
* to use a pre-mounted cgroup controller.
* There are two options.
* 1. YARN hierarchy already exists. We verify, whether we have write access
* in this case.
* 2. YARN hierarchy does not exist, yet. We create it in this case. If cgroup v2 is used
* an additional step is required to update the cgroup.subtree_control file, see
* {@link CGroupsV2HandlerImpl#updateEnabledControllersInHierarchy}
*
* @param controller the controller being initialized
* @throws ResourceHandlerException yarn hierarchy cannot be created or
* accessed for any reason
*/
private void initializePreMountedCGroupController(CGroupController controller)
throws ResourceHandlerException {
// Check permissions to cgroup hierarchy and
// create YARN cgroup if it does not exist, yet
String controllerPath = getControllerPath(controller);
if (controllerPath == null) {
throw new ResourceHandlerException(
String.format("Controller %s not mounted."
+ " You either need to mount it with %s"
+ " or mount cgroups before launching Yarn",
controller.getName(), YarnConfiguration.
NM_LINUX_CONTAINER_CGROUPS_MOUNT));
}
File rootHierarchy = new File(controllerPath);
File yarnHierarchy = new File(rootHierarchy, cGroupPrefix);
String subsystemName = controller.getName();
LOG.info("Initializing mounted controller " + controller.getName() + " " +
"at " + yarnHierarchy);
if (!rootHierarchy.exists()) {
throw new ResourceHandlerException(getErrorWithDetails(
"Cgroups mount point does not exist or not accessible",
subsystemName,
rootHierarchy.getAbsolutePath()
));
} else if (!yarnHierarchy.exists()) {
LOG.info("Yarn control group does not exist. Creating " +
yarnHierarchy.getAbsolutePath());
try {
if (yarnHierarchy.mkdir()) {
updateEnabledControllersInHierarchy(rootHierarchy, controller);
} else {
// Unexpected: we just checked that it was missing
throw new ResourceHandlerException(getErrorWithDetails(
"Unexpected: Cannot create yarn cgroup",
subsystemName,
yarnHierarchy.getAbsolutePath()
));
}
} catch (SecurityException e) {
throw new ResourceHandlerException(getErrorWithDetails(
"No permissions to create yarn cgroup",
subsystemName,
yarnHierarchy.getAbsolutePath()
), e);
}
} else if (!FileUtil.canWrite(yarnHierarchy)) {
throw new ResourceHandlerException(getErrorWithDetails(
"Yarn control group not writable",
subsystemName,
yarnHierarchy.getAbsolutePath()
));
}
try {
updateEnabledControllersInHierarchy(yarnHierarchy, controller);
} catch (ResourceHandlerException e) {
throw new ResourceHandlerException(getErrorWithDetails(
"Failed to update cgroup.subtree_control in yarn hierarchy",
subsystemName,
yarnHierarchy.getAbsolutePath()
));
}
}
protected abstract void updateEnabledControllersInHierarchy(
File yarnHierarchy, CGroupController controller)
throws ResourceHandlerException;
/**
* Creates an actionable error message for mtab parsing.
*
* @param errorMessage message to use
* @param subsystemName cgroup subsystem
* @param yarnCgroupPath cgroup path that failed
* @return a string builder that can be appended by the caller
*/
private String getErrorWithDetails(
String errorMessage,
String subsystemName,
String yarnCgroupPath) {
return String.format("%s Subsystem:%s Mount points:%s User:%s Path:%s ",
errorMessage, subsystemName, mtabFile, System.getProperty("user.name"),
yarnCgroupPath);
}
@Override
public String createCGroup(CGroupController controller, String cGroupId)
throws ResourceHandlerException {
String path = getPathForCGroup(controller, cGroupId);
LOG.debug("createCgroup: {}", path);
if (!new File(path).mkdir()) {
throw new ResourceHandlerException("Failed to create cgroup at " + path);
}
return path;
}
/*
* Utility routine to print first line from cgroup.procs file
*/
private void logLineFromProcsFile(File cgf) {
String str;
if (LOG.isDebugEnabled()) {
try (BufferedReader inl =
new BufferedReader(new InputStreamReader(
Files.newInputStream(Paths.get(cgf + Path.SEPARATOR + CGROUP_PROCS_FILE)),
StandardCharsets.UTF_8))) {
str = inl.readLine();
if (str != null) {
LOG.debug("First line in cgroup tasks file: {} {}", cgf, str);
}
} catch (IOException e) {
LOG.warn("Failed to read cgroup tasks file. ", e);
}
}
}
/**
* If tasks file is empty, delete the cgroup.
*
* @param cgf object referring to the cgroup to be deleted
* @return Boolean indicating whether cgroup was deleted
*/
private boolean checkAndDeleteCgroup(File cgf) throws InterruptedException {
boolean deleted = false;
// FileInputStream in = null;
if (cgf.exists()) {
try (FileInputStream in = new FileInputStream(cgf + Path.SEPARATOR + CGROUP_PROCS_FILE)) {
if (in.read() == -1) {
/*
* "cgroup.procs" file is empty, sleep a bit more and then try to delete the
* cgroup. Some versions of linux will occasionally panic due to a race
* condition in this area, hence the paranoia.
*/
Thread.sleep(deleteCGroupDelay);
deleted = cgf.delete();
if (!deleted) {
LOG.warn("Failed attempt to delete cgroup: " + cgf);
}
} else {
logLineFromProcsFile(cgf);
}
} catch (IOException e) {
LOG.warn("Failed to read cgroup tasks file. ", e);
}
} else {
LOG.info("Parent Cgroups directory {} does not exist. Skipping "
+ "deletion", cgf.getPath());
deleted = true;
}
return deleted;
}
@Override
public void deleteCGroup(CGroupController controller, String cGroupId)
throws ResourceHandlerException {
boolean deleted = false;
String cGroupPath = getPathForCGroup(controller, cGroupId);
LOG.debug("deleteCGroup: {}", cGroupPath);
long start = clock.getTime();
do {
try {
deleted = checkAndDeleteCgroup(new File(cGroupPath));
if (!deleted) {
Thread.sleep(deleteCGroupDelay);
}
} catch (InterruptedException ex) {
// NOP
}
} while (!deleted && (clock.getTime() - start) < deleteCGroupTimeout);
if (!deleted) {
LOG.warn(String.format("Unable to delete %s, tried to delete for %d ms",
cGroupPath, deleteCGroupTimeout));
}
}
@Override
public void updateCGroupParam(CGroupController controller, String cGroupId,
String param, String value) throws ResourceHandlerException {
String cGroupParamPath = getPathForCGroupParam(controller, cGroupId, param);
PrintWriter pw = null;
LOG.debug("updateCGroupParam for path: {} with value {}",
cGroupParamPath, value);
try {
File file = new File(cGroupParamPath);
Writer w = new OutputStreamWriter(Files.newOutputStream(file.toPath()),
StandardCharsets.UTF_8);
pw = new PrintWriter(w);
pw.write(value);
} catch (IOException e) {
throw new ResourceHandlerException(
String.format("Unable to write to %s with value: %s",
cGroupParamPath, value), e);
} finally {
if (pw != null) {
boolean hasError = pw.checkError();
pw.close();
if (hasError) {
throw new ResourceHandlerException(
String.format("PrintWriter unable to write to %s with value: %s",
cGroupParamPath, value));
}
if (pw.checkError()) {
throw new ResourceHandlerException(
String.format("Error while closing cgroup file %s",
cGroupParamPath));
}
}
}
}
@Override
public String getCGroupParam(CGroupController controller, String cGroupId,
String param) throws ResourceHandlerException {
String cGroupParamPath =
param.equals(CGROUP_PROCS_FILE) ?
getPathForCGroup(controller, cGroupId)
+ Path.SEPARATOR + param :
getPathForCGroupParam(controller, cGroupId, param);
try {
byte[] contents = Files.readAllBytes(Paths.get(cGroupParamPath));
return new String(contents, StandardCharsets.UTF_8).trim();
} catch (IOException e) {
throw new ResourceHandlerException(
"Unable to read from " + cGroupParamPath);
}
}
@Override
public String getCGroupMountPath() {
return this.cGroupsMountConfig.getMountPath();
}
@Override
public String toString() {
return CGroupsHandlerImpl.class.getName() + "{" +
"mtabFile='" + mtabFile + '\'' +
", cGroupPrefix='" + cGroupPrefix + '\'' +
", cGroupsMountConfig=" + cGroupsMountConfig +
", deleteCGroupTimeout=" + deleteCGroupTimeout +
", deleteCGroupDelay=" + deleteCGroupDelay +
'}';
}
}

View File

@ -36,45 +36,76 @@
public interface CGroupsHandler {
/**
* List of supported cgroup subsystem types.
* List of supported cgroup controller types. The two boolean variables denote whether
* the controller is valid in v1, v2 or both.
*/
enum CGroupController {
CPU("cpu"),
NET_CLS("net_cls"),
BLKIO("blkio"),
MEMORY("memory"),
CPUACCT("cpuacct"),
CPUSET("cpuset"),
FREEZER("freezer"),
DEVICES("devices");
NET_CLS("net_cls", true, false),
BLKIO("blkio", true, false),
CPUACCT("cpuacct", true, false),
FREEZER("freezer", true, false),
DEVICES("devices", true, false),
// v2 specific
IO("io", false, true),
// present in v1 and v2
CPU("cpu", true, true),
CPUSET("cpuset", true, true),
MEMORY("memory", true, true);
private final String name;
private final boolean inV1;
private final boolean inV2;
CGroupController(String name) {
CGroupController(String name, boolean inV1, boolean inV2) {
this.name = name;
this.inV1 = inV1;
this.inV2 = inV2;
}
public String getName() {
return name;
}
public boolean isInV1() {
return inV1;
}
public boolean isInV2() {
return inV2;
}
/**
* Get the list of valid cgroup names.
* @return The set of cgroup name strings
* Returns a set of valid cgroup controller names for v1.
* @return a set of valid cgroup controller names for v1.
*/
public static Set<String> getValidCGroups() {
public static Set<String> getValidV1CGroups() {
HashSet<String> validCgroups = new HashSet<>();
for (CGroupController controller : CGroupController.values()) {
if (controller.isInV1()) {
validCgroups.add(controller.getName());
}
}
return validCgroups;
}
/**
* Returns a set of valid cgroup controller names for v2.
* @return a set of valid cgroup controller names for v2.
*/
public static Set<String> getValidV2CGroups() {
HashSet<String> validCgroups = new HashSet<>();
for (CGroupController controller : CGroupController.values()) {
if (controller.isInV2()) {
validCgroups.add(controller.getName());
}
}
return validCgroups;
}
}
String CGROUP_PROCS_FILE = "cgroup.procs";
String CGROUP_PARAM_CLASSID = "classid";
String CGROUP_PARAM_BLKIO_WEIGHT = "weight";
// v1 specific params
String CGROUP_PARAM_MEMORY_HARD_LIMIT_BYTES = "limit_in_bytes";
String CGROUP_PARAM_MEMORY_SWAP_HARD_LIMIT_BYTES = "memsw.limit_in_bytes";
String CGROUP_PARAM_MEMORY_SOFT_LIMIT_BYTES = "soft_limit_in_bytes";
@ -84,12 +115,19 @@ public static Set<String> getValidCGroups() {
String CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES = "memsw.usage_in_bytes";
String CGROUP_NO_LIMIT = "-1";
String UNDER_OOM = "under_oom 1";
String CGROUP_CPU_PERIOD_US = "cfs_period_us";
String CGROUP_CPU_QUOTA_US = "cfs_quota_us";
String CGROUP_CPU_SHARES = "shares";
// v2 specific params
String CGROUP_CONTROLLERS_FILE = "cgroup.controllers";
String CGROUP_SUBTREE_CONTROL_FILE = "cgroup.subtree_control";
// present in v1 and v2
String CGROUP_PROCS_FILE = "cgroup.procs";
String CGROUP_PARAM_CLASSID = "classid";
String CGROUP_PARAM_BLKIO_WEIGHT = "weight";
/**
* Mounts or initializes a cgroup controller.
* @param controller - the controller being initialized
@ -125,6 +163,12 @@ void deleteCGroup(CGroupController controller, String cGroupId) throws
*/
String getControllerPath(CGroupController controller);
/**
* Gets the valid cgroup controller names based on the version used.
* @return a set containing the valid controller names for the used cgroup version.
*/
Set<String> getValidCGroups();
/**
* Gets the relative path for the cgroup, independent of a controller, for a
* given cgroup id.

View File

@ -20,64 +20,38 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.base.Joiner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.thirdparty.com.google.common.base.Joiner;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationException;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
/**
* Support for interacting with various CGroup subsystems. Thread-safe.
* Support for interacting with various CGroup v1 subsystems. Thread-safe.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
class CGroupsHandlerImpl implements CGroupsHandler {
class CGroupsHandlerImpl extends AbstractCGroupsHandler {
private static final Logger LOG =
LoggerFactory.getLogger(CGroupsHandlerImpl.class);
private static final String MTAB_FILE = "/proc/mounts";
private static final String CGROUPS_FSTYPE = "cgroup";
private final String mtabFile;
private final String cGroupPrefix;
private final CGroupsMountConfig cGroupsMountConfig;
private final long deleteCGroupTimeout;
private final long deleteCGroupDelay;
private Map<CGroupController, String> controllerPaths;
private Map<String, Set<String>> parsedMtab;
private final ReadWriteLock rwLock;
private final PrivilegedOperationExecutor privilegedOperationExecutor;
private final Clock clock;
private static final String CGROUP_FSTYPE = "cgroup";
/**
* Create cgroup handler object.
* Create cgroup v1 handler object.
* @param conf configuration
* @param privilegedOperationExecutor provides mechanisms to execute
* PrivilegedContainerOperations
@ -87,30 +61,11 @@ class CGroupsHandlerImpl implements CGroupsHandler {
CGroupsHandlerImpl(Configuration conf, PrivilegedOperationExecutor
privilegedOperationExecutor, String mtab)
throws ResourceHandlerException {
// Remove leading and trialing slash(es)
this.cGroupPrefix = conf.get(YarnConfiguration.
NM_LINUX_CONTAINER_CGROUPS_HIERARCHY, "/hadoop-yarn")
.replaceAll("^/+", "").replaceAll("/+$", "");
this.cGroupsMountConfig = new CGroupsMountConfig(conf);
this.deleteCGroupTimeout = conf.getLong(
YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_DELETE_TIMEOUT,
YarnConfiguration.DEFAULT_NM_LINUX_CONTAINER_CGROUPS_DELETE_TIMEOUT) +
conf.getLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS,
YarnConfiguration.DEFAULT_NM_SLEEP_DELAY_BEFORE_SIGKILL_MS) + 1000;
this.deleteCGroupDelay =
conf.getLong(YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_DELETE_DELAY,
YarnConfiguration.DEFAULT_NM_LINUX_CONTAINER_CGROUPS_DELETE_DELAY);
this.controllerPaths = new HashMap<>();
this.parsedMtab = new HashMap<>();
this.rwLock = new ReentrantReadWriteLock();
this.privilegedOperationExecutor = privilegedOperationExecutor;
this.clock = SystemClock.getInstance();
mtabFile = mtab;
init();
super(conf, privilegedOperationExecutor, mtab);
}
/**
* Create cgroup handler object.
* Create cgroup v1 handler object.
* @param conf configuration
* @param privilegedOperationExecutor provides mechanisms to execute
* PrivilegedContainerOperations
@ -121,171 +76,47 @@ class CGroupsHandlerImpl implements CGroupsHandler {
this(conf, privilegedOperationExecutor, MTAB_FILE);
}
private void init() throws ResourceHandlerException {
initializeControllerPaths();
@Override
public Set<String> getValidCGroups() {
return CGroupController.getValidV1CGroups();
}
@Override
public String getControllerPath(CGroupController controller) {
rwLock.readLock().lock();
try {
return controllerPaths.get(controller);
} finally {
rwLock.readLock().unlock();
}
protected List<CGroupController> getCGroupControllers() {
return Arrays.stream(CGroupController.values()).filter(CGroupController::isInV1)
.collect(Collectors.toList());
}
private void initializeControllerPaths() throws ResourceHandlerException {
// Cluster admins may have some subsystems mounted in specific locations
// We'll attempt to figure out mount points. We do this even if we plan
// to mount cgroups into our own tree to control the path permissions or
// to mount subsystems that are not mounted previously.
// The subsystems for new and existing mount points have to match, and
// the same hierarchy will be mounted at each mount point with the same
// subsystem set.
Map<String, Set<String>> newMtab = null;
Map<CGroupController, String> cPaths;
try {
if (this.cGroupsMountConfig.mountDisabledButMountPathDefined()) {
newMtab = ResourceHandlerModule.
@Override
protected Map<String, Set<String>> parsePreConfiguredMountPath() throws IOException {
return ResourceHandlerModule.
parseConfiguredCGroupPath(this.cGroupsMountConfig.getMountPath());
}
if (newMtab == null) {
// parse mtab
newMtab = parseMtab(mtabFile);
}
@Override
protected Set<String> handleMtabEntry(String path, String type, String options) {
Set<String> validCgroups = getValidCGroups();
// find cgroup controller paths
cPaths = initializeControllerPathsFromMtab(newMtab);
} catch (IOException e) {
LOG.warn("Failed to initialize controller paths! Exception: " + e);
throw new ResourceHandlerException(
"Failed to initialize controller paths!");
}
// we want to do a bulk update without the paths changing concurrently
rwLock.writeLock().lock();
try {
controllerPaths = cPaths;
parsedMtab = newMtab;
} finally {
rwLock.writeLock().unlock();
}
}
@VisibleForTesting
static Map<CGroupController, String> initializeControllerPathsFromMtab(
Map<String, Set<String>> parsedMtab)
throws ResourceHandlerException {
Map<CGroupController, String> ret = new HashMap<>();
for (CGroupController controller : CGroupController.values()) {
String subsystemName = controller.getName();
String controllerPath = findControllerInMtab(subsystemName, parsedMtab);
if (controllerPath != null) {
ret.put(controller, controllerPath);
}
}
return ret;
}
/* We are looking for entries of the form:
* none /cgroup/path/mem cgroup rw,memory 0 0
*
* Use a simple pattern that splits on the five spaces, and
* grabs the 2, 3, and 4th fields.
*/
private static final Pattern MTAB_FILE_FORMAT = Pattern.compile(
"^[^\\s]+\\s([^\\s]+)\\s([^\\s]+)\\s([^\\s]+)\\s[^\\s]+\\s[^\\s]+$");
/*
* Returns a map: path -> mount options
* for mounts with type "cgroup". Cgroup controllers will
* appear in the list of options for a path.
*/
@VisibleForTesting
static Map<String, Set<String>> parseMtab(String mtab)
throws IOException {
Map<String, Set<String>> ret = new HashMap<>();
BufferedReader in = null;
Set<String> validCgroups =
CGroupsHandler.CGroupController.getValidCGroups();
try {
FileInputStream fis = new FileInputStream(new File(mtab));
in = new BufferedReader(new InputStreamReader(fis, StandardCharsets.UTF_8));
for (String str = in.readLine(); str != null;
str = in.readLine()) {
Matcher m = MTAB_FILE_FORMAT.matcher(str);
boolean mat = m.find();
if (mat) {
String path = m.group(1);
String type = m.group(2);
String options = m.group(3);
if (type.equals(CGROUPS_FSTYPE)) {
Set<String> cgroupList =
if (type.equals(CGROUP_FSTYPE)) {
Set<String> controllerSet =
new HashSet<>(Arrays.asList(options.split(",")));
// Collect the valid subsystem names
cgroupList.retainAll(validCgroups);
ret.put(path, cgroupList);
}
}
}
} catch (IOException e) {
if (Shell.LINUX) {
throw new IOException("Error while reading " + mtab, e);
} else {
// Ignore the error, if we are running on an os other than Linux
LOG.warn("Error while reading " + mtab, e);
}
} finally {
IOUtils.cleanupWithLogger(LOG, in);
}
return ret;
}
/**
* Find the hierarchy of the subsystem.
* The kernel ensures that a subsystem can only be part of a single hierarchy.
* The subsystem can be part of multiple mount points, if they belong to the
* same hierarchy.
* @param controller subsystem like cpu, cpuset, etc...
* @param entries map of paths to mount options
* @return the first mount path that has the requested subsystem
*/
@VisibleForTesting
static String findControllerInMtab(String controller,
Map<String, Set<String>> entries) {
for (Map.Entry<String, Set<String>> e : entries.entrySet()) {
if (e.getValue().contains(controller)) {
if (new File(e.getKey()).canRead()) {
return e.getKey();
} else {
LOG.warn(String.format(
"Skipping inaccessible cgroup mount point %s", e.getKey()));
}
}
controllerSet.retainAll(validCgroups);
return controllerSet;
}
return null;
}
private void mountCGroupController(CGroupController controller)
@Override
protected void mountCGroupController(CGroupController controller)
throws ResourceHandlerException {
String existingMountPath = getControllerPath(controller);
String requestedMountPath =
new File(cGroupsMountConfig.getMountPath(),
controller.getName()).getAbsolutePath();
if (existingMountPath == null ||
!requestedMountPath.equals(existingMountPath)) {
if (!requestedMountPath.equals(existingMountPath)) {
//lock out other readers/writers till we are done
rwLock.writeLock().lock();
try {
@ -326,296 +157,8 @@ private void mountCGroupController(CGroupController controller)
}
@Override
public String getRelativePathForCGroup(String cGroupId) {
return cGroupPrefix + Path.SEPARATOR + cGroupId;
}
@Override
public String getPathForCGroup(CGroupController controller, String cGroupId) {
return getControllerPath(controller) + Path.SEPARATOR + cGroupPrefix
+ Path.SEPARATOR + cGroupId;
}
@Override
public String getPathForCGroupTasks(CGroupController controller,
String cGroupId) {
return getPathForCGroup(controller, cGroupId)
+ Path.SEPARATOR + CGROUP_PROCS_FILE;
}
@Override
public String getPathForCGroupParam(CGroupController controller,
String cGroupId, String param) {
return getPathForCGroup(controller, cGroupId)
+ Path.SEPARATOR + controller.getName()
+ "." + param;
}
/**
* Mount cgroup or use existing mount point based on configuration.
* @param controller - the controller being initialized
* @throws ResourceHandlerException yarn hierarchy cannot be created or
* accessed for any reason
*/
@Override
public void initializeCGroupController(CGroupController controller) throws
ResourceHandlerException {
if (this.cGroupsMountConfig.isMountEnabled() &&
cGroupsMountConfig.ensureMountPathIsDefined()) {
// We have a controller that needs to be mounted
mountCGroupController(controller);
}
// We are working with a pre-mounted contoller
// Make sure that YARN cgroup hierarchy path exists
initializePreMountedCGroupController(controller);
}
/**
* This function is called when the administrator opted
* to use a pre-mounted cgroup controller.
* There are two options.
* 1. YARN hierarchy already exists. We verify, whether we have write access
* in this case.
* 2. YARN hierarchy does not exist, yet. We create it in this case.
* @param controller the controller being initialized
* @throws ResourceHandlerException yarn hierarchy cannot be created or
* accessed for any reason
*/
private void initializePreMountedCGroupController(CGroupController controller)
throws ResourceHandlerException {
// Check permissions to cgroup hierarchy and
// create YARN cgroup if it does not exist, yet
String controllerPath = getControllerPath(controller);
if (controllerPath == null) {
throw new ResourceHandlerException(
String.format("Controller %s not mounted."
+ " You either need to mount it with %s"
+ " or mount cgroups before launching Yarn",
controller.getName(), YarnConfiguration.
NM_LINUX_CONTAINER_CGROUPS_MOUNT));
}
File rootHierarchy = new File(controllerPath);
File yarnHierarchy = new File(rootHierarchy, cGroupPrefix);
String subsystemName = controller.getName();
LOG.info("Initializing mounted controller " + controller.getName() + " " +
"at " + yarnHierarchy);
if (!rootHierarchy.exists()) {
throw new ResourceHandlerException(getErrorWithDetails(
"Cgroups mount point does not exist or not accessible",
subsystemName,
rootHierarchy.getAbsolutePath()
));
} else if (!yarnHierarchy.exists()) {
LOG.info("Yarn control group does not exist. Creating " +
yarnHierarchy.getAbsolutePath());
try {
if (!yarnHierarchy.mkdir()) {
// Unexpected: we just checked that it was missing
throw new ResourceHandlerException(getErrorWithDetails(
"Unexpected: Cannot create yarn cgroup",
subsystemName,
yarnHierarchy.getAbsolutePath()
));
}
} catch (SecurityException e) {
throw new ResourceHandlerException(getErrorWithDetails(
"No permissions to create yarn cgroup",
subsystemName,
yarnHierarchy.getAbsolutePath()
), e);
}
} else if (!FileUtil.canWrite(yarnHierarchy)) {
throw new ResourceHandlerException(getErrorWithDetails(
"Yarn control group not writable",
subsystemName,
yarnHierarchy.getAbsolutePath()
));
}
}
/**
* Creates an actionable error message for mtab parsing.
* @param errorMessage message to use
* @param subsystemName cgroup subsystem
* @param yarnCgroupPath cgroup path that failed
* @return a string builder that can be appended by the caller
*/
private String getErrorWithDetails(
String errorMessage,
String subsystemName,
String yarnCgroupPath) {
return String.format("%s Subsystem:%s Mount points:%s User:%s Path:%s ",
errorMessage, subsystemName, mtabFile, System.getProperty("user.name"),
yarnCgroupPath);
}
@Override
public String createCGroup(CGroupController controller, String cGroupId)
throws ResourceHandlerException {
String path = getPathForCGroup(controller, cGroupId);
LOG.debug("createCgroup: {}", path);
if (!new File(path).mkdir()) {
throw new ResourceHandlerException("Failed to create cgroup at " + path);
}
return path;
}
/*
* Utility routine to print first line from cgroup tasks file
*/
private void logLineFromTasksFile(File cgf) {
String str;
if (LOG.isDebugEnabled()) {
try (BufferedReader inl =
new BufferedReader(new InputStreamReader(new FileInputStream(cgf
+ "/tasks"), StandardCharsets.UTF_8))) {
str = inl.readLine();
if (str != null) {
LOG.debug("First line in cgroup tasks file: {} {}", cgf, str);
}
} catch (IOException e) {
LOG.warn("Failed to read cgroup tasks file. ", e);
}
}
}
/**
* If tasks file is empty, delete the cgroup.
*
* @param cgf object referring to the cgroup to be deleted
* @return Boolean indicating whether cgroup was deleted
*/
private boolean checkAndDeleteCgroup(File cgf) throws InterruptedException {
boolean deleted = false;
// FileInputStream in = null;
if ( cgf.exists() ) {
try (FileInputStream in = new FileInputStream(cgf + "/tasks")) {
if (in.read() == -1) {
/*
* "tasks" file is empty, sleep a bit more and then try to delete the
* cgroup. Some versions of linux will occasionally panic due to a race
* condition in this area, hence the paranoia.
*/
Thread.sleep(deleteCGroupDelay);
deleted = cgf.delete();
if (!deleted) {
LOG.warn("Failed attempt to delete cgroup: " + cgf);
}
} else{
logLineFromTasksFile(cgf);
}
} catch (IOException e) {
LOG.warn("Failed to read cgroup tasks file. ", e);
}
} else {
LOG.info("Parent Cgroups directory {} does not exist. Skipping "
+ "deletion", cgf.getPath());
deleted = true;
}
return deleted;
}
@Override
public void deleteCGroup(CGroupController controller, String cGroupId)
throws ResourceHandlerException {
boolean deleted = false;
String cGroupPath = getPathForCGroup(controller, cGroupId);
LOG.debug("deleteCGroup: {}", cGroupPath);
long start = clock.getTime();
do {
try {
deleted = checkAndDeleteCgroup(new File(cGroupPath));
if (!deleted) {
Thread.sleep(deleteCGroupDelay);
}
} catch (InterruptedException ex) {
// NOP
}
} while (!deleted && (clock.getTime() - start) < deleteCGroupTimeout);
if (!deleted) {
LOG.warn(String.format("Unable to delete %s, tried to delete for %d ms",
cGroupPath, deleteCGroupTimeout));
}
}
@Override
public void updateCGroupParam(CGroupController controller, String cGroupId,
String param, String value) throws ResourceHandlerException {
String cGroupParamPath = getPathForCGroupParam(controller, cGroupId, param);
PrintWriter pw = null;
LOG.debug("updateCGroupParam for path: {} with value {}",
cGroupParamPath, value);
try {
File file = new File(cGroupParamPath);
Writer w = new OutputStreamWriter(new FileOutputStream(file), StandardCharsets.UTF_8);
pw = new PrintWriter(w);
pw.write(value);
} catch (IOException e) {
throw new ResourceHandlerException(
String.format("Unable to write to %s with value: %s",
cGroupParamPath, value), e);
} finally {
if (pw != null) {
boolean hasError = pw.checkError();
pw.close();
if (hasError) {
throw new ResourceHandlerException(
String.format("PrintWriter unable to write to %s with value: %s",
cGroupParamPath, value));
}
if (pw.checkError()) {
throw new ResourceHandlerException(
String.format("Error while closing cgroup file %s",
cGroupParamPath));
}
}
}
}
@Override
public String getCGroupParam(CGroupController controller, String cGroupId,
String param) throws ResourceHandlerException {
String cGroupParamPath =
param.equals(CGROUP_PROCS_FILE) ?
getPathForCGroup(controller, cGroupId)
+ Path.SEPARATOR + param :
getPathForCGroupParam(controller, cGroupId, param);
try {
byte[] contents = Files.readAllBytes(Paths.get(cGroupParamPath));
return new String(contents, StandardCharsets.UTF_8).trim();
} catch (IOException e) {
throw new ResourceHandlerException(
"Unable to read from " + cGroupParamPath);
}
}
@Override
public String getCGroupMountPath() {
return this.cGroupsMountConfig.getMountPath();
}
@Override
public String toString() {
return CGroupsHandlerImpl.class.getName() + "{" +
"mtabFile='" + mtabFile + '\'' +
", cGroupPrefix='" + cGroupPrefix + '\'' +
", cGroupsMountConfig=" + cGroupsMountConfig +
", deleteCGroupTimeout=" + deleteCGroupTimeout +
", deleteCGroupDelay=" + deleteCGroupDelay +
'}';
protected void updateEnabledControllersInHierarchy(
File yarnHierarchy, CGroupController controller) {
// no-op in cgroup v1
}
}

View File

@ -0,0 +1,209 @@
/*
* *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* /
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
/**
* Support for interacting with various CGroup v2 subsystems. Thread-safe.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
class CGroupsV2HandlerImpl extends AbstractCGroupsHandler {
private static final Logger LOG =
LoggerFactory.getLogger(CGroupsV2HandlerImpl.class);
private static final String CGROUP2_FSTYPE = "cgroup2";
/**
* Create cgroup v2 handler object.
* @param conf configuration
* @param privilegedOperationExecutor provides mechanisms to execute
* PrivilegedContainerOperations
* @param mtab mount file location
* @throws ResourceHandlerException if initialization failed
*/
CGroupsV2HandlerImpl(Configuration conf, PrivilegedOperationExecutor
privilegedOperationExecutor, String mtab)
throws ResourceHandlerException {
super(conf, privilegedOperationExecutor, mtab);
}
/**
* Create cgroup v2 handler object.
* @param conf configuration
* @param privilegedOperationExecutor provides mechanisms to execute
* PrivilegedContainerOperations
* @throws ResourceHandlerException if initialization failed
*/
CGroupsV2HandlerImpl(Configuration conf, PrivilegedOperationExecutor
privilegedOperationExecutor) throws ResourceHandlerException {
this(conf, privilegedOperationExecutor, MTAB_FILE);
}
@Override
public Set<String> getValidCGroups() {
return CGroupController.getValidV2CGroups();
}
@Override
protected List<CGroupController> getCGroupControllers() {
return Arrays.stream(CGroupController.values()).filter(CGroupController::isInV2)
.collect(Collectors.toList());
}
@Override
protected Map<String, Set<String>> parsePreConfiguredMountPath() throws IOException {
Map<String, Set<String>> controllerMappings = new HashMap<>();
String controllerPath = this.cGroupsMountConfig.getMountPath() +
Path.SEPARATOR + this.cGroupPrefix;
controllerMappings.put(this.cGroupsMountConfig.getMountPath(),
readControllersFile(controllerPath));
return controllerMappings;
}
@Override
protected Set<String> handleMtabEntry(String path, String type, String options)
throws IOException {
if (type.equals(CGROUP2_FSTYPE)) {
return readControllersFile(path);
}
return null;
}
@Override
protected void mountCGroupController(CGroupController controller) {
throw new UnsupportedOperationException("Mounting cgroup controllers is not supported in " +
"cgroup v2");
}
/**
* Parse the cgroup v2 controllers file (cgroup.controllers) to check the enabled controllers.
* @param cgroupPath path to the cgroup directory
* @return set of enabled and YARN supported controllers.
* @throws IOException if the file is not found or cannot be read
*/
public Set<String> readControllersFile(String cgroupPath) throws IOException {
File cgroupControllersFile = new File(cgroupPath + Path.SEPARATOR + CGROUP_CONTROLLERS_FILE);
if (!cgroupControllersFile.exists()) {
throw new IOException("No cgroup controllers file found in the directory specified: " +
cgroupPath);
}
String enabledControllers = FileUtils.readFileToString(cgroupControllersFile,
StandardCharsets.UTF_8);
Set<String> validCGroups = getValidCGroups();
Set<String> controllerSet =
new HashSet<>(Arrays.asList(enabledControllers.split(" ")));
// Collect the valid subsystem names
controllerSet.retainAll(validCGroups);
if (controllerSet.isEmpty()) {
LOG.warn("The following cgroup directory doesn't contain any supported controllers: " +
cgroupPath);
}
return controllerSet;
}
/**
* The cgroup.subtree_control file is used to enable controllers for a subtree of the cgroup
* hierarchy (the current level excluded).
* From the documentation: A read-write space separated values file which exists on all
* cgroups. Starts out empty. When read, it shows space separated list of the controllers which
* are enabled to control resource distribution from the cgroup to its children.
* Space separated list of controllers prefixed with '+' or '-'
* can be written to enable or disable controllers.
* Since YARN will create a sub-cgroup for each container, we need to enable the controllers
* for the subtree. Update the subtree_control file to enable subsequent container based cgroups
* to use the same controllers.
* If a cgroup.subtree_control file is present, but it doesn't contain all the controllers
* enabled in the cgroup.controllers file, this method will update the subtree_control file
* to include all the controllers.
* @param yarnHierarchy path to the yarn cgroup under which the container cgroups will be created
* @throws ResourceHandlerException if the controllers file cannot be updated
*/
@Override
protected void updateEnabledControllersInHierarchy(
File yarnHierarchy, CGroupController controller) throws ResourceHandlerException {
try {
Set<String> enabledControllers = readControllersFile(yarnHierarchy.getAbsolutePath());
if (!enabledControllers.contains(controller.getName())) {
throw new ResourceHandlerException(String.format(
"The controller %s is not enabled in the cgroup hierarchy: %s. Please enable it in " +
"in the %s/cgroup.subtree_control file.",
controller.getName(), yarnHierarchy.getAbsolutePath(),
yarnHierarchy.getParentFile().getAbsolutePath()));
}
File subtreeControlFile = new File(yarnHierarchy.getAbsolutePath()
+ Path.SEPARATOR + CGROUP_SUBTREE_CONTROL_FILE);
if (!subtreeControlFile.exists()) {
throw new ResourceHandlerException(
"No subtree control file found in the cgroup hierarchy: " +
yarnHierarchy.getAbsolutePath());
}
Writer w = new OutputStreamWriter(Files.newOutputStream(subtreeControlFile.toPath(),
StandardOpenOption.APPEND), StandardCharsets.UTF_8);
try(PrintWriter pw = new PrintWriter(w)) {
LOG.info("Appending the following controller to the cgroup.subtree_control file: {}, " +
"for the cgroup hierarchy: {}", controller.getName(),
yarnHierarchy.getAbsolutePath());
pw.write("+" + controller.getName());
if (pw.checkError()) {
throw new ResourceHandlerException("Failed to add the controller to the " +
"cgroup.subtree_control file in the cgroup hierarchy: " +
yarnHierarchy.getAbsolutePath());
}
}
} catch (IOException e) {
throw new ResourceHandlerException(
"Failed to update the cgroup.subtree_control file in the cgroup hierarchy: " +
yarnHierarchy.getAbsolutePath(), e);
}
}
}

View File

@ -82,6 +82,7 @@ private static CGroupsHandler getInitializedCGroupsHandler(Configuration conf)
if (cGroupsHandler == null) {
synchronized (CGroupsHandler.class) {
if (cGroupsHandler == null) {
// TODO determine cgroup version
cGroupsHandler = new CGroupsHandlerImpl(conf,
PrivilegedOperationExecutor.getInstance(conf));
LOG.debug("Value of CGroupsHandler is: {}", cGroupsHandler);
@ -377,7 +378,7 @@ public static Map<String, Set<String>> parseConfiguredCGroupPath(
Map<String, Set<String>> pathSubsystemMappings = new HashMap<>();
Set<String> validCGroups =
CGroupsHandler.CGroupController.getValidCGroups();
CGroupsHandler.CGroupController.getValidV1CGroups();
for (File candidate: list) {
Set<String> cgroupList =
new HashSet<>(Arrays.asList(candidate.getName().split(",")));

View File

@ -400,7 +400,7 @@ private Map<String, Set<String>> parseMtab() throws IOException {
Map<String, Set<String>> ret = new HashMap<String, Set<String>>();
BufferedReader in = null;
Set<String> validCgroups =
CGroupsHandler.CGroupController.getValidCGroups();
CGroupsHandler.CGroupController.getValidV1CGroups();
try {
FileInputStream fis = new FileInputStream(new File(getMtabFileName()));

View File

@ -0,0 +1,147 @@
/*
* *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* /
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
import org.junit.After;
import org.junit.Before;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.security.Permission;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
/**
* Tests for the CGroups handler implementation.
*/
public abstract class TestCGroupsHandlerBase {
protected PrivilegedOperationExecutor privilegedOperationExecutorMock;
protected String tmpPath;
protected String hierarchy;
protected CGroupsHandler.CGroupController controller;
protected String controllerPath;
@Before
public void setup() {
privilegedOperationExecutorMock = mock(PrivilegedOperationExecutor.class);
// Prepare test directory
tmpPath = System.getProperty("test.build.data") + "/cgroup";
File tmpDir = new File(tmpPath);
FileUtils.deleteQuietly(tmpDir);
assertTrue(tmpDir.mkdirs());
//no leading or trailing slashes here
hierarchy = "test-hadoop-yarn";
// Sample subsystem. Not used by all the tests
controller = CGroupsHandler.CGroupController.CPU;
controllerPath = getControllerFilePath(controller.getName());
}
@After
public void teardown() {
FileUtil.fullyDelete(new File(tmpPath));
}
protected abstract String getControllerFilePath(String controllerName);
/**
* Security manager simulating access denied.
*/
protected static class MockSecurityManagerDenyWrite extends SecurityManager {
@Override
public void checkPermission(Permission perm) {
if(perm.getActions().equals("write")) {
throw new SecurityException("Mock not allowed");
}
}
}
/**
* Create configuration to mount cgroups that do not exist.
* @return configuration object
*/
protected YarnConfiguration createMountConfiguration() {
YarnConfiguration conf = new YarnConfiguration();
conf.set(YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_HIERARCHY, hierarchy);
conf.setBoolean(YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_MOUNT, true);
conf.set(YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_MOUNT_PATH, tmpPath);
return conf;
}
/**
* Create configuration where the cgroups are premounted.
* @param myHierarchy YARN cgroup
* @return configuration object
*/
protected Configuration createNoMountConfiguration(String myHierarchy) {
Configuration confNoMount = new Configuration();
confNoMount.set(YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_HIERARCHY,
myHierarchy);
confNoMount.setBoolean(YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_MOUNT,
false);
return confNoMount;
}
/**
* Create an empty mtab file. No cgroups are premounted
* @return mtab file
* @throws IOException could not create file
*/
protected File createEmptyMtabFile() throws IOException {
File emptyMtab = new File(tmpPath, "mtab");
assertTrue("New file should have been created", emptyMtab.createNewFile());
return emptyMtab;
}
/**
* Create a new file with supplied content.
* @param parentDir parent directory
* @param fileName name of the file
* @param content content to write in the file
* @return file created
* @throws IOException if file could not be created
*/
public File createFileWithContent(File parentDir, String fileName, String content)
throws IOException {
File fileToCreate = new File(parentDir, fileName);
if (!fileToCreate.exists()) {
if (!fileToCreate.createNewFile()) {
String message = "Could not create file " + fileToCreate.getAbsolutePath();
throw new IOException(message);
}
}
FileWriter fWriter = new FileWriter(fileToCreate.getAbsoluteFile());
fWriter.write(content);
fWriter.close();
fileToCreate.deleteOnExit();
return fileToCreate;
}
}

View File

@ -24,15 +24,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationException;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
@ -40,16 +36,16 @@
import java.io.FileWriter;
import java.io.IOException;
import java.nio.file.Files;
import java.security.Permission;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.verifyZeroInteractions;
@ -57,90 +53,15 @@
/**
* Tests for the CGroups handler implementation.
*/
public class TestCGroupsHandlerImpl {
public class TestCGroupsHandlerImpl extends TestCGroupsHandlerBase {
private static final Logger LOG =
LoggerFactory.getLogger(TestCGroupsHandlerImpl.class);
private PrivilegedOperationExecutor privilegedOperationExecutorMock;
private String tmpPath;
private String hierarchy;
private CGroupsHandler.CGroupController controller;
private String controllerPath;
@Before
public void setup() {
privilegedOperationExecutorMock = mock(PrivilegedOperationExecutor.class);
// Prepare test directory
tmpPath = System.getProperty("test.build.data") + "/cgroups";
File tmpDir = new File(tmpPath);
FileUtils.deleteQuietly(tmpDir);
assertTrue(tmpDir.mkdirs());
//no leading or trailing slashes here
hierarchy = "test-hadoop-yarn";
// Sample subsystem. Not used by all the tests
controller = CGroupsHandler.CGroupController.NET_CLS;
controllerPath =
new File(new File(tmpPath, controller.getName()), hierarchy)
protected String getControllerFilePath(String controllerName) {
return new File(new File(tmpPath, controllerName), hierarchy)
.getAbsolutePath();
}
@After
public void teardown() {
FileUtil.fullyDelete(new File(tmpPath));
}
/**
* Security manager simulating access denied.
*/
private class MockSecurityManagerDenyWrite extends SecurityManager {
@Override
public void checkPermission(Permission perm) {
if(perm.getActions().equals("write")) {
throw new SecurityException("Mock not allowed");
}
}
}
/**
* Create configuration to mount cgroups that do not exist.
* @return configuration object
*/
private YarnConfiguration createMountConfiguration() {
YarnConfiguration conf = new YarnConfiguration();
conf.set(YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_HIERARCHY, hierarchy);
conf.setBoolean(YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_MOUNT, true);
conf.set(YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_MOUNT_PATH, tmpPath);
return conf;
}
/**
* Create configuration where the cgroups are premounted.
* @param myHierarchy YARN cgroup
* @return configuration object
*/
private Configuration createNoMountConfiguration(String myHierarchy) {
Configuration confNoMount = new Configuration();
confNoMount.set(YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_HIERARCHY,
myHierarchy);
confNoMount.setBoolean(YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_MOUNT,
false);
return confNoMount;
}
/**
* Create an empty mtab file. No cgroups are premounted
* @return mtab file
* @throws IOException could not create file
*/
private File createEmptyCgroups() throws IOException {
File emptyMtab = new File(tmpPath, "mtab");
assertTrue("New file should have been created", emptyMtab.createNewFile());
return emptyMtab;
}
/**
* Create simulated cgroups mount point.
* @param parentDir cgroups mount point
@ -185,6 +106,7 @@ public static File createPremountedCgroups(File parentDir, boolean cpuAcct)
return mockMtab;
}
@Test
public void testMountController() throws IOException {
File parentDir = new File(tmpPath);
@ -193,7 +115,7 @@ public void testMountController() throws IOException {
//Since we enabled (deferred) cgroup controller mounting, no interactions
//should have occurred, with this mock
verifyZeroInteractions(privilegedOperationExecutorMock);
File emptyMtab = createEmptyCgroups();
File emptyMtab = createEmptyMtabFile();
try {
CGroupsHandler cGroupsHandler = new CGroupsHandlerImpl(
@ -225,12 +147,11 @@ public void testMountController() throws IOException {
verifyNoMoreInteractions(privilegedOperationExecutorMock);
} catch (PrivilegedOperationException e) {
LOG.error("Caught exception: " + e);
assertTrue("Unexpected PrivilegedOperationException from mock!",
false);
fail("Unexpected PrivilegedOperationException from mock!");
}
} catch (ResourceHandlerException e) {
LOG.error("Caught exception: " + e);
assertTrue("Unexpected ResourceHandler Exception!", false);
fail("Unexpected ResourceHandler Exception!");
}
}
@ -240,9 +161,9 @@ public void testCGroupPaths() throws IOException {
//in this test.
verifyZeroInteractions(privilegedOperationExecutorMock);
CGroupsHandler cGroupsHandler = null;
File mtab = createEmptyCgroups();
File mtab = createEmptyMtabFile();
// Lets manually create a path to (partially) simulate a controller mounted
// Let's manually create a path to (partially) simulate a controller mounted
// later in the test. This is required because the handler uses a mocked
// privileged operation executor
assertTrue("Sample subsystem should be created",
@ -254,9 +175,7 @@ public void testCGroupPaths() throws IOException {
cGroupsHandler.initializeCGroupController(controller);
} catch (ResourceHandlerException e) {
LOG.error("Caught exception: " + e);
assertTrue(
"Unexpected ResourceHandlerException when mounting controller!",
false);
fail("Unexpected ResourceHandlerException when mounting controller!");
}
String testCGroup = "container_01";
@ -283,7 +202,7 @@ public void testCGroupOperations() throws IOException {
//in this test.
verifyZeroInteractions(privilegedOperationExecutorMock);
CGroupsHandler cGroupsHandler = null;
File mtab = createEmptyCgroups();
File mtab = createEmptyMtabFile();
// Lets manually create a path to (partially) simulate a controller mounted
// later in the test. This is required because the handler uses a mocked
@ -341,11 +260,11 @@ public void testCGroupOperations() throws IOException {
//We can't really do a delete test here. Linux cgroups
//implementation provides additional semantics - the cgroup cannot be
//deleted if there are any tasks still running in the cgroup even if
//the user attempting the delete has the file permissions to do so - we
//the user attempting the deletion has the file permissions to do so - we
//cannot simulate that here. Even if we create a dummy 'tasks' file, we
//wouldn't be able to simulate the delete behavior we need, since a cgroup
//can be deleted using using 'rmdir' if the tasks file is empty. Such a
//delete is not possible with a regular non-empty directory.
//can be deleted using 'rmdir' if the tasks file is empty. Such a
//deletion is not possible with a regular non-empty directory.
} catch (ResourceHandlerException e) {
LOG.error("Caught exception: " + e);
Assert
@ -364,11 +283,15 @@ public void testMtabParsing() throws Exception {
// create mock cgroup
File mockMtabFile = createPremountedCgroups(parentDir, false);
CGroupsHandlerImpl cGroupsHandler = new CGroupsHandlerImpl(
createMountConfiguration(),
privilegedOperationExecutorMock, mockMtabFile.getAbsolutePath());
// Run mtabs parsing
Map<String, Set<String>> newMtab =
CGroupsHandlerImpl.parseMtab(mockMtabFile.getAbsolutePath());
cGroupsHandler.parseMtab(mockMtabFile.getAbsolutePath());
Map<CGroupsHandler.CGroupController, String> controllerPaths =
CGroupsHandlerImpl.initializeControllerPathsFromMtab(
cGroupsHandler.initializeControllerPathsFromMtab(
newMtab);
// Verify
@ -406,8 +329,7 @@ private void testPreMountedControllerInitialization(String myHierarchy)
""));
// Test that a missing yarn hierarchy will be created automatically
if (!cpuCgroupMountDir.equals(mountPoint)) {
assertTrue("Directory should be deleted",
!cpuCgroupMountDir.exists());
assertFalse("Directory should be deleted", cpuCgroupMountDir.exists());
}
cGroupsHandler.initializeCGroupController(
CGroupsHandler.CGroupController.CPU);
@ -431,8 +353,7 @@ private void testPreMountedControllerInitialization(String myHierarchy)
// Test that a non-accessible mount directory results in an exception
if (!cpuCgroupMountDir.equals(mountPoint)) {
assertTrue("Could not delete cgroups", cpuCgroupMountDir.delete());
assertTrue("Directory should be deleted",
!cpuCgroupMountDir.exists());
assertFalse("Directory should be deleted", cpuCgroupMountDir.exists());
}
assertTrue(mountPoint.setWritable(false));
try {
@ -451,8 +372,7 @@ private void testPreMountedControllerInitialization(String myHierarchy)
if (!cpuCgroupMountDir.equals(mountPoint)) {
Assert.assertFalse("Could not delete cgroups",
cpuCgroupMountDir.delete());
assertTrue("Directory should be deleted",
!cpuCgroupMountDir.exists());
assertFalse("Directory should be deleted", cpuCgroupMountDir.exists());
SecurityManager manager = System.getSecurityManager();
System.setSecurityManager(new MockSecurityManagerDenyWrite());
try {
@ -471,12 +391,10 @@ private void testPreMountedControllerInitialization(String myHierarchy)
if (!cpuCgroupMountDir.equals(mountPoint)) {
Assert.assertFalse("Could not delete cgroups",
cpuCgroupMountDir.delete());
assertTrue("Directory should be deleted",
!cpuCgroupMountDir.exists());
assertFalse("Directory should be deleted", cpuCgroupMountDir.exists());
}
FileUtils.deleteQuietly(mountPoint);
assertTrue("cgroups mount point should be deleted",
!mountPoint.exists());
assertFalse("cgroups mount point should be deleted", mountPoint.exists());
try {
cGroupsHandler.initializeCGroupController(
CGroupsHandler.CGroupController.CPU);

View File

@ -0,0 +1,277 @@
/*
* *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* /
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.Assert;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.verifyZeroInteractions;
/**
* Tests for the CGroups handler implementation.
*/
public class TestCGroupsV2HandlerImpl extends TestCGroupsHandlerBase {
// Create a controller file in the unified hierarchy of cgroup v2
@Override
protected String getControllerFilePath(String controllerName) {
return new File(tmpPath, hierarchy).getAbsolutePath();
}
/*
* Create a mock mtab file with the following content:
* cgroup2 /path/to/parentDir cgroup2 rw,nosuid,nodev,noexec,relatime,nsdelegate,memory_recursiveprot 0 0
*
* Create the following cgroup v2 file hierarchy:
* parentDir
* ___________________________________________________
* / \ \
* cgroup.controllers cgroup.subtree_control test-hadoop-yarn (hierarchyDir)
* _________________
* / \
* cgroup.controllers cgroup.subtree_control
*/
public File createPremountedCgroups(File parentDir)
throws IOException {
String baseCgroup2Line =
"cgroup2 " + parentDir.getAbsolutePath()
+ " cgroup2 rw,nosuid,nodev,noexec,relatime,nsdelegate,memory_recursiveprot 0 0\n";
File mockMtab = createFileWithContent(parentDir, UUID.randomUUID().toString(), baseCgroup2Line);
String enabledControllers = "cpuset cpu io memory hugetlb pids rdma misc\n";
File controllersFile = createFileWithContent(parentDir, CGroupsHandler.CGROUP_CONTROLLERS_FILE,
enabledControllers);
File subtreeControlFile = new File(parentDir, CGroupsHandler.CGROUP_SUBTREE_CONTROL_FILE);
Assert.assertTrue("empty subtree_control file should be created",
subtreeControlFile.createNewFile());
File hierarchyDir = new File(parentDir, hierarchy);
if (!hierarchyDir.mkdirs()) {
String message = "Could not create directory " + hierarchyDir.getAbsolutePath();
throw new IOException(message);
}
hierarchyDir.deleteOnExit();
FileUtils.copyFile(controllersFile, new File(hierarchyDir,
CGroupsHandler.CGROUP_CONTROLLERS_FILE));
FileUtils.copyFile(subtreeControlFile, new File(hierarchyDir,
CGroupsHandler.CGROUP_SUBTREE_CONTROL_FILE));
return mockMtab;
}
@Test
public void testCGroupPaths() throws IOException, ResourceHandlerException {
verifyZeroInteractions(privilegedOperationExecutorMock);
File parentDir = new File(tmpPath);
File mtab = createPremountedCgroups(parentDir);
assertTrue("Sample subsystem should be created",
new File(controllerPath).exists());
CGroupsHandler cGroupsHandler = new CGroupsV2HandlerImpl(createNoMountConfiguration(hierarchy),
privilegedOperationExecutorMock, mtab.getAbsolutePath());
cGroupsHandler.initializeCGroupController(controller);
String testCGroup = "container_01";
String expectedPath =
controllerPath + Path.SEPARATOR + testCGroup;
String path = cGroupsHandler.getPathForCGroup(controller, testCGroup);
Assert.assertEquals(expectedPath, path);
String expectedPathTasks = expectedPath + Path.SEPARATOR
+ CGroupsHandler.CGROUP_PROCS_FILE;
path = cGroupsHandler.getPathForCGroupTasks(controller, testCGroup);
Assert.assertEquals(expectedPathTasks, path);
String param = CGroupsHandler.CGROUP_PARAM_CLASSID;
String expectedPathParam = expectedPath + Path.SEPARATOR
+ controller.getName() + "." + param;
path = cGroupsHandler.getPathForCGroupParam(controller, testCGroup, param);
Assert.assertEquals(expectedPathParam, path);
}
@Test(expected = UnsupportedOperationException.class)
public void testUnsupportedMountConfiguration() throws Exception {
//As per junit behavior, we expect a new mock object to be available
//in this test.
verifyZeroInteractions(privilegedOperationExecutorMock);
CGroupsHandler cGroupsHandler;
File mtab = createEmptyMtabFile();
assertTrue("Sample subsystem should be created",
new File(controllerPath).mkdirs());
cGroupsHandler = new CGroupsV2HandlerImpl(createMountConfiguration(),
privilegedOperationExecutorMock, mtab.getAbsolutePath());
cGroupsHandler.initializeCGroupController(controller);
}
@Test
public void testCGroupOperations() throws IOException, ResourceHandlerException {
verifyZeroInteractions(privilegedOperationExecutorMock);
File parentDir = new File(tmpPath);
File mtab = createPremountedCgroups(parentDir);
assertTrue("Sample subsystem should be created",
new File(controllerPath).exists());
CGroupsHandler cGroupsHandler = new CGroupsV2HandlerImpl(createNoMountConfiguration(hierarchy),
privilegedOperationExecutorMock, mtab.getAbsolutePath());
cGroupsHandler.initializeCGroupController(controller);
String testCGroup = "container_01";
String expectedPath = controllerPath
+ Path.SEPARATOR + testCGroup;
String path = cGroupsHandler.createCGroup(controller, testCGroup);
assertTrue(new File(expectedPath).exists());
Assert.assertEquals(expectedPath, path);
String param = "test_param";
String paramValue = "test_param_value";
cGroupsHandler
.updateCGroupParam(controller, testCGroup, param, paramValue);
String paramPath = expectedPath + Path.SEPARATOR + controller.getName()
+ "." + param;
File paramFile = new File(paramPath);
assertTrue(paramFile.exists());
Assert.assertEquals(paramValue, new String(Files.readAllBytes(
paramFile.toPath())));
Assert.assertEquals(paramValue,
cGroupsHandler.getCGroupParam(controller, testCGroup, param));
}
/**
* Tests whether mtab parsing works as expected with a valid hierarchy set.
* @throws Exception the test will fail
*/
@Test
public void testMtabParsing() throws Exception {
// Initialize mtab and cgroup dir
File parentDir = new File(tmpPath);
// create mock cgroup
File mockMtabFile = createPremountedCgroups(parentDir);
CGroupsV2HandlerImpl cGroupsHandler = new CGroupsV2HandlerImpl(
createMountConfiguration(),
privilegedOperationExecutorMock, mockMtabFile.getAbsolutePath());
// Run mtabs parsing
Map<String, Set<String>> newMtab =
cGroupsHandler.parseMtab(mockMtabFile.getAbsolutePath());
Map<CGroupsHandler.CGroupController, String> controllerPaths =
cGroupsHandler.initializeControllerPathsFromMtab(
newMtab);
// Verify
Assert.assertEquals(4, controllerPaths.size());
assertTrue(controllerPaths
.containsKey(CGroupsHandler.CGroupController.CPU));
assertTrue(controllerPaths
.containsKey(CGroupsHandler.CGroupController.MEMORY));
String cpuDir = controllerPaths.get(CGroupsHandler.CGroupController.CPU);
String memoryDir =
controllerPaths.get(CGroupsHandler.CGroupController.MEMORY);
Assert.assertEquals(parentDir.getAbsolutePath(), cpuDir);
Assert.assertEquals(parentDir.getAbsolutePath(), memoryDir);
}
@Test
public void testManualCgroupSetting() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
conf.set(YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_MOUNT_PATH, tmpPath);
conf.set(YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_HIERARCHY,
"/hadoop-yarn");
File subCgroup = new File(tmpPath, "/hadoop-yarn");
Assert.assertTrue("temp dir should be created", subCgroup.mkdirs());
subCgroup.deleteOnExit();
String enabledControllers = "cpuset cpu io memory hugetlb pids rdma misc\n";
createFileWithContent(subCgroup, CGroupsHandler.CGROUP_CONTROLLERS_FILE, enabledControllers);
File subtreeControlFile = new File(subCgroup.getAbsolutePath(),
CGroupsHandler.CGROUP_SUBTREE_CONTROL_FILE);
Assert.assertTrue("empty subtree_control file should be created",
subtreeControlFile.createNewFile());
CGroupsV2HandlerImpl cGroupsHandler = new CGroupsV2HandlerImpl(conf, null);
cGroupsHandler.initializeCGroupController(CGroupsHandler.CGroupController.CPU);
Assert.assertEquals("CPU cgroup path was not set", subCgroup.getAbsolutePath(),
new File(cGroupsHandler.getPathForCGroup(
CGroupsHandler.CGroupController.CPU, "")).getAbsolutePath());
// Verify that the subtree control file was updated
String subtreeControllersEnabledString = FileUtils.readFileToString(subtreeControlFile,
StandardCharsets.UTF_8);
Assert.assertEquals("The newly added controller doesn't contain + sign",
1, StringUtils.countMatches(subtreeControllersEnabledString, "+"));
Assert.assertEquals("Controller is not enabled in subtree control file",
controller.getName(), subtreeControllersEnabledString.replace("+", "").trim());
cGroupsHandler.initializeCGroupController(CGroupsHandler.CGroupController.MEMORY);
subtreeControllersEnabledString = FileUtils.readFileToString(subtreeControlFile,
StandardCharsets.UTF_8);
Assert.assertEquals("The newly added controllers doesn't contain + signs",
2, StringUtils.countMatches(subtreeControllersEnabledString, "+"));
Set<String> subtreeControllersEnabled = new HashSet<>(Arrays.asList(
subtreeControllersEnabledString.replace("+", " ").trim().split(" ")));
Assert.assertEquals(2, subtreeControllersEnabled.size());
Assert.assertTrue("Controller is not enabled in subtree control file",
cGroupsHandler.getValidCGroups().containsAll(subtreeControllersEnabled));
// Test that the subtree control file is appended correctly
// even if some controllers are present
subtreeControlFile.delete();
createFileWithContent(subCgroup, CGroupsHandler.CGROUP_SUBTREE_CONTROL_FILE, "cpu io");
cGroupsHandler.initializeCGroupController(CGroupsHandler.CGroupController.MEMORY);
subtreeControllersEnabledString = FileUtils.readFileToString(subtreeControlFile,
StandardCharsets.UTF_8);
Assert.assertEquals("The newly added controller doesn't contain + sign",
1, StringUtils.countMatches(subtreeControllersEnabledString, "+"));
subtreeControllersEnabled = new HashSet<>(Arrays.asList(
subtreeControllersEnabledString.replace("+", " ").split(" ")));
Assert.assertEquals(3, subtreeControllersEnabled.size());
Assert.assertTrue("Controllers not enabled in subtree control file",
cGroupsHandler.getValidCGroups().containsAll(subtreeControllersEnabled));
}
}