From 399299104c5837ed0edd42be027bc7dfc45c2bfe Mon Sep 17 00:00:00 2001 From: Benjamin Teke Date: Fri, 26 Apr 2024 15:00:00 +0200 Subject: [PATCH] YARN-11674. Add CPUResourceHandler for cgroup v2. (#6751) --- .../AbstractCGroupsCpuResourceHandler.java | 219 ++++++++++++ .../resources/AbstractCGroupsHandler.java | 3 +- .../CGroupsBlkioResourceHandlerImpl.java | 2 +- .../CGroupsCpuResourceHandlerImpl.java | 226 ++----------- .../linux/resources/CGroupsHandler.java | 3 +- .../CGroupsV2CpuResourceHandlerImpl.java | 105 ++++++ .../util/CgroupsLCEResourcesHandler.java | 2 +- .../TestCGroupsBlkioResourceHandlerImpl.java | 2 +- .../TestCGroupsV2CpuResourceHandlerImpl.java | 313 ++++++++++++++++++ 9 files changed, 677 insertions(+), 198 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/AbstractCGroupsCpuResourceHandler.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsV2CpuResourceHandlerImpl.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupsV2CpuResourceHandlerImpl.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/AbstractCGroupsCpuResourceHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/AbstractCGroupsCpuResourceHandler.java new file mode 100644 index 0000000000..92d33d9722 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/AbstractCGroupsCpuResourceHandler.java @@ -0,0 +1,219 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation; +import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils; +import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; + +@InterfaceStability.Unstable +@InterfaceAudience.Private +public abstract class AbstractCGroupsCpuResourceHandler implements CpuResourceHandler { + + static final Logger LOG = + LoggerFactory.getLogger(AbstractCGroupsCpuResourceHandler.class); + + protected CGroupsHandler cGroupsHandler; + private boolean strictResourceUsageMode = false; + private float yarnProcessors; + private int nodeVCores; + private static final CGroupsHandler.CGroupController CPU = + CGroupsHandler.CGroupController.CPU; + + @VisibleForTesting + static final int MAX_QUOTA_US = 1000 * 1000; + @VisibleForTesting + static final int MIN_PERIOD_US = 1000; + + AbstractCGroupsCpuResourceHandler(CGroupsHandler cGroupsHandler) { + this.cGroupsHandler = cGroupsHandler; + } + + @Override + public List bootstrap(Configuration conf) + throws ResourceHandlerException { + return bootstrap( + ResourceCalculatorPlugin.getResourceCalculatorPlugin(null, conf), conf); + } + + @VisibleForTesting + List bootstrap( + ResourceCalculatorPlugin plugin, Configuration conf) + throws ResourceHandlerException { + this.strictResourceUsageMode = conf.getBoolean( + YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE, + YarnConfiguration.DEFAULT_NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE); + this.cGroupsHandler.initializeCGroupController(CPU); + nodeVCores = NodeManagerHardwareUtils.getVCores(plugin, conf); + + // cap overall usage to the number of cores allocated to YARN + yarnProcessors = NodeManagerHardwareUtils.getContainersCPUs(plugin, conf); + int systemProcessors = NodeManagerHardwareUtils.getNodeCPUs(plugin, conf); + boolean existingCpuLimits; + existingCpuLimits = cpuLimitExists( + cGroupsHandler.getPathForCGroup(CPU, "")); + + if (systemProcessors != (int) yarnProcessors) { + LOG.info("YARN containers restricted to " + yarnProcessors + " cores"); + int[] limits = getOverallLimits(yarnProcessors); + updateCgroupMaxCpuLimit("", String.valueOf(limits[1]), String.valueOf(limits[0])); + } else if (existingCpuLimits) { + LOG.info("Removing CPU constraints for YARN containers."); + updateCgroupMaxCpuLimit("", String.valueOf(-1), null); + } + return null; + } + + protected abstract void updateCgroupMaxCpuLimit(String cgroupId, String quota, String period) + throws ResourceHandlerException; + protected abstract boolean cpuLimitExists(String path) throws ResourceHandlerException; + + + @VisibleForTesting + @InterfaceAudience.Private + public static int[] getOverallLimits(float yarnProcessors) { + + int[] ret = new int[2]; + + if (yarnProcessors < 0.01f) { + throw new IllegalArgumentException("Number of processors can't be <= 0."); + } + + int quotaUS = MAX_QUOTA_US; + int periodUS = (int) (MAX_QUOTA_US / yarnProcessors); + if (yarnProcessors < 1.0f) { + periodUS = MAX_QUOTA_US; + quotaUS = (int) (periodUS * yarnProcessors); + if (quotaUS < MIN_PERIOD_US) { + LOG.warn("The quota calculated for the cgroup was too low." + + " The minimum value is " + MIN_PERIOD_US + + ", calculated value is " + quotaUS + + ". Setting quota to minimum value."); + quotaUS = MIN_PERIOD_US; + } + } + + // cfs_period_us can't be less than 1000 microseconds + // if the value of periodUS is less than 1000, we can't really use cgroups + // to limit cpu + if (periodUS < MIN_PERIOD_US) { + LOG.warn("The period calculated for the cgroup was too low." + + " The minimum value is " + MIN_PERIOD_US + + ", calculated value is " + periodUS + + ". Using all available CPU."); + periodUS = MAX_QUOTA_US; + quotaUS = -1; + } + + ret[0] = periodUS; + ret[1] = quotaUS; + return ret; + } + + @Override + public List preStart(Container container) + throws ResourceHandlerException { + String cgroupId = container.getContainerId().toString(); + cGroupsHandler.createCGroup(CPU, cgroupId); + updateContainer(container); + List ret = new ArrayList<>(); + ret.add(new PrivilegedOperation( + PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP, + PrivilegedOperation.CGROUP_ARG_PREFIX + cGroupsHandler + .getPathForCGroupTasks(CPU, cgroupId))); + return ret; + } + + @Override + public List reacquireContainer(ContainerId containerId) + throws ResourceHandlerException { + return null; + } + + @Override + public List updateContainer(Container container) + throws ResourceHandlerException { + Resource containerResource = container.getResource(); + String cgroupId = container.getContainerId().toString(); + File cgroup = new File(cGroupsHandler.getPathForCGroup(CPU, cgroupId)); + if (cgroup.exists()) { + try { + int containerVCores = containerResource.getVirtualCores(); + ContainerTokenIdentifier id = container.getContainerTokenIdentifier(); + if (id != null && id.getExecutionType() == + ExecutionType.OPPORTUNISTIC) { + updateCgroupCpuWeight(cgroupId, getOpportunisticCpuWeight()); + } else { + updateCgroupCpuWeight(cgroupId, getCpuWeightByContainerVcores(containerVCores)); + } + if (strictResourceUsageMode) { + if (nodeVCores != containerVCores) { + float containerCPU = + (containerVCores * yarnProcessors) / (float) nodeVCores; + int[] limits = getOverallLimits(containerCPU); + updateCgroupMaxCpuLimit(cgroupId, String.valueOf(limits[1]), String.valueOf(limits[0])); + } + } + } catch (ResourceHandlerException re) { + cGroupsHandler.deleteCGroup(CPU, cgroupId); + LOG.warn("Could not update cgroup for container", re); + throw re; + } + } + return null; + } + + protected abstract int getOpportunisticCpuWeight(); + protected abstract int getCpuWeightByContainerVcores(int containerVcores); + protected abstract void updateCgroupCpuWeight(String cgroupId, int weight) + throws ResourceHandlerException; + + @Override + public List postComplete(ContainerId containerId) + throws ResourceHandlerException { + cGroupsHandler.deleteCGroup(CPU, containerId.toString()); + return null; + } + + @Override public List teardown() + throws ResourceHandlerException { + return null; + } + + @Override + public String toString() { + return AbstractCGroupsCpuResourceHandler.class.getName(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/AbstractCGroupsHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/AbstractCGroupsHandler.java index 272f04ce77..a8f528a209 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/AbstractCGroupsHandler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/AbstractCGroupsHandler.java @@ -414,9 +414,10 @@ private String getErrorWithDetails( public String createCGroup(CGroupController controller, String cGroupId) throws ResourceHandlerException { String path = getPathForCGroup(controller, cGroupId); + File cgroup = new File(path); LOG.debug("createCgroup: {}", path); - if (!new File(path).mkdir()) { + if (!cgroup.exists() && !cgroup.mkdir()) { throw new ResourceHandlerException("Failed to create cgroup at " + path); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsBlkioResourceHandlerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsBlkioResourceHandlerImpl.java index 865d2b19fd..b2829ae0f5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsBlkioResourceHandlerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsBlkioResourceHandlerImpl.java @@ -134,7 +134,7 @@ public List preStart(Container container) .createCGroup(CGroupsHandler.CGroupController.BLKIO, cgroupId); try { cGroupsHandler.updateCGroupParam(CGroupsHandler.CGroupController.BLKIO, - cgroupId, CGroupsHandler.CGROUP_PARAM_BLKIO_WEIGHT, DEFAULT_WEIGHT); + cgroupId, CGroupsHandler.CGROUP_PARAM_WEIGHT, DEFAULT_WEIGHT); } catch (ResourceHandlerException re) { cGroupsHandler.deleteCGroup(CGroupsHandler.CGroupController.BLKIO, cgroupId); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsCpuResourceHandlerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsCpuResourceHandlerImpl.java index f724b8803d..54686fddb2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsCpuResourceHandlerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsCpuResourceHandlerImpl.java @@ -18,28 +18,14 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources; -import org.apache.hadoop.classification.VisibleForTesting; import org.apache.commons.io.FileUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ExecutionType; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation; -import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils; -import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin; +import org.apache.hadoop.classification.VisibleForTesting; import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.List; /** * An implementation for using CGroups to restrict CPU usage on Linux. The @@ -58,208 +44,62 @@ */ @InterfaceStability.Unstable @InterfaceAudience.Private -public class CGroupsCpuResourceHandlerImpl implements CpuResourceHandler { - - static final Logger LOG = - LoggerFactory.getLogger(CGroupsCpuResourceHandlerImpl.class); - - private CGroupsHandler cGroupsHandler; - private boolean strictResourceUsageMode = false; - private float yarnProcessors; - private int nodeVCores; +public class CGroupsCpuResourceHandlerImpl extends AbstractCGroupsCpuResourceHandler { private static final CGroupsHandler.CGroupController CPU = CGroupsHandler.CGroupController.CPU; - @VisibleForTesting - static final int MAX_QUOTA_US = 1000 * 1000; - @VisibleForTesting - static final int MIN_PERIOD_US = 1000; @VisibleForTesting static final int CPU_DEFAULT_WEIGHT = 1024; // set by kernel static final int CPU_DEFAULT_WEIGHT_OPPORTUNISTIC = 2; + CGroupsCpuResourceHandlerImpl(CGroupsHandler cGroupsHandler) { - this.cGroupsHandler = cGroupsHandler; + super(cGroupsHandler); } @Override - public List bootstrap(Configuration conf) - throws ResourceHandlerException { - return bootstrap( - ResourceCalculatorPlugin.getResourceCalculatorPlugin(null, conf), conf); + protected void updateCgroupMaxCpuLimit(String cgroupId, String quota, String period) throws ResourceHandlerException { + if (quota != null) { + cGroupsHandler + .updateCGroupParam(CPU, cgroupId, CGroupsHandler.CGROUP_CPU_QUOTA_US, quota); + } + if (period != null) { + cGroupsHandler + .updateCGroupParam(CPU, cgroupId, CGroupsHandler.CGROUP_CPU_PERIOD_US, period); + } } - @VisibleForTesting - List bootstrap( - ResourceCalculatorPlugin plugin, Configuration conf) - throws ResourceHandlerException { - this.strictResourceUsageMode = conf.getBoolean( - YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE, - YarnConfiguration.DEFAULT_NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE); - this.cGroupsHandler.initializeCGroupController(CPU); - nodeVCores = NodeManagerHardwareUtils.getVCores(plugin, conf); + @Override + protected int getOpportunisticCpuWeight() { + return CPU_DEFAULT_WEIGHT_OPPORTUNISTIC; + } + protected int getCpuWeightByContainerVcores(int containerVCores) { + return containerVCores * CPU_DEFAULT_WEIGHT; + } - // cap overall usage to the number of cores allocated to YARN - yarnProcessors = NodeManagerHardwareUtils.getContainersCPUs(plugin, conf); - int systemProcessors = NodeManagerHardwareUtils.getNodeCPUs(plugin, conf); - boolean existingCpuLimits; + @Override + protected void updateCgroupCpuWeight(String cgroupId, int weight) throws ResourceHandlerException { + cGroupsHandler.updateCGroupParam(CPU, cgroupId, CGroupsHandler.CGROUP_CPU_SHARES, + String.valueOf(weight)); + } + + @Override + public boolean cpuLimitExists(String cgroupPath) throws ResourceHandlerException { try { - existingCpuLimits = - cpuLimitsExist(cGroupsHandler.getPathForCGroup(CPU, "")); - } catch (IOException ie) { - throw new ResourceHandlerException(ie); + return checkCgroupV1CPULimitExists(cgroupPath); + } catch (IOException e) { + throw new ResourceHandlerException("Failed to check CPU limit", e); } - if (systemProcessors != (int) yarnProcessors) { - LOG.info("YARN containers restricted to " + yarnProcessors + " cores"); - int[] limits = getOverallLimits(yarnProcessors); - cGroupsHandler - .updateCGroupParam(CPU, "", CGroupsHandler.CGROUP_CPU_PERIOD_US, - String.valueOf(limits[0])); - cGroupsHandler - .updateCGroupParam(CPU, "", CGroupsHandler.CGROUP_CPU_QUOTA_US, - String.valueOf(limits[1])); - } else if (existingCpuLimits) { - LOG.info("Removing CPU constraints for YARN containers."); - cGroupsHandler - .updateCGroupParam(CPU, "", CGroupsHandler.CGROUP_CPU_QUOTA_US, - String.valueOf(-1)); - } - return null; } @InterfaceAudience.Private - public static boolean cpuLimitsExist(String path) - throws IOException { + public static boolean checkCgroupV1CPULimitExists(String path) throws IOException { File quotaFile = new File(path, CPU.getName() + "." + CGroupsHandler.CGROUP_CPU_QUOTA_US); if (quotaFile.exists()) { String contents = FileUtils.readFileToString(quotaFile, StandardCharsets.UTF_8); - int quotaUS = Integer.parseInt(contents.trim()); - if (quotaUS != -1) { - return true; - } + return Integer.parseInt(contents.trim()) != -1; } return false; } - - @VisibleForTesting - @InterfaceAudience.Private - public static int[] getOverallLimits(float yarnProcessors) { - - int[] ret = new int[2]; - - if (yarnProcessors < 0.01f) { - throw new IllegalArgumentException("Number of processors can't be <= 0."); - } - - int quotaUS = MAX_QUOTA_US; - int periodUS = (int) (MAX_QUOTA_US / yarnProcessors); - if (yarnProcessors < 1.0f) { - periodUS = MAX_QUOTA_US; - quotaUS = (int) (periodUS * yarnProcessors); - if (quotaUS < MIN_PERIOD_US) { - LOG.warn("The quota calculated for the cgroup was too low." - + " The minimum value is " + MIN_PERIOD_US - + ", calculated value is " + quotaUS - + ". Setting quota to minimum value."); - quotaUS = MIN_PERIOD_US; - } - } - - // cfs_period_us can't be less than 1000 microseconds - // if the value of periodUS is less than 1000, we can't really use cgroups - // to limit cpu - if (periodUS < MIN_PERIOD_US) { - LOG.warn("The period calculated for the cgroup was too low." - + " The minimum value is " + MIN_PERIOD_US - + ", calculated value is " + periodUS - + ". Using all available CPU."); - periodUS = MAX_QUOTA_US; - quotaUS = -1; - } - - ret[0] = periodUS; - ret[1] = quotaUS; - return ret; - } - - @Override - public List preStart(Container container) - throws ResourceHandlerException { - String cgroupId = container.getContainerId().toString(); - cGroupsHandler.createCGroup(CPU, cgroupId); - updateContainer(container); - List ret = new ArrayList<>(); - ret.add(new PrivilegedOperation( - PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP, - PrivilegedOperation.CGROUP_ARG_PREFIX + cGroupsHandler - .getPathForCGroupTasks(CPU, cgroupId))); - return ret; - } - - @Override - public List reacquireContainer(ContainerId containerId) - throws ResourceHandlerException { - return null; - } - - @Override - public List updateContainer(Container container) - throws ResourceHandlerException { - Resource containerResource = container.getResource(); - String cgroupId = container.getContainerId().toString(); - File cgroup = new File(cGroupsHandler.getPathForCGroup(CPU, cgroupId)); - if (cgroup.exists()) { - try { - int containerVCores = containerResource.getVirtualCores(); - ContainerTokenIdentifier id = container.getContainerTokenIdentifier(); - if (id != null && id.getExecutionType() == - ExecutionType.OPPORTUNISTIC) { - cGroupsHandler - .updateCGroupParam(CPU, cgroupId, - CGroupsHandler.CGROUP_CPU_SHARES, - String.valueOf(CPU_DEFAULT_WEIGHT_OPPORTUNISTIC)); - } else { - int cpuShares = CPU_DEFAULT_WEIGHT * containerVCores; - cGroupsHandler - .updateCGroupParam(CPU, cgroupId, - CGroupsHandler.CGROUP_CPU_SHARES, - String.valueOf(cpuShares)); - } - if (strictResourceUsageMode) { - if (nodeVCores != containerVCores) { - float containerCPU = - (containerVCores * yarnProcessors) / (float) nodeVCores; - int[] limits = getOverallLimits(containerCPU); - cGroupsHandler.updateCGroupParam(CPU, cgroupId, - CGroupsHandler.CGROUP_CPU_PERIOD_US, String.valueOf(limits[0])); - cGroupsHandler.updateCGroupParam(CPU, cgroupId, - CGroupsHandler.CGROUP_CPU_QUOTA_US, String.valueOf(limits[1])); - } - } - } catch (ResourceHandlerException re) { - cGroupsHandler.deleteCGroup(CPU, cgroupId); - LOG.warn("Could not update cgroup for container", re); - throw re; - } - } - return null; - } - - @Override - public List postComplete(ContainerId containerId) - throws ResourceHandlerException { - cGroupsHandler.deleteCGroup(CPU, containerId.toString()); - return null; - } - - @Override public List teardown() - throws ResourceHandlerException { - return null; - } - - @Override - public String toString() { - return CGroupsCpuResourceHandlerImpl.class.getName(); - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandler.java index 73e1443a2e..b8b4b2b7e3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandler.java @@ -122,11 +122,12 @@ public static Set getValidV2CGroups() { // v2 specific params String CGROUP_CONTROLLERS_FILE = "cgroup.controllers"; String CGROUP_SUBTREE_CONTROL_FILE = "cgroup.subtree_control"; + String CGROUP_CPU_MAX = "max"; // present in v1 and v2 String CGROUP_PROCS_FILE = "cgroup.procs"; String CGROUP_PARAM_CLASSID = "classid"; - String CGROUP_PARAM_BLKIO_WEIGHT = "weight"; + String CGROUP_PARAM_WEIGHT = "weight"; /** * Mounts or initializes a cgroup controller. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsV2CpuResourceHandlerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsV2CpuResourceHandlerImpl.java new file mode 100644 index 0000000000..97456ad7d2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsV2CpuResourceHandlerImpl.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.classification.VisibleForTesting; + +/** + * An implementation for using CGroups V2 to restrict CPU usage on Linux. The + * implementation supports 3 different controls - restrict usage of all YARN + * containers, restrict relative usage of individual YARN containers and + * restrict usage of individual YARN containers. Admins can set the overall CPU + * to be used by all YARN containers - this is implemented by setting + * cpu.max to the value desired. If strict resource usage mode is not enabled, + * cpu.weight is set for individual containers - this prevents containers from + * exceeding the overall limit for YARN containers but individual containers + * can use as much of the CPU as available(under the YARN limit). If strict + * resource usage is enabled, then container can only use the percentage of + * CPU allocated to them and this is again implemented using cpu.max. + */ +@InterfaceStability.Unstable +@InterfaceAudience.Private +public class CGroupsV2CpuResourceHandlerImpl extends AbstractCGroupsCpuResourceHandler { + private static final CGroupsHandler.CGroupController CPU = + CGroupsHandler.CGroupController.CPU; + + @VisibleForTesting + static final int CPU_DEFAULT_WEIGHT = 100; // cgroup v2 default + static final int CPU_DEFAULT_WEIGHT_OPPORTUNISTIC = 1; + static final int CPU_MAX_WEIGHT = 10000; + static final String NO_LIMIT = "max"; + + + CGroupsV2CpuResourceHandlerImpl(CGroupsHandler cGroupsHandler) { + super(cGroupsHandler); + } + + @Override + protected void updateCgroupMaxCpuLimit(String cgroupId, String max, String period) + throws ResourceHandlerException { + // The cpu.max file in cgroup v2 is a read-write two value file which exists on + // non-root cgroups. The default is “max 100000”. + // It is the maximum bandwidth limit. It’s in the following format: + // $MAX $PERIOD + // which indicates that the group may consume up to $MAX in each $PERIOD duration. + // “max” for $MAX indicates no limit. If only one number is written, $MAX is updated. + String currentCpuMax = cGroupsHandler.getCGroupParam(CPU, cgroupId, + CGroupsHandler.CGROUP_CPU_MAX); + + if (currentCpuMax == null) { + currentCpuMax = ""; + } + + String[] currentCpuMaxArray = currentCpuMax.split(" "); + String maxToSet = max != null ? max : currentCpuMaxArray[0]; + maxToSet = maxToSet.equals("-1") ? NO_LIMIT : maxToSet; + String periodToSet = period != null ? period : currentCpuMaxArray[1]; + cGroupsHandler + .updateCGroupParam(CPU, cgroupId, CGroupsHandler.CGROUP_CPU_MAX, + maxToSet + " " + periodToSet); + } + + @Override + protected int getOpportunisticCpuWeight() { + return CPU_DEFAULT_WEIGHT_OPPORTUNISTIC; + } + protected int getCpuWeightByContainerVcores(int containerVCores) { + return Math.min(containerVCores * CPU_DEFAULT_WEIGHT, CPU_MAX_WEIGHT); + } + + @Override + protected void updateCgroupCpuWeight(String cgroupId, int weight) throws ResourceHandlerException { + cGroupsHandler.updateCGroupParam(CPU, cgroupId, CGroupsHandler.CGROUP_PARAM_WEIGHT, + String.valueOf(weight)); + } + + @Override + public boolean cpuLimitExists(String cgroupPath) throws ResourceHandlerException { + String globalCpuMaxLimit = cGroupsHandler.getCGroupParam(CPU, "", + CGroupsHandler.CGROUP_CPU_MAX); + if (globalCpuMaxLimit == null) { + return false; + } + String[] cpuMaxLimitArray = globalCpuMaxLimit.split(" "); + + return !cpuMaxLimitArray[0].equals(NO_LIMIT); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/CgroupsLCEResourcesHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/CgroupsLCEResourcesHandler.java index 037d4cf170..74d6f8a528 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/CgroupsLCEResourcesHandler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/CgroupsLCEResourcesHandler.java @@ -175,7 +175,7 @@ void init(LinuxContainerExecutor lce, ResourceCalculatorPlugin plugin) String.valueOf(limits[0])); updateCgroup(CONTROLLER_CPU, "", CPU_QUOTA_US, String.valueOf(limits[1])); - } else if (CGroupsCpuResourceHandlerImpl.cpuLimitsExist( + } else if (CGroupsCpuResourceHandlerImpl.checkCgroupV1CPULimitExists( pathForCgroup(CONTROLLER_CPU, ""))) { LOG.info("Removing CPU constraints for YARN containers."); updateCgroup(CONTROLLER_CPU, "", CPU_QUOTA_US, String.valueOf(-1)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupsBlkioResourceHandlerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupsBlkioResourceHandlerImpl.java index e8ec6fa83b..65a5c667b6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupsBlkioResourceHandlerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupsBlkioResourceHandlerImpl.java @@ -77,7 +77,7 @@ public void testPreStart() throws Exception { CGroupsHandler.CGroupController.BLKIO, id); verify(mockCGroupsHandler, times(1)).updateCGroupParam( CGroupsHandler.CGroupController.BLKIO, id, - CGroupsHandler.CGROUP_PARAM_BLKIO_WEIGHT, + CGroupsHandler.CGROUP_PARAM_WEIGHT, CGroupsBlkioResourceHandlerImpl.DEFAULT_WEIGHT); Assert.assertNotNull(ret); Assert.assertEquals(1, ret.size()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupsV2CpuResourceHandlerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupsV2CpuResourceHandlerImpl.java new file mode 100644 index 0000000000..1d77d8adc6 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupsV2CpuResourceHandlerImpl.java @@ -0,0 +1,313 @@ +/* + * * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * / + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation; +import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.List; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class TestCGroupsV2CpuResourceHandlerImpl { + + private CGroupsHandler mockCGroupsHandler; + private CGroupsV2CpuResourceHandlerImpl cGroupsCpuResourceHandler; + private ResourceCalculatorPlugin plugin; + final int numProcessors = 4; + + @Before + public void setup() { + mockCGroupsHandler = mock(CGroupsHandler.class); + when(mockCGroupsHandler.getPathForCGroup(any(), any())).thenReturn("."); + cGroupsCpuResourceHandler = + new CGroupsV2CpuResourceHandlerImpl(mockCGroupsHandler); + + plugin = mock(ResourceCalculatorPlugin.class); + Mockito.doReturn(numProcessors).when(plugin).getNumProcessors(); + Mockito.doReturn(numProcessors).when(plugin).getNumCores(); + } + + @Test + public void testBootstrap() throws Exception { + Configuration conf = new YarnConfiguration(); + + List ret = + cGroupsCpuResourceHandler.bootstrap(plugin, conf); + verify(mockCGroupsHandler, times(1)) + .initializeCGroupController(CGroupsHandler.CGroupController.CPU); + verify(mockCGroupsHandler, times(0)) + .updateCGroupParam(CGroupsHandler.CGroupController.CPU, "", + CGroupsHandler.CGROUP_CPU_MAX, ""); + Assert.assertNull(ret); + } + + @Test + public void testBootstrapLimits() throws Exception { + Configuration conf = new YarnConfiguration(); + + int cpuPerc = 80; + conf.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT, + cpuPerc); + int period = (CGroupsV2CpuResourceHandlerImpl.MAX_QUOTA_US * 100) / (cpuPerc + * numProcessors); + String cpuMaxValue = CGroupsV2CpuResourceHandlerImpl.MAX_QUOTA_US + " " + period; + List ret = + cGroupsCpuResourceHandler.bootstrap(plugin, conf); + verify(mockCGroupsHandler, times(1)) + .initializeCGroupController(CGroupsHandler.CGroupController.CPU); + verify(mockCGroupsHandler, times(1)) + .updateCGroupParam(CGroupsHandler.CGroupController.CPU, "", + CGroupsHandler.CGROUP_CPU_MAX, cpuMaxValue); + Assert.assertNull(ret); + } + + @Test + public void testBootstrapExistingLimits() throws Exception { + Configuration conf = new YarnConfiguration(); + + when(mockCGroupsHandler + .getCGroupParam(CGroupsHandler.CGroupController.CPU, "", + CGroupsHandler.CGROUP_CPU_MAX)) + .thenReturn("100 100000"); + + List ret = + cGroupsCpuResourceHandler.bootstrap(plugin, conf); + verify(mockCGroupsHandler, times(1)) + .initializeCGroupController(CGroupsHandler.CGroupController.CPU); + verify(mockCGroupsHandler, times(2)) + .getCGroupParam(CGroupsHandler.CGroupController.CPU, "", + CGroupsHandler.CGROUP_CPU_MAX); + verify(mockCGroupsHandler, times(1)) + .updateCGroupParam(CGroupsHandler.CGroupController.CPU, "", + CGroupsHandler.CGROUP_CPU_MAX, "max 100000"); + Assert.assertNull(ret); + } + + @Test + public void testPreStart() throws Exception { + String id = "container_01_01"; + String path = "test-path/" + id; + ContainerId mockContainerId = mock(ContainerId.class); + when(mockContainerId.toString()).thenReturn(id); + Container mockContainer = mock(Container.class); + when(mockContainer.getContainerId()).thenReturn(mockContainerId); + when(mockCGroupsHandler + .getPathForCGroupTasks(CGroupsHandler.CGroupController.CPU, id)) + .thenReturn(path); + when(mockContainer.getResource()).thenReturn(Resource.newInstance(1024, 2)); + + List ret = + cGroupsCpuResourceHandler.preStart(mockContainer); + verify(mockCGroupsHandler, times(1)) + .createCGroup(CGroupsHandler.CGroupController.CPU, id); + verify(mockCGroupsHandler, times(1)) + .updateCGroupParam(CGroupsHandler.CGroupController.CPU, id, + CGroupsHandler.CGROUP_PARAM_WEIGHT, String + .valueOf(CGroupsV2CpuResourceHandlerImpl.CPU_DEFAULT_WEIGHT * 2)); // 2 vcores + + // don't set cpu.max + verify(mockCGroupsHandler, never()) + .updateCGroupParam(eq(CGroupsHandler.CGroupController.CPU), eq(id), + eq(CGroupsHandler.CGROUP_CPU_MAX), anyString()); + + validatePrivilegedOperationList(ret, path); + } + + @Test + public void testPreStartStrictUsage() throws Exception { + String id = "container_01_01"; + String path = "test-path/" + id; + ContainerId mockContainerId = mock(ContainerId.class); + when(mockContainerId.toString()).thenReturn(id); + Container mockContainer = mock(Container.class); + when(mockContainer.getContainerId()).thenReturn(mockContainerId); + when(mockCGroupsHandler + .getPathForCGroupTasks(CGroupsHandler.CGroupController.CPU, id)) + .thenReturn(path); + when(mockContainer.getResource()).thenReturn(Resource.newInstance(1024, 1)); + Configuration conf = new YarnConfiguration(); + conf.setBoolean( + YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE, + true); + + cGroupsCpuResourceHandler.bootstrap(plugin, conf); + + int defaultVCores = 8; + float share = (float) numProcessors / (float) defaultVCores; + List ret = + cGroupsCpuResourceHandler.preStart(mockContainer); + + verify(mockCGroupsHandler, times(1)) + .createCGroup(CGroupsHandler.CGroupController.CPU, id); + verify(mockCGroupsHandler, times(1)) + .updateCGroupParam(CGroupsHandler.CGroupController.CPU, id, + CGroupsHandler.CGROUP_PARAM_WEIGHT, + String.valueOf(CGroupsV2CpuResourceHandlerImpl.CPU_DEFAULT_WEIGHT)); + + // set quota and period + String cpuMaxValue = (int) (CGroupsV2CpuResourceHandlerImpl.MAX_QUOTA_US * share) + + " " + CGroupsV2CpuResourceHandlerImpl.MAX_QUOTA_US; + verify(mockCGroupsHandler, times(1)) + .updateCGroupParam(CGroupsHandler.CGroupController.CPU, id, + CGroupsHandler.CGROUP_CPU_MAX, cpuMaxValue); + + validatePrivilegedOperationList(ret, path); + } + + @Test + public void testPreStartRestrictedContainers() throws Exception { + String id = "container_01_01"; + String path = "test-path/" + id; + int defaultVCores = 8; + Configuration conf = new YarnConfiguration(); + conf.setBoolean( + YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE, + true); + int cpuPerc = 75; + conf.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT, + cpuPerc); + + cGroupsCpuResourceHandler.bootstrap(plugin, conf); + + String maxCpuLimit = CGroupsV2CpuResourceHandlerImpl.MAX_QUOTA_US + " " + + CGroupsV2CpuResourceHandlerImpl.MAX_QUOTA_US * 100 / (cpuPerc * numProcessors); + verify(mockCGroupsHandler, times(1)) + .updateCGroupParam(CGroupsHandler.CGroupController.CPU, "", + CGroupsHandler.CGROUP_CPU_MAX, maxCpuLimit); + + float yarnCores = (float) (cpuPerc * numProcessors) / 100; + int[] containerVCores = { 2, 4 }; + for (int cVcores : containerVCores) { + ContainerId mockContainerId = mock(ContainerId.class); + when(mockContainerId.toString()).thenReturn(id); + Container mockContainer = mock(Container.class); + when(mockContainer.getContainerId()).thenReturn(mockContainerId); + when(mockCGroupsHandler + .getPathForCGroupTasks(CGroupsHandler.CGroupController.CPU, id)) + .thenReturn(path); + when(mockContainer.getResource()) + .thenReturn(Resource.newInstance(1024, cVcores)); + when(mockCGroupsHandler + .getPathForCGroupTasks(CGroupsHandler.CGroupController.CPU, id)) + .thenReturn(path); + + float share = (cVcores * yarnCores) / defaultVCores; + int quotaUS; + int periodUS; + if (share > 1.0f) { + quotaUS = CGroupsV2CpuResourceHandlerImpl.MAX_QUOTA_US; + periodUS = + (int) ((float) CGroupsV2CpuResourceHandlerImpl.MAX_QUOTA_US / share); + } else { + quotaUS = (int) (CGroupsV2CpuResourceHandlerImpl.MAX_QUOTA_US * share); + periodUS = CGroupsV2CpuResourceHandlerImpl.MAX_QUOTA_US; + } + + cGroupsCpuResourceHandler.preStart(mockContainer); + + verify(mockCGroupsHandler, times(1)) + .updateCGroupParam(CGroupsHandler.CGroupController.CPU, id, + CGroupsHandler.CGROUP_PARAM_WEIGHT, String.valueOf( + CGroupsV2CpuResourceHandlerImpl.CPU_DEFAULT_WEIGHT * cVcores)); + + // set cpu.max + verify(mockCGroupsHandler, times(1)) + .updateCGroupParam(CGroupsHandler.CGroupController.CPU, id, + CGroupsHandler.CGROUP_CPU_MAX, quotaUS + " " + periodUS); + } + } + + @Test + public void testReacquireContainer() throws Exception { + ContainerId containerIdMock = mock(ContainerId.class); + Assert.assertNull( + cGroupsCpuResourceHandler.reacquireContainer(containerIdMock)); + } + + @Test + public void testPostComplete() throws Exception { + String id = "container_01_01"; + ContainerId mockContainerId = mock(ContainerId.class); + when(mockContainerId.toString()).thenReturn(id); + Assert.assertNull(cGroupsCpuResourceHandler.postComplete(mockContainerId)); + verify(mockCGroupsHandler, times(1)) + .deleteCGroup(CGroupsHandler.CGroupController.CPU, id); + } + + @Test + public void testTeardown() throws Exception { + Assert.assertNull(cGroupsCpuResourceHandler.teardown()); + } + + @Test + public void testOpportunistic() throws Exception { + Configuration conf = new YarnConfiguration(); + + cGroupsCpuResourceHandler.bootstrap(plugin, conf); + + ContainerTokenIdentifier tokenId = mock(ContainerTokenIdentifier.class); + when(tokenId.getExecutionType()).thenReturn(ExecutionType.OPPORTUNISTIC); + Container container = mock(Container.class); + String id = "container_01_01"; + ContainerId mockContainerId = mock(ContainerId.class); + when(mockContainerId.toString()).thenReturn(id); + when(container.getContainerId()).thenReturn(mockContainerId); + when(container.getContainerTokenIdentifier()).thenReturn(tokenId); + when(container.getResource()).thenReturn(Resource.newInstance(1024, 2)); + + cGroupsCpuResourceHandler.preStart(container); + verify(mockCGroupsHandler, times(1)) + .updateCGroupParam(CGroupsHandler.CGroupController.CPU, id, + CGroupsHandler.CGROUP_PARAM_WEIGHT, String.valueOf( + CGroupsV2CpuResourceHandlerImpl.CPU_DEFAULT_WEIGHT_OPPORTUNISTIC)); + } + + private void validatePrivilegedOperationList(List ops, String path) { + Assert.assertNotNull(ops); + Assert.assertEquals(1, ops.size()); + PrivilegedOperation op = ops.get(0); + Assert.assertEquals(PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP, + op.getOperationType()); + List args = op.getArguments(); + Assert.assertEquals(1, args.size()); + Assert.assertEquals(PrivilegedOperation.CGROUP_ARG_PREFIX + path, + args.get(0)); + } +}