YARN-3366. Enhanced NodeManager to support classifying/shaping outgoing network bandwidth traffic originating from YARN containers Contributed by Sidharta Seethana.

This commit is contained in:
Vinod Kumar Vavilapalli 2015-04-22 17:26:13 -07:00
parent 0ebe84d30a
commit a100be685c
10 changed files with 1864 additions and 2 deletions

View File

@ -96,6 +96,10 @@ Release 2.8.0 - UNRELEASED
YARN-3225. New parameter of CLI for decommissioning node gracefully in YARN-3225. New parameter of CLI for decommissioning node gracefully in
RMAdmin CLI. (Devaraj K via junping_du) RMAdmin CLI. (Devaraj K via junping_du)
YARN-3366. Enhanced NodeManager to support classifying/shaping outgoing
network bandwidth traffic originating from YARN containers (Sidharta Seethana
via vinodkv)
IMPROVEMENTS IMPROVEMENTS
YARN-1880. Cleanup TestApplicationClientProtocolOnHA YARN-1880. Cleanup TestApplicationClientProtocolOnHA

View File

@ -822,7 +822,43 @@ private static void addDeprecatedKeys() {
NM_PREFIX + "resource.percentage-physical-cpu-limit"; NM_PREFIX + "resource.percentage-physical-cpu-limit";
public static final int DEFAULT_NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT = public static final int DEFAULT_NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT =
100; 100;
public static final String NM_NETWORK_RESOURCE_PREFIX = NM_PREFIX + "resource.network.";
/** This setting controls if resource handling for network bandwidth is enabled **/
/* Work in progress: This configuration parameter may be changed/removed in the future */
@Private
public static final String NM_NETWORK_RESOURCE_ENABLED =
NM_NETWORK_RESOURCE_PREFIX + "enabled";
/** Network as a resource is disabled by default **/
@Private
public static final boolean DEFAULT_NM_NETWORK_RESOURCE_ENABLED = false;
/** Specifies the interface to be used for applying network throttling rules **/
/* Work in progress: This configuration parameter may be changed/removed in the future */
@Private
public static final String NM_NETWORK_RESOURCE_INTERFACE =
NM_NETWORK_RESOURCE_PREFIX + "interface";
@Private
public static final String DEFAULT_NM_NETWORK_RESOURCE_INTERFACE = "eth0";
/** Specifies the total available outbound bandwidth on the node **/
/* Work in progress: This configuration parameter may be changed/removed in the future */
@Private
public static final String NM_NETWORK_RESOURCE_OUTBOUND_BANDWIDTH_MBIT =
NM_NETWORK_RESOURCE_PREFIX + "outbound-bandwidth-mbit";
@Private
public static final int DEFAULT_NM_NETWORK_RESOURCE_OUTBOUND_BANDWIDTH_MBIT = 1000;
/** Specifies the total outbound bandwidth available to YARN containers. defaults to
* NM_NETWORK_RESOURCE_OUTBOUND_BANDWIDTH_MBIT if not specified.
*/
/* Work in progress: This configuration parameter may be changed/removed in the future */
@Private
public static final String NM_NETWORK_RESOURCE_OUTBOUND_BANDWIDTH_YARN_MBIT =
NM_NETWORK_RESOURCE_PREFIX + "outbound-bandwidth-yarn-mbit";
/** NM Webapp address.**/ /** NM Webapp address.**/
public static final String NM_WEBAPP_ADDRESS = NM_PREFIX + "webapp.address"; public static final String NM_WEBAPP_ADDRESS = NM_PREFIX + "webapp.address";
public static final int DEFAULT_NM_WEBAPP_PORT = 8042; public static final int DEFAULT_NM_WEBAPP_PORT = 8042;

View File

@ -43,7 +43,13 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationException;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerModule;
import org.apache.hadoop.yarn.server.nodemanager.util.DefaultLCEResourcesHandler; import org.apache.hadoop.yarn.server.nodemanager.util.DefaultLCEResourcesHandler;
import org.apache.hadoop.yarn.server.nodemanager.util.LCEResourcesHandler; import org.apache.hadoop.yarn.server.nodemanager.util.LCEResourcesHandler;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
@ -60,6 +66,7 @@ public class LinuxContainerExecutor extends ContainerExecutor {
private boolean containerSchedPriorityIsSet = false; private boolean containerSchedPriorityIsSet = false;
private int containerSchedPriorityAdjustment = 0; private int containerSchedPriorityAdjustment = 0;
private boolean containerLimitUsers; private boolean containerLimitUsers;
private ResourceHandler resourceHandlerChain;
@Override @Override
public void setConf(Configuration conf) { public void setConf(Configuration conf) {
@ -189,7 +196,20 @@ public void init() throws IOException {
throw new IOException("Linux container executor not configured properly" throw new IOException("Linux container executor not configured properly"
+ " (error=" + exitCode + ")", e); + " (error=" + exitCode + ")", e);
} }
try {
Configuration conf = super.getConf();
resourceHandlerChain = ResourceHandlerModule
.getConfiguredResourceHandlerChain(conf);
if (resourceHandlerChain != null) {
resourceHandlerChain.bootstrap(conf);
}
} catch (ResourceHandlerException e) {
LOG.error("Failed to bootstrap configured resource subsystems! ", e);
throw new IOException("Failed to bootstrap configured resource subsystems!");
}
resourcesHandler.init(this); resourcesHandler.init(this);
} }
@ -268,6 +288,51 @@ public int launchContainer(Container container,
container.getResource()); container.getResource());
String resourcesOptions = resourcesHandler.getResourcesOption( String resourcesOptions = resourcesHandler.getResourcesOption(
containerId); containerId);
String tcCommandFile = null;
try {
if (resourceHandlerChain != null) {
List<PrivilegedOperation> ops = resourceHandlerChain
.preStart(container);
if (ops != null) {
List<PrivilegedOperation> resourceOps = new ArrayList<>();
resourceOps.add(new PrivilegedOperation
(PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP,
resourcesOptions));
for (PrivilegedOperation op : ops) {
switch (op.getOperationType()) {
case ADD_PID_TO_CGROUP:
resourceOps.add(op);
break;
case TC_MODIFY_STATE:
tcCommandFile = op.getArguments().get(0);
break;
default:
LOG.warn("PrivilegedOperation type unsupported in launch: "
+ op.getOperationType());
}
}
if (resourceOps.size() > 1) {
//squash resource operations
try {
PrivilegedOperation operation = PrivilegedOperationExecutor
.squashCGroupOperations(resourceOps);
resourcesOptions = operation.getArguments().get(0);
} catch (PrivilegedOperationException e) {
LOG.error("Failed to squash cgroup operations!", e);
throw new ResourceHandlerException("Failed to squash cgroup operations!");
}
}
}
}
} catch (ResourceHandlerException e) {
LOG.error("ResourceHandlerChain.preStart() failed!", e);
throw new IOException("ResourceHandlerChain.preStart() failed!");
}
ShellCommandExecutor shExec = null; ShellCommandExecutor shExec = null;
@ -286,6 +351,11 @@ public int launchContainer(Container container,
StringUtils.join(",", localDirs), StringUtils.join(",", localDirs),
StringUtils.join(",", logDirs), StringUtils.join(",", logDirs),
resourcesOptions)); resourcesOptions));
if (tcCommandFile != null) {
command.add(tcCommandFile);
}
String[] commandArray = command.toArray(new String[command.size()]); String[] commandArray = command.toArray(new String[command.size()]);
shExec = new ShellCommandExecutor(commandArray, null, // NM's cwd shExec = new ShellCommandExecutor(commandArray, null, // NM's cwd
container.getLaunchContext().getEnvironment()); // sanitized env container.getLaunchContext().getEnvironment()); // sanitized env
@ -334,6 +404,15 @@ public int launchContainer(Container container,
return exitCode; return exitCode;
} finally { } finally {
resourcesHandler.postExecute(containerId); resourcesHandler.postExecute(containerId);
try {
if (resourceHandlerChain != null) {
resourceHandlerChain.postComplete(containerId);
}
} catch (ResourceHandlerException e) {
LOG.warn("ResourceHandlerChain.postComplete failed for " +
"containerId: " + containerId + ". Exception: " + e);
}
} }
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Output from LinuxContainerExecutor's launchContainer follows:"); LOG.debug("Output from LinuxContainerExecutor's launchContainer follows:");
@ -346,9 +425,28 @@ public int launchContainer(Container container,
public int reacquireContainer(String user, ContainerId containerId) public int reacquireContainer(String user, ContainerId containerId)
throws IOException, InterruptedException { throws IOException, InterruptedException {
try { try {
//Resource handler chain needs to reacquire container state
//as well
if (resourceHandlerChain != null) {
try {
resourceHandlerChain.reacquireContainer(containerId);
} catch (ResourceHandlerException e) {
LOG.warn("ResourceHandlerChain.reacquireContainer failed for " +
"containerId: " + containerId + " Exception: " + e);
}
}
return super.reacquireContainer(user, containerId); return super.reacquireContainer(user, containerId);
} finally { } finally {
resourcesHandler.postExecute(containerId); resourcesHandler.postExecute(containerId);
if (resourceHandlerChain != null) {
try {
resourceHandlerChain.postComplete(containerId);
} catch (ResourceHandlerException e) {
LOG.warn("ResourceHandlerChain.postComplete failed for " +
"containerId: " + containerId + " Exception: " + e);
}
}
} }
} }

View File

@ -0,0 +1,29 @@
/*
* *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* /
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@InterfaceAudience.Private
@InterfaceStability.Unstable
public interface OutboundBandwidthResourceHandler extends ResourceHandler {
}

View File

@ -0,0 +1,128 @@
/*
* *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* /
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
import java.util.ArrayList;
import java.util.List;
/**
* Provides mechanisms to get various resource handlers - cpu, memory, network,
* disk etc., - based on configuration
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class ResourceHandlerModule {
private volatile static ResourceHandlerChain resourceHandlerChain;
/**
* This specific implementation might provide resource management as well
* as resource metrics functionality. We need to ensure that the same
* instance is used for both.
*/
private volatile static TrafficControlBandwidthHandlerImpl
trafficControlBandwidthHandler;
private volatile static CGroupsHandler cGroupsHandler;
/**
* Returns an initialized, thread-safe CGroupsHandler instance
*/
public static CGroupsHandler getCGroupsHandler(Configuration conf)
throws ResourceHandlerException {
if (cGroupsHandler == null) {
synchronized (CGroupsHandler.class) {
if (cGroupsHandler == null) {
cGroupsHandler = new CGroupsHandlerImpl(conf,
PrivilegedOperationExecutor.getInstance(conf));
}
}
}
return cGroupsHandler;
}
private static TrafficControlBandwidthHandlerImpl
getTrafficControlBandwidthHandler(Configuration conf)
throws ResourceHandlerException {
if (conf.getBoolean(YarnConfiguration.NM_NETWORK_RESOURCE_ENABLED,
YarnConfiguration.DEFAULT_NM_NETWORK_RESOURCE_ENABLED)) {
if (trafficControlBandwidthHandler == null) {
synchronized (OutboundBandwidthResourceHandler.class) {
if (trafficControlBandwidthHandler == null) {
trafficControlBandwidthHandler = new
TrafficControlBandwidthHandlerImpl(PrivilegedOperationExecutor
.getInstance(conf), getCGroupsHandler(conf),
new TrafficController(conf, PrivilegedOperationExecutor
.getInstance(conf)));
}
}
}
return trafficControlBandwidthHandler;
} else {
return null;
}
}
public static OutboundBandwidthResourceHandler
getOutboundBandwidthResourceHandler(Configuration conf)
throws ResourceHandlerException {
return getTrafficControlBandwidthHandler(conf);
}
private static void addHandlerIfNotNull(List<ResourceHandler> handlerList,
ResourceHandler handler) {
if (handler != null) {
handlerList.add(handler);
}
}
private static void initializeConfiguredResourceHandlerChain(
Configuration conf) throws ResourceHandlerException {
ArrayList<ResourceHandler> handlerList = new ArrayList<>();
addHandlerIfNotNull(handlerList, getOutboundBandwidthResourceHandler(conf));
resourceHandlerChain = new ResourceHandlerChain(handlerList);
}
public static ResourceHandlerChain getConfiguredResourceHandlerChain
(Configuration conf) throws ResourceHandlerException {
if (resourceHandlerChain == null) {
synchronized (ResourceHandlerModule.class) {
if (resourceHandlerChain == null) {
initializeConfiguredResourceHandlerChain(conf);
}
}
}
if (resourceHandlerChain.getResourceHandlerList().size() != 0) {
return resourceHandlerChain;
} else {
return null;
}
}
}

View File

@ -0,0 +1,281 @@
/*
* *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* /
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationException;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
import org.apache.hadoop.yarn.util.SystemClock;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class TrafficControlBandwidthHandlerImpl
implements OutboundBandwidthResourceHandler {
private static final Log LOG = LogFactory
.getLog(TrafficControlBandwidthHandlerImpl.class);
//In the absence of 'scheduling' support, we'll 'infer' the guaranteed
//outbound bandwidth for each container based on this number. This will
//likely go away once we add support on the RM for this resource type.
private static final int MAX_CONTAINER_COUNT = 50;
private final PrivilegedOperationExecutor privilegedOperationExecutor;
private final CGroupsHandler cGroupsHandler;
private final TrafficController trafficController;
private final ConcurrentHashMap<ContainerId, Integer> containerIdClassIdMap;
private Configuration conf;
private String device;
private boolean strictMode;
private int containerBandwidthMbit;
private int rootBandwidthMbit;
private int yarnBandwidthMbit;
public TrafficControlBandwidthHandlerImpl(PrivilegedOperationExecutor
privilegedOperationExecutor, CGroupsHandler cGroupsHandler,
TrafficController trafficController) {
this.privilegedOperationExecutor = privilegedOperationExecutor;
this.cGroupsHandler = cGroupsHandler;
this.trafficController = trafficController;
this.containerIdClassIdMap = new ConcurrentHashMap<>();
}
/**
* Bootstrapping 'outbound-bandwidth' resource handler - mounts net_cls
* controller and bootstraps a traffic control bandwidth shaping hierarchy
* @param configuration yarn configuration in use
* @return (potentially empty) list of privileged operations to execute.
* @throws ResourceHandlerException
*/
@Override
public List<PrivilegedOperation> bootstrap(Configuration configuration)
throws ResourceHandlerException {
conf = configuration;
//We'll do this inline for the time being - since this is a one time
//operation. At some point, LCE code can be refactored to batch mount
//operations across multiple controllers - cpu, net_cls, blkio etc
cGroupsHandler
.mountCGroupController(CGroupsHandler.CGroupController.NET_CLS);
device = conf.get(YarnConfiguration.NM_NETWORK_RESOURCE_INTERFACE,
YarnConfiguration.DEFAULT_NM_NETWORK_RESOURCE_INTERFACE);
strictMode = configuration.getBoolean(YarnConfiguration
.NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE, YarnConfiguration
.DEFAULT_NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE);
rootBandwidthMbit = conf.getInt(YarnConfiguration
.NM_NETWORK_RESOURCE_OUTBOUND_BANDWIDTH_MBIT, YarnConfiguration
.DEFAULT_NM_NETWORK_RESOURCE_OUTBOUND_BANDWIDTH_MBIT);
yarnBandwidthMbit = conf.getInt(YarnConfiguration
.NM_NETWORK_RESOURCE_OUTBOUND_BANDWIDTH_YARN_MBIT, rootBandwidthMbit);
containerBandwidthMbit = (int) Math.ceil((double) yarnBandwidthMbit /
MAX_CONTAINER_COUNT);
StringBuffer logLine = new StringBuffer("strict mode is set to :")
.append(strictMode).append(System.lineSeparator());
if (strictMode) {
logLine.append("container bandwidth will be capped to soft limit.")
.append(System.lineSeparator());
} else {
logLine.append(
"containers will be allowed to use spare YARN bandwidth.")
.append(System.lineSeparator());
}
logLine
.append("containerBandwidthMbit soft limit (in mbit/sec) is set to : ")
.append(containerBandwidthMbit);
LOG.info(logLine);
trafficController.bootstrap(device, rootBandwidthMbit, yarnBandwidthMbit);
return null;
}
/**
* Pre-start hook for 'outbound-bandwidth' resource. A cgroup is created
* and a net_cls classid is generated and written to a cgroup file. A
* traffic control shaping rule is created in order to limit outbound
* bandwidth utilization.
* @param container Container being launched
* @return privileged operations for some cgroups/tc operations.
* @throws ResourceHandlerException
*/
@Override
public List<PrivilegedOperation> preStart(Container container)
throws ResourceHandlerException {
String containerIdStr = container.getContainerId().toString();
int classId = trafficController.getNextClassId();
String classIdStr = trafficController.getStringForNetClsClassId(classId);
cGroupsHandler.createCGroup(CGroupsHandler.CGroupController
.NET_CLS,
containerIdStr);
cGroupsHandler.updateCGroupParam(CGroupsHandler.CGroupController.NET_CLS,
containerIdStr, CGroupsHandler.CGROUP_PARAM_CLASSID, classIdStr);
containerIdClassIdMap.put(container.getContainerId(), classId);
//Now create a privileged operation in order to update the tasks file with
//the pid of the running container process (root of process tree). This can
//only be done at the time of launching the container, in a privileged
//executable.
String tasksFile = cGroupsHandler.getPathForCGroupTasks(
CGroupsHandler.CGroupController.NET_CLS, containerIdStr);
String opArg = new StringBuffer(PrivilegedOperation.CGROUP_ARG_PREFIX)
.append(tasksFile).toString();
List<PrivilegedOperation> ops = new ArrayList<>();
ops.add(new PrivilegedOperation(
PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP, opArg));
//Create a privileged operation to create a tc rule for this container
//We'll return this to the calling (Linux) Container Executor
//implementation for batching optimizations so that we don't fork/exec
//additional times during container launch.
TrafficController.BatchBuilder builder = trafficController.new
BatchBuilder(PrivilegedOperation.OperationType.TC_MODIFY_STATE);
builder.addContainerClass(classId, containerBandwidthMbit, strictMode);
ops.add(builder.commitBatchToTempFile());
return ops;
}
/**
* Reacquires state for a container - reads the classid from the cgroup
* being used for the container being reacquired
* @param containerId if of the container being reacquired.
* @return (potentially empty) list of privileged operations
* @throws ResourceHandlerException
*/
@Override
public List<PrivilegedOperation> reacquireContainer(ContainerId containerId)
throws ResourceHandlerException {
String containerIdStr = containerId.toString();
if (LOG.isDebugEnabled()) {
LOG.debug("Attempting to reacquire classId for container: " +
containerIdStr);
}
String classIdStrFromFile = cGroupsHandler.getCGroupParam(
CGroupsHandler.CGroupController.NET_CLS, containerIdStr,
CGroupsHandler.CGROUP_PARAM_CLASSID);
int classId = trafficController
.getClassIdFromFileContents(classIdStrFromFile);
LOG.info("Reacquired containerId -> classId mapping: " + containerIdStr
+ " -> " + classId);
containerIdClassIdMap.put(containerId, classId);
return null;
}
/**
* Returns total bytes sent per container to be used for metrics tracking
* purposes.
* @return a map of containerId to bytes sent
* @throws ResourceHandlerException
*/
public Map<ContainerId, Integer> getBytesSentPerContainer()
throws ResourceHandlerException {
Map<Integer, Integer> classIdStats = trafficController.readStats();
Map<ContainerId, Integer> containerIdStats = new HashMap<>();
for (Map.Entry<ContainerId, Integer> entry : containerIdClassIdMap
.entrySet()) {
ContainerId containerId = entry.getKey();
Integer classId = entry.getValue();
Integer bytesSent = classIdStats.get(classId);
if (bytesSent == null) {
LOG.warn("No bytes sent metric found for container: " + containerId +
" with classId: " + classId);
continue;
}
containerIdStats.put(containerId, bytesSent);
}
return containerIdStats;
}
/**
* Cleanup operations once container is completed - deletes cgroup and
* removes traffic shaping rule(s).
* @param containerId of the container that was completed.
* @return
* @throws ResourceHandlerException
*/
@Override
public List<PrivilegedOperation> postComplete(ContainerId containerId)
throws ResourceHandlerException {
LOG.info("postComplete for container: " + containerId.toString());
cGroupsHandler.deleteCGroup(CGroupsHandler.CGroupController.NET_CLS,
containerId.toString());
Integer classId = containerIdClassIdMap.get(containerId);
if (classId != null) {
PrivilegedOperation op = trafficController.new
BatchBuilder(PrivilegedOperation.OperationType.TC_MODIFY_STATE)
.deleteContainerClass(classId).commitBatchToTempFile();
try {
privilegedOperationExecutor.executePrivilegedOperation(op, false);
trafficController.releaseClassId(classId);
} catch (PrivilegedOperationException e) {
LOG.warn("Failed to delete tc rule for classId: " + classId);
throw new ResourceHandlerException(
"Failed to delete tc rule for classId:" + classId);
}
} else {
LOG.warn("Not cleaning up tc rules. classId unknown for container: " +
containerId.toString());
}
return null;
}
@Override
public List<PrivilegedOperation> teardown()
throws ResourceHandlerException {
if (LOG.isDebugEnabled()) {
LOG.debug("teardown(): Nothing to do");
}
return null;
}
}

View File

@ -0,0 +1,650 @@
/*
* *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* /
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationException;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
import java.io.*;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* Wrapper around the 'tc' tool. Provides access to a very specific subset of
* the functionality provided by the tc tool.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable class TrafficController {
private static final Log LOG = LogFactory.getLog(TrafficController.class);
private static final int ROOT_QDISC_HANDLE = 42;
private static final int ZERO_CLASS_ID = 0;
private static final int ROOT_CLASS_ID = 1;
/** Traffic shaping class used for all unclassified traffic */
private static final int DEFAULT_CLASS_ID = 2;
/** Traffic shaping class used for all YARN traffic */
private static final int YARN_ROOT_CLASS_ID = 3;
/** Classes 0-3 are used already. We need to ensure that container classes
* do not collide with these classids.
*/
private static final int MIN_CONTAINER_CLASS_ID = 4;
/** This is the number of distinct (container) traffic shaping classes
* that are supported */
private static final int MAX_CONTAINER_CLASSES = 1024;
private static final String MBIT_SUFFIX = "mbit";
private static final String TMP_FILE_PREFIX = "tc.";
private static final String TMP_FILE_SUFFIX = ".cmds";
/** Root queuing discipline attached to the root of the interface */
private static final String FORMAT_QDISC_ADD_TO_ROOT_WITH_DEFAULT =
"qdisc add dev %s root handle %d: htb default %s";
/** Specifies a cgroup/classid based filter - based on the classid associated
* with the outbound packet, the corresponding traffic shaping rule is used
* . Please see tc documentation for additional details.
*/
private static final String FORMAT_FILTER_CGROUP_ADD_TO_PARENT =
"filter add dev %s parent %d: protocol ip prio 10 handle 1: cgroup";
/** Standard format for adding a traffic shaping class to a parent, with
* the specified bandwidth limits
*/
private static final String FORMAT_CLASS_ADD_TO_PARENT_WITH_RATES =
"class add dev %s parent %d:%d classid %d:%d htb rate %s ceil %s";
/** Standard format to delete a traffic shaping class */
private static final String FORMAT_DELETE_CLASS =
"class del dev %s classid %d:%d";
/** Format of the classid that is to be used with the net_cls cgroup. Needs
* to be of the form 0xAAAABBBB */
private static final String FORMAT_NET_CLS_CLASS_ID = "0x%04d%04d";
/** Commands to read the qdsic(s)/filter(s)/class(es) associated with an
* interface
*/
private static final String FORMAT_READ_STATE =
"qdisc show dev %1$s%n" +
"filter show dev %1$s%n" +
"class show dev %1$s";
private static final String FORMAT_READ_CLASSES = "class show dev %s";
/** Delete a qdisc and all its children - classes/filters etc */
private static final String FORMAT_WIPE_STATE =
"qdisc del dev %s parent root";
private final Configuration conf;
//Used to store the set of classids in use for container classes
private final BitSet classIdSet;
private final PrivilegedOperationExecutor privilegedOperationExecutor;
private String tmpDirPath;
private String device;
private int rootBandwidthMbit;
private int yarnBandwidthMbit;
private int defaultClassBandwidthMbit;
TrafficController(Configuration conf, PrivilegedOperationExecutor exec) {
this.conf = conf;
this.classIdSet = new BitSet(MAX_CONTAINER_CLASSES);
this.privilegedOperationExecutor = exec;
}
/**
* Bootstrap tc configuration
*/
public void bootstrap(String device, int rootBandwidthMbit, int
yarnBandwidthMbit)
throws ResourceHandlerException {
if (device == null) {
throw new ResourceHandlerException("device cannot be null!");
}
String tmpDirBase = conf.get("hadoop.tmp.dir");
if (tmpDirBase == null) {
throw new ResourceHandlerException("hadoop.tmp.dir not set!");
}
tmpDirPath = tmpDirBase + "/nm-tc-rules";
File tmpDir = new File(tmpDirPath);
if (!(tmpDir.exists() || tmpDir.mkdirs())) {
LOG.warn("Unable to create directory: " + tmpDirPath);
throw new ResourceHandlerException("Unable to create directory: " +
tmpDirPath);
}
this.device = device;
this.rootBandwidthMbit = rootBandwidthMbit;
this.yarnBandwidthMbit = yarnBandwidthMbit;
defaultClassBandwidthMbit = (rootBandwidthMbit - yarnBandwidthMbit) <= 0
? rootBandwidthMbit : (rootBandwidthMbit - yarnBandwidthMbit);
boolean recoveryEnabled = conf.getBoolean(YarnConfiguration
.NM_RECOVERY_ENABLED, YarnConfiguration.DEFAULT_NM_RECOVERY_ENABLED);
String state = null;
if (!recoveryEnabled) {
LOG.info("NM recovery is not enabled. We'll wipe tc state before proceeding.");
} else {
//NM recovery enabled - run a state check
state = readState();
if (checkIfAlreadyBootstrapped(state)) {
LOG.info("TC configuration is already in place. Not wiping state.");
//We already have the list of existing container classes, if any
//that were created after bootstrapping
reacquireContainerClasses(state);
return;
} else {
LOG.info("TC configuration is incomplete. Wiping tc state before proceeding");
}
}
wipeState(); //start over in case preview bootstrap was incomplete
initializeState();
}
private void initializeState() throws ResourceHandlerException {
LOG.info("Initializing tc state.");
BatchBuilder builder = new BatchBuilder(PrivilegedOperation.
OperationType.TC_MODIFY_STATE)
.addRootQDisc()
.addCGroupFilter()
.addClassToRootQDisc(rootBandwidthMbit)
.addDefaultClass(defaultClassBandwidthMbit, rootBandwidthMbit)
//yarn bandwidth is capped with rate = ceil
.addYARNRootClass(yarnBandwidthMbit, yarnBandwidthMbit);
PrivilegedOperation op = builder.commitBatchToTempFile();
try {
privilegedOperationExecutor.executePrivilegedOperation(op, false);
} catch (PrivilegedOperationException e) {
LOG.warn("Failed to bootstrap outbound bandwidth configuration");
throw new ResourceHandlerException(
"Failed to bootstrap outbound bandwidth configuration", e);
}
}
/**
* Function to check if the interface in use has already been fully
* bootstrapped with the required tc configuration
*
* @return boolean indicating the result of the check
*/
private boolean checkIfAlreadyBootstrapped(String state)
throws ResourceHandlerException {
List<String> regexes = new ArrayList<>();
//root qdisc
regexes.add(String.format("^qdisc htb %d: root(.)*$",
ROOT_QDISC_HANDLE));
//cgroup filter
regexes.add(String.format("^filter parent %d: protocol ip " +
"(.)*cgroup(.)*$", ROOT_QDISC_HANDLE));
//root, default and yarn classes
regexes.add(String.format("^class htb %d:%d root(.)*$",
ROOT_QDISC_HANDLE, ROOT_CLASS_ID));
regexes.add(String.format("^class htb %d:%d parent %d:%d(.)*$",
ROOT_QDISC_HANDLE, DEFAULT_CLASS_ID, ROOT_QDISC_HANDLE, ROOT_CLASS_ID));
regexes.add(String.format("^class htb %d:%d parent %d:%d(.)*$",
ROOT_QDISC_HANDLE, YARN_ROOT_CLASS_ID, ROOT_QDISC_HANDLE,
ROOT_CLASS_ID));
for (String regex : regexes) {
Pattern pattern = Pattern.compile(regex, Pattern.MULTILINE);
if (pattern.matcher(state).find()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Matched regex: " + regex);
}
} else {
String logLine = new StringBuffer("Failed to match regex: ")
.append(regex).append(" Current state: ").append(state).toString();
LOG.warn(logLine);
return false;
}
}
LOG.info("Bootstrap check succeeded");
return true;
}
private String readState() throws ResourceHandlerException {
//Sample state output:
// qdisc htb 42: root refcnt 2 r2q 10 default 2 direct_packets_stat 0
// filter parent 42: protocol ip pref 10 cgroup handle 0x1
//
// filter parent 42: protocol ip pref 10 cgroup handle 0x1
//
// class htb 42:1 root rate 10000Kbit ceil 10000Kbit burst 1600b cburst 1600b
// class htb 42:2 parent 42:1 prio 0 rate 3000Kbit ceil 10000Kbit burst 1599b cburst 1600b
// class htb 42:3 parent 42:1 prio 0 rate 7000Kbit ceil 7000Kbit burst 1598b cburst 1598b
BatchBuilder builder = new BatchBuilder(PrivilegedOperation.
OperationType.TC_READ_STATE)
.readState();
PrivilegedOperation op = builder.commitBatchToTempFile();
try {
String output =
privilegedOperationExecutor.executePrivilegedOperation(op, true);
if (LOG.isDebugEnabled()) {
LOG.debug("TC state: %n" + output);
}
return output;
} catch (PrivilegedOperationException e) {
LOG.warn("Failed to bootstrap outbound bandwidth rules");
throw new ResourceHandlerException(
"Failed to bootstrap outbound bandwidth rules", e);
}
}
private void wipeState() throws ResourceHandlerException {
BatchBuilder builder = new BatchBuilder(PrivilegedOperation.
OperationType.TC_MODIFY_STATE)
.wipeState();
PrivilegedOperation op = builder.commitBatchToTempFile();
try {
LOG.info("Wiping tc state.");
privilegedOperationExecutor.executePrivilegedOperation(op, false);
} catch (PrivilegedOperationException e) {
LOG.warn("Failed to wipe tc state. This could happen if the interface" +
" is already in its default state. Ignoring.");
//Ignoring this exception. This could happen if the interface is already
//in its default state. For this reason we don't throw a
//ResourceHandlerException here.
}
}
/**
* Parses the current state looks for classids already in use
*/
private void reacquireContainerClasses(String state) {
//At this point we already have already successfully passed
//checkIfAlreadyBootstrapped() - so we know that at least the
//root classes are in place.
String tcClassesStr = state.substring(state.indexOf("class"));
//one class per line - the results of the split will need to trimmed
String[] tcClasses = Pattern.compile("$", Pattern.MULTILINE)
.split(tcClassesStr);
Pattern tcClassPattern = Pattern.compile(String.format(
"class htb %d:(\\d+) .*", ROOT_QDISC_HANDLE));
synchronized (classIdSet) {
for (String tcClassSplit : tcClasses) {
String tcClass = tcClassSplit.trim();
if (!tcClass.isEmpty()) {
Matcher classMatcher = tcClassPattern.matcher(tcClass);
if (classMatcher.matches()) {
int classId = Integer.parseInt(classMatcher.group(1));
if (classId >= MIN_CONTAINER_CLASS_ID) {
classIdSet.set(classId - MIN_CONTAINER_CLASS_ID);
LOG.info("Reacquired container classid: " + classId);
}
} else {
LOG.warn("Unable to match classid in string:" + tcClass);
}
}
}
}
}
public Map<Integer, Integer> readStats() throws ResourceHandlerException {
BatchBuilder builder = new BatchBuilder(PrivilegedOperation.
OperationType.TC_READ_STATS)
.readClasses();
PrivilegedOperation op = builder.commitBatchToTempFile();
try {
String output =
privilegedOperationExecutor.executePrivilegedOperation(op, true);
if (LOG.isDebugEnabled()) {
LOG.debug("TC stats output:" + output);
}
Map<Integer, Integer> classIdBytesStats = parseStatsString(output);
if (LOG.isDebugEnabled()) {
LOG.debug("classId -> bytes sent %n" + classIdBytesStats);
}
return classIdBytesStats;
} catch (PrivilegedOperationException e) {
LOG.warn("Failed to get tc stats");
throw new ResourceHandlerException("Failed to get tc stats", e);
}
}
private Map<Integer, Integer> parseStatsString(String stats) {
//Example class stats segment (multiple present in tc output)
// class htb 42:4 parent 42:3 prio 0 rate 1000Kbit ceil 7000Kbit burst1600b cburst 1598b
// Sent 77921300 bytes 52617 pkt (dropped 0, overlimits 0 requeues 0)
// rate 6973Kbit 589pps backlog 0b 39p requeues 0
// lended: 3753 borrowed: 22514 giants: 0
// tokens: -122164 ctokens: -52488
String[] lines = Pattern.compile("$", Pattern.MULTILINE)
.split(stats);
Pattern tcClassPattern = Pattern.compile(String.format(
"class htb %d:(\\d+) .*", ROOT_QDISC_HANDLE));
Pattern bytesPattern = Pattern.compile("Sent (\\d+) bytes.*");
int currentClassId = -1;
Map<Integer, Integer> containerClassIdStats = new HashMap<>();
for (String lineSplit : lines) {
String line = lineSplit.trim();
if (!line.isEmpty()) {
//Check if we encountered a stats segment for a container class
Matcher classMatcher = tcClassPattern.matcher(line);
if (classMatcher.matches()) {
int classId = Integer.parseInt(classMatcher.group(1));
if (classId >= MIN_CONTAINER_CLASS_ID) {
currentClassId = classId;
continue;
}
}
//Check if we encountered a stats line
Matcher bytesMatcher = bytesPattern.matcher(line);
if (bytesMatcher.matches()) {
//we found at least one class segment
if (currentClassId != -1) {
int bytes = Integer.parseInt(bytesMatcher.group(1));
containerClassIdStats.put(currentClassId, bytes);
} else {
LOG.warn("Matched a 'bytes sent' line outside of a class stats " +
"segment : " + line);
}
continue;
}
//skip other kinds of non-empty lines - since we aren't interested in
//them.
}
}
return containerClassIdStats;
}
/**
* Returns a formatted string for attaching a qdisc to the root of the
* device/interface. Additional qdisc
* parameters can be supplied - for example, the default 'class' to use for
* incoming packets
*/
private String getStringForAddRootQDisc() {
return String.format(FORMAT_QDISC_ADD_TO_ROOT_WITH_DEFAULT, device,
ROOT_QDISC_HANDLE, DEFAULT_CLASS_ID);
}
/**
* Returns a formatted string for a filter that matches packets based on the
* presence of net_cls classids
*/
private String getStringForaAddCGroupFilter() {
return String.format(FORMAT_FILTER_CGROUP_ADD_TO_PARENT, device,
ROOT_QDISC_HANDLE);
}
/**
* Get the next available classid. This has to be released post container
* complete
*/
public int getNextClassId() throws ResourceHandlerException {
synchronized (classIdSet) {
int index = classIdSet.nextClearBit(0);
if (index >= MAX_CONTAINER_CLASSES) {
throw new ResourceHandlerException("Reached max container classes: "
+ MAX_CONTAINER_CLASSES);
}
classIdSet.set(index);
return (index + MIN_CONTAINER_CLASS_ID);
}
}
public void releaseClassId(int classId) throws ResourceHandlerException {
synchronized (classIdSet) {
int index = classId - MIN_CONTAINER_CLASS_ID;
if (index < 0 || index >= MAX_CONTAINER_CLASSES) {
throw new ResourceHandlerException("Invalid incoming classId: "
+ classId);
}
classIdSet.clear(index);
}
}
/**
* Returns a formatted string representing the given classId including a
* handle
*/
public String getStringForNetClsClassId(int classId) {
return String.format(FORMAT_NET_CLS_CLASS_ID, ROOT_QDISC_HANDLE, classId);
}
/**
* A value read out of net_cls.classid file is in decimal form. We need to
* convert to 32-bit/8 digit hex, extract the lower 16-bit/four digits
* as an int
*/
public int getClassIdFromFileContents(String input) {
//convert from decimal back to fixed size hex form
//e.g 4325381 -> 00420005
String classIdStr = String.format("%08x", Integer.parseInt(input));
if (LOG.isDebugEnabled()) {
LOG.debug("ClassId hex string : " + classIdStr);
}
//extract and return 4 digits
//e.g 00420005 -> 0005
return Integer.parseInt(classIdStr.substring(4));
}
/**
* Adds a tc class to qdisc at root
*/
private String getStringForAddClassToRootQDisc(int rateMbit) {
String rateMbitStr = rateMbit + MBIT_SUFFIX;
//example : "class add dev eth0 parent 42:0 classid 42:1 htb rate 1000mbit
// ceil 1000mbit"
return String.format(FORMAT_CLASS_ADD_TO_PARENT_WITH_RATES, device,
ROOT_QDISC_HANDLE, ZERO_CLASS_ID, ROOT_QDISC_HANDLE, ROOT_CLASS_ID,
rateMbitStr, rateMbitStr);
}
private String getStringForAddDefaultClass(int rateMbit, int ceilMbit) {
String rateMbitStr = rateMbit + MBIT_SUFFIX;
String ceilMbitStr = ceilMbit + MBIT_SUFFIX;
//example : "class add dev eth0 parent 42:1 classid 42:2 htb rate 300mbit
// ceil 1000mbit"
return String.format(FORMAT_CLASS_ADD_TO_PARENT_WITH_RATES, device,
ROOT_QDISC_HANDLE, ROOT_CLASS_ID, ROOT_QDISC_HANDLE, DEFAULT_CLASS_ID,
rateMbitStr, ceilMbitStr);
}
private String getStringForAddYARNRootClass(int rateMbit, int ceilMbit) {
String rateMbitStr = rateMbit + MBIT_SUFFIX;
String ceilMbitStr = ceilMbit + MBIT_SUFFIX;
//example : "class add dev eth0 parent 42:1 classid 42:3 htb rate 700mbit
// ceil 1000mbit"
return String.format(FORMAT_CLASS_ADD_TO_PARENT_WITH_RATES, device,
ROOT_QDISC_HANDLE, ROOT_CLASS_ID, ROOT_QDISC_HANDLE, YARN_ROOT_CLASS_ID,
rateMbitStr, ceilMbitStr);
}
private String getStringForAddContainerClass(int classId, int rateMbit, int
ceilMbit) {
String rateMbitStr = rateMbit + MBIT_SUFFIX;
String ceilMbitStr = ceilMbit + MBIT_SUFFIX;
//example : "class add dev eth0 parent 42:99 classid 42:99 htb rate 50mbit
// ceil 700mbit"
return String.format(FORMAT_CLASS_ADD_TO_PARENT_WITH_RATES, device,
ROOT_QDISC_HANDLE, YARN_ROOT_CLASS_ID, ROOT_QDISC_HANDLE, classId,
rateMbitStr, ceilMbitStr);
}
private String getStringForDeleteContainerClass(int classId) {
//example "class del dev eth0 classid 42:7"
return String.format(FORMAT_DELETE_CLASS, device, ROOT_QDISC_HANDLE,
classId);
}
private String getStringForReadState() {
return String.format(FORMAT_READ_STATE, device);
}
private String getStringForReadClasses() {
return String.format(FORMAT_READ_CLASSES, device);
}
private String getStringForWipeState() {
return String.format(FORMAT_WIPE_STATE, device);
}
public class BatchBuilder {
final PrivilegedOperation operation;
final List<String> commands;
public BatchBuilder(PrivilegedOperation.OperationType opType)
throws ResourceHandlerException {
switch (opType) {
case TC_MODIFY_STATE:
case TC_READ_STATE:
case TC_READ_STATS:
operation = new PrivilegedOperation(opType, (String) null);
commands = new ArrayList<>();
break;
default:
throw new ResourceHandlerException("Not a tc operation type : " +
opType);
}
}
private BatchBuilder addRootQDisc() {
commands.add(getStringForAddRootQDisc());
return this;
}
private BatchBuilder addCGroupFilter() {
commands.add(getStringForaAddCGroupFilter());
return this;
}
private BatchBuilder addClassToRootQDisc(int rateMbit) {
commands.add(getStringForAddClassToRootQDisc(rateMbit));
return this;
}
private BatchBuilder addDefaultClass(int rateMbit, int ceilMbit) {
commands.add(getStringForAddDefaultClass(rateMbit, ceilMbit));
return this;
}
private BatchBuilder addYARNRootClass(int rateMbit, int ceilMbit) {
commands.add(getStringForAddYARNRootClass(rateMbit, ceilMbit));
return this;
}
public BatchBuilder addContainerClass(int classId, int rateMbit, boolean
strictMode) {
int ceilMbit;
if (strictMode) {
ceilMbit = rateMbit;
} else {
ceilMbit = yarnBandwidthMbit;
}
commands.add(getStringForAddContainerClass(classId, rateMbit, ceilMbit));
return this;
}
public BatchBuilder deleteContainerClass(int classId) {
commands.add(getStringForDeleteContainerClass(classId));
return this;
}
private BatchBuilder readState() {
commands.add(getStringForReadState());
return this;
}
//We'll read all classes, but use a different tc operation type
//when reading stats for all these classes. Stats are fetched using a
//different tc cli option (-s).
private BatchBuilder readClasses() {
//We'll read all classes, but use a different tc operation type
//for reading stats for all these classes. Stats are fetched using a
//different tc cli option (-s).
commands.add(getStringForReadClasses());
return this;
}
private BatchBuilder wipeState() {
commands.add(getStringForWipeState());
return this;
}
public PrivilegedOperation commitBatchToTempFile()
throws ResourceHandlerException {
try {
File tcCmds = File.createTempFile(TMP_FILE_PREFIX, TMP_FILE_SUFFIX, new
File(tmpDirPath));
Writer writer = new OutputStreamWriter(new FileOutputStream(tcCmds),
"UTF-8");
PrintWriter printWriter = new PrintWriter(writer);
for (String command : commands) {
printWriter.println(command);
}
printWriter.close();
operation.appendArgs(tcCmds.getAbsolutePath());
return operation;
} catch (IOException e) {
LOG.warn("Failed to create or write to temporary file in dir: " +
tmpDirPath);
throw new ResourceHandlerException(
"Failed to create or write to temporary file in dir: "
+ tmpDirPath);
}
}
} //end BatchBuilder
}

View File

@ -0,0 +1,78 @@
/*
* *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* /
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.List;
public class TestResourceHandlerModule {
private static final Log LOG = LogFactory.
getLog(TestResourceHandlerModule.class);
Configuration emptyConf;
Configuration networkEnabledConf;
@Before
public void setup() {
emptyConf = new YarnConfiguration();
networkEnabledConf = new YarnConfiguration();
networkEnabledConf.setBoolean(YarnConfiguration.NM_NETWORK_RESOURCE_ENABLED,
true);
//We need to bypass mtab parsing for figuring out cgroups mount locations
networkEnabledConf.setBoolean(YarnConfiguration
.NM_LINUX_CONTAINER_CGROUPS_MOUNT, true);
}
@Test
public void testOutboundBandwidthHandler() {
try {
//This resourceHandler should be non-null only if network as a resource
//is explicitly enabled
OutboundBandwidthResourceHandler resourceHandler = ResourceHandlerModule
.getOutboundBandwidthResourceHandler(emptyConf);
Assert.assertNull(resourceHandler);
//When network as a resource is enabled this should be non-null
resourceHandler = ResourceHandlerModule
.getOutboundBandwidthResourceHandler(networkEnabledConf);
Assert.assertNotNull(resourceHandler);
//Ensure that outbound bandwidth resource handler is present in the chain
ResourceHandlerChain resourceHandlerChain = ResourceHandlerModule
.getConfiguredResourceHandlerChain(networkEnabledConf);
List<ResourceHandler> resourceHandlers = resourceHandlerChain
.getResourceHandlerList();
//Exactly one resource handler in chain
Assert.assertEquals(resourceHandlers.size(), 1);
//Same instance is expected to be in the chain.
Assert.assertTrue(resourceHandlers.get(0) == resourceHandler);
} catch (ResourceHandlerException e) {
Assert.fail("Unexpected ResourceHandlerException: " + e);
}
}
}

View File

@ -0,0 +1,231 @@
/*
* *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* /
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationException;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import java.io.File;
import java.util.List;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
public class TestTrafficControlBandwidthHandlerImpl {
private static final Log LOG =
LogFactory.getLog(TestTrafficControlBandwidthHandlerImpl.class);
private static final int ROOT_BANDWIDTH_MBIT = 100;
private static final int YARN_BANDWIDTH_MBIT = 70;
private static final int TEST_CLASSID = 100;
private static final String TEST_CLASSID_STR = "42:100";
private static final String TEST_CONTAINER_ID_STR = "container_01";
private static final String TEST_TASKS_FILE = "testTasksFile";
private PrivilegedOperationExecutor privilegedOperationExecutorMock;
private CGroupsHandler cGroupsHandlerMock;
private TrafficController trafficControllerMock;
private Configuration conf;
private String tmpPath;
private String device;
ContainerId containerIdMock;
Container containerMock;
@Before
public void setup() {
privilegedOperationExecutorMock = mock(PrivilegedOperationExecutor.class);
cGroupsHandlerMock = mock(CGroupsHandler.class);
trafficControllerMock = mock(TrafficController.class);
conf = new YarnConfiguration();
tmpPath = new StringBuffer(System.getProperty("test.build.data")).append
('/').append("hadoop.tmp.dir").toString();
device = YarnConfiguration.DEFAULT_NM_NETWORK_RESOURCE_INTERFACE;
containerIdMock = mock(ContainerId.class);
containerMock = mock(Container.class);
when(containerIdMock.toString()).thenReturn(TEST_CONTAINER_ID_STR);
//mock returning a mock - an angel died somewhere.
when(containerMock.getContainerId()).thenReturn(containerIdMock);
conf.setInt(YarnConfiguration
.NM_NETWORK_RESOURCE_OUTBOUND_BANDWIDTH_MBIT, ROOT_BANDWIDTH_MBIT);
conf.setInt(YarnConfiguration
.NM_NETWORK_RESOURCE_OUTBOUND_BANDWIDTH_YARN_MBIT, YARN_BANDWIDTH_MBIT);
conf.set("hadoop.tmp.dir", tmpPath);
//In these tests, we'll only use TrafficController with recovery disabled
conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, false);
}
@Test
public void testBootstrap() {
TrafficControlBandwidthHandlerImpl handlerImpl = new
TrafficControlBandwidthHandlerImpl(privilegedOperationExecutorMock,
cGroupsHandlerMock, trafficControllerMock);
try {
handlerImpl.bootstrap(conf);
verify(cGroupsHandlerMock).mountCGroupController(
eq(CGroupsHandler.CGroupController.NET_CLS));
verifyNoMoreInteractions(cGroupsHandlerMock);
verify(trafficControllerMock).bootstrap(eq(device),
eq(ROOT_BANDWIDTH_MBIT),
eq(YARN_BANDWIDTH_MBIT));
verifyNoMoreInteractions(trafficControllerMock);
} catch (ResourceHandlerException e) {
LOG.error("Unexpected exception: " + e);
Assert.fail("Caught unexpected ResourceHandlerException!");
}
}
@Test
public void testLifeCycle() {
TrafficController trafficControllerSpy = spy(new TrafficController(conf,
privilegedOperationExecutorMock));
TrafficControlBandwidthHandlerImpl handlerImpl = new
TrafficControlBandwidthHandlerImpl(privilegedOperationExecutorMock,
cGroupsHandlerMock, trafficControllerSpy);
try {
handlerImpl.bootstrap(conf);
testPreStart(trafficControllerSpy, handlerImpl);
testPostComplete(trafficControllerSpy, handlerImpl);
} catch (ResourceHandlerException e) {
LOG.error("Unexpected exception: " + e);
Assert.fail("Caught unexpected ResourceHandlerException!");
}
}
private void testPreStart(TrafficController trafficControllerSpy,
TrafficControlBandwidthHandlerImpl handlerImpl) throws
ResourceHandlerException {
//This is not the cleanest of solutions - but since we are testing the
//preStart/postComplete lifecycle, we don't have a different way of
//handling this - we don't keep track of the number of invocations by
//a class we are not testing here (TrafficController)
//So, we'll reset this mock. This is not a problem with other mocks.
reset(privilegedOperationExecutorMock);
doReturn(TEST_CLASSID).when(trafficControllerSpy).getNextClassId();
doReturn(TEST_CLASSID_STR).when(trafficControllerSpy)
.getStringForNetClsClassId(TEST_CLASSID);
when(cGroupsHandlerMock.getPathForCGroupTasks(CGroupsHandler
.CGroupController.NET_CLS, TEST_CONTAINER_ID_STR)).thenReturn(
TEST_TASKS_FILE);
List<PrivilegedOperation> ops = handlerImpl.preStart(containerMock);
//Ensure that cgroups is created and updated as expected
verify(cGroupsHandlerMock).createCGroup(
eq(CGroupsHandler.CGroupController.NET_CLS), eq(TEST_CONTAINER_ID_STR));
verify(cGroupsHandlerMock).updateCGroupParam(
eq(CGroupsHandler.CGroupController.NET_CLS), eq(TEST_CONTAINER_ID_STR),
eq(CGroupsHandler.CGROUP_PARAM_CLASSID), eq(TEST_CLASSID_STR));
//Now check the privileged operations being returned
//We expect two operations - one for adding pid to tasks file and another
//for a tc modify operation
Assert.assertEquals(2, ops.size());
//Verify that the add pid op is correct
PrivilegedOperation addPidOp = ops.get(0);
String expectedAddPidOpArg = PrivilegedOperation.CGROUP_ARG_PREFIX +
TEST_TASKS_FILE;
List<String> addPidOpArgs = addPidOp.getArguments();
Assert.assertEquals(PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP,
addPidOp.getOperationType());
Assert.assertEquals(1, addPidOpArgs.size());
Assert.assertEquals(expectedAddPidOpArg, addPidOpArgs.get(0));
//Verify that that tc modify op is correct
PrivilegedOperation tcModifyOp = ops.get(1);
List<String> tcModifyOpArgs = tcModifyOp.getArguments();
Assert.assertEquals(PrivilegedOperation.OperationType.TC_MODIFY_STATE,
tcModifyOp.getOperationType());
Assert.assertEquals(1, tcModifyOpArgs.size());
//verify that the tc command file exists
Assert.assertTrue(new File(tcModifyOpArgs.get(0)).exists());
}
private void testPostComplete(TrafficController trafficControllerSpy,
TrafficControlBandwidthHandlerImpl handlerImpl) throws
ResourceHandlerException {
//This is not the cleanest of solutions - but since we are testing the
//preStart/postComplete lifecycle, we don't have a different way of
//handling this - we don't keep track of the number of invocations by
//a class we are not testing here (TrafficController)
//So, we'll reset this mock. This is not a problem with other mocks.
reset(privilegedOperationExecutorMock);
List<PrivilegedOperation> ops = handlerImpl.postComplete(containerIdMock);
verify(cGroupsHandlerMock).deleteCGroup(
eq(CGroupsHandler.CGroupController.NET_CLS), eq(TEST_CONTAINER_ID_STR));
try {
//capture privileged op argument and ensure it is correct
ArgumentCaptor<PrivilegedOperation> opCaptor = ArgumentCaptor.forClass
(PrivilegedOperation.class);
verify(privilegedOperationExecutorMock)
.executePrivilegedOperation(opCaptor.capture(), eq(false));
List<String> args = opCaptor.getValue().getArguments();
Assert.assertEquals(PrivilegedOperation.OperationType.TC_MODIFY_STATE,
opCaptor.getValue().getOperationType());
Assert.assertEquals(1, args.size());
//ensure that tc command file exists
Assert.assertTrue(new File(args.get(0)).exists());
verify(trafficControllerSpy).releaseClassId(TEST_CLASSID);
} catch (PrivilegedOperationException e) {
LOG.error("Caught exception: " + e);
Assert.fail("Unexpected PrivilegedOperationException from mock!");
}
//We don't expect any operations to be returned here
Assert.assertNull(ops);
}
@After
public void teardown() {
FileUtil.fullyDelete(new File(tmpPath));
}
}

View File

@ -0,0 +1,327 @@
/*
* *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* /
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationException;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.List;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class TestTrafficController {
private static final Log LOG = LogFactory.getLog(TestTrafficController.class);
private static final int ROOT_BANDWIDTH_MBIT = 100;
private static final int YARN_BANDWIDTH_MBIT = 70;
private static final int CONTAINER_BANDWIDTH_MBIT = 10;
//These constants are closely tied to the implementation of TrafficController
//and will have to be modified in tandem with any related TrafficController
//changes.
private static final String DEVICE = "eth0";
private static final String WIPE_STATE_CMD = "qdisc del dev eth0 parent root";
private static final String ADD_ROOT_QDISC_CMD =
"qdisc add dev eth0 root handle 42: htb default 2";
private static final String ADD_CGROUP_FILTER_CMD =
"filter add dev eth0 parent 42: protocol ip prio 10 handle 1: cgroup";
private static final String ADD_ROOT_CLASS_CMD =
"class add dev eth0 parent 42:0 classid 42:1 htb rate 100mbit ceil 100mbit";
private static final String ADD_DEFAULT_CLASS_CMD =
"class add dev eth0 parent 42:1 classid 42:2 htb rate 30mbit ceil 100mbit";
private static final String ADD_YARN_CLASS_CMD =
"class add dev eth0 parent 42:1 classid 42:3 htb rate 70mbit ceil 70mbit";
private static final String DEFAULT_TC_STATE_EXAMPLE =
"qdisc pfifo_fast 0: root refcnt 2 bands 3 priomap 1 2 2 2 1 2 0 0 1 1 1 1 1 1 1 1";
private static final String READ_QDISC_CMD = "qdisc show dev eth0";
private static final String READ_FILTER_CMD = "filter show dev eth0";
private static final String READ_CLASS_CMD = "class show dev eth0";
private static final int MIN_CONTAINER_CLASS_ID = 4;
private static final String FORMAT_CONTAINER_CLASS_STR = "0x0042%04d";
private static final String FORMAT_ADD_CONTAINER_CLASS_TO_DEVICE =
"class add dev eth0 parent 42:3 classid 42:%d htb rate 10mbit ceil %dmbit";
private static final String FORAMT_DELETE_CONTAINER_CLASS_FROM_DEVICE =
"class del dev eth0 classid 42:%d";
private static final int TEST_CLASS_ID = 97;
//decimal form of 0x00420097 - when reading a classid file, it is read out
//as decimal
private static final String TEST_CLASS_ID_DECIMAL_STR = "4325527";
private Configuration conf;
private String tmpPath;
private PrivilegedOperationExecutor privilegedOperationExecutorMock;
@Before
public void setup() {
privilegedOperationExecutorMock = mock(PrivilegedOperationExecutor.class);
conf = new YarnConfiguration();
tmpPath = new StringBuffer(System.getProperty("test.build.data")).append
('/').append("hadoop.tmp.dir").toString();
conf.set("hadoop.tmp.dir", tmpPath);
}
private void verifyTrafficControlOperation(PrivilegedOperation op,
PrivilegedOperation.OperationType expectedOpType,
List<String> expectedTcCmds)
throws IOException {
//Verify that the optype matches
Assert.assertEquals(expectedOpType, op.getOperationType());
List<String> args = op.getArguments();
//Verify that arg count is always 1 (tc command file) for a tc operation
Assert.assertEquals(1, args.size());
File tcCmdsFile = new File(args.get(0));
//Verify that command file exists
Assert.assertTrue(tcCmdsFile.exists());
List<String> tcCmds = Files.readAllLines(tcCmdsFile.toPath(),
Charset.forName("UTF-8"));
//Verify that the number of commands is the same as expected and verify
//that each command is the same, in sequence
Assert.assertEquals(expectedTcCmds.size(), tcCmds.size());
for (int i = 0; i < tcCmds.size(); ++i) {
Assert.assertEquals(expectedTcCmds.get(i), tcCmds.get(i));
}
}
@Test
public void testBootstrapRecoveryDisabled() {
conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, false);
TrafficController trafficController = new TrafficController(conf,
privilegedOperationExecutorMock);
try {
trafficController
.bootstrap(DEVICE, ROOT_BANDWIDTH_MBIT, YARN_BANDWIDTH_MBIT);
ArgumentCaptor<PrivilegedOperation> opCaptor = ArgumentCaptor.forClass
(PrivilegedOperation.class);
//NM_RECOVERY_DISABLED - so we expect two privileged operation executions
//one for wiping tc state - a second for initializing state
verify(privilegedOperationExecutorMock, times(2))
.executePrivilegedOperation(opCaptor.capture(), eq(false));
//Now verify that the two operations were correct
List<PrivilegedOperation> ops = opCaptor.getAllValues();
verifyTrafficControlOperation(ops.get(0),
PrivilegedOperation.OperationType.TC_MODIFY_STATE,
Arrays.asList(WIPE_STATE_CMD));
verifyTrafficControlOperation(ops.get(1),
PrivilegedOperation.OperationType.TC_MODIFY_STATE,
Arrays.asList(ADD_ROOT_QDISC_CMD, ADD_CGROUP_FILTER_CMD,
ADD_ROOT_CLASS_CMD, ADD_DEFAULT_CLASS_CMD, ADD_YARN_CLASS_CMD));
} catch (ResourceHandlerException | PrivilegedOperationException |
IOException e) {
LOG.error("Unexpected exception: " + e);
Assert.fail("Caught unexpected exception: "
+ e.getClass().getSimpleName());
}
}
@Test
public void testBootstrapRecoveryEnabled() {
conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
TrafficController trafficController = new TrafficController(conf,
privilegedOperationExecutorMock);
try {
//Return a default tc state when attempting to read state
when(privilegedOperationExecutorMock.executePrivilegedOperation(
any(PrivilegedOperation.class), eq(true)))
.thenReturn(DEFAULT_TC_STATE_EXAMPLE);
trafficController
.bootstrap(DEVICE, ROOT_BANDWIDTH_MBIT, YARN_BANDWIDTH_MBIT);
ArgumentCaptor<PrivilegedOperation> readOpCaptor = ArgumentCaptor.forClass
(PrivilegedOperation.class);
//NM_RECOVERY_ENABLED - so we expect three privileged operation executions
//1) read tc state 2) wipe tc state 3) init tc state
//one for wiping tc state - a second for initializing state
//First, verify read op
verify(privilegedOperationExecutorMock, times(1))
.executePrivilegedOperation(readOpCaptor.capture(), eq(true));
List<PrivilegedOperation> readOps = readOpCaptor.getAllValues();
verifyTrafficControlOperation(readOps.get(0),
PrivilegedOperation.OperationType.TC_READ_STATE,
Arrays.asList(READ_QDISC_CMD, READ_FILTER_CMD, READ_CLASS_CMD));
ArgumentCaptor<PrivilegedOperation> writeOpCaptor = ArgumentCaptor
.forClass(PrivilegedOperation.class);
verify(privilegedOperationExecutorMock, times(2))
.executePrivilegedOperation(writeOpCaptor.capture(), eq(false));
//Now verify that the two write operations were correct
List<PrivilegedOperation> writeOps = writeOpCaptor.getAllValues();
verifyTrafficControlOperation(writeOps.get(0),
PrivilegedOperation.OperationType.TC_MODIFY_STATE,
Arrays.asList(WIPE_STATE_CMD));
verifyTrafficControlOperation(writeOps.get(1),
PrivilegedOperation.OperationType.TC_MODIFY_STATE,
Arrays.asList(ADD_ROOT_QDISC_CMD, ADD_CGROUP_FILTER_CMD,
ADD_ROOT_CLASS_CMD, ADD_DEFAULT_CLASS_CMD, ADD_YARN_CLASS_CMD));
} catch (ResourceHandlerException | PrivilegedOperationException |
IOException e) {
LOG.error("Unexpected exception: " + e);
Assert.fail("Caught unexpected exception: "
+ e.getClass().getSimpleName());
}
}
@Test
public void testInvalidBuilder() {
conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, false);
TrafficController trafficController = new TrafficController(conf,
privilegedOperationExecutorMock);
try {
trafficController
.bootstrap(DEVICE, ROOT_BANDWIDTH_MBIT, YARN_BANDWIDTH_MBIT);
try {
//Invalid op type for TC batch builder
TrafficController.BatchBuilder invalidBuilder = trafficController.
new BatchBuilder(
PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP);
Assert.fail("Invalid builder check failed!");
} catch (ResourceHandlerException e) {
//expected
}
} catch (ResourceHandlerException e) {
LOG.error("Unexpected exception: " + e);
Assert.fail("Caught unexpected exception: "
+ e.getClass().getSimpleName());
}
}
@Test
public void testClassIdFileContentParsing() {
conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, false);
TrafficController trafficController = new TrafficController(conf,
privilegedOperationExecutorMock);
//Verify that classid file contents are parsed correctly
//This call strips the QDISC prefix and returns the classid asociated with
//the container
int parsedClassId = trafficController.getClassIdFromFileContents
(TEST_CLASS_ID_DECIMAL_STR);
Assert.assertEquals(TEST_CLASS_ID, parsedClassId);
}
@Test
public void testContainerOperations() {
conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, false);
TrafficController trafficController = new TrafficController(conf,
privilegedOperationExecutorMock);
try {
trafficController
.bootstrap(DEVICE, ROOT_BANDWIDTH_MBIT, YARN_BANDWIDTH_MBIT);
int classId = trafficController.getNextClassId();
Assert.assertTrue(classId >= MIN_CONTAINER_CLASS_ID);
Assert.assertEquals(String.format(FORMAT_CONTAINER_CLASS_STR, classId),
trafficController.getStringForNetClsClassId(classId));
//Verify that the operation is setup correctly with strictMode = false
TrafficController.BatchBuilder builder = trafficController.
new BatchBuilder(PrivilegedOperation.OperationType.TC_MODIFY_STATE)
.addContainerClass(classId, CONTAINER_BANDWIDTH_MBIT, false);
PrivilegedOperation addClassOp = builder.commitBatchToTempFile();
String expectedAddClassCmd = String.format
(FORMAT_ADD_CONTAINER_CLASS_TO_DEVICE, classId, YARN_BANDWIDTH_MBIT);
verifyTrafficControlOperation(addClassOp,
PrivilegedOperation.OperationType.TC_MODIFY_STATE,
Arrays.asList(expectedAddClassCmd));
//Verify that the operation is setup correctly with strictMode = true
TrafficController.BatchBuilder strictModeBuilder = trafficController.
new BatchBuilder(PrivilegedOperation.OperationType.TC_MODIFY_STATE)
.addContainerClass(classId, CONTAINER_BANDWIDTH_MBIT, true);
PrivilegedOperation addClassStrictModeOp = strictModeBuilder
.commitBatchToTempFile();
String expectedAddClassStrictModeCmd = String.format
(FORMAT_ADD_CONTAINER_CLASS_TO_DEVICE, classId,
CONTAINER_BANDWIDTH_MBIT);
verifyTrafficControlOperation(addClassStrictModeOp,
PrivilegedOperation.OperationType.TC_MODIFY_STATE,
Arrays.asList(expectedAddClassStrictModeCmd));
TrafficController.BatchBuilder deleteBuilder = trafficController.new
BatchBuilder(PrivilegedOperation.OperationType.TC_MODIFY_STATE)
.deleteContainerClass(classId);
PrivilegedOperation deleteClassOp = deleteBuilder.commitBatchToTempFile();
String expectedDeleteClassCmd = String.format
(FORAMT_DELETE_CONTAINER_CLASS_FROM_DEVICE, classId);
verifyTrafficControlOperation(deleteClassOp,
PrivilegedOperation.OperationType.TC_MODIFY_STATE,
Arrays.asList(expectedDeleteClassCmd));
} catch (ResourceHandlerException | IOException e) {
LOG.error("Unexpected exception: " + e);
Assert.fail("Caught unexpected exception: "
+ e.getClass().getSimpleName());
}
}
@After
public void teardown() {
FileUtil.fullyDelete(new File(tmpPath));
}
}