YARN-11687. CGroupV2 resource calculator (#6835)

Co-authored-by: Benjamin Teke <brumi1024@users.noreply.github.com>
This commit is contained in:
K0K0V0K 2024-05-29 17:20:23 +02:00 committed by GitHub
parent 6c08e8e2aa
commit ccb8ff4360
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 724 additions and 597 deletions

View File

@ -20,8 +20,6 @@
import java.lang.reflect.Constructor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configuration;
@ -37,8 +35,6 @@
@Public
@Evolving
public abstract class ResourceCalculatorProcessTree extends Configured {
static final Logger LOG = LoggerFactory
.getLogger(ResourceCalculatorProcessTree.class);
public static final int UNAVAILABLE = -1;
/**
@ -169,7 +165,6 @@ public float getCpuUsagePercent() {
*/
public static ResourceCalculatorProcessTree getResourceCalculatorProcessTree(
String pid, Class<? extends ResourceCalculatorProcessTree> clazz, Configuration conf) {
if (clazz != null) {
try {
Constructor <? extends ResourceCalculatorProcessTree> c = clazz.getConstructor(String.class);

View File

@ -559,6 +559,11 @@ public String getCGroupMountPath() {
return this.cGroupsMountConfig.getMountPath();
}
@Override
public String getCGroupV2MountPath() {
return this.cGroupsMountConfig.getV2MountPath();
}
@Override
public String toString() {
return CGroupsHandlerImpl.class.getName() + "{" +

View File

@ -0,0 +1,212 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
import java.io.IOException;
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.util.CpuTimeTracker;
import org.apache.hadoop.util.SysInfoLinux;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
import org.apache.hadoop.yarn.util.SystemClock;
/**
* Common code base for the CGroupsResourceCalculator implementations.
*/
public abstract class AbstractCGroupsResourceCalculator extends ResourceCalculatorProcessTree {
private static final Logger LOG =
LoggerFactory.getLogger(AbstractCGroupsResourceCalculator.class);
private final String pid;
private final Clock clock = SystemClock.getInstance();
private final Map<String, String> stats = new ConcurrentHashMap<>();
private long jiffyLengthMs = SysInfoLinux.JIFFY_LENGTH_IN_MILLIS;
private CpuTimeTracker cpuTimeTracker;
private CGroupsHandler cGroupsHandler;
private String procFs = "/proc";
private final List<String> totalJiffiesKeys;
private final String rssMemoryKey;
private final String virtualMemoryKey;
protected AbstractCGroupsResourceCalculator(
String pid,
List<String> totalJiffiesKeys,
String rssMemoryKey,
String virtualMemoryKey
) {
super(pid);
this.pid = pid;
this.totalJiffiesKeys = totalJiffiesKeys;
this.rssMemoryKey = rssMemoryKey;
this.virtualMemoryKey = virtualMemoryKey;
}
@Override
public void initialize() throws YarnException {
cpuTimeTracker = new CpuTimeTracker(jiffyLengthMs);
cGroupsHandler = ResourceHandlerModule.getCGroupsHandler();
}
@Override
public long getCumulativeCpuTime() {
long totalJiffies = getTotalJiffies();
return jiffyLengthMs == UNAVAILABLE || totalJiffies == UNAVAILABLE
? UNAVAILABLE
: getTotalJiffies() * jiffyLengthMs;
}
@Override
public long getRssMemorySize(int olderThanAge) {
return 1 < olderThanAge ? UNAVAILABLE : getStat(rssMemoryKey);
}
@Override
public long getVirtualMemorySize(int olderThanAge) {
return 1 < olderThanAge ? UNAVAILABLE : getStat(virtualMemoryKey);
}
@Override
public String getProcessTreeDump() {
// We do not have a process tree in cgroups return just the pid for tracking
return pid;
}
@Override
public boolean checkPidPgrpidForMatch() {
// We do not have a process tree in cgroups returning default ok
return true;
}
@Override
public float getCpuUsagePercent() {
return cpuTimeTracker.getCpuTrackerUsagePercent();
}
@Override
public void updateProcessTree() {
stats.clear();
for (Path statFile : getCGroupFilesToLoadInStats()) {
try {
List<String> lines = fileToLines(statFile);
if (1 == lines.size()) {
addSingleLineToStat(statFile, lines.get(0));
} else if (1 < lines.size()) {
addMultiLineToStat(statFile, lines);
}
} catch (IOException e) {
LOG.debug(String.format("Failed to read cgroup file %s for pid %s", statFile, pid), e);
}
}
LOG.debug("After updateProcessTree the {} pid has stats {}", pid, stats);
cpuTimeTracker.updateElapsedJiffies(BigInteger.valueOf(getTotalJiffies()), clock.getTime());
}
private void addSingleLineToStat(Path file, String line) {
Path fileName = file.getFileName();
if (fileName != null) {
stats.put(fileName.toString(), line.trim());
}
}
private void addMultiLineToStat(Path file, List<String> lines) {
for (String line : lines) {
String[] parts = line.split(" ");
if (1 < parts.length) {
stats.put(file.getFileName() + "#" + parts[0], parts[1]);
}
}
}
private long getTotalJiffies() {
Long reduce = totalJiffiesKeys.stream()
.map(this::getStat)
.filter(statValue -> statValue != UNAVAILABLE)
.reduce(0L, Long::sum);
return reduce == 0 ? UNAVAILABLE : reduce;
}
private long getStat(String key) {
return Long.parseLong(stats.getOrDefault(key, String.valueOf(UNAVAILABLE)));
}
protected abstract List<Path> getCGroupFilesToLoadInStats();
protected List<String> readLinesFromCGroupFileFromProcDir() throws IOException {
// https://docs.kernel.org/admin-guide/cgroup-v2.html#processes
// https://www.kernel.org/doc/html/latest/admin-guide/cgroup-v1/cgroups.html
Path cgroup = Paths.get(procFs, pid, "cgroup");
List<String> result = Arrays.asList(fileToString(cgroup).split(System.lineSeparator()));
LOG.debug("The {} pid has the following lines in the procfs cgroup file {}", pid, result);
return result;
}
protected String fileToString(Path path) throws IOException {
return FileUtils.readFileToString(path.toFile(), StandardCharsets.UTF_8).trim();
}
protected List<String> fileToLines(Path path) throws IOException {
return !path.toFile().exists() ? Collections.emptyList()
: Arrays.asList(FileUtils.readFileToString(path.toFile(), StandardCharsets.UTF_8)
.trim().split(System.lineSeparator()));
}
@VisibleForTesting
void setJiffyLengthMs(long jiffyLengthMs) {
this.jiffyLengthMs = jiffyLengthMs;
}
@VisibleForTesting
void setCpuTimeTracker(CpuTimeTracker cpuTimeTracker) {
this.cpuTimeTracker = cpuTimeTracker;
}
@VisibleForTesting
void setcGroupsHandler(CGroupsHandler cGroupsHandler) {
this.cGroupsHandler = cGroupsHandler;
}
@VisibleForTesting
void setProcFs(String procFs) {
this.procFs = procFs;
}
public CGroupsHandler getcGroupsHandler() {
return cGroupsHandler;
}
public String getPid() {
return pid;
}
}

View File

@ -239,4 +239,10 @@ String getCGroupParam(CGroupController controller, String cGroupId,
* @return parameter value as read from the parameter file
*/
String getCGroupMountPath();
/**
* Returns CGroupV2 Mount Path.
* @return parameter value as read from the parameter file
*/
String getCGroupV2MountPath();
}

View File

@ -18,338 +18,146 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
import org.apache.hadoop.classification.VisibleForTesting;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.util.CpuTimeTracker;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.SysInfoLinux;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
import org.apache.hadoop.yarn.util.SystemClock;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* A cgroups file-system based Resource calculator without the process tree
* features.
* A Cgroup version 1 file-system based Resource calculator without the process tree features.
*
* CGroups has its limitations. It can only be enabled, if both CPU and memory
* cgroups are enabled with yarn.nodemanager.resource.cpu.enabled and
* yarn.nodemanager.resource.memory.enabled respectively. This means that
* memory limits are enforced by default. You can turn this off and keep
* memory reporting only with yarn.nodemanager.resource.memory.enforced.
* Warning: this implementation will not work properly
* when configured using the mapreduce.job.process-tree.class job property.
* Theoretically the ResourceCalculatorProcessTree can be configured using the
* mapreduce.job.process-tree.class job property, however it has a dependency on an
* instantiated ResourceHandlerModule, which is only initialised in the NodeManager process
* and not in the containers.
*
* Another limitation is virtual memory measurement. CGroups does not have the
* ability to measure virtual memory usage. This includes memory reserved but
* not used. CGroups measures used memory as sa sum of
* physical memory and swap usage. This will be returned in the virtual
* memory counters.
* If the real virtual memory is required please use the legacy procfs based
* resource calculator or CombinedResourceCalculator.
* Limitation:
* The ResourceCalculatorProcessTree class can be configured using the
* mapreduce.job.process-tree.class property within a MapReduce job.
* However, it is important to note that instances of ResourceCalculatorProcessTree operate
* within the context of a MapReduce task. This presents a limitation:
* these instances do not have access to the ResourceHandlerModule,
* which is only initialized within the NodeManager process
* and not within individual containers where MapReduce tasks execute.
* As a result, the current implementation of ResourceCalculatorProcessTree is incompatible
* with the mapreduce.job.process-tree.class property. This incompatibility arises
* because the ResourceHandlerModule is essential for managing and monitoring resource usage,
* and without it, the ResourceCalculatorProcessTree cannot function as intended
* within the confines of a MapReduce task. Therefore, any attempts to utilize this class
* through the mapreduce.job.process-tree.class property
* will not succeed under the current architecture.
*/
public class CGroupsResourceCalculator extends ResourceCalculatorProcessTree {
enum Result {
Continue,
Exit
}
protected static final Logger LOG = LoggerFactory
.getLogger(CGroupsResourceCalculator.class);
private static final String PROCFS = "/proc";
static final String CGROUP = "cgroup";
static final String CPU_STAT = "cpuacct.stat";
static final String MEM_STAT = "memory.usage_in_bytes";
static final String MEMSW_STAT = "memory.memsw.usage_in_bytes";
private static final String USER = "user ";
private static final String SYSTEM = "system ";
private static final Pattern CGROUP_FILE_FORMAT = Pattern.compile(
"^(\\d+):([^:]+):/(.*)$");
private final String procfsDir;
private CGroupsHandler cGroupsHandler;
private String pid;
private File cpuStat;
private File memStat;
private File memswStat;
private BigInteger processTotalJiffies;
private long processPhysicalMemory;
private long processVirtualMemory;
private final long jiffyLengthMs;
private final CpuTimeTracker cpuTimeTracker;
private Clock clock;
public class CGroupsResourceCalculator extends AbstractCGroupsResourceCalculator {
private static final Logger LOG = LoggerFactory.getLogger(CGroupsResourceCalculator.class);
/**
* Create resource calculator for all Yarn containers.
*/
public CGroupsResourceCalculator()
throws YarnException {
this(null, PROCFS, ResourceHandlerModule.getCGroupsHandler(),
SystemClock.getInstance(), SysInfoLinux.JIFFY_LENGTH_IN_MILLIS);
}
/**
* Create resource calculator for the container that has the specified pid.
* @param pid A pid from the cgroup or null for all containers
*/
public CGroupsResourceCalculator(String pid) {
this(pid, PROCFS, ResourceHandlerModule.getCGroupsHandler(),
SystemClock.getInstance(), SysInfoLinux.JIFFY_LENGTH_IN_MILLIS);
}
/**
* Create resource calculator for testing.
* @param pid A pid from the cgroup or null for all containers
* @param procfsDir Path to /proc or a mock /proc directory
* @param cGroupsHandler Initialized cgroups handler object
* @param clock A clock object
* @param jiffyLengthMs0 Jiffy length in milliseconds
*/
@VisibleForTesting
CGroupsResourceCalculator(String pid, String procfsDir,
CGroupsHandler cGroupsHandler,
Clock clock,
long jiffyLengthMs0) {
super(pid);
this.procfsDir = procfsDir;
this.cGroupsHandler = cGroupsHandler;
this.pid = pid != null && pid.equals("0") ? "1" : pid;
this.jiffyLengthMs = jiffyLengthMs0;
this.cpuTimeTracker =
new CpuTimeTracker(this.jiffyLengthMs);
this.clock = clock;
this.processTotalJiffies = BigInteger.ZERO;
this.processPhysicalMemory = UNAVAILABLE;
this.processVirtualMemory = UNAVAILABLE;
}
@Override
public void initialize() throws YarnException {
if (!CGroupsResourceCalculator.isAvailable()) {
throw new YarnException("CGroupsResourceCalculator is not available");
}
setCGroupFilePaths();
}
@Override
public float getCpuUsagePercent() {
LOG.debug("Process {} jiffies:{}", pid, processTotalJiffies);
return cpuTimeTracker.getCpuTrackerUsagePercent();
}
@Override
public long getCumulativeCpuTime() {
if (jiffyLengthMs < 0) {
return UNAVAILABLE;
}
return processTotalJiffies.longValue() * jiffyLengthMs;
}
@Override
public long getRssMemorySize(int olderThanAge) {
if (olderThanAge > 1) {
return UNAVAILABLE;
}
return processPhysicalMemory;
}
@Override
public long getVirtualMemorySize(int olderThanAge) {
if (olderThanAge > 1) {
return UNAVAILABLE;
}
return processVirtualMemory;
}
@Override
public void updateProcessTree() {
try {
this.processTotalJiffies = readTotalProcessJiffies();
cpuTimeTracker.updateElapsedJiffies(processTotalJiffies,
clock.getTime());
} catch (YarnException e) {
LOG.warn("Failed to parse " + pid, e);
}
processPhysicalMemory = getMemorySize(memStat);
if (memswStat.exists()) {
processVirtualMemory = getMemorySize(memswStat);
} else {
LOG.debug("Swap cgroups monitoring is not compiled into the kernel {}",
memswStat.getAbsolutePath());
}
}
@Override
public String getProcessTreeDump() {
// We do not have a process tree in cgroups return just the pid for tracking
return pid;
}
@Override
public boolean checkPidPgrpidForMatch() {
// We do not have a process tree in cgroups returning default ok
return true;
}
/**
* Checks if the CGroupsResourceCalculator is available on this system.
* This assumes that Linux container executor is already initialized.
* <a href="https://docs.kernel.org/admin-guide/cgroup-v1/cpuacct.html">DOC</a>
*
* ...
* cpuacct.stat file lists a few statistics which further divide the CPU time obtained
* by the cgroup into user and system times.
* Currently the following statistics are supported:
* - user: Time spent by tasks of the cgroup in user mode.
* - system: Time spent by tasks of the cgroup in kernel mode.
* user and system are in USER_HZ unit.
* ...
*
* <a href="https://litux.nl/mirror/kerneldevelopment/0672327201/ch10lev1sec3.html">DOC</a>
*
* ...
* In kernels earlier than 2.6, changing the value of HZ resulted in user-space anomalies.
* This happened because values were exported to user-space in units of ticks-per-second.
* As these interfaces became permanent, applications grew to rely on a specific value of HZ.
* Consequently, changing HZ would scale various exported values
* by some constantwithout user-space knowing!
* Uptime would read 20 hours when it was in fact two!
*
* To prevent such problems, the kernel needs to scale all exported jiffies values.
* It does this by defining USER_HZ, which is the HZ value that user-space expects. On x86,
* because HZ was historically 100, USER_HZ is 100. The macro jiffies_to_clock_t()
* is then used to scale a tick count in terms of HZ to a tick count in terms of USER_HZ.
* The macro used depends on whether USER_HZ and HZ are integer multiples of themselves.
* ...
*
* @return true if CGroupsResourceCalculator is available. False otherwise.
*/
public static boolean isAvailable() {
private static final String CPU_STAT = "cpuacct.stat";
/**
* <a href="https://docs.kernel.org/admin-guide/cgroup-v1/memory.html#usage-in-bytes">DOC</a>
*
* ...
* For efficiency, as other kernel components, memory cgroup uses some optimization
* to avoid unnecessary cacheline false sharing.
* usage_in_bytes is affected by the method
* and doesnt show exact value of memory (and swap) usage,
* its a fuzz value for efficient access. (Of course, when necessary, its synchronized.)
* ...
*
*/
private static final String MEM_STAT = "memory.usage_in_bytes";
private static final String MEMSW_STAT = "memory.memsw.usage_in_bytes";
public CGroupsResourceCalculator(String pid) {
super(
pid,
Arrays.asList(CPU_STAT + "#user", CPU_STAT + "#system"),
MEM_STAT,
MEMSW_STAT
);
}
@Override
protected List<Path> getCGroupFilesToLoadInStats() {
List<Path> result = new ArrayList<>();
try {
if (!Shell.LINUX) {
LOG.info("CGroupsResourceCalculator currently is supported only on "
+ "Linux.");
return false;
String cpuRelative = getCGroupRelativePath(CGroupsHandler.CGroupController.CPUACCT);
if (cpuRelative != null) {
File cpuDir = new File(getcGroupsHandler().getControllerPath(
CGroupsHandler.CGroupController.CPUACCT), cpuRelative);
result.add(Paths.get(cpuDir.getAbsolutePath(), CPU_STAT));
}
if (ResourceHandlerModule.getCGroupsHandler() == null ||
ResourceHandlerModule.getCpuResourceHandler() == null ||
ResourceHandlerModule.getMemoryResourceHandler() == null) {
LOG.info("CGroupsResourceCalculator requires enabling CGroups" +
"cpu and memory");
return false;
}
} catch (SecurityException se) {
LOG.warn("Failed to get Operating System name. " + se);
return false;
} catch (IOException e) {
LOG.debug("Exception while looking for CPUACCT controller for pid: " + getPid(), e);
}
return true;
}
private long getMemorySize(File cgroupUsageFile) {
long[] mem = new long[1];
try {
processFile(cgroupUsageFile, (String line) -> {
mem[0] = Long.parseLong(line);
return Result.Exit;
});
return mem[0];
} catch (YarnException e) {
LOG.warn("Failed to parse cgroups " + memswStat, e);
}
return UNAVAILABLE;
}
private BigInteger readTotalProcessJiffies() throws YarnException {
final BigInteger[] totalCPUTimeJiffies = new BigInteger[1];
totalCPUTimeJiffies[0] = BigInteger.ZERO;
processFile(cpuStat, (String line) -> {
if (line.startsWith(USER)) {
totalCPUTimeJiffies[0] = totalCPUTimeJiffies[0].add(
new BigInteger(line.substring(USER.length())));
String memoryRelative = getCGroupRelativePath(CGroupsHandler.CGroupController.MEMORY);
if (memoryRelative != null) {
File memDir = new File(getcGroupsHandler().getControllerPath(
CGroupsHandler.CGroupController.MEMORY), memoryRelative);
result.add(Paths.get(memDir.getAbsolutePath(), MEM_STAT));
result.add(Paths.get(memDir.getAbsolutePath(), MEMSW_STAT));
}
if (line.startsWith(SYSTEM)) {
totalCPUTimeJiffies[0] = totalCPUTimeJiffies[0].add(
new BigInteger(line.substring(SYSTEM.length())));
}
return Result.Continue;
});
return totalCPUTimeJiffies[0];
}
private String getCGroupRelativePath(
CGroupsHandler.CGroupController controller)
throws YarnException {
if (pid == null) {
return cGroupsHandler.getRelativePathForCGroup("");
} else {
return getCGroupRelativePathForPid(controller);
} catch (IOException e) {
LOG.debug("Exception while looking for MEMORY controller for pid: " + getPid(), e);
}
return result;
}
private String getCGroupRelativePathForPid(
CGroupsHandler.CGroupController controller)
throws YarnException {
File pidCgroupFile = new File(new File(procfsDir, pid), CGROUP);
String[] result = new String[1];
processFile(pidCgroupFile, (String line)->{
Matcher m = CGROUP_FILE_FORMAT.matcher(line);
boolean mat = m.find();
if (mat) {
if (m.group(2).contains(controller.getName())) {
// Instead of returning the full path we compose it
// based on the last item as the container id
// This helps to avoid confusion within a privileged Docker container
// where the path is referred in /proc/<pid>/cgroup as
// /docker/<dcontainerid>/hadoop-yarn/<containerid>
// but it is /hadoop-yarn/<containerid> in the cgroups hierarchy
String cgroupPath = m.group(3);
if (cgroupPath != null) {
String cgroup =
new File(cgroupPath).toPath().getFileName().toString();
result[0] = cGroupsHandler.getRelativePathForCGroup(cgroup);
} else {
LOG.warn("Invalid cgroup path for " + pidCgroupFile);
}
return Result.Exit;
}
} else {
LOG.warn(
"Unexpected: cgroup file is not in the expected format"
+ " for process with pid " + pid);
}
return Result.Continue;
});
if (result[0] == null) {
throw new YarnException(controller.getName() + " CGroup for pid " + pid +
" not found " + pidCgroupFile);
}
return result[0];
}
private void processFile(File file, Function<String, Result> processLine)
throws YarnException {
// Read "procfsDir/<pid>/stat" file - typically /proc/<pid>/stat
try (InputStreamReader fReader = new InputStreamReader(
new FileInputStream(file), StandardCharsets.UTF_8)) {
try (BufferedReader in = new BufferedReader(fReader)) {
try {
String str;
while ((str = in.readLine()) != null) {
Result result = processLine.apply(str);
if (result == Result.Exit) {
return;
}
}
} catch (IOException io) {
throw new YarnException("Error reading the stream " + io, io);
private String getCGroupRelativePath(CGroupsHandler.CGroupController controller)
throws IOException {
for (String line : readLinesFromCGroupFileFromProcDir()) {
// example line: 6:cpuacct,cpu:/yarn/container_1
String[] parts = line.split(":");
if (parts[1].contains(controller.getName())) {
String cgroupPath = parts[2];
Path fileName = new File(cgroupPath).toPath().getFileName();
if (fileName != null) {
return getcGroupsHandler().getRelativePathForCGroup(fileName.toString());
}
}
} catch (IOException f) {
throw new YarnException("The process vanished in the interim " + pid, f);
}
LOG.debug("No {} controller found for pid {}", controller, getPid());
return null;
}
void setCGroupFilePaths() throws YarnException {
if (cGroupsHandler == null) {
throw new YarnException("CGroups handler is not initialized");
}
File cpuDir = new File(
cGroupsHandler.getControllerPath(
CGroupsHandler.CGroupController.CPUACCT),
getCGroupRelativePath(CGroupsHandler.CGroupController.CPUACCT));
File memDir = new File(
cGroupsHandler.getControllerPath(
CGroupsHandler.CGroupController.MEMORY),
getCGroupRelativePath(CGroupsHandler.CGroupController.MEMORY));
cpuStat = new File(cpuDir, CPU_STAT);
memStat = new File(memDir, MEM_STAT);
memswStat = new File(memDir, MEMSW_STAT);
}
}

View File

@ -0,0 +1,136 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.lang3.StringUtils;
/**
* A Cgroup version 2 file-system based Resource calculator without the process tree features.
*
* Warning: this implementation will not work properly when configured
* using the mapreduce.job.process-tree.class job property.
* Theoretically the ResourceCalculatorProcessTree can be configured
* using the mapreduce.job.process-tree.class job property, however it
* has a dependency on an instantiated ResourceHandlerModule,
* which is only initialised in the NodeManager process and not in the containers.
*
* Limitation:
* The ResourceCalculatorProcessTree class can be configured using the
* mapreduce.job.process-tree.class property within a MapReduce job.
* However, it is important to note that instances of ResourceCalculatorProcessTree operate
* within the context of a MapReduce task. This presents a limitation:
* these instances do not have access to the ResourceHandlerModule,
* which is only initialized within the NodeManager process
* and not within individual containers where MapReduce tasks execute.
* As a result, the current implementation of ResourceCalculatorProcessTree is incompatible
* with the mapreduce.job.process-tree.class property. This incompatibility arises
* because the ResourceHandlerModule is essential for managing and monitoring resource usage,
* and without it, the ResourceCalculatorProcessTree cannot function as intended
* within the confines of a MapReduce task. Therefore, any attempts to utilize this class
* through the mapreduce.job.process-tree.class property
* will not succeed under the current architecture.
*/
public class CGroupsV2ResourceCalculator extends AbstractCGroupsResourceCalculator {
private static final Logger LOG = LoggerFactory.getLogger(CGroupsV2ResourceCalculator.class);
/**
* <a href="https://docs.kernel.org/admin-guide/cgroup-v2.html#cpu-interface-files">DOC</a>
*
* ...
* cpu.stat
* A read-only flat-keyed file. This file exists whether the controller is enabled or not.
* It always reports the following three stats:
* - usage_usec
* - user_usec
* - system_usec
* ...
*
*/
private static final String CPU_STAT = "cpu.stat#usage_usec";
/**
* <a href="https://docs.kernel.org/admin-guide/cgroup-v2.html#memory-interface-files">DOC</a>
*
* ...
* memory.stat
* A read-only flat-keyed file which exists on non-root cgroups.
* This breaks down the cgroups memory footprint into different types of memory,
* type-specific details, and other information on the state
* and past events of the memory management system.
* All memory amounts are in bytes.
* ...
* anon
* Amount of memory used in anonymous mappings such as brk(), sbrk(), and mmap(MAP_ANONYMOUS)
* ...
*
*/
private static final String MEM_STAT = "memory.stat#anon";
/**
* <a href="https://docs.kernel.org/admin-guide/cgroup-v2.html#memory-interface-files">DOC</a>
*
* ...
* memory.swap.current
* A read-only single value file which exists on non-root cgroups.
* The total amount of swap currently being used by the cgroup and its descendants.
* ...
*
*/
private static final String MEMSW_STAT = "memory.swap.current";
public CGroupsV2ResourceCalculator(String pid) {
super(
pid,
Collections.singletonList(CPU_STAT),
MEM_STAT,
MEMSW_STAT
);
}
@Override
protected List<Path> getCGroupFilesToLoadInStats() {
List<Path> result = new ArrayList<>();
try (Stream<Path> cGroupFiles = Files.list(getCGroupPath())){
cGroupFiles.forEach(result::add);
} catch (IOException e) {
LOG.debug("Failed to list cgroup files for pid: " + getPid(), e);
}
LOG.debug("Found cgroup files for pid {} is {}", getPid(), result);
return result;
}
private Path getCGroupPath() throws IOException {
return Paths.get(
getcGroupsHandler().getCGroupV2MountPath(),
StringUtils.substringAfterLast(readLinesFromCGroupFileFromProcDir().get(0), ":")
);
}
}

View File

@ -18,8 +18,9 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree;
import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
@ -29,80 +30,68 @@
* it is backward compatible with procfs in terms of virtual memory usage.
*/
public class CombinedResourceCalculator extends ResourceCalculatorProcessTree {
protected static final Logger LOG = LoggerFactory
.getLogger(CombinedResourceCalculator.class);
private ProcfsBasedProcessTree procfs;
private CGroupsResourceCalculator cgroup;
private final List<ResourceCalculatorProcessTree> resourceCalculators;
private final ProcfsBasedProcessTree procfsBasedProcessTree;
public CombinedResourceCalculator(String pid) {
super(pid);
procfs = new ProcfsBasedProcessTree(pid);
cgroup = new CGroupsResourceCalculator(pid);
this.procfsBasedProcessTree = new ProcfsBasedProcessTree(pid);
this.resourceCalculators = Arrays.asList(
new CGroupsV2ResourceCalculator(pid),
new CGroupsResourceCalculator(pid),
procfsBasedProcessTree
);
}
@Override
public void initialize() throws YarnException {
procfs.initialize();
cgroup.initialize();
for (ResourceCalculatorProcessTree calculator : resourceCalculators) {
calculator.initialize();
}
}
@Override
public void updateProcessTree() {
procfs.updateProcessTree();
cgroup.updateProcessTree();
resourceCalculators.stream().parallel()
.forEach(ResourceCalculatorProcessTree::updateProcessTree);
}
@Override
public String getProcessTreeDump() {
return procfs.getProcessTreeDump();
}
@Override
public float getCpuUsagePercent() {
float cgroupUsage = cgroup.getCpuUsagePercent();
if (LOG.isDebugEnabled()) {
float procfsUsage = procfs.getCpuUsagePercent();
LOG.debug("CPU Comparison:" + procfsUsage + " " + cgroupUsage);
LOG.debug("Jiffy Comparison:" +
procfs.getCumulativeCpuTime() + " " +
cgroup.getCumulativeCpuTime());
}
return cgroupUsage;
return procfsBasedProcessTree.getProcessTreeDump();
}
@Override
public boolean checkPidPgrpidForMatch() {
return procfs.checkPidPgrpidForMatch();
}
@Override
public long getCumulativeCpuTime() {
if (LOG.isDebugEnabled()) {
LOG.debug("CPU Comparison:" +
procfs.getCumulativeCpuTime() + " " +
cgroup.getCumulativeCpuTime());
}
return cgroup.getCumulativeCpuTime();
}
@Override
public long getRssMemorySize(int olderThanAge) {
if (LOG.isDebugEnabled()) {
LOG.debug("MEM Comparison:" +
procfs.getRssMemorySize(olderThanAge) + " " +
cgroup.getRssMemorySize(olderThanAge));
}
return cgroup.getRssMemorySize(olderThanAge);
return procfsBasedProcessTree.checkPidPgrpidForMatch();
}
@Override
public long getVirtualMemorySize(int olderThanAge) {
if (LOG.isDebugEnabled()) {
LOG.debug("VMEM Comparison:" +
procfs.getVirtualMemorySize(olderThanAge) + " " +
cgroup.getVirtualMemorySize(olderThanAge));
}
return procfs.getVirtualMemorySize(olderThanAge);
return procfsBasedProcessTree.getVirtualMemorySize(olderThanAge);
}
@Override
public long getRssMemorySize(int olderThanAge) {
return resourceCalculators.stream()
.map(calculator -> calculator.getRssMemorySize(olderThanAge))
.filter(result -> UNAVAILABLE < result)
.findAny().orElse((long) UNAVAILABLE);
}
@Override
public long getCumulativeCpuTime() {
return resourceCalculators.stream()
.map(ResourceCalculatorProcessTree::getCumulativeCpuTime)
.filter(result -> UNAVAILABLE < result)
.findAny().orElse((long) UNAVAILABLE);
}
@Override
public float getCpuUsagePercent() {
return resourceCalculators.stream()
.map(ResourceCalculatorProcessTree::getCpuUsagePercent)
.filter(result -> UNAVAILABLE < result)
.findAny().orElse((float) UNAVAILABLE);
}
}

View File

@ -295,7 +295,7 @@ private boolean isResourceCalculatorAvailable() {
+ "{} is disabled.", this.getClass().getName());
return false;
}
if (getResourceCalculatorProcessTree("0") == null) {
if (getResourceCalculatorProcessTree("1") == null) {
LOG.info("ResourceCalculatorProcessTree is unavailable on this system. "
+ "{} is disabled.", this.getClass().getName());
return false;

View File

@ -18,258 +18,124 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.ControlledClock;
import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
import org.junit.Assert;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.stream.Collectors;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.nio.charset.StandardCharsets;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.util.CpuTimeTracker;
import static org.mockito.Mockito.*;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* Unit test for CGroupsResourceCalculator.
*/
public class TestCGroupsResourceCalculator {
private ControlledClock clock = new ControlledClock();
private CGroupsHandler cGroupsHandler = mock(CGroupsHandler.class);
private String basePath = "/tmp/" + this.getClass().getName();
private Path root;
public TestCGroupsResourceCalculator() {
when(cGroupsHandler.getRelativePathForCGroup("container_1"))
.thenReturn("/yarn/container_1");
when(cGroupsHandler.getRelativePathForCGroup("")).thenReturn("/yarn/");
@Before
public void before() throws IOException {
root = Files.createTempDirectory("TestCGroupsResourceCalculator");
}
@Test(expected = YarnException.class)
public void testPidNotFound() throws Exception {
CGroupsResourceCalculator calculator =
new CGroupsResourceCalculator(
"1234", ".", cGroupsHandler, clock, 10);
calculator.setCGroupFilePaths();
Assert.assertEquals("Expected exception", null, calculator);
@After
public void after() throws IOException {
FileUtils.deleteDirectory(root.toFile());
}
@Test(expected = YarnException.class)
@Test
public void testNoMemoryCGgroupMount() throws Exception {
File procfs = new File(basePath + "/1234");
Assert.assertTrue("Setup error", procfs.mkdirs());
try {
FileUtils.writeStringToFile(
new File(procfs, CGroupsResourceCalculator.CGROUP),
"7:devices:/yarn/container_1\n" +
"6:cpuacct,cpu:/yarn/container_1\n" +
"5:pids:/yarn/container_1\n", StandardCharsets.UTF_8);
CGroupsResourceCalculator calculator =
new CGroupsResourceCalculator(
"1234", basePath,
cGroupsHandler, clock, 10);
calculator.setCGroupFilePaths();
Assert.assertEquals("Expected exception", null, calculator);
} finally {
FileUtils.deleteDirectory(new File(basePath));
}
writeToFile("proc/41/cgroup",
"7:devices:/yarn/container_1",
"6:cpuacct,cpu:/yarn/container_1",
"5:pids:/yarn/container_1"
);
CGroupsResourceCalculator calculator = createCalculator();
calculator.updateProcessTree();
assertEquals(-1, calculator.getVirtualMemorySize());
}
@Test
public void testCGgroupNotFound() throws Exception {
File procfs = new File(basePath + "/1234");
Assert.assertTrue("Setup error", procfs.mkdirs());
try {
FileUtils.writeStringToFile(
new File(procfs, CGroupsResourceCalculator.CGROUP),
"7:devices:/yarn/container_1\n" +
"6:cpuacct,cpu:/yarn/container_1\n" +
"5:pids:/yarn/container_1\n" +
"4:memory:/yarn/container_1\n", StandardCharsets.UTF_8);
writeToFile("proc/41/cgroup",
"7:devices:/yarn/container_1",
"6:cpuacct,cpu:/yarn/container_1",
"5:pids:/yarn/container_1",
"4:memory:/yarn/container_1"
);
CGroupsResourceCalculator calculator =
new CGroupsResourceCalculator(
"1234", basePath,
cGroupsHandler, clock, 10);
calculator.setCGroupFilePaths();
calculator.updateProcessTree();
Assert.assertEquals("cgroups should be missing",
(long)ResourceCalculatorProcessTree.UNAVAILABLE,
calculator.getRssMemorySize(0));
} finally {
FileUtils.deleteDirectory(new File(basePath));
}
CGroupsResourceCalculator calculator = createCalculator();
calculator.updateProcessTree();
assertEquals(-1, calculator.getCumulativeCpuTime());
}
@Test
public void testCPUParsing() throws Exception {
File cgcpuacctDir =
new File(basePath + "/cgcpuacct");
File cgcpuacctContainerDir =
new File(cgcpuacctDir, "/yarn/container_1");
File procfs = new File(basePath + "/1234");
when(cGroupsHandler.getControllerPath(
CGroupsHandler.CGroupController.CPUACCT)).
thenReturn(cgcpuacctDir.getAbsolutePath());
Assert.assertTrue("Setup error", procfs.mkdirs());
Assert.assertTrue("Setup error", cgcpuacctContainerDir.mkdirs());
try {
FileUtils.writeStringToFile(
new File(procfs, CGroupsResourceCalculator.CGROUP),
"7:devices:/yarn/container_1\n" +
"6:cpuacct,cpu:/yarn/container_1\n" +
"5:pids:/yarn/container_1\n" +
"4:memory:/yarn/container_1\n", StandardCharsets.UTF_8);
FileUtils.writeStringToFile(
new File(cgcpuacctContainerDir, CGroupsResourceCalculator.CPU_STAT),
"Can you handle this?\n" +
"user 5415\n" +
"system 3632", StandardCharsets.UTF_8);
CGroupsResourceCalculator calculator =
new CGroupsResourceCalculator(
"1234", basePath,
cGroupsHandler, clock, 10);
calculator.setCGroupFilePaths();
calculator.updateProcessTree();
Assert.assertEquals("Incorrect CPU usage",
90470,
calculator.getCumulativeCpuTime());
} finally {
FileUtils.deleteDirectory(new File(basePath));
}
public void testParsing() throws Exception {
writeToFile("proc/41/cgroup",
"7:devices:/yarn/container_1",
"6:cpuacct,cpu:/yarn/container_1",
"5:pids:/yarn/container_1",
"4:memory:/yarn/container_1"
);
writeToFile("mount/cgroup/yarn/container_1/cpuacct.stat",
"Can you handle this?",
"user 5415",
"system 3632"
);
CGroupsResourceCalculator calculator = createCalculator();
calculator.updateProcessTree();
assertEquals(90470, calculator.getCumulativeCpuTime());
writeToFile("mount/cgroup/yarn/container_1/memory.usage_in_bytes",
"418496512"
);
calculator.updateProcessTree();
assertEquals(418496512, calculator.getRssMemorySize());
assertEquals(-1, calculator.getVirtualMemorySize());
writeToFile("mount/cgroup/yarn/container_1/memory.memsw.usage_in_bytes",
"418496513"
);
calculator.updateProcessTree();
assertEquals(418496512, calculator.getRssMemorySize());
assertEquals(418496513, calculator.getVirtualMemorySize());
}
@Test
public void testMemoryParsing() throws Exception {
File cgcpuacctDir =
new File(basePath + "/cgcpuacct");
File cgcpuacctContainerDir =
new File(cgcpuacctDir, "/yarn/container_1");
File cgmemoryDir =
new File(basePath + "/memory");
File cgMemoryContainerDir =
new File(cgmemoryDir, "/yarn/container_1");
File procfs = new File(basePath + "/1234");
when(cGroupsHandler.getControllerPath(
CGroupsHandler.CGroupController.MEMORY)).
thenReturn(cgmemoryDir.getAbsolutePath());
Assert.assertTrue("Setup error", procfs.mkdirs());
Assert.assertTrue("Setup error", cgcpuacctContainerDir.mkdirs());
Assert.assertTrue("Setup error", cgMemoryContainerDir.mkdirs());
try {
FileUtils.writeStringToFile(
new File(procfs, CGroupsResourceCalculator.CGROUP),
"6:cpuacct,cpu:/yarn/container_1\n" +
"4:memory:/yarn/container_1\n", StandardCharsets.UTF_8);
FileUtils.writeStringToFile(
new File(cgMemoryContainerDir, CGroupsResourceCalculator.MEM_STAT),
"418496512\n", StandardCharsets.UTF_8);
CGroupsResourceCalculator calculator =
new CGroupsResourceCalculator(
"1234", basePath,
cGroupsHandler, clock, 10);
calculator.setCGroupFilePaths();
calculator.updateProcessTree();
// Test the case where memsw is not available (Ubuntu)
Assert.assertEquals("Incorrect memory usage",
418496512,
calculator.getRssMemorySize());
Assert.assertEquals("Incorrect swap usage",
(long)ResourceCalculatorProcessTree.UNAVAILABLE,
calculator.getVirtualMemorySize());
// Test the case where memsw is available
FileUtils.writeStringToFile(
new File(cgMemoryContainerDir, CGroupsResourceCalculator.MEMSW_STAT),
"418496513\n", StandardCharsets.UTF_8);
calculator.updateProcessTree();
Assert.assertEquals("Incorrect swap usage",
418496513,
calculator.getVirtualMemorySize());
} finally {
FileUtils.deleteDirectory(new File(basePath));
}
private CGroupsResourceCalculator createCalculator() {
CGroupsResourceCalculator calculator = new CGroupsResourceCalculator("41");
calculator.setCpuTimeTracker(mock(CpuTimeTracker.class));
calculator.setcGroupsHandler(mock(CGroupsHandler.class));
when(calculator.getcGroupsHandler().getRelativePathForCGroup("container_1"))
.thenReturn("/yarn/container_1");
when(calculator.getcGroupsHandler().getRelativePathForCGroup(""))
.thenReturn("/yarn/");
when(calculator.getcGroupsHandler().getControllerPath(any()))
.thenReturn(root.resolve("mount/cgroup").toString());
calculator.setProcFs(root.toString() + "/proc/");
calculator.setJiffyLengthMs(10);
return calculator;
}
@Test
public void testCPUParsingRoot() throws Exception {
File cgcpuacctDir =
new File(basePath + "/cgcpuacct");
File cgcpuacctRootDir =
new File(cgcpuacctDir, "/yarn");
when(cGroupsHandler.getControllerPath(
CGroupsHandler.CGroupController.CPUACCT)).
thenReturn(cgcpuacctDir.getAbsolutePath());
Assert.assertTrue("Setup error", cgcpuacctRootDir.mkdirs());
try {
FileUtils.writeStringToFile(
new File(cgcpuacctRootDir, CGroupsResourceCalculator.CPU_STAT),
"user 5415\n" +
"system 3632", StandardCharsets.UTF_8);
CGroupsResourceCalculator calculator =
new CGroupsResourceCalculator(
null, basePath,
cGroupsHandler, clock, 10);
calculator.setCGroupFilePaths();
calculator.updateProcessTree();
Assert.assertEquals("Incorrect CPU usage",
90470,
calculator.getCumulativeCpuTime());
} finally {
FileUtils.deleteDirectory(new File(basePath));
}
}
@Test
public void testMemoryParsingRoot() throws Exception {
File cgcpuacctDir =
new File(basePath + "/cgcpuacct");
File cgcpuacctRootDir =
new File(cgcpuacctDir, "/yarn");
File cgmemoryDir =
new File(basePath + "/memory");
File cgMemoryRootDir =
new File(cgmemoryDir, "/yarn");
File procfs = new File(basePath + "/1234");
when(cGroupsHandler.getControllerPath(
CGroupsHandler.CGroupController.MEMORY)).
thenReturn(cgmemoryDir.getAbsolutePath());
Assert.assertTrue("Setup error", procfs.mkdirs());
Assert.assertTrue("Setup error", cgcpuacctRootDir.mkdirs());
Assert.assertTrue("Setup error", cgMemoryRootDir.mkdirs());
try {
FileUtils.writeStringToFile(
new File(cgMemoryRootDir, CGroupsResourceCalculator.MEM_STAT),
"418496512\n", StandardCharsets.UTF_8);
CGroupsResourceCalculator calculator =
new CGroupsResourceCalculator(
null, basePath,
cGroupsHandler, clock, 10);
calculator.setCGroupFilePaths();
calculator.updateProcessTree();
// Test the case where memsw is not available (Ubuntu)
Assert.assertEquals("Incorrect memory usage",
418496512,
calculator.getRssMemorySize());
Assert.assertEquals("Incorrect swap usage",
(long)ResourceCalculatorProcessTree.UNAVAILABLE,
calculator.getVirtualMemorySize());
// Test the case where memsw is available
FileUtils.writeStringToFile(
new File(cgMemoryRootDir, CGroupsResourceCalculator.MEMSW_STAT),
"418496513\n", StandardCharsets.UTF_8);
calculator.updateProcessTree();
Assert.assertEquals("Incorrect swap usage",
418496513,
calculator.getVirtualMemorySize());
} finally {
FileUtils.deleteDirectory(new File(basePath));
}
private void writeToFile(String path, String... lines) throws IOException {
FileUtils.writeStringToFile(
root.resolve(path).toFile(),
Arrays.stream(lines).collect(Collectors.joining(System.lineSeparator())),
StandardCharsets.UTF_8);
}
}

View File

@ -0,0 +1,110 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.stream.Collectors;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.util.CpuTimeTracker;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* Unit test for CGroupsV2ResourceCalculator.
*/
public class TestCGroupsV2ResourceCalculator {
private Path root;
@Before
public void before() throws IOException {
root = Files.createTempDirectory("TestCGroupsV2ResourceCalculator");
}
@After
public void after() throws IOException {
FileUtils.deleteDirectory(root.toFile());
}
@Test
public void testPidNotFound() {
CGroupsV2ResourceCalculator calculator = createCalculator();
calculator.updateProcessTree();
assertEquals(-1, calculator.getRssMemorySize(), 0L);
}
@Test
public void readFiles() throws IOException {
Files.createDirectories(root.resolve("proc/42"));
Files.createDirectories(root.resolve("mount/cgroup2/yarn/container_1"));
writeToFile("proc/42/cgroup",
"0::/container_1");
writeToFile("mount/cgroup2/yarn/container_1/memory.stat",
"anon 22000",
"slab 1774128");
writeToFile("mount/cgroup2/yarn/container_1/memory.swap.current",
"11000");
writeToFile("mount/cgroup2/yarn/container_1/cpu.stat",
"usage_usec 333",
"meaning_of_life 42");
CGroupsV2ResourceCalculator calculator = createCalculator();
when(calculator.getcGroupsHandler().getCGroupV2MountPath())
.thenReturn(root.resolve("mount/cgroup2/yarn").toString());
when(calculator.getcGroupsHandler().getRelativePathForCGroup(eq("/container_1")))
.thenReturn("container_1");
calculator.updateProcessTree();
assertEquals(333000L, calculator.getCumulativeCpuTime(), 0L);
assertEquals(22000L, calculator.getRssMemorySize(), 0L);
assertEquals(11000L, calculator.getVirtualMemorySize(), 0L);
assertEquals(-1L, calculator.getRssMemorySize(2), 0L);
assertEquals(-1L, calculator.getVirtualMemorySize(2), 0L);
}
private CGroupsV2ResourceCalculator createCalculator() {
CGroupsV2ResourceCalculator calculator = new CGroupsV2ResourceCalculator("42");
calculator.setCpuTimeTracker(mock(CpuTimeTracker.class));
calculator.setcGroupsHandler(mock(CGroupsHandler.class));
calculator.setProcFs(root.toString() + "/proc/");
calculator.setJiffyLengthMs(1_000);
return calculator;
}
private void writeToFile(String path, String... lines) throws IOException {
FileUtils.writeStringToFile(
root.resolve(path).toFile(),
Arrays.stream(lines).collect(Collectors.joining(System.lineSeparator())),
StandardCharsets.UTF_8);
}
}

View File

@ -98,7 +98,7 @@ public void testCompareResults()
new ProcfsBasedProcessTree(Long.toString(getPid()));
CGroupsResourceCalculator cgroupsCalculator =
new CGroupsResourceCalculator(Long.toString(getPid()));
cgroupsCalculator.setCGroupFilePaths();
cgroupsCalculator.initialize();
for (int i = 0; i < 5; ++i) {
Thread.sleep(3000);