diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/AbstractCGroupsCpuResourceHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/AbstractCGroupsCpuResourceHandler.java
new file mode 100644
index 0000000000..92d33d9722
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/AbstractCGroupsCpuResourceHandler.java
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
+import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils;
+import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+@InterfaceStability.Unstable
+@InterfaceAudience.Private
+public abstract class AbstractCGroupsCpuResourceHandler implements CpuResourceHandler {
+
+ static final Logger LOG =
+ LoggerFactory.getLogger(AbstractCGroupsCpuResourceHandler.class);
+
+ protected CGroupsHandler cGroupsHandler;
+ private boolean strictResourceUsageMode = false;
+ private float yarnProcessors;
+ private int nodeVCores;
+ private static final CGroupsHandler.CGroupController CPU =
+ CGroupsHandler.CGroupController.CPU;
+
+ @VisibleForTesting
+ static final int MAX_QUOTA_US = 1000 * 1000;
+ @VisibleForTesting
+ static final int MIN_PERIOD_US = 1000;
+
+ AbstractCGroupsCpuResourceHandler(CGroupsHandler cGroupsHandler) {
+ this.cGroupsHandler = cGroupsHandler;
+ }
+
+ @Override
+ public List bootstrap(Configuration conf)
+ throws ResourceHandlerException {
+ return bootstrap(
+ ResourceCalculatorPlugin.getResourceCalculatorPlugin(null, conf), conf);
+ }
+
+ @VisibleForTesting
+ List bootstrap(
+ ResourceCalculatorPlugin plugin, Configuration conf)
+ throws ResourceHandlerException {
+ this.strictResourceUsageMode = conf.getBoolean(
+ YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE,
+ YarnConfiguration.DEFAULT_NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE);
+ this.cGroupsHandler.initializeCGroupController(CPU);
+ nodeVCores = NodeManagerHardwareUtils.getVCores(plugin, conf);
+
+ // cap overall usage to the number of cores allocated to YARN
+ yarnProcessors = NodeManagerHardwareUtils.getContainersCPUs(plugin, conf);
+ int systemProcessors = NodeManagerHardwareUtils.getNodeCPUs(plugin, conf);
+ boolean existingCpuLimits;
+ existingCpuLimits = cpuLimitExists(
+ cGroupsHandler.getPathForCGroup(CPU, ""));
+
+ if (systemProcessors != (int) yarnProcessors) {
+ LOG.info("YARN containers restricted to " + yarnProcessors + " cores");
+ int[] limits = getOverallLimits(yarnProcessors);
+ updateCgroupMaxCpuLimit("", String.valueOf(limits[1]), String.valueOf(limits[0]));
+ } else if (existingCpuLimits) {
+ LOG.info("Removing CPU constraints for YARN containers.");
+ updateCgroupMaxCpuLimit("", String.valueOf(-1), null);
+ }
+ return null;
+ }
+
+ protected abstract void updateCgroupMaxCpuLimit(String cgroupId, String quota, String period)
+ throws ResourceHandlerException;
+ protected abstract boolean cpuLimitExists(String path) throws ResourceHandlerException;
+
+
+ @VisibleForTesting
+ @InterfaceAudience.Private
+ public static int[] getOverallLimits(float yarnProcessors) {
+
+ int[] ret = new int[2];
+
+ if (yarnProcessors < 0.01f) {
+ throw new IllegalArgumentException("Number of processors can't be <= 0.");
+ }
+
+ int quotaUS = MAX_QUOTA_US;
+ int periodUS = (int) (MAX_QUOTA_US / yarnProcessors);
+ if (yarnProcessors < 1.0f) {
+ periodUS = MAX_QUOTA_US;
+ quotaUS = (int) (periodUS * yarnProcessors);
+ if (quotaUS < MIN_PERIOD_US) {
+ LOG.warn("The quota calculated for the cgroup was too low."
+ + " The minimum value is " + MIN_PERIOD_US
+ + ", calculated value is " + quotaUS
+ + ". Setting quota to minimum value.");
+ quotaUS = MIN_PERIOD_US;
+ }
+ }
+
+ // cfs_period_us can't be less than 1000 microseconds
+ // if the value of periodUS is less than 1000, we can't really use cgroups
+ // to limit cpu
+ if (periodUS < MIN_PERIOD_US) {
+ LOG.warn("The period calculated for the cgroup was too low."
+ + " The minimum value is " + MIN_PERIOD_US
+ + ", calculated value is " + periodUS
+ + ". Using all available CPU.");
+ periodUS = MAX_QUOTA_US;
+ quotaUS = -1;
+ }
+
+ ret[0] = periodUS;
+ ret[1] = quotaUS;
+ return ret;
+ }
+
+ @Override
+ public List preStart(Container container)
+ throws ResourceHandlerException {
+ String cgroupId = container.getContainerId().toString();
+ cGroupsHandler.createCGroup(CPU, cgroupId);
+ updateContainer(container);
+ List ret = new ArrayList<>();
+ ret.add(new PrivilegedOperation(
+ PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP,
+ PrivilegedOperation.CGROUP_ARG_PREFIX + cGroupsHandler
+ .getPathForCGroupTasks(CPU, cgroupId)));
+ return ret;
+ }
+
+ @Override
+ public List reacquireContainer(ContainerId containerId)
+ throws ResourceHandlerException {
+ return null;
+ }
+
+ @Override
+ public List updateContainer(Container container)
+ throws ResourceHandlerException {
+ Resource containerResource = container.getResource();
+ String cgroupId = container.getContainerId().toString();
+ File cgroup = new File(cGroupsHandler.getPathForCGroup(CPU, cgroupId));
+ if (cgroup.exists()) {
+ try {
+ int containerVCores = containerResource.getVirtualCores();
+ ContainerTokenIdentifier id = container.getContainerTokenIdentifier();
+ if (id != null && id.getExecutionType() ==
+ ExecutionType.OPPORTUNISTIC) {
+ updateCgroupCpuWeight(cgroupId, getOpportunisticCpuWeight());
+ } else {
+ updateCgroupCpuWeight(cgroupId, getCpuWeightByContainerVcores(containerVCores));
+ }
+ if (strictResourceUsageMode) {
+ if (nodeVCores != containerVCores) {
+ float containerCPU =
+ (containerVCores * yarnProcessors) / (float) nodeVCores;
+ int[] limits = getOverallLimits(containerCPU);
+ updateCgroupMaxCpuLimit(cgroupId, String.valueOf(limits[1]), String.valueOf(limits[0]));
+ }
+ }
+ } catch (ResourceHandlerException re) {
+ cGroupsHandler.deleteCGroup(CPU, cgroupId);
+ LOG.warn("Could not update cgroup for container", re);
+ throw re;
+ }
+ }
+ return null;
+ }
+
+ protected abstract int getOpportunisticCpuWeight();
+ protected abstract int getCpuWeightByContainerVcores(int containerVcores);
+ protected abstract void updateCgroupCpuWeight(String cgroupId, int weight)
+ throws ResourceHandlerException;
+
+ @Override
+ public List postComplete(ContainerId containerId)
+ throws ResourceHandlerException {
+ cGroupsHandler.deleteCGroup(CPU, containerId.toString());
+ return null;
+ }
+
+ @Override public List teardown()
+ throws ResourceHandlerException {
+ return null;
+ }
+
+ @Override
+ public String toString() {
+ return AbstractCGroupsCpuResourceHandler.class.getName();
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/AbstractCGroupsHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/AbstractCGroupsHandler.java
index 272f04ce77..a8f528a209 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/AbstractCGroupsHandler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/AbstractCGroupsHandler.java
@@ -414,9 +414,10 @@ private String getErrorWithDetails(
public String createCGroup(CGroupController controller, String cGroupId)
throws ResourceHandlerException {
String path = getPathForCGroup(controller, cGroupId);
+ File cgroup = new File(path);
LOG.debug("createCgroup: {}", path);
- if (!new File(path).mkdir()) {
+ if (!cgroup.exists() && !cgroup.mkdir()) {
throw new ResourceHandlerException("Failed to create cgroup at " + path);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsBlkioResourceHandlerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsBlkioResourceHandlerImpl.java
index 865d2b19fd..b2829ae0f5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsBlkioResourceHandlerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsBlkioResourceHandlerImpl.java
@@ -134,7 +134,7 @@ public List preStart(Container container)
.createCGroup(CGroupsHandler.CGroupController.BLKIO, cgroupId);
try {
cGroupsHandler.updateCGroupParam(CGroupsHandler.CGroupController.BLKIO,
- cgroupId, CGroupsHandler.CGROUP_PARAM_BLKIO_WEIGHT, DEFAULT_WEIGHT);
+ cgroupId, CGroupsHandler.CGROUP_PARAM_WEIGHT, DEFAULT_WEIGHT);
} catch (ResourceHandlerException re) {
cGroupsHandler.deleteCGroup(CGroupsHandler.CGroupController.BLKIO,
cgroupId);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsCpuResourceHandlerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsCpuResourceHandlerImpl.java
index f724b8803d..54686fddb2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsCpuResourceHandlerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsCpuResourceHandlerImpl.java
@@ -18,28 +18,14 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
-import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.commons.io.FileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ExecutionType;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
-import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils;
-import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
+import org.apache.hadoop.classification.VisibleForTesting;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.List;
/**
* An implementation for using CGroups to restrict CPU usage on Linux. The
@@ -58,208 +44,62 @@
*/
@InterfaceStability.Unstable
@InterfaceAudience.Private
-public class CGroupsCpuResourceHandlerImpl implements CpuResourceHandler {
-
- static final Logger LOG =
- LoggerFactory.getLogger(CGroupsCpuResourceHandlerImpl.class);
-
- private CGroupsHandler cGroupsHandler;
- private boolean strictResourceUsageMode = false;
- private float yarnProcessors;
- private int nodeVCores;
+public class CGroupsCpuResourceHandlerImpl extends AbstractCGroupsCpuResourceHandler {
private static final CGroupsHandler.CGroupController CPU =
CGroupsHandler.CGroupController.CPU;
- @VisibleForTesting
- static final int MAX_QUOTA_US = 1000 * 1000;
- @VisibleForTesting
- static final int MIN_PERIOD_US = 1000;
@VisibleForTesting
static final int CPU_DEFAULT_WEIGHT = 1024; // set by kernel
static final int CPU_DEFAULT_WEIGHT_OPPORTUNISTIC = 2;
+
CGroupsCpuResourceHandlerImpl(CGroupsHandler cGroupsHandler) {
- this.cGroupsHandler = cGroupsHandler;
+ super(cGroupsHandler);
}
@Override
- public List bootstrap(Configuration conf)
- throws ResourceHandlerException {
- return bootstrap(
- ResourceCalculatorPlugin.getResourceCalculatorPlugin(null, conf), conf);
+ protected void updateCgroupMaxCpuLimit(String cgroupId, String quota, String period) throws ResourceHandlerException {
+ if (quota != null) {
+ cGroupsHandler
+ .updateCGroupParam(CPU, cgroupId, CGroupsHandler.CGROUP_CPU_QUOTA_US, quota);
+ }
+ if (period != null) {
+ cGroupsHandler
+ .updateCGroupParam(CPU, cgroupId, CGroupsHandler.CGROUP_CPU_PERIOD_US, period);
+ }
}
- @VisibleForTesting
- List bootstrap(
- ResourceCalculatorPlugin plugin, Configuration conf)
- throws ResourceHandlerException {
- this.strictResourceUsageMode = conf.getBoolean(
- YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE,
- YarnConfiguration.DEFAULT_NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE);
- this.cGroupsHandler.initializeCGroupController(CPU);
- nodeVCores = NodeManagerHardwareUtils.getVCores(plugin, conf);
+ @Override
+ protected int getOpportunisticCpuWeight() {
+ return CPU_DEFAULT_WEIGHT_OPPORTUNISTIC;
+ }
+ protected int getCpuWeightByContainerVcores(int containerVCores) {
+ return containerVCores * CPU_DEFAULT_WEIGHT;
+ }
- // cap overall usage to the number of cores allocated to YARN
- yarnProcessors = NodeManagerHardwareUtils.getContainersCPUs(plugin, conf);
- int systemProcessors = NodeManagerHardwareUtils.getNodeCPUs(plugin, conf);
- boolean existingCpuLimits;
+ @Override
+ protected void updateCgroupCpuWeight(String cgroupId, int weight) throws ResourceHandlerException {
+ cGroupsHandler.updateCGroupParam(CPU, cgroupId, CGroupsHandler.CGROUP_CPU_SHARES,
+ String.valueOf(weight));
+ }
+
+ @Override
+ public boolean cpuLimitExists(String cgroupPath) throws ResourceHandlerException {
try {
- existingCpuLimits =
- cpuLimitsExist(cGroupsHandler.getPathForCGroup(CPU, ""));
- } catch (IOException ie) {
- throw new ResourceHandlerException(ie);
+ return checkCgroupV1CPULimitExists(cgroupPath);
+ } catch (IOException e) {
+ throw new ResourceHandlerException("Failed to check CPU limit", e);
}
- if (systemProcessors != (int) yarnProcessors) {
- LOG.info("YARN containers restricted to " + yarnProcessors + " cores");
- int[] limits = getOverallLimits(yarnProcessors);
- cGroupsHandler
- .updateCGroupParam(CPU, "", CGroupsHandler.CGROUP_CPU_PERIOD_US,
- String.valueOf(limits[0]));
- cGroupsHandler
- .updateCGroupParam(CPU, "", CGroupsHandler.CGROUP_CPU_QUOTA_US,
- String.valueOf(limits[1]));
- } else if (existingCpuLimits) {
- LOG.info("Removing CPU constraints for YARN containers.");
- cGroupsHandler
- .updateCGroupParam(CPU, "", CGroupsHandler.CGROUP_CPU_QUOTA_US,
- String.valueOf(-1));
- }
- return null;
}
@InterfaceAudience.Private
- public static boolean cpuLimitsExist(String path)
- throws IOException {
+ public static boolean checkCgroupV1CPULimitExists(String path) throws IOException {
File quotaFile = new File(path,
CPU.getName() + "." + CGroupsHandler.CGROUP_CPU_QUOTA_US);
if (quotaFile.exists()) {
String contents = FileUtils.readFileToString(quotaFile, StandardCharsets.UTF_8);
- int quotaUS = Integer.parseInt(contents.trim());
- if (quotaUS != -1) {
- return true;
- }
+ return Integer.parseInt(contents.trim()) != -1;
}
return false;
}
-
- @VisibleForTesting
- @InterfaceAudience.Private
- public static int[] getOverallLimits(float yarnProcessors) {
-
- int[] ret = new int[2];
-
- if (yarnProcessors < 0.01f) {
- throw new IllegalArgumentException("Number of processors can't be <= 0.");
- }
-
- int quotaUS = MAX_QUOTA_US;
- int periodUS = (int) (MAX_QUOTA_US / yarnProcessors);
- if (yarnProcessors < 1.0f) {
- periodUS = MAX_QUOTA_US;
- quotaUS = (int) (periodUS * yarnProcessors);
- if (quotaUS < MIN_PERIOD_US) {
- LOG.warn("The quota calculated for the cgroup was too low."
- + " The minimum value is " + MIN_PERIOD_US
- + ", calculated value is " + quotaUS
- + ". Setting quota to minimum value.");
- quotaUS = MIN_PERIOD_US;
- }
- }
-
- // cfs_period_us can't be less than 1000 microseconds
- // if the value of periodUS is less than 1000, we can't really use cgroups
- // to limit cpu
- if (periodUS < MIN_PERIOD_US) {
- LOG.warn("The period calculated for the cgroup was too low."
- + " The minimum value is " + MIN_PERIOD_US
- + ", calculated value is " + periodUS
- + ". Using all available CPU.");
- periodUS = MAX_QUOTA_US;
- quotaUS = -1;
- }
-
- ret[0] = periodUS;
- ret[1] = quotaUS;
- return ret;
- }
-
- @Override
- public List preStart(Container container)
- throws ResourceHandlerException {
- String cgroupId = container.getContainerId().toString();
- cGroupsHandler.createCGroup(CPU, cgroupId);
- updateContainer(container);
- List ret = new ArrayList<>();
- ret.add(new PrivilegedOperation(
- PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP,
- PrivilegedOperation.CGROUP_ARG_PREFIX + cGroupsHandler
- .getPathForCGroupTasks(CPU, cgroupId)));
- return ret;
- }
-
- @Override
- public List reacquireContainer(ContainerId containerId)
- throws ResourceHandlerException {
- return null;
- }
-
- @Override
- public List updateContainer(Container container)
- throws ResourceHandlerException {
- Resource containerResource = container.getResource();
- String cgroupId = container.getContainerId().toString();
- File cgroup = new File(cGroupsHandler.getPathForCGroup(CPU, cgroupId));
- if (cgroup.exists()) {
- try {
- int containerVCores = containerResource.getVirtualCores();
- ContainerTokenIdentifier id = container.getContainerTokenIdentifier();
- if (id != null && id.getExecutionType() ==
- ExecutionType.OPPORTUNISTIC) {
- cGroupsHandler
- .updateCGroupParam(CPU, cgroupId,
- CGroupsHandler.CGROUP_CPU_SHARES,
- String.valueOf(CPU_DEFAULT_WEIGHT_OPPORTUNISTIC));
- } else {
- int cpuShares = CPU_DEFAULT_WEIGHT * containerVCores;
- cGroupsHandler
- .updateCGroupParam(CPU, cgroupId,
- CGroupsHandler.CGROUP_CPU_SHARES,
- String.valueOf(cpuShares));
- }
- if (strictResourceUsageMode) {
- if (nodeVCores != containerVCores) {
- float containerCPU =
- (containerVCores * yarnProcessors) / (float) nodeVCores;
- int[] limits = getOverallLimits(containerCPU);
- cGroupsHandler.updateCGroupParam(CPU, cgroupId,
- CGroupsHandler.CGROUP_CPU_PERIOD_US, String.valueOf(limits[0]));
- cGroupsHandler.updateCGroupParam(CPU, cgroupId,
- CGroupsHandler.CGROUP_CPU_QUOTA_US, String.valueOf(limits[1]));
- }
- }
- } catch (ResourceHandlerException re) {
- cGroupsHandler.deleteCGroup(CPU, cgroupId);
- LOG.warn("Could not update cgroup for container", re);
- throw re;
- }
- }
- return null;
- }
-
- @Override
- public List postComplete(ContainerId containerId)
- throws ResourceHandlerException {
- cGroupsHandler.deleteCGroup(CPU, containerId.toString());
- return null;
- }
-
- @Override public List teardown()
- throws ResourceHandlerException {
- return null;
- }
-
- @Override
- public String toString() {
- return CGroupsCpuResourceHandlerImpl.class.getName();
- }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandler.java
index 73e1443a2e..b8b4b2b7e3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandler.java
@@ -122,11 +122,12 @@ public static Set getValidV2CGroups() {
// v2 specific params
String CGROUP_CONTROLLERS_FILE = "cgroup.controllers";
String CGROUP_SUBTREE_CONTROL_FILE = "cgroup.subtree_control";
+ String CGROUP_CPU_MAX = "max";
// present in v1 and v2
String CGROUP_PROCS_FILE = "cgroup.procs";
String CGROUP_PARAM_CLASSID = "classid";
- String CGROUP_PARAM_BLKIO_WEIGHT = "weight";
+ String CGROUP_PARAM_WEIGHT = "weight";
/**
* Mounts or initializes a cgroup controller.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsV2CpuResourceHandlerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsV2CpuResourceHandlerImpl.java
new file mode 100644
index 0000000000..97456ad7d2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsV2CpuResourceHandlerImpl.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.VisibleForTesting;
+
+/**
+ * An implementation for using CGroups V2 to restrict CPU usage on Linux. The
+ * implementation supports 3 different controls - restrict usage of all YARN
+ * containers, restrict relative usage of individual YARN containers and
+ * restrict usage of individual YARN containers. Admins can set the overall CPU
+ * to be used by all YARN containers - this is implemented by setting
+ * cpu.max to the value desired. If strict resource usage mode is not enabled,
+ * cpu.weight is set for individual containers - this prevents containers from
+ * exceeding the overall limit for YARN containers but individual containers
+ * can use as much of the CPU as available(under the YARN limit). If strict
+ * resource usage is enabled, then container can only use the percentage of
+ * CPU allocated to them and this is again implemented using cpu.max.
+ */
+@InterfaceStability.Unstable
+@InterfaceAudience.Private
+public class CGroupsV2CpuResourceHandlerImpl extends AbstractCGroupsCpuResourceHandler {
+ private static final CGroupsHandler.CGroupController CPU =
+ CGroupsHandler.CGroupController.CPU;
+
+ @VisibleForTesting
+ static final int CPU_DEFAULT_WEIGHT = 100; // cgroup v2 default
+ static final int CPU_DEFAULT_WEIGHT_OPPORTUNISTIC = 1;
+ static final int CPU_MAX_WEIGHT = 10000;
+ static final String NO_LIMIT = "max";
+
+
+ CGroupsV2CpuResourceHandlerImpl(CGroupsHandler cGroupsHandler) {
+ super(cGroupsHandler);
+ }
+
+ @Override
+ protected void updateCgroupMaxCpuLimit(String cgroupId, String max, String period)
+ throws ResourceHandlerException {
+ // The cpu.max file in cgroup v2 is a read-write two value file which exists on
+ // non-root cgroups. The default is “max 100000”.
+ // It is the maximum bandwidth limit. It’s in the following format:
+ // $MAX $PERIOD
+ // which indicates that the group may consume up to $MAX in each $PERIOD duration.
+ // “max” for $MAX indicates no limit. If only one number is written, $MAX is updated.
+ String currentCpuMax = cGroupsHandler.getCGroupParam(CPU, cgroupId,
+ CGroupsHandler.CGROUP_CPU_MAX);
+
+ if (currentCpuMax == null) {
+ currentCpuMax = "";
+ }
+
+ String[] currentCpuMaxArray = currentCpuMax.split(" ");
+ String maxToSet = max != null ? max : currentCpuMaxArray[0];
+ maxToSet = maxToSet.equals("-1") ? NO_LIMIT : maxToSet;
+ String periodToSet = period != null ? period : currentCpuMaxArray[1];
+ cGroupsHandler
+ .updateCGroupParam(CPU, cgroupId, CGroupsHandler.CGROUP_CPU_MAX,
+ maxToSet + " " + periodToSet);
+ }
+
+ @Override
+ protected int getOpportunisticCpuWeight() {
+ return CPU_DEFAULT_WEIGHT_OPPORTUNISTIC;
+ }
+ protected int getCpuWeightByContainerVcores(int containerVCores) {
+ return Math.min(containerVCores * CPU_DEFAULT_WEIGHT, CPU_MAX_WEIGHT);
+ }
+
+ @Override
+ protected void updateCgroupCpuWeight(String cgroupId, int weight) throws ResourceHandlerException {
+ cGroupsHandler.updateCGroupParam(CPU, cgroupId, CGroupsHandler.CGROUP_PARAM_WEIGHT,
+ String.valueOf(weight));
+ }
+
+ @Override
+ public boolean cpuLimitExists(String cgroupPath) throws ResourceHandlerException {
+ String globalCpuMaxLimit = cGroupsHandler.getCGroupParam(CPU, "",
+ CGroupsHandler.CGROUP_CPU_MAX);
+ if (globalCpuMaxLimit == null) {
+ return false;
+ }
+ String[] cpuMaxLimitArray = globalCpuMaxLimit.split(" ");
+
+ return !cpuMaxLimitArray[0].equals(NO_LIMIT);
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/CgroupsLCEResourcesHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/CgroupsLCEResourcesHandler.java
index 037d4cf170..74d6f8a528 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/CgroupsLCEResourcesHandler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/CgroupsLCEResourcesHandler.java
@@ -175,7 +175,7 @@ void init(LinuxContainerExecutor lce, ResourceCalculatorPlugin plugin)
String.valueOf(limits[0]));
updateCgroup(CONTROLLER_CPU, "", CPU_QUOTA_US,
String.valueOf(limits[1]));
- } else if (CGroupsCpuResourceHandlerImpl.cpuLimitsExist(
+ } else if (CGroupsCpuResourceHandlerImpl.checkCgroupV1CPULimitExists(
pathForCgroup(CONTROLLER_CPU, ""))) {
LOG.info("Removing CPU constraints for YARN containers.");
updateCgroup(CONTROLLER_CPU, "", CPU_QUOTA_US, String.valueOf(-1));
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupsBlkioResourceHandlerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupsBlkioResourceHandlerImpl.java
index e8ec6fa83b..65a5c667b6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupsBlkioResourceHandlerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupsBlkioResourceHandlerImpl.java
@@ -77,7 +77,7 @@ public void testPreStart() throws Exception {
CGroupsHandler.CGroupController.BLKIO, id);
verify(mockCGroupsHandler, times(1)).updateCGroupParam(
CGroupsHandler.CGroupController.BLKIO, id,
- CGroupsHandler.CGROUP_PARAM_BLKIO_WEIGHT,
+ CGroupsHandler.CGROUP_PARAM_WEIGHT,
CGroupsBlkioResourceHandlerImpl.DEFAULT_WEIGHT);
Assert.assertNotNull(ret);
Assert.assertEquals(1, ret.size());
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupsV2CpuResourceHandlerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupsV2CpuResourceHandlerImpl.java
new file mode 100644
index 0000000000..1d77d8adc6
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupsV2CpuResourceHandlerImpl.java
@@ -0,0 +1,313 @@
+/*
+ * *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * /
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
+import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.List;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TestCGroupsV2CpuResourceHandlerImpl {
+
+ private CGroupsHandler mockCGroupsHandler;
+ private CGroupsV2CpuResourceHandlerImpl cGroupsCpuResourceHandler;
+ private ResourceCalculatorPlugin plugin;
+ final int numProcessors = 4;
+
+ @Before
+ public void setup() {
+ mockCGroupsHandler = mock(CGroupsHandler.class);
+ when(mockCGroupsHandler.getPathForCGroup(any(), any())).thenReturn(".");
+ cGroupsCpuResourceHandler =
+ new CGroupsV2CpuResourceHandlerImpl(mockCGroupsHandler);
+
+ plugin = mock(ResourceCalculatorPlugin.class);
+ Mockito.doReturn(numProcessors).when(plugin).getNumProcessors();
+ Mockito.doReturn(numProcessors).when(plugin).getNumCores();
+ }
+
+ @Test
+ public void testBootstrap() throws Exception {
+ Configuration conf = new YarnConfiguration();
+
+ List ret =
+ cGroupsCpuResourceHandler.bootstrap(plugin, conf);
+ verify(mockCGroupsHandler, times(1))
+ .initializeCGroupController(CGroupsHandler.CGroupController.CPU);
+ verify(mockCGroupsHandler, times(0))
+ .updateCGroupParam(CGroupsHandler.CGroupController.CPU, "",
+ CGroupsHandler.CGROUP_CPU_MAX, "");
+ Assert.assertNull(ret);
+ }
+
+ @Test
+ public void testBootstrapLimits() throws Exception {
+ Configuration conf = new YarnConfiguration();
+
+ int cpuPerc = 80;
+ conf.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT,
+ cpuPerc);
+ int period = (CGroupsV2CpuResourceHandlerImpl.MAX_QUOTA_US * 100) / (cpuPerc
+ * numProcessors);
+ String cpuMaxValue = CGroupsV2CpuResourceHandlerImpl.MAX_QUOTA_US + " " + period;
+ List ret =
+ cGroupsCpuResourceHandler.bootstrap(plugin, conf);
+ verify(mockCGroupsHandler, times(1))
+ .initializeCGroupController(CGroupsHandler.CGroupController.CPU);
+ verify(mockCGroupsHandler, times(1))
+ .updateCGroupParam(CGroupsHandler.CGroupController.CPU, "",
+ CGroupsHandler.CGROUP_CPU_MAX, cpuMaxValue);
+ Assert.assertNull(ret);
+ }
+
+ @Test
+ public void testBootstrapExistingLimits() throws Exception {
+ Configuration conf = new YarnConfiguration();
+
+ when(mockCGroupsHandler
+ .getCGroupParam(CGroupsHandler.CGroupController.CPU, "",
+ CGroupsHandler.CGROUP_CPU_MAX))
+ .thenReturn("100 100000");
+
+ List ret =
+ cGroupsCpuResourceHandler.bootstrap(plugin, conf);
+ verify(mockCGroupsHandler, times(1))
+ .initializeCGroupController(CGroupsHandler.CGroupController.CPU);
+ verify(mockCGroupsHandler, times(2))
+ .getCGroupParam(CGroupsHandler.CGroupController.CPU, "",
+ CGroupsHandler.CGROUP_CPU_MAX);
+ verify(mockCGroupsHandler, times(1))
+ .updateCGroupParam(CGroupsHandler.CGroupController.CPU, "",
+ CGroupsHandler.CGROUP_CPU_MAX, "max 100000");
+ Assert.assertNull(ret);
+ }
+
+ @Test
+ public void testPreStart() throws Exception {
+ String id = "container_01_01";
+ String path = "test-path/" + id;
+ ContainerId mockContainerId = mock(ContainerId.class);
+ when(mockContainerId.toString()).thenReturn(id);
+ Container mockContainer = mock(Container.class);
+ when(mockContainer.getContainerId()).thenReturn(mockContainerId);
+ when(mockCGroupsHandler
+ .getPathForCGroupTasks(CGroupsHandler.CGroupController.CPU, id))
+ .thenReturn(path);
+ when(mockContainer.getResource()).thenReturn(Resource.newInstance(1024, 2));
+
+ List ret =
+ cGroupsCpuResourceHandler.preStart(mockContainer);
+ verify(mockCGroupsHandler, times(1))
+ .createCGroup(CGroupsHandler.CGroupController.CPU, id);
+ verify(mockCGroupsHandler, times(1))
+ .updateCGroupParam(CGroupsHandler.CGroupController.CPU, id,
+ CGroupsHandler.CGROUP_PARAM_WEIGHT, String
+ .valueOf(CGroupsV2CpuResourceHandlerImpl.CPU_DEFAULT_WEIGHT * 2)); // 2 vcores
+
+ // don't set cpu.max
+ verify(mockCGroupsHandler, never())
+ .updateCGroupParam(eq(CGroupsHandler.CGroupController.CPU), eq(id),
+ eq(CGroupsHandler.CGROUP_CPU_MAX), anyString());
+
+ validatePrivilegedOperationList(ret, path);
+ }
+
+ @Test
+ public void testPreStartStrictUsage() throws Exception {
+ String id = "container_01_01";
+ String path = "test-path/" + id;
+ ContainerId mockContainerId = mock(ContainerId.class);
+ when(mockContainerId.toString()).thenReturn(id);
+ Container mockContainer = mock(Container.class);
+ when(mockContainer.getContainerId()).thenReturn(mockContainerId);
+ when(mockCGroupsHandler
+ .getPathForCGroupTasks(CGroupsHandler.CGroupController.CPU, id))
+ .thenReturn(path);
+ when(mockContainer.getResource()).thenReturn(Resource.newInstance(1024, 1));
+ Configuration conf = new YarnConfiguration();
+ conf.setBoolean(
+ YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE,
+ true);
+
+ cGroupsCpuResourceHandler.bootstrap(plugin, conf);
+
+ int defaultVCores = 8;
+ float share = (float) numProcessors / (float) defaultVCores;
+ List ret =
+ cGroupsCpuResourceHandler.preStart(mockContainer);
+
+ verify(mockCGroupsHandler, times(1))
+ .createCGroup(CGroupsHandler.CGroupController.CPU, id);
+ verify(mockCGroupsHandler, times(1))
+ .updateCGroupParam(CGroupsHandler.CGroupController.CPU, id,
+ CGroupsHandler.CGROUP_PARAM_WEIGHT,
+ String.valueOf(CGroupsV2CpuResourceHandlerImpl.CPU_DEFAULT_WEIGHT));
+
+ // set quota and period
+ String cpuMaxValue = (int) (CGroupsV2CpuResourceHandlerImpl.MAX_QUOTA_US * share) +
+ " " + CGroupsV2CpuResourceHandlerImpl.MAX_QUOTA_US;
+ verify(mockCGroupsHandler, times(1))
+ .updateCGroupParam(CGroupsHandler.CGroupController.CPU, id,
+ CGroupsHandler.CGROUP_CPU_MAX, cpuMaxValue);
+
+ validatePrivilegedOperationList(ret, path);
+ }
+
+ @Test
+ public void testPreStartRestrictedContainers() throws Exception {
+ String id = "container_01_01";
+ String path = "test-path/" + id;
+ int defaultVCores = 8;
+ Configuration conf = new YarnConfiguration();
+ conf.setBoolean(
+ YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE,
+ true);
+ int cpuPerc = 75;
+ conf.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT,
+ cpuPerc);
+
+ cGroupsCpuResourceHandler.bootstrap(plugin, conf);
+
+ String maxCpuLimit = CGroupsV2CpuResourceHandlerImpl.MAX_QUOTA_US + " " +
+ CGroupsV2CpuResourceHandlerImpl.MAX_QUOTA_US * 100 / (cpuPerc * numProcessors);
+ verify(mockCGroupsHandler, times(1))
+ .updateCGroupParam(CGroupsHandler.CGroupController.CPU, "",
+ CGroupsHandler.CGROUP_CPU_MAX, maxCpuLimit);
+
+ float yarnCores = (float) (cpuPerc * numProcessors) / 100;
+ int[] containerVCores = { 2, 4 };
+ for (int cVcores : containerVCores) {
+ ContainerId mockContainerId = mock(ContainerId.class);
+ when(mockContainerId.toString()).thenReturn(id);
+ Container mockContainer = mock(Container.class);
+ when(mockContainer.getContainerId()).thenReturn(mockContainerId);
+ when(mockCGroupsHandler
+ .getPathForCGroupTasks(CGroupsHandler.CGroupController.CPU, id))
+ .thenReturn(path);
+ when(mockContainer.getResource())
+ .thenReturn(Resource.newInstance(1024, cVcores));
+ when(mockCGroupsHandler
+ .getPathForCGroupTasks(CGroupsHandler.CGroupController.CPU, id))
+ .thenReturn(path);
+
+ float share = (cVcores * yarnCores) / defaultVCores;
+ int quotaUS;
+ int periodUS;
+ if (share > 1.0f) {
+ quotaUS = CGroupsV2CpuResourceHandlerImpl.MAX_QUOTA_US;
+ periodUS =
+ (int) ((float) CGroupsV2CpuResourceHandlerImpl.MAX_QUOTA_US / share);
+ } else {
+ quotaUS = (int) (CGroupsV2CpuResourceHandlerImpl.MAX_QUOTA_US * share);
+ periodUS = CGroupsV2CpuResourceHandlerImpl.MAX_QUOTA_US;
+ }
+
+ cGroupsCpuResourceHandler.preStart(mockContainer);
+
+ verify(mockCGroupsHandler, times(1))
+ .updateCGroupParam(CGroupsHandler.CGroupController.CPU, id,
+ CGroupsHandler.CGROUP_PARAM_WEIGHT, String.valueOf(
+ CGroupsV2CpuResourceHandlerImpl.CPU_DEFAULT_WEIGHT * cVcores));
+
+ // set cpu.max
+ verify(mockCGroupsHandler, times(1))
+ .updateCGroupParam(CGroupsHandler.CGroupController.CPU, id,
+ CGroupsHandler.CGROUP_CPU_MAX, quotaUS + " " + periodUS);
+ }
+ }
+
+ @Test
+ public void testReacquireContainer() throws Exception {
+ ContainerId containerIdMock = mock(ContainerId.class);
+ Assert.assertNull(
+ cGroupsCpuResourceHandler.reacquireContainer(containerIdMock));
+ }
+
+ @Test
+ public void testPostComplete() throws Exception {
+ String id = "container_01_01";
+ ContainerId mockContainerId = mock(ContainerId.class);
+ when(mockContainerId.toString()).thenReturn(id);
+ Assert.assertNull(cGroupsCpuResourceHandler.postComplete(mockContainerId));
+ verify(mockCGroupsHandler, times(1))
+ .deleteCGroup(CGroupsHandler.CGroupController.CPU, id);
+ }
+
+ @Test
+ public void testTeardown() throws Exception {
+ Assert.assertNull(cGroupsCpuResourceHandler.teardown());
+ }
+
+ @Test
+ public void testOpportunistic() throws Exception {
+ Configuration conf = new YarnConfiguration();
+
+ cGroupsCpuResourceHandler.bootstrap(plugin, conf);
+
+ ContainerTokenIdentifier tokenId = mock(ContainerTokenIdentifier.class);
+ when(tokenId.getExecutionType()).thenReturn(ExecutionType.OPPORTUNISTIC);
+ Container container = mock(Container.class);
+ String id = "container_01_01";
+ ContainerId mockContainerId = mock(ContainerId.class);
+ when(mockContainerId.toString()).thenReturn(id);
+ when(container.getContainerId()).thenReturn(mockContainerId);
+ when(container.getContainerTokenIdentifier()).thenReturn(tokenId);
+ when(container.getResource()).thenReturn(Resource.newInstance(1024, 2));
+
+ cGroupsCpuResourceHandler.preStart(container);
+ verify(mockCGroupsHandler, times(1))
+ .updateCGroupParam(CGroupsHandler.CGroupController.CPU, id,
+ CGroupsHandler.CGROUP_PARAM_WEIGHT, String.valueOf(
+ CGroupsV2CpuResourceHandlerImpl.CPU_DEFAULT_WEIGHT_OPPORTUNISTIC));
+ }
+
+ private void validatePrivilegedOperationList(List ops, String path) {
+ Assert.assertNotNull(ops);
+ Assert.assertEquals(1, ops.size());
+ PrivilegedOperation op = ops.get(0);
+ Assert.assertEquals(PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP,
+ op.getOperationType());
+ List args = op.getArguments();
+ Assert.assertEquals(1, args.size());
+ Assert.assertEquals(PrivilegedOperation.CGROUP_ARG_PREFIX + path,
+ args.get(0));
+ }
+}