From a100be685cc4521e9949589948219231aa5d2733 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Wed, 22 Apr 2015 17:26:13 -0700 Subject: [PATCH] YARN-3366. Enhanced NodeManager to support classifying/shaping outgoing network bandwidth traffic originating from YARN containers Contributed by Sidharta Seethana. --- hadoop-yarn-project/CHANGES.txt | 4 + .../hadoop/yarn/conf/YarnConfiguration.java | 38 +- .../nodemanager/LinuxContainerExecutor.java | 100 ++- .../OutboundBandwidthResourceHandler.java | 29 + .../resources/ResourceHandlerModule.java | 128 ++++ .../TrafficControlBandwidthHandlerImpl.java | 281 ++++++++ .../linux/resources/TrafficController.java | 650 ++++++++++++++++++ .../resources/TestResourceHandlerModule.java | 78 +++ ...estTrafficControlBandwidthHandlerImpl.java | 231 +++++++ .../resources/TestTrafficController.java | 327 +++++++++ 10 files changed, 1864 insertions(+), 2 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/OutboundBandwidthResourceHandler.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerModule.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TrafficControlBandwidthHandlerImpl.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TrafficController.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestResourceHandlerModule.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestTrafficControlBandwidthHandlerImpl.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestTrafficController.java diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 975db66d5c..21ef32d5c5 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -96,6 +96,10 @@ Release 2.8.0 - UNRELEASED YARN-3225. New parameter of CLI for decommissioning node gracefully in RMAdmin CLI. (Devaraj K via junping_du) + YARN-3366. Enhanced NodeManager to support classifying/shaping outgoing + network bandwidth traffic originating from YARN containers (Sidharta Seethana + via vinodkv) + IMPROVEMENTS YARN-1880. Cleanup TestApplicationClientProtocolOnHA diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 253ae08be9..a7f485d01c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -822,7 +822,43 @@ private static void addDeprecatedKeys() { NM_PREFIX + "resource.percentage-physical-cpu-limit"; public static final int DEFAULT_NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT = 100; - + + + public static final String NM_NETWORK_RESOURCE_PREFIX = NM_PREFIX + "resource.network."; + + /** This setting controls if resource handling for network bandwidth is enabled **/ + /* Work in progress: This configuration parameter may be changed/removed in the future */ + @Private + public static final String NM_NETWORK_RESOURCE_ENABLED = + NM_NETWORK_RESOURCE_PREFIX + "enabled"; + /** Network as a resource is disabled by default **/ + @Private + public static final boolean DEFAULT_NM_NETWORK_RESOURCE_ENABLED = false; + + /** Specifies the interface to be used for applying network throttling rules **/ + /* Work in progress: This configuration parameter may be changed/removed in the future */ + @Private + public static final String NM_NETWORK_RESOURCE_INTERFACE = + NM_NETWORK_RESOURCE_PREFIX + "interface"; + @Private + public static final String DEFAULT_NM_NETWORK_RESOURCE_INTERFACE = "eth0"; + + /** Specifies the total available outbound bandwidth on the node **/ + /* Work in progress: This configuration parameter may be changed/removed in the future */ + @Private + public static final String NM_NETWORK_RESOURCE_OUTBOUND_BANDWIDTH_MBIT = + NM_NETWORK_RESOURCE_PREFIX + "outbound-bandwidth-mbit"; + @Private + public static final int DEFAULT_NM_NETWORK_RESOURCE_OUTBOUND_BANDWIDTH_MBIT = 1000; + + /** Specifies the total outbound bandwidth available to YARN containers. defaults to + * NM_NETWORK_RESOURCE_OUTBOUND_BANDWIDTH_MBIT if not specified. + */ + /* Work in progress: This configuration parameter may be changed/removed in the future */ + @Private + public static final String NM_NETWORK_RESOURCE_OUTBOUND_BANDWIDTH_YARN_MBIT = + NM_NETWORK_RESOURCE_PREFIX + "outbound-bandwidth-yarn-mbit"; + /** NM Webapp address.**/ public static final String NM_WEBAPP_ADDRESS = NM_PREFIX + "webapp.address"; public static final int DEFAULT_NM_WEBAPP_PORT = 8042; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java index d6e6894974..f8da958543 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java @@ -43,7 +43,13 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationException; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandler; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerModule; import org.apache.hadoop.yarn.server.nodemanager.util.DefaultLCEResourcesHandler; import org.apache.hadoop.yarn.server.nodemanager.util.LCEResourcesHandler; import org.apache.hadoop.yarn.util.ConverterUtils; @@ -60,6 +66,7 @@ public class LinuxContainerExecutor extends ContainerExecutor { private boolean containerSchedPriorityIsSet = false; private int containerSchedPriorityAdjustment = 0; private boolean containerLimitUsers; + private ResourceHandler resourceHandlerChain; @Override public void setConf(Configuration conf) { @@ -189,7 +196,20 @@ public void init() throws IOException { throw new IOException("Linux container executor not configured properly" + " (error=" + exitCode + ")", e); } - + + try { + Configuration conf = super.getConf(); + + resourceHandlerChain = ResourceHandlerModule + .getConfiguredResourceHandlerChain(conf); + if (resourceHandlerChain != null) { + resourceHandlerChain.bootstrap(conf); + } + } catch (ResourceHandlerException e) { + LOG.error("Failed to bootstrap configured resource subsystems! ", e); + throw new IOException("Failed to bootstrap configured resource subsystems!"); + } + resourcesHandler.init(this); } @@ -268,6 +288,51 @@ public int launchContainer(Container container, container.getResource()); String resourcesOptions = resourcesHandler.getResourcesOption( containerId); + String tcCommandFile = null; + + try { + if (resourceHandlerChain != null) { + List ops = resourceHandlerChain + .preStart(container); + + if (ops != null) { + List resourceOps = new ArrayList<>(); + + resourceOps.add(new PrivilegedOperation + (PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP, + resourcesOptions)); + + for (PrivilegedOperation op : ops) { + switch (op.getOperationType()) { + case ADD_PID_TO_CGROUP: + resourceOps.add(op); + break; + case TC_MODIFY_STATE: + tcCommandFile = op.getArguments().get(0); + break; + default: + LOG.warn("PrivilegedOperation type unsupported in launch: " + + op.getOperationType()); + } + } + + if (resourceOps.size() > 1) { + //squash resource operations + try { + PrivilegedOperation operation = PrivilegedOperationExecutor + .squashCGroupOperations(resourceOps); + resourcesOptions = operation.getArguments().get(0); + } catch (PrivilegedOperationException e) { + LOG.error("Failed to squash cgroup operations!", e); + throw new ResourceHandlerException("Failed to squash cgroup operations!"); + } + } + } + } + } catch (ResourceHandlerException e) { + LOG.error("ResourceHandlerChain.preStart() failed!", e); + throw new IOException("ResourceHandlerChain.preStart() failed!"); + } ShellCommandExecutor shExec = null; @@ -286,6 +351,11 @@ public int launchContainer(Container container, StringUtils.join(",", localDirs), StringUtils.join(",", logDirs), resourcesOptions)); + + if (tcCommandFile != null) { + command.add(tcCommandFile); + } + String[] commandArray = command.toArray(new String[command.size()]); shExec = new ShellCommandExecutor(commandArray, null, // NM's cwd container.getLaunchContext().getEnvironment()); // sanitized env @@ -334,6 +404,15 @@ public int launchContainer(Container container, return exitCode; } finally { resourcesHandler.postExecute(containerId); + + try { + if (resourceHandlerChain != null) { + resourceHandlerChain.postComplete(containerId); + } + } catch (ResourceHandlerException e) { + LOG.warn("ResourceHandlerChain.postComplete failed for " + + "containerId: " + containerId + ". Exception: " + e); + } } if (LOG.isDebugEnabled()) { LOG.debug("Output from LinuxContainerExecutor's launchContainer follows:"); @@ -346,9 +425,28 @@ public int launchContainer(Container container, public int reacquireContainer(String user, ContainerId containerId) throws IOException, InterruptedException { try { + //Resource handler chain needs to reacquire container state + //as well + if (resourceHandlerChain != null) { + try { + resourceHandlerChain.reacquireContainer(containerId); + } catch (ResourceHandlerException e) { + LOG.warn("ResourceHandlerChain.reacquireContainer failed for " + + "containerId: " + containerId + " Exception: " + e); + } + } + return super.reacquireContainer(user, containerId); } finally { resourcesHandler.postExecute(containerId); + if (resourceHandlerChain != null) { + try { + resourceHandlerChain.postComplete(containerId); + } catch (ResourceHandlerException e) { + LOG.warn("ResourceHandlerChain.postComplete failed for " + + "containerId: " + containerId + " Exception: " + e); + } + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/OutboundBandwidthResourceHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/OutboundBandwidthResourceHandler.java new file mode 100644 index 0000000000..c814c89be1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/OutboundBandwidthResourceHandler.java @@ -0,0 +1,29 @@ +/* + * * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * / + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +@InterfaceAudience.Private +@InterfaceStability.Unstable +public interface OutboundBandwidthResourceHandler extends ResourceHandler { +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerModule.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerModule.java new file mode 100644 index 0000000000..30fc951def --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerModule.java @@ -0,0 +1,128 @@ +/* + * * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * / + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor; + +import java.util.ArrayList; +import java.util.List; + +/** + * Provides mechanisms to get various resource handlers - cpu, memory, network, + * disk etc., - based on configuration + */ + +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class ResourceHandlerModule { + private volatile static ResourceHandlerChain resourceHandlerChain; + + /** + * This specific implementation might provide resource management as well + * as resource metrics functionality. We need to ensure that the same + * instance is used for both. + */ + private volatile static TrafficControlBandwidthHandlerImpl + trafficControlBandwidthHandler; + private volatile static CGroupsHandler cGroupsHandler; + + /** + * Returns an initialized, thread-safe CGroupsHandler instance + */ + public static CGroupsHandler getCGroupsHandler(Configuration conf) + throws ResourceHandlerException { + if (cGroupsHandler == null) { + synchronized (CGroupsHandler.class) { + if (cGroupsHandler == null) { + cGroupsHandler = new CGroupsHandlerImpl(conf, + PrivilegedOperationExecutor.getInstance(conf)); + } + } + } + + return cGroupsHandler; + } + + private static TrafficControlBandwidthHandlerImpl + getTrafficControlBandwidthHandler(Configuration conf) + throws ResourceHandlerException { + if (conf.getBoolean(YarnConfiguration.NM_NETWORK_RESOURCE_ENABLED, + YarnConfiguration.DEFAULT_NM_NETWORK_RESOURCE_ENABLED)) { + if (trafficControlBandwidthHandler == null) { + synchronized (OutboundBandwidthResourceHandler.class) { + if (trafficControlBandwidthHandler == null) { + trafficControlBandwidthHandler = new + TrafficControlBandwidthHandlerImpl(PrivilegedOperationExecutor + .getInstance(conf), getCGroupsHandler(conf), + new TrafficController(conf, PrivilegedOperationExecutor + .getInstance(conf))); + } + } + } + + return trafficControlBandwidthHandler; + } else { + return null; + } + } + + public static OutboundBandwidthResourceHandler + getOutboundBandwidthResourceHandler(Configuration conf) + throws ResourceHandlerException { + return getTrafficControlBandwidthHandler(conf); + } + + private static void addHandlerIfNotNull(List handlerList, + ResourceHandler handler) { + if (handler != null) { + handlerList.add(handler); + } + } + + private static void initializeConfiguredResourceHandlerChain( + Configuration conf) throws ResourceHandlerException { + ArrayList handlerList = new ArrayList<>(); + + addHandlerIfNotNull(handlerList, getOutboundBandwidthResourceHandler(conf)); + resourceHandlerChain = new ResourceHandlerChain(handlerList); + } + + public static ResourceHandlerChain getConfiguredResourceHandlerChain + (Configuration conf) throws ResourceHandlerException { + if (resourceHandlerChain == null) { + synchronized (ResourceHandlerModule.class) { + if (resourceHandlerChain == null) { + initializeConfiguredResourceHandlerChain(conf); + } + } + } + + if (resourceHandlerChain.getResourceHandlerList().size() != 0) { + return resourceHandlerChain; + } else { + return null; + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TrafficControlBandwidthHandlerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TrafficControlBandwidthHandlerImpl.java new file mode 100644 index 0000000000..a0327a2730 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TrafficControlBandwidthHandlerImpl.java @@ -0,0 +1,281 @@ +/* + * * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * / + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationException; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor; +import org.apache.hadoop.yarn.util.SystemClock; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class TrafficControlBandwidthHandlerImpl + implements OutboundBandwidthResourceHandler { + + private static final Log LOG = LogFactory + .getLog(TrafficControlBandwidthHandlerImpl.class); + //In the absence of 'scheduling' support, we'll 'infer' the guaranteed + //outbound bandwidth for each container based on this number. This will + //likely go away once we add support on the RM for this resource type. + private static final int MAX_CONTAINER_COUNT = 50; + + private final PrivilegedOperationExecutor privilegedOperationExecutor; + private final CGroupsHandler cGroupsHandler; + private final TrafficController trafficController; + private final ConcurrentHashMap containerIdClassIdMap; + + private Configuration conf; + private String device; + private boolean strictMode; + private int containerBandwidthMbit; + private int rootBandwidthMbit; + private int yarnBandwidthMbit; + + public TrafficControlBandwidthHandlerImpl(PrivilegedOperationExecutor + privilegedOperationExecutor, CGroupsHandler cGroupsHandler, + TrafficController trafficController) { + this.privilegedOperationExecutor = privilegedOperationExecutor; + this.cGroupsHandler = cGroupsHandler; + this.trafficController = trafficController; + this.containerIdClassIdMap = new ConcurrentHashMap<>(); + } + + /** + * Bootstrapping 'outbound-bandwidth' resource handler - mounts net_cls + * controller and bootstraps a traffic control bandwidth shaping hierarchy + * @param configuration yarn configuration in use + * @return (potentially empty) list of privileged operations to execute. + * @throws ResourceHandlerException + */ + + @Override + public List bootstrap(Configuration configuration) + throws ResourceHandlerException { + conf = configuration; + //We'll do this inline for the time being - since this is a one time + //operation. At some point, LCE code can be refactored to batch mount + //operations across multiple controllers - cpu, net_cls, blkio etc + cGroupsHandler + .mountCGroupController(CGroupsHandler.CGroupController.NET_CLS); + device = conf.get(YarnConfiguration.NM_NETWORK_RESOURCE_INTERFACE, + YarnConfiguration.DEFAULT_NM_NETWORK_RESOURCE_INTERFACE); + strictMode = configuration.getBoolean(YarnConfiguration + .NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE, YarnConfiguration + .DEFAULT_NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE); + rootBandwidthMbit = conf.getInt(YarnConfiguration + .NM_NETWORK_RESOURCE_OUTBOUND_BANDWIDTH_MBIT, YarnConfiguration + .DEFAULT_NM_NETWORK_RESOURCE_OUTBOUND_BANDWIDTH_MBIT); + yarnBandwidthMbit = conf.getInt(YarnConfiguration + .NM_NETWORK_RESOURCE_OUTBOUND_BANDWIDTH_YARN_MBIT, rootBandwidthMbit); + containerBandwidthMbit = (int) Math.ceil((double) yarnBandwidthMbit / + MAX_CONTAINER_COUNT); + + StringBuffer logLine = new StringBuffer("strict mode is set to :") + .append(strictMode).append(System.lineSeparator()); + + if (strictMode) { + logLine.append("container bandwidth will be capped to soft limit.") + .append(System.lineSeparator()); + } else { + logLine.append( + "containers will be allowed to use spare YARN bandwidth.") + .append(System.lineSeparator()); + } + + logLine + .append("containerBandwidthMbit soft limit (in mbit/sec) is set to : ") + .append(containerBandwidthMbit); + + LOG.info(logLine); + trafficController.bootstrap(device, rootBandwidthMbit, yarnBandwidthMbit); + + return null; + } + + /** + * Pre-start hook for 'outbound-bandwidth' resource. A cgroup is created + * and a net_cls classid is generated and written to a cgroup file. A + * traffic control shaping rule is created in order to limit outbound + * bandwidth utilization. + * @param container Container being launched + * @return privileged operations for some cgroups/tc operations. + * @throws ResourceHandlerException + */ + @Override + public List preStart(Container container) + throws ResourceHandlerException { + String containerIdStr = container.getContainerId().toString(); + int classId = trafficController.getNextClassId(); + String classIdStr = trafficController.getStringForNetClsClassId(classId); + + cGroupsHandler.createCGroup(CGroupsHandler.CGroupController + .NET_CLS, + containerIdStr); + cGroupsHandler.updateCGroupParam(CGroupsHandler.CGroupController.NET_CLS, + containerIdStr, CGroupsHandler.CGROUP_PARAM_CLASSID, classIdStr); + containerIdClassIdMap.put(container.getContainerId(), classId); + + //Now create a privileged operation in order to update the tasks file with + //the pid of the running container process (root of process tree). This can + //only be done at the time of launching the container, in a privileged + //executable. + String tasksFile = cGroupsHandler.getPathForCGroupTasks( + CGroupsHandler.CGroupController.NET_CLS, containerIdStr); + String opArg = new StringBuffer(PrivilegedOperation.CGROUP_ARG_PREFIX) + .append(tasksFile).toString(); + List ops = new ArrayList<>(); + + ops.add(new PrivilegedOperation( + PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP, opArg)); + + //Create a privileged operation to create a tc rule for this container + //We'll return this to the calling (Linux) Container Executor + //implementation for batching optimizations so that we don't fork/exec + //additional times during container launch. + TrafficController.BatchBuilder builder = trafficController.new + BatchBuilder(PrivilegedOperation.OperationType.TC_MODIFY_STATE); + + builder.addContainerClass(classId, containerBandwidthMbit, strictMode); + ops.add(builder.commitBatchToTempFile()); + + return ops; + } + + /** + * Reacquires state for a container - reads the classid from the cgroup + * being used for the container being reacquired + * @param containerId if of the container being reacquired. + * @return (potentially empty) list of privileged operations + * @throws ResourceHandlerException + */ + + @Override + public List reacquireContainer(ContainerId containerId) + throws ResourceHandlerException { + String containerIdStr = containerId.toString(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Attempting to reacquire classId for container: " + + containerIdStr); + } + + String classIdStrFromFile = cGroupsHandler.getCGroupParam( + CGroupsHandler.CGroupController.NET_CLS, containerIdStr, + CGroupsHandler.CGROUP_PARAM_CLASSID); + int classId = trafficController + .getClassIdFromFileContents(classIdStrFromFile); + + LOG.info("Reacquired containerId -> classId mapping: " + containerIdStr + + " -> " + classId); + containerIdClassIdMap.put(containerId, classId); + + return null; + } + + /** + * Returns total bytes sent per container to be used for metrics tracking + * purposes. + * @return a map of containerId to bytes sent + * @throws ResourceHandlerException + */ + public Map getBytesSentPerContainer() + throws ResourceHandlerException { + Map classIdStats = trafficController.readStats(); + Map containerIdStats = new HashMap<>(); + + for (Map.Entry entry : containerIdClassIdMap + .entrySet()) { + ContainerId containerId = entry.getKey(); + Integer classId = entry.getValue(); + Integer bytesSent = classIdStats.get(classId); + + if (bytesSent == null) { + LOG.warn("No bytes sent metric found for container: " + containerId + + " with classId: " + classId); + continue; + } + containerIdStats.put(containerId, bytesSent); + } + + return containerIdStats; + } + + /** + * Cleanup operations once container is completed - deletes cgroup and + * removes traffic shaping rule(s). + * @param containerId of the container that was completed. + * @return + * @throws ResourceHandlerException + */ + @Override + public List postComplete(ContainerId containerId) + throws ResourceHandlerException { + LOG.info("postComplete for container: " + containerId.toString()); + cGroupsHandler.deleteCGroup(CGroupsHandler.CGroupController.NET_CLS, + containerId.toString()); + + Integer classId = containerIdClassIdMap.get(containerId); + + if (classId != null) { + PrivilegedOperation op = trafficController.new + BatchBuilder(PrivilegedOperation.OperationType.TC_MODIFY_STATE) + .deleteContainerClass(classId).commitBatchToTempFile(); + + try { + privilegedOperationExecutor.executePrivilegedOperation(op, false); + trafficController.releaseClassId(classId); + } catch (PrivilegedOperationException e) { + LOG.warn("Failed to delete tc rule for classId: " + classId); + throw new ResourceHandlerException( + "Failed to delete tc rule for classId:" + classId); + } + } else { + LOG.warn("Not cleaning up tc rules. classId unknown for container: " + + containerId.toString()); + } + + return null; + } + + @Override + public List teardown() + throws ResourceHandlerException { + if (LOG.isDebugEnabled()) { + LOG.debug("teardown(): Nothing to do"); + } + + return null; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TrafficController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TrafficController.java new file mode 100644 index 0000000000..e33cea45bf --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TrafficController.java @@ -0,0 +1,650 @@ +/* + * * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * / + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationException; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor; + +import java.io.*; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Wrapper around the 'tc' tool. Provides access to a very specific subset of + * the functionality provided by the tc tool. + */ + +@InterfaceAudience.Private +@InterfaceStability.Unstable class TrafficController { + private static final Log LOG = LogFactory.getLog(TrafficController.class); + private static final int ROOT_QDISC_HANDLE = 42; + private static final int ZERO_CLASS_ID = 0; + private static final int ROOT_CLASS_ID = 1; + /** Traffic shaping class used for all unclassified traffic */ + private static final int DEFAULT_CLASS_ID = 2; + /** Traffic shaping class used for all YARN traffic */ + private static final int YARN_ROOT_CLASS_ID = 3; + /** Classes 0-3 are used already. We need to ensure that container classes + * do not collide with these classids. + */ + private static final int MIN_CONTAINER_CLASS_ID = 4; + /** This is the number of distinct (container) traffic shaping classes + * that are supported */ + private static final int MAX_CONTAINER_CLASSES = 1024; + + private static final String MBIT_SUFFIX = "mbit"; + private static final String TMP_FILE_PREFIX = "tc."; + private static final String TMP_FILE_SUFFIX = ".cmds"; + + /** Root queuing discipline attached to the root of the interface */ + private static final String FORMAT_QDISC_ADD_TO_ROOT_WITH_DEFAULT = + "qdisc add dev %s root handle %d: htb default %s"; + /** Specifies a cgroup/classid based filter - based on the classid associated + * with the outbound packet, the corresponding traffic shaping rule is used + * . Please see tc documentation for additional details. + */ + private static final String FORMAT_FILTER_CGROUP_ADD_TO_PARENT = + "filter add dev %s parent %d: protocol ip prio 10 handle 1: cgroup"; + /** Standard format for adding a traffic shaping class to a parent, with + * the specified bandwidth limits + */ + private static final String FORMAT_CLASS_ADD_TO_PARENT_WITH_RATES = + "class add dev %s parent %d:%d classid %d:%d htb rate %s ceil %s"; + /** Standard format to delete a traffic shaping class */ + private static final String FORMAT_DELETE_CLASS = + "class del dev %s classid %d:%d"; + /** Format of the classid that is to be used with the net_cls cgroup. Needs + * to be of the form 0xAAAABBBB */ + private static final String FORMAT_NET_CLS_CLASS_ID = "0x%04d%04d"; + /** Commands to read the qdsic(s)/filter(s)/class(es) associated with an + * interface + */ + private static final String FORMAT_READ_STATE = + "qdisc show dev %1$s%n" + + "filter show dev %1$s%n" + + "class show dev %1$s"; + private static final String FORMAT_READ_CLASSES = "class show dev %s"; + /** Delete a qdisc and all its children - classes/filters etc */ + private static final String FORMAT_WIPE_STATE = + "qdisc del dev %s parent root"; + + private final Configuration conf; + //Used to store the set of classids in use for container classes + private final BitSet classIdSet; + private final PrivilegedOperationExecutor privilegedOperationExecutor; + + private String tmpDirPath; + private String device; + private int rootBandwidthMbit; + private int yarnBandwidthMbit; + private int defaultClassBandwidthMbit; + + TrafficController(Configuration conf, PrivilegedOperationExecutor exec) { + this.conf = conf; + this.classIdSet = new BitSet(MAX_CONTAINER_CLASSES); + this.privilegedOperationExecutor = exec; + } + + /** + * Bootstrap tc configuration + */ + public void bootstrap(String device, int rootBandwidthMbit, int + yarnBandwidthMbit) + throws ResourceHandlerException { + if (device == null) { + throw new ResourceHandlerException("device cannot be null!"); + } + + String tmpDirBase = conf.get("hadoop.tmp.dir"); + if (tmpDirBase == null) { + throw new ResourceHandlerException("hadoop.tmp.dir not set!"); + } + tmpDirPath = tmpDirBase + "/nm-tc-rules"; + + File tmpDir = new File(tmpDirPath); + if (!(tmpDir.exists() || tmpDir.mkdirs())) { + LOG.warn("Unable to create directory: " + tmpDirPath); + throw new ResourceHandlerException("Unable to create directory: " + + tmpDirPath); + } + + this.device = device; + this.rootBandwidthMbit = rootBandwidthMbit; + this.yarnBandwidthMbit = yarnBandwidthMbit; + defaultClassBandwidthMbit = (rootBandwidthMbit - yarnBandwidthMbit) <= 0 + ? rootBandwidthMbit : (rootBandwidthMbit - yarnBandwidthMbit); + + boolean recoveryEnabled = conf.getBoolean(YarnConfiguration + .NM_RECOVERY_ENABLED, YarnConfiguration.DEFAULT_NM_RECOVERY_ENABLED); + String state = null; + + if (!recoveryEnabled) { + LOG.info("NM recovery is not enabled. We'll wipe tc state before proceeding."); + } else { + //NM recovery enabled - run a state check + state = readState(); + if (checkIfAlreadyBootstrapped(state)) { + LOG.info("TC configuration is already in place. Not wiping state."); + + //We already have the list of existing container classes, if any + //that were created after bootstrapping + reacquireContainerClasses(state); + return; + } else { + LOG.info("TC configuration is incomplete. Wiping tc state before proceeding"); + } + } + + wipeState(); //start over in case preview bootstrap was incomplete + initializeState(); + } + + private void initializeState() throws ResourceHandlerException { + LOG.info("Initializing tc state."); + + BatchBuilder builder = new BatchBuilder(PrivilegedOperation. + OperationType.TC_MODIFY_STATE) + .addRootQDisc() + .addCGroupFilter() + .addClassToRootQDisc(rootBandwidthMbit) + .addDefaultClass(defaultClassBandwidthMbit, rootBandwidthMbit) + //yarn bandwidth is capped with rate = ceil + .addYARNRootClass(yarnBandwidthMbit, yarnBandwidthMbit); + PrivilegedOperation op = builder.commitBatchToTempFile(); + + try { + privilegedOperationExecutor.executePrivilegedOperation(op, false); + } catch (PrivilegedOperationException e) { + LOG.warn("Failed to bootstrap outbound bandwidth configuration"); + + throw new ResourceHandlerException( + "Failed to bootstrap outbound bandwidth configuration", e); + } + } + + /** + * Function to check if the interface in use has already been fully + * bootstrapped with the required tc configuration + * + * @return boolean indicating the result of the check + */ + private boolean checkIfAlreadyBootstrapped(String state) + throws ResourceHandlerException { + List regexes = new ArrayList<>(); + + //root qdisc + regexes.add(String.format("^qdisc htb %d: root(.)*$", + ROOT_QDISC_HANDLE)); + //cgroup filter + regexes.add(String.format("^filter parent %d: protocol ip " + + "(.)*cgroup(.)*$", ROOT_QDISC_HANDLE)); + //root, default and yarn classes + regexes.add(String.format("^class htb %d:%d root(.)*$", + ROOT_QDISC_HANDLE, ROOT_CLASS_ID)); + regexes.add(String.format("^class htb %d:%d parent %d:%d(.)*$", + ROOT_QDISC_HANDLE, DEFAULT_CLASS_ID, ROOT_QDISC_HANDLE, ROOT_CLASS_ID)); + regexes.add(String.format("^class htb %d:%d parent %d:%d(.)*$", + ROOT_QDISC_HANDLE, YARN_ROOT_CLASS_ID, ROOT_QDISC_HANDLE, + ROOT_CLASS_ID)); + + for (String regex : regexes) { + Pattern pattern = Pattern.compile(regex, Pattern.MULTILINE); + + if (pattern.matcher(state).find()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Matched regex: " + regex); + } + } else { + String logLine = new StringBuffer("Failed to match regex: ") + .append(regex).append(" Current state: ").append(state).toString(); + LOG.warn(logLine); + return false; + } + } + + LOG.info("Bootstrap check succeeded"); + + return true; + } + + private String readState() throws ResourceHandlerException { + //Sample state output: + // qdisc htb 42: root refcnt 2 r2q 10 default 2 direct_packets_stat 0 + // filter parent 42: protocol ip pref 10 cgroup handle 0x1 + // + // filter parent 42: protocol ip pref 10 cgroup handle 0x1 + // + // class htb 42:1 root rate 10000Kbit ceil 10000Kbit burst 1600b cburst 1600b + // class htb 42:2 parent 42:1 prio 0 rate 3000Kbit ceil 10000Kbit burst 1599b cburst 1600b + // class htb 42:3 parent 42:1 prio 0 rate 7000Kbit ceil 7000Kbit burst 1598b cburst 1598b + + BatchBuilder builder = new BatchBuilder(PrivilegedOperation. + OperationType.TC_READ_STATE) + .readState(); + PrivilegedOperation op = builder.commitBatchToTempFile(); + + try { + String output = + privilegedOperationExecutor.executePrivilegedOperation(op, true); + + if (LOG.isDebugEnabled()) { + LOG.debug("TC state: %n" + output); + } + + return output; + } catch (PrivilegedOperationException e) { + LOG.warn("Failed to bootstrap outbound bandwidth rules"); + throw new ResourceHandlerException( + "Failed to bootstrap outbound bandwidth rules", e); + } + } + + private void wipeState() throws ResourceHandlerException { + BatchBuilder builder = new BatchBuilder(PrivilegedOperation. + OperationType.TC_MODIFY_STATE) + .wipeState(); + PrivilegedOperation op = builder.commitBatchToTempFile(); + + try { + LOG.info("Wiping tc state."); + privilegedOperationExecutor.executePrivilegedOperation(op, false); + } catch (PrivilegedOperationException e) { + LOG.warn("Failed to wipe tc state. This could happen if the interface" + + " is already in its default state. Ignoring."); + //Ignoring this exception. This could happen if the interface is already + //in its default state. For this reason we don't throw a + //ResourceHandlerException here. + } + } + + /** + * Parses the current state looks for classids already in use + */ + private void reacquireContainerClasses(String state) { + //At this point we already have already successfully passed + //checkIfAlreadyBootstrapped() - so we know that at least the + //root classes are in place. + String tcClassesStr = state.substring(state.indexOf("class")); + //one class per line - the results of the split will need to trimmed + String[] tcClasses = Pattern.compile("$", Pattern.MULTILINE) + .split(tcClassesStr); + Pattern tcClassPattern = Pattern.compile(String.format( + "class htb %d:(\\d+) .*", ROOT_QDISC_HANDLE)); + + synchronized (classIdSet) { + for (String tcClassSplit : tcClasses) { + String tcClass = tcClassSplit.trim(); + + if (!tcClass.isEmpty()) { + Matcher classMatcher = tcClassPattern.matcher(tcClass); + if (classMatcher.matches()) { + int classId = Integer.parseInt(classMatcher.group(1)); + if (classId >= MIN_CONTAINER_CLASS_ID) { + classIdSet.set(classId - MIN_CONTAINER_CLASS_ID); + LOG.info("Reacquired container classid: " + classId); + } + } else { + LOG.warn("Unable to match classid in string:" + tcClass); + } + } + } + } + } + + public Map readStats() throws ResourceHandlerException { + BatchBuilder builder = new BatchBuilder(PrivilegedOperation. + OperationType.TC_READ_STATS) + .readClasses(); + PrivilegedOperation op = builder.commitBatchToTempFile(); + + try { + String output = + privilegedOperationExecutor.executePrivilegedOperation(op, true); + + if (LOG.isDebugEnabled()) { + LOG.debug("TC stats output:" + output); + } + + Map classIdBytesStats = parseStatsString(output); + + if (LOG.isDebugEnabled()) { + LOG.debug("classId -> bytes sent %n" + classIdBytesStats); + } + + return classIdBytesStats; + } catch (PrivilegedOperationException e) { + LOG.warn("Failed to get tc stats"); + throw new ResourceHandlerException("Failed to get tc stats", e); + } + } + + private Map parseStatsString(String stats) { + //Example class stats segment (multiple present in tc output) + // class htb 42:4 parent 42:3 prio 0 rate 1000Kbit ceil 7000Kbit burst1600b cburst 1598b + // Sent 77921300 bytes 52617 pkt (dropped 0, overlimits 0 requeues 0) + // rate 6973Kbit 589pps backlog 0b 39p requeues 0 + // lended: 3753 borrowed: 22514 giants: 0 + // tokens: -122164 ctokens: -52488 + + String[] lines = Pattern.compile("$", Pattern.MULTILINE) + .split(stats); + Pattern tcClassPattern = Pattern.compile(String.format( + "class htb %d:(\\d+) .*", ROOT_QDISC_HANDLE)); + Pattern bytesPattern = Pattern.compile("Sent (\\d+) bytes.*"); + + int currentClassId = -1; + Map containerClassIdStats = new HashMap<>(); + + for (String lineSplit : lines) { + String line = lineSplit.trim(); + + if (!line.isEmpty()) { + //Check if we encountered a stats segment for a container class + Matcher classMatcher = tcClassPattern.matcher(line); + if (classMatcher.matches()) { + int classId = Integer.parseInt(classMatcher.group(1)); + if (classId >= MIN_CONTAINER_CLASS_ID) { + currentClassId = classId; + continue; + } + } + + //Check if we encountered a stats line + Matcher bytesMatcher = bytesPattern.matcher(line); + if (bytesMatcher.matches()) { + //we found at least one class segment + if (currentClassId != -1) { + int bytes = Integer.parseInt(bytesMatcher.group(1)); + containerClassIdStats.put(currentClassId, bytes); + } else { + LOG.warn("Matched a 'bytes sent' line outside of a class stats " + + "segment : " + line); + } + continue; + } + + //skip other kinds of non-empty lines - since we aren't interested in + //them. + } + } + + return containerClassIdStats; + } + + /** + * Returns a formatted string for attaching a qdisc to the root of the + * device/interface. Additional qdisc + * parameters can be supplied - for example, the default 'class' to use for + * incoming packets + */ + private String getStringForAddRootQDisc() { + return String.format(FORMAT_QDISC_ADD_TO_ROOT_WITH_DEFAULT, device, + ROOT_QDISC_HANDLE, DEFAULT_CLASS_ID); + } + + /** + * Returns a formatted string for a filter that matches packets based on the + * presence of net_cls classids + */ + private String getStringForaAddCGroupFilter() { + return String.format(FORMAT_FILTER_CGROUP_ADD_TO_PARENT, device, + ROOT_QDISC_HANDLE); + } + + /** + * Get the next available classid. This has to be released post container + * complete + */ + public int getNextClassId() throws ResourceHandlerException { + synchronized (classIdSet) { + int index = classIdSet.nextClearBit(0); + if (index >= MAX_CONTAINER_CLASSES) { + throw new ResourceHandlerException("Reached max container classes: " + + MAX_CONTAINER_CLASSES); + } + classIdSet.set(index); + return (index + MIN_CONTAINER_CLASS_ID); + } + } + + public void releaseClassId(int classId) throws ResourceHandlerException { + synchronized (classIdSet) { + int index = classId - MIN_CONTAINER_CLASS_ID; + if (index < 0 || index >= MAX_CONTAINER_CLASSES) { + throw new ResourceHandlerException("Invalid incoming classId: " + + classId); + } + classIdSet.clear(index); + } + } + + /** + * Returns a formatted string representing the given classId including a + * handle + */ + public String getStringForNetClsClassId(int classId) { + return String.format(FORMAT_NET_CLS_CLASS_ID, ROOT_QDISC_HANDLE, classId); + } + + /** + * A value read out of net_cls.classid file is in decimal form. We need to + * convert to 32-bit/8 digit hex, extract the lower 16-bit/four digits + * as an int + */ + public int getClassIdFromFileContents(String input) { + //convert from decimal back to fixed size hex form + //e.g 4325381 -> 00420005 + String classIdStr = String.format("%08x", Integer.parseInt(input)); + + if (LOG.isDebugEnabled()) { + LOG.debug("ClassId hex string : " + classIdStr); + } + + //extract and return 4 digits + //e.g 00420005 -> 0005 + return Integer.parseInt(classIdStr.substring(4)); + } + + /** + * Adds a tc class to qdisc at root + */ + private String getStringForAddClassToRootQDisc(int rateMbit) { + String rateMbitStr = rateMbit + MBIT_SUFFIX; + //example : "class add dev eth0 parent 42:0 classid 42:1 htb rate 1000mbit + // ceil 1000mbit" + return String.format(FORMAT_CLASS_ADD_TO_PARENT_WITH_RATES, device, + ROOT_QDISC_HANDLE, ZERO_CLASS_ID, ROOT_QDISC_HANDLE, ROOT_CLASS_ID, + rateMbitStr, rateMbitStr); + } + + private String getStringForAddDefaultClass(int rateMbit, int ceilMbit) { + String rateMbitStr = rateMbit + MBIT_SUFFIX; + String ceilMbitStr = ceilMbit + MBIT_SUFFIX; + //example : "class add dev eth0 parent 42:1 classid 42:2 htb rate 300mbit + // ceil 1000mbit" + return String.format(FORMAT_CLASS_ADD_TO_PARENT_WITH_RATES, device, + ROOT_QDISC_HANDLE, ROOT_CLASS_ID, ROOT_QDISC_HANDLE, DEFAULT_CLASS_ID, + rateMbitStr, ceilMbitStr); + } + + private String getStringForAddYARNRootClass(int rateMbit, int ceilMbit) { + String rateMbitStr = rateMbit + MBIT_SUFFIX; + String ceilMbitStr = ceilMbit + MBIT_SUFFIX; + //example : "class add dev eth0 parent 42:1 classid 42:3 htb rate 700mbit + // ceil 1000mbit" + return String.format(FORMAT_CLASS_ADD_TO_PARENT_WITH_RATES, device, + ROOT_QDISC_HANDLE, ROOT_CLASS_ID, ROOT_QDISC_HANDLE, YARN_ROOT_CLASS_ID, + rateMbitStr, ceilMbitStr); + } + + private String getStringForAddContainerClass(int classId, int rateMbit, int + ceilMbit) { + String rateMbitStr = rateMbit + MBIT_SUFFIX; + String ceilMbitStr = ceilMbit + MBIT_SUFFIX; + //example : "class add dev eth0 parent 42:99 classid 42:99 htb rate 50mbit + // ceil 700mbit" + return String.format(FORMAT_CLASS_ADD_TO_PARENT_WITH_RATES, device, + ROOT_QDISC_HANDLE, YARN_ROOT_CLASS_ID, ROOT_QDISC_HANDLE, classId, + rateMbitStr, ceilMbitStr); + } + + private String getStringForDeleteContainerClass(int classId) { + //example "class del dev eth0 classid 42:7" + return String.format(FORMAT_DELETE_CLASS, device, ROOT_QDISC_HANDLE, + classId); + } + + private String getStringForReadState() { + return String.format(FORMAT_READ_STATE, device); + } + + private String getStringForReadClasses() { + return String.format(FORMAT_READ_CLASSES, device); + } + + private String getStringForWipeState() { + return String.format(FORMAT_WIPE_STATE, device); + } + + public class BatchBuilder { + final PrivilegedOperation operation; + final List commands; + + public BatchBuilder(PrivilegedOperation.OperationType opType) + throws ResourceHandlerException { + switch (opType) { + case TC_MODIFY_STATE: + case TC_READ_STATE: + case TC_READ_STATS: + operation = new PrivilegedOperation(opType, (String) null); + commands = new ArrayList<>(); + break; + default: + throw new ResourceHandlerException("Not a tc operation type : " + + opType); + } + } + + private BatchBuilder addRootQDisc() { + commands.add(getStringForAddRootQDisc()); + return this; + } + + private BatchBuilder addCGroupFilter() { + commands.add(getStringForaAddCGroupFilter()); + return this; + } + + private BatchBuilder addClassToRootQDisc(int rateMbit) { + commands.add(getStringForAddClassToRootQDisc(rateMbit)); + return this; + } + + private BatchBuilder addDefaultClass(int rateMbit, int ceilMbit) { + commands.add(getStringForAddDefaultClass(rateMbit, ceilMbit)); + return this; + } + + private BatchBuilder addYARNRootClass(int rateMbit, int ceilMbit) { + commands.add(getStringForAddYARNRootClass(rateMbit, ceilMbit)); + return this; + } + + public BatchBuilder addContainerClass(int classId, int rateMbit, boolean + strictMode) { + int ceilMbit; + + if (strictMode) { + ceilMbit = rateMbit; + } else { + ceilMbit = yarnBandwidthMbit; + } + + commands.add(getStringForAddContainerClass(classId, rateMbit, ceilMbit)); + return this; + } + + public BatchBuilder deleteContainerClass(int classId) { + commands.add(getStringForDeleteContainerClass(classId)); + return this; + } + + private BatchBuilder readState() { + commands.add(getStringForReadState()); + return this; + } + + //We'll read all classes, but use a different tc operation type + //when reading stats for all these classes. Stats are fetched using a + //different tc cli option (-s). + + private BatchBuilder readClasses() { + //We'll read all classes, but use a different tc operation type + //for reading stats for all these classes. Stats are fetched using a + //different tc cli option (-s). + commands.add(getStringForReadClasses()); + return this; + } + + private BatchBuilder wipeState() { + commands.add(getStringForWipeState()); + return this; + } + + public PrivilegedOperation commitBatchToTempFile() + throws ResourceHandlerException { + try { + File tcCmds = File.createTempFile(TMP_FILE_PREFIX, TMP_FILE_SUFFIX, new + File(tmpDirPath)); + Writer writer = new OutputStreamWriter(new FileOutputStream(tcCmds), + "UTF-8"); + PrintWriter printWriter = new PrintWriter(writer); + + for (String command : commands) { + printWriter.println(command); + } + + printWriter.close(); + operation.appendArgs(tcCmds.getAbsolutePath()); + + return operation; + } catch (IOException e) { + LOG.warn("Failed to create or write to temporary file in dir: " + + tmpDirPath); + throw new ResourceHandlerException( + "Failed to create or write to temporary file in dir: " + + tmpDirPath); + } + } + } //end BatchBuilder +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestResourceHandlerModule.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestResourceHandlerModule.java new file mode 100644 index 0000000000..939dfe7f72 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestResourceHandlerModule.java @@ -0,0 +1,78 @@ +/* + * * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * / + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; + +public class TestResourceHandlerModule { + private static final Log LOG = LogFactory. + getLog(TestResourceHandlerModule.class); + Configuration emptyConf; + Configuration networkEnabledConf; + + @Before + public void setup() { + emptyConf = new YarnConfiguration(); + networkEnabledConf = new YarnConfiguration(); + + networkEnabledConf.setBoolean(YarnConfiguration.NM_NETWORK_RESOURCE_ENABLED, + true); + //We need to bypass mtab parsing for figuring out cgroups mount locations + networkEnabledConf.setBoolean(YarnConfiguration + .NM_LINUX_CONTAINER_CGROUPS_MOUNT, true); + } + + @Test + public void testOutboundBandwidthHandler() { + try { + //This resourceHandler should be non-null only if network as a resource + //is explicitly enabled + OutboundBandwidthResourceHandler resourceHandler = ResourceHandlerModule + .getOutboundBandwidthResourceHandler(emptyConf); + Assert.assertNull(resourceHandler); + + //When network as a resource is enabled this should be non-null + resourceHandler = ResourceHandlerModule + .getOutboundBandwidthResourceHandler(networkEnabledConf); + Assert.assertNotNull(resourceHandler); + + //Ensure that outbound bandwidth resource handler is present in the chain + ResourceHandlerChain resourceHandlerChain = ResourceHandlerModule + .getConfiguredResourceHandlerChain(networkEnabledConf); + List resourceHandlers = resourceHandlerChain + .getResourceHandlerList(); + //Exactly one resource handler in chain + Assert.assertEquals(resourceHandlers.size(), 1); + //Same instance is expected to be in the chain. + Assert.assertTrue(resourceHandlers.get(0) == resourceHandler); + } catch (ResourceHandlerException e) { + Assert.fail("Unexpected ResourceHandlerException: " + e); + } + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestTrafficControlBandwidthHandlerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestTrafficControlBandwidthHandlerImpl.java new file mode 100644 index 0000000000..50ad6b9abd --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestTrafficControlBandwidthHandlerImpl.java @@ -0,0 +1,231 @@ +/* + * * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * / + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationException; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +import java.io.File; +import java.util.List; + +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +public class TestTrafficControlBandwidthHandlerImpl { + private static final Log LOG = + LogFactory.getLog(TestTrafficControlBandwidthHandlerImpl.class); + private static final int ROOT_BANDWIDTH_MBIT = 100; + private static final int YARN_BANDWIDTH_MBIT = 70; + private static final int TEST_CLASSID = 100; + private static final String TEST_CLASSID_STR = "42:100"; + private static final String TEST_CONTAINER_ID_STR = "container_01"; + private static final String TEST_TASKS_FILE = "testTasksFile"; + + private PrivilegedOperationExecutor privilegedOperationExecutorMock; + private CGroupsHandler cGroupsHandlerMock; + private TrafficController trafficControllerMock; + private Configuration conf; + private String tmpPath; + private String device; + ContainerId containerIdMock; + Container containerMock; + + @Before + public void setup() { + privilegedOperationExecutorMock = mock(PrivilegedOperationExecutor.class); + cGroupsHandlerMock = mock(CGroupsHandler.class); + trafficControllerMock = mock(TrafficController.class); + conf = new YarnConfiguration(); + tmpPath = new StringBuffer(System.getProperty("test.build.data")).append + ('/').append("hadoop.tmp.dir").toString(); + device = YarnConfiguration.DEFAULT_NM_NETWORK_RESOURCE_INTERFACE; + containerIdMock = mock(ContainerId.class); + containerMock = mock(Container.class); + when(containerIdMock.toString()).thenReturn(TEST_CONTAINER_ID_STR); + //mock returning a mock - an angel died somewhere. + when(containerMock.getContainerId()).thenReturn(containerIdMock); + + conf.setInt(YarnConfiguration + .NM_NETWORK_RESOURCE_OUTBOUND_BANDWIDTH_MBIT, ROOT_BANDWIDTH_MBIT); + conf.setInt(YarnConfiguration + .NM_NETWORK_RESOURCE_OUTBOUND_BANDWIDTH_YARN_MBIT, YARN_BANDWIDTH_MBIT); + conf.set("hadoop.tmp.dir", tmpPath); + //In these tests, we'll only use TrafficController with recovery disabled + conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, false); + } + + @Test + public void testBootstrap() { + TrafficControlBandwidthHandlerImpl handlerImpl = new + TrafficControlBandwidthHandlerImpl(privilegedOperationExecutorMock, + cGroupsHandlerMock, trafficControllerMock); + + try { + handlerImpl.bootstrap(conf); + verify(cGroupsHandlerMock).mountCGroupController( + eq(CGroupsHandler.CGroupController.NET_CLS)); + verifyNoMoreInteractions(cGroupsHandlerMock); + verify(trafficControllerMock).bootstrap(eq(device), + eq(ROOT_BANDWIDTH_MBIT), + eq(YARN_BANDWIDTH_MBIT)); + verifyNoMoreInteractions(trafficControllerMock); + } catch (ResourceHandlerException e) { + LOG.error("Unexpected exception: " + e); + Assert.fail("Caught unexpected ResourceHandlerException!"); + } + } + + @Test + public void testLifeCycle() { + TrafficController trafficControllerSpy = spy(new TrafficController(conf, + privilegedOperationExecutorMock)); + TrafficControlBandwidthHandlerImpl handlerImpl = new + TrafficControlBandwidthHandlerImpl(privilegedOperationExecutorMock, + cGroupsHandlerMock, trafficControllerSpy); + + try { + handlerImpl.bootstrap(conf); + testPreStart(trafficControllerSpy, handlerImpl); + testPostComplete(trafficControllerSpy, handlerImpl); + } catch (ResourceHandlerException e) { + LOG.error("Unexpected exception: " + e); + Assert.fail("Caught unexpected ResourceHandlerException!"); + } + } + + private void testPreStart(TrafficController trafficControllerSpy, + TrafficControlBandwidthHandlerImpl handlerImpl) throws + ResourceHandlerException { + //This is not the cleanest of solutions - but since we are testing the + //preStart/postComplete lifecycle, we don't have a different way of + //handling this - we don't keep track of the number of invocations by + //a class we are not testing here (TrafficController) + //So, we'll reset this mock. This is not a problem with other mocks. + reset(privilegedOperationExecutorMock); + + doReturn(TEST_CLASSID).when(trafficControllerSpy).getNextClassId(); + doReturn(TEST_CLASSID_STR).when(trafficControllerSpy) + .getStringForNetClsClassId(TEST_CLASSID); + when(cGroupsHandlerMock.getPathForCGroupTasks(CGroupsHandler + .CGroupController.NET_CLS, TEST_CONTAINER_ID_STR)).thenReturn( + TEST_TASKS_FILE); + + List ops = handlerImpl.preStart(containerMock); + + //Ensure that cgroups is created and updated as expected + verify(cGroupsHandlerMock).createCGroup( + eq(CGroupsHandler.CGroupController.NET_CLS), eq(TEST_CONTAINER_ID_STR)); + verify(cGroupsHandlerMock).updateCGroupParam( + eq(CGroupsHandler.CGroupController.NET_CLS), eq(TEST_CONTAINER_ID_STR), + eq(CGroupsHandler.CGROUP_PARAM_CLASSID), eq(TEST_CLASSID_STR)); + + //Now check the privileged operations being returned + //We expect two operations - one for adding pid to tasks file and another + //for a tc modify operation + Assert.assertEquals(2, ops.size()); + + //Verify that the add pid op is correct + PrivilegedOperation addPidOp = ops.get(0); + String expectedAddPidOpArg = PrivilegedOperation.CGROUP_ARG_PREFIX + + TEST_TASKS_FILE; + List addPidOpArgs = addPidOp.getArguments(); + + Assert.assertEquals(PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP, + addPidOp.getOperationType()); + Assert.assertEquals(1, addPidOpArgs.size()); + Assert.assertEquals(expectedAddPidOpArg, addPidOpArgs.get(0)); + + //Verify that that tc modify op is correct + PrivilegedOperation tcModifyOp = ops.get(1); + List tcModifyOpArgs = tcModifyOp.getArguments(); + + Assert.assertEquals(PrivilegedOperation.OperationType.TC_MODIFY_STATE, + tcModifyOp.getOperationType()); + Assert.assertEquals(1, tcModifyOpArgs.size()); + //verify that the tc command file exists + Assert.assertTrue(new File(tcModifyOpArgs.get(0)).exists()); + } + + private void testPostComplete(TrafficController trafficControllerSpy, + TrafficControlBandwidthHandlerImpl handlerImpl) throws + ResourceHandlerException { + //This is not the cleanest of solutions - but since we are testing the + //preStart/postComplete lifecycle, we don't have a different way of + //handling this - we don't keep track of the number of invocations by + //a class we are not testing here (TrafficController) + //So, we'll reset this mock. This is not a problem with other mocks. + reset(privilegedOperationExecutorMock); + + List ops = handlerImpl.postComplete(containerIdMock); + + verify(cGroupsHandlerMock).deleteCGroup( + eq(CGroupsHandler.CGroupController.NET_CLS), eq(TEST_CONTAINER_ID_STR)); + + try { + //capture privileged op argument and ensure it is correct + ArgumentCaptor opCaptor = ArgumentCaptor.forClass + (PrivilegedOperation.class); + + verify(privilegedOperationExecutorMock) + .executePrivilegedOperation(opCaptor.capture(), eq(false)); + + List args = opCaptor.getValue().getArguments(); + + Assert.assertEquals(PrivilegedOperation.OperationType.TC_MODIFY_STATE, + opCaptor.getValue().getOperationType()); + Assert.assertEquals(1, args.size()); + //ensure that tc command file exists + Assert.assertTrue(new File(args.get(0)).exists()); + + verify(trafficControllerSpy).releaseClassId(TEST_CLASSID); + } catch (PrivilegedOperationException e) { + LOG.error("Caught exception: " + e); + Assert.fail("Unexpected PrivilegedOperationException from mock!"); + } + + //We don't expect any operations to be returned here + Assert.assertNull(ops); + } + + @After + public void teardown() { + FileUtil.fullyDelete(new File(tmpPath)); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestTrafficController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestTrafficController.java new file mode 100644 index 0000000000..7ea7135111 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestTrafficController.java @@ -0,0 +1,327 @@ +/* + * * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * / + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationException; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.file.Files; +import java.util.Arrays; +import java.util.List; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class TestTrafficController { + private static final Log LOG = LogFactory.getLog(TestTrafficController.class); + private static final int ROOT_BANDWIDTH_MBIT = 100; + private static final int YARN_BANDWIDTH_MBIT = 70; + private static final int CONTAINER_BANDWIDTH_MBIT = 10; + + //These constants are closely tied to the implementation of TrafficController + //and will have to be modified in tandem with any related TrafficController + //changes. + private static final String DEVICE = "eth0"; + private static final String WIPE_STATE_CMD = "qdisc del dev eth0 parent root"; + private static final String ADD_ROOT_QDISC_CMD = + "qdisc add dev eth0 root handle 42: htb default 2"; + private static final String ADD_CGROUP_FILTER_CMD = + "filter add dev eth0 parent 42: protocol ip prio 10 handle 1: cgroup"; + private static final String ADD_ROOT_CLASS_CMD = + "class add dev eth0 parent 42:0 classid 42:1 htb rate 100mbit ceil 100mbit"; + private static final String ADD_DEFAULT_CLASS_CMD = + "class add dev eth0 parent 42:1 classid 42:2 htb rate 30mbit ceil 100mbit"; + private static final String ADD_YARN_CLASS_CMD = + "class add dev eth0 parent 42:1 classid 42:3 htb rate 70mbit ceil 70mbit"; + private static final String DEFAULT_TC_STATE_EXAMPLE = + "qdisc pfifo_fast 0: root refcnt 2 bands 3 priomap 1 2 2 2 1 2 0 0 1 1 1 1 1 1 1 1"; + private static final String READ_QDISC_CMD = "qdisc show dev eth0"; + private static final String READ_FILTER_CMD = "filter show dev eth0"; + private static final String READ_CLASS_CMD = "class show dev eth0"; + private static final int MIN_CONTAINER_CLASS_ID = 4; + private static final String FORMAT_CONTAINER_CLASS_STR = "0x0042%04d"; + private static final String FORMAT_ADD_CONTAINER_CLASS_TO_DEVICE = + "class add dev eth0 parent 42:3 classid 42:%d htb rate 10mbit ceil %dmbit"; + private static final String FORAMT_DELETE_CONTAINER_CLASS_FROM_DEVICE = + "class del dev eth0 classid 42:%d"; + + private static final int TEST_CLASS_ID = 97; + //decimal form of 0x00420097 - when reading a classid file, it is read out + //as decimal + private static final String TEST_CLASS_ID_DECIMAL_STR = "4325527"; + + private Configuration conf; + private String tmpPath; + + private PrivilegedOperationExecutor privilegedOperationExecutorMock; + + @Before + public void setup() { + privilegedOperationExecutorMock = mock(PrivilegedOperationExecutor.class); + conf = new YarnConfiguration(); + tmpPath = new StringBuffer(System.getProperty("test.build.data")).append + ('/').append("hadoop.tmp.dir").toString(); + + conf.set("hadoop.tmp.dir", tmpPath); + } + + private void verifyTrafficControlOperation(PrivilegedOperation op, + PrivilegedOperation.OperationType expectedOpType, + List expectedTcCmds) + throws IOException { + //Verify that the optype matches + Assert.assertEquals(expectedOpType, op.getOperationType()); + + List args = op.getArguments(); + + //Verify that arg count is always 1 (tc command file) for a tc operation + Assert.assertEquals(1, args.size()); + + File tcCmdsFile = new File(args.get(0)); + + //Verify that command file exists + Assert.assertTrue(tcCmdsFile.exists()); + + List tcCmds = Files.readAllLines(tcCmdsFile.toPath(), + Charset.forName("UTF-8")); + + //Verify that the number of commands is the same as expected and verify + //that each command is the same, in sequence + Assert.assertEquals(expectedTcCmds.size(), tcCmds.size()); + for (int i = 0; i < tcCmds.size(); ++i) { + Assert.assertEquals(expectedTcCmds.get(i), tcCmds.get(i)); + } + } + + @Test + public void testBootstrapRecoveryDisabled() { + conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, false); + + TrafficController trafficController = new TrafficController(conf, + privilegedOperationExecutorMock); + + try { + trafficController + .bootstrap(DEVICE, ROOT_BANDWIDTH_MBIT, YARN_BANDWIDTH_MBIT); + + ArgumentCaptor opCaptor = ArgumentCaptor.forClass + (PrivilegedOperation.class); + + //NM_RECOVERY_DISABLED - so we expect two privileged operation executions + //one for wiping tc state - a second for initializing state + verify(privilegedOperationExecutorMock, times(2)) + .executePrivilegedOperation(opCaptor.capture(), eq(false)); + + //Now verify that the two operations were correct + List ops = opCaptor.getAllValues(); + + verifyTrafficControlOperation(ops.get(0), + PrivilegedOperation.OperationType.TC_MODIFY_STATE, + Arrays.asList(WIPE_STATE_CMD)); + + verifyTrafficControlOperation(ops.get(1), + PrivilegedOperation.OperationType.TC_MODIFY_STATE, + Arrays.asList(ADD_ROOT_QDISC_CMD, ADD_CGROUP_FILTER_CMD, + ADD_ROOT_CLASS_CMD, ADD_DEFAULT_CLASS_CMD, ADD_YARN_CLASS_CMD)); + } catch (ResourceHandlerException | PrivilegedOperationException | + IOException e) { + LOG.error("Unexpected exception: " + e); + Assert.fail("Caught unexpected exception: " + + e.getClass().getSimpleName()); + } + } + + @Test + public void testBootstrapRecoveryEnabled() { + conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true); + + TrafficController trafficController = new TrafficController(conf, + privilegedOperationExecutorMock); + + try { + //Return a default tc state when attempting to read state + when(privilegedOperationExecutorMock.executePrivilegedOperation( + any(PrivilegedOperation.class), eq(true))) + .thenReturn(DEFAULT_TC_STATE_EXAMPLE); + + trafficController + .bootstrap(DEVICE, ROOT_BANDWIDTH_MBIT, YARN_BANDWIDTH_MBIT); + + ArgumentCaptor readOpCaptor = ArgumentCaptor.forClass + (PrivilegedOperation.class); + + //NM_RECOVERY_ENABLED - so we expect three privileged operation executions + //1) read tc state 2) wipe tc state 3) init tc state + //one for wiping tc state - a second for initializing state + //First, verify read op + verify(privilegedOperationExecutorMock, times(1)) + .executePrivilegedOperation(readOpCaptor.capture(), eq(true)); + List readOps = readOpCaptor.getAllValues(); + verifyTrafficControlOperation(readOps.get(0), + PrivilegedOperation.OperationType.TC_READ_STATE, + Arrays.asList(READ_QDISC_CMD, READ_FILTER_CMD, READ_CLASS_CMD)); + + ArgumentCaptor writeOpCaptor = ArgumentCaptor + .forClass(PrivilegedOperation.class); + verify(privilegedOperationExecutorMock, times(2)) + .executePrivilegedOperation(writeOpCaptor.capture(), eq(false)); + //Now verify that the two write operations were correct + List writeOps = writeOpCaptor.getAllValues(); + verifyTrafficControlOperation(writeOps.get(0), + PrivilegedOperation.OperationType.TC_MODIFY_STATE, + Arrays.asList(WIPE_STATE_CMD)); + + verifyTrafficControlOperation(writeOps.get(1), + PrivilegedOperation.OperationType.TC_MODIFY_STATE, + Arrays.asList(ADD_ROOT_QDISC_CMD, ADD_CGROUP_FILTER_CMD, + ADD_ROOT_CLASS_CMD, ADD_DEFAULT_CLASS_CMD, ADD_YARN_CLASS_CMD)); + } catch (ResourceHandlerException | PrivilegedOperationException | + IOException e) { + LOG.error("Unexpected exception: " + e); + Assert.fail("Caught unexpected exception: " + + e.getClass().getSimpleName()); + } + } + + @Test + public void testInvalidBuilder() { + conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, false); + + TrafficController trafficController = new TrafficController(conf, + privilegedOperationExecutorMock); + try { + trafficController + .bootstrap(DEVICE, ROOT_BANDWIDTH_MBIT, YARN_BANDWIDTH_MBIT); + + try { + //Invalid op type for TC batch builder + TrafficController.BatchBuilder invalidBuilder = trafficController. + new BatchBuilder( + PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP); + Assert.fail("Invalid builder check failed!"); + } catch (ResourceHandlerException e) { + //expected + } + } catch (ResourceHandlerException e) { + LOG.error("Unexpected exception: " + e); + Assert.fail("Caught unexpected exception: " + + e.getClass().getSimpleName()); + } + } + + @Test + public void testClassIdFileContentParsing() { + conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, false); + + TrafficController trafficController = new TrafficController(conf, + privilegedOperationExecutorMock); + + //Verify that classid file contents are parsed correctly + //This call strips the QDISC prefix and returns the classid asociated with + //the container + int parsedClassId = trafficController.getClassIdFromFileContents + (TEST_CLASS_ID_DECIMAL_STR); + + Assert.assertEquals(TEST_CLASS_ID, parsedClassId); + } + + @Test + public void testContainerOperations() { + conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, false); + + TrafficController trafficController = new TrafficController(conf, + privilegedOperationExecutorMock); + try { + trafficController + .bootstrap(DEVICE, ROOT_BANDWIDTH_MBIT, YARN_BANDWIDTH_MBIT); + + int classId = trafficController.getNextClassId(); + + Assert.assertTrue(classId >= MIN_CONTAINER_CLASS_ID); + Assert.assertEquals(String.format(FORMAT_CONTAINER_CLASS_STR, classId), + trafficController.getStringForNetClsClassId(classId)); + + //Verify that the operation is setup correctly with strictMode = false + TrafficController.BatchBuilder builder = trafficController. + new BatchBuilder(PrivilegedOperation.OperationType.TC_MODIFY_STATE) + .addContainerClass(classId, CONTAINER_BANDWIDTH_MBIT, false); + PrivilegedOperation addClassOp = builder.commitBatchToTempFile(); + + String expectedAddClassCmd = String.format + (FORMAT_ADD_CONTAINER_CLASS_TO_DEVICE, classId, YARN_BANDWIDTH_MBIT); + verifyTrafficControlOperation(addClassOp, + PrivilegedOperation.OperationType.TC_MODIFY_STATE, + Arrays.asList(expectedAddClassCmd)); + + //Verify that the operation is setup correctly with strictMode = true + TrafficController.BatchBuilder strictModeBuilder = trafficController. + new BatchBuilder(PrivilegedOperation.OperationType.TC_MODIFY_STATE) + .addContainerClass(classId, CONTAINER_BANDWIDTH_MBIT, true); + PrivilegedOperation addClassStrictModeOp = strictModeBuilder + .commitBatchToTempFile(); + + String expectedAddClassStrictModeCmd = String.format + (FORMAT_ADD_CONTAINER_CLASS_TO_DEVICE, classId, + CONTAINER_BANDWIDTH_MBIT); + verifyTrafficControlOperation(addClassStrictModeOp, + PrivilegedOperation.OperationType.TC_MODIFY_STATE, + Arrays.asList(expectedAddClassStrictModeCmd)); + + TrafficController.BatchBuilder deleteBuilder = trafficController.new + BatchBuilder(PrivilegedOperation.OperationType.TC_MODIFY_STATE) + .deleteContainerClass(classId); + PrivilegedOperation deleteClassOp = deleteBuilder.commitBatchToTempFile(); + + String expectedDeleteClassCmd = String.format + (FORAMT_DELETE_CONTAINER_CLASS_FROM_DEVICE, classId); + verifyTrafficControlOperation(deleteClassOp, + PrivilegedOperation.OperationType.TC_MODIFY_STATE, + Arrays.asList(expectedDeleteClassCmd)); + } catch (ResourceHandlerException | IOException e) { + LOG.error("Unexpected exception: " + e); + Assert.fail("Caught unexpected exception: " + + e.getClass().getSimpleName()); + } + } + + @After + public void teardown() { + FileUtil.fullyDelete(new File(tmpPath)); + } +}