YARN-2619. Added NodeManager support for disk io isolation through cgroups. Contributed by Varun Vasudev and Wei Yan.

This commit is contained in:
Vinod Kumar Vavilapalli 2015-04-30 21:41:07 -07:00
parent 98a6176628
commit 1b3b9e5c31
12 changed files with 600 additions and 134 deletions

View File

@ -105,6 +105,9 @@ Release 2.8.0 - UNRELEASED
YARN-2498. Respect labels in preemption policy of capacity scheduler for
inter-queue preemption. (Wangda Tan via jianhe)
YARN-2619. Added NodeManager support for disk io isolation through cgroups.
(Varun Vasudev and Wei Yan via vinodkv)
IMPROVEMENTS
YARN-1880. Cleanup TestApplicationClientProtocolOnHA

View File

@ -823,38 +823,68 @@ private static void addDeprecatedKeys() {
public static final int DEFAULT_NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT =
100;
/**
* Prefix for disk configurations. Work in progress: This configuration
* parameter may be changed/removed in the future.
*/
@Private
public static final String NM_DISK_RESOURCE_PREFIX = NM_PREFIX
+ "resource.disk.";
/**
* This setting controls if resource handling for disk operations is enabled.
* Work in progress: This configuration parameter may be changed/removed in
* the future
*/
@Private
public static final String NM_DISK_RESOURCE_ENABLED = NM_DISK_RESOURCE_PREFIX
+ "enabled";
/** Disk as a resource is disabled by default. **/
@Private
public static final boolean DEFAULT_NM_DISK_RESOURCE_ENABLED = false;
public static final String NM_NETWORK_RESOURCE_PREFIX = NM_PREFIX + "resource.network.";
public static final String NM_NETWORK_RESOURCE_PREFIX = NM_PREFIX
+ "resource.network.";
/** This setting controls if resource handling for network bandwidth is enabled **/
/* Work in progress: This configuration parameter may be changed/removed in the future */
/**
* This setting controls if resource handling for network bandwidth is
* enabled. Work in progress: This configuration parameter may be
* changed/removed in the future
*/
@Private
public static final String NM_NETWORK_RESOURCE_ENABLED =
NM_NETWORK_RESOURCE_PREFIX + "enabled";
/** Network as a resource is disabled by default **/
/** Network as a resource is disabled by default. **/
@Private
public static final boolean DEFAULT_NM_NETWORK_RESOURCE_ENABLED = false;
/** Specifies the interface to be used for applying network throttling rules **/
/* Work in progress: This configuration parameter may be changed/removed in the future */
/**
* Specifies the interface to be used for applying network throttling rules.
* Work in progress: This configuration parameter may be changed/removed in
* the future
*/
@Private
public static final String NM_NETWORK_RESOURCE_INTERFACE =
NM_NETWORK_RESOURCE_PREFIX + "interface";
@Private
public static final String DEFAULT_NM_NETWORK_RESOURCE_INTERFACE = "eth0";
/** Specifies the total available outbound bandwidth on the node **/
/* Work in progress: This configuration parameter may be changed/removed in the future */
/**
* Specifies the total available outbound bandwidth on the node. Work in
* progress: This configuration parameter may be changed/removed in the future
*/
@Private
public static final String NM_NETWORK_RESOURCE_OUTBOUND_BANDWIDTH_MBIT =
NM_NETWORK_RESOURCE_PREFIX + "outbound-bandwidth-mbit";
@Private
public static final int DEFAULT_NM_NETWORK_RESOURCE_OUTBOUND_BANDWIDTH_MBIT = 1000;
public static final int DEFAULT_NM_NETWORK_RESOURCE_OUTBOUND_BANDWIDTH_MBIT =
1000;
/** Specifies the total outbound bandwidth available to YARN containers. defaults to
* NM_NETWORK_RESOURCE_OUTBOUND_BANDWIDTH_MBIT if not specified.
/**
* Specifies the total outbound bandwidth available to YARN containers.
* defaults to NM_NETWORK_RESOURCE_OUTBOUND_BANDWIDTH_MBIT if not specified.
* Work in progress: This configuration parameter may be changed/removed in
* the future
*/
/* Work in progress: This configuration parameter may be changed/removed in the future */
@Private
public static final String NM_NETWORK_RESOURCE_OUTBOUND_BANDWIDTH_YARN_MBIT =
NM_NETWORK_RESOURCE_PREFIX + "outbound-bandwidth-yarn-mbit";

View File

@ -0,0 +1,170 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
/**
* Handler class to handle the blkio controller. Currently it splits resources
* evenly across all containers. Once we have scheduling sorted out, we can
* modify the function to represent the disk resources allocated.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class CGroupsBlkioResourceHandlerImpl implements DiskResourceHandler {
static final Log LOG = LogFactory
.getLog(CGroupsBlkioResourceHandlerImpl.class);
private CGroupsHandler cGroupsHandler;
// Arbitrarily choose a weight - all that matters is that all containers
// get the same weight assigned to them. Once we have scheduling support
// this number will be determined dynamically for each container.
@VisibleForTesting
static final String DEFAULT_WEIGHT = "500";
private static final String PARTITIONS_FILE = "/proc/partitions";
CGroupsBlkioResourceHandlerImpl(CGroupsHandler cGroupsHandler) {
this.cGroupsHandler = cGroupsHandler;
// check for linux so that we don't print messages for tests running on
// other platforms
if(Shell.LINUX) {
checkDiskScheduler();
}
}
private void checkDiskScheduler() {
String data;
// read /proc/partitions and check to make sure that sd* and hd*
// are using the CFQ scheduler. If they aren't print a warning
try {
byte[] contents = Files.readAllBytes(Paths.get(PARTITIONS_FILE));
data = new String(contents, "UTF-8").trim();
} catch (IOException e) {
String msg = "Couldn't read " + PARTITIONS_FILE +
"; can't determine disk scheduler type";
LOG.warn(msg, e);
return;
}
String[] lines = data.split(System.lineSeparator());
if (lines.length > 0) {
for (String line : lines) {
String[] columns = line.split("\\s+");
if (columns.length > 4) {
String partition = columns[4];
// check some known partitions to make sure the disk scheduler
// is cfq - not meant to be comprehensive, more a sanity check
if (partition.startsWith("sd") || partition.startsWith("hd")
|| partition.startsWith("vd") || partition.startsWith("xvd")) {
String schedulerPath =
"/sys/block/" + partition + "/queue/scheduler";
File schedulerFile = new File(schedulerPath);
if (schedulerFile.exists()) {
try {
byte[] contents = Files.readAllBytes(Paths.get(schedulerPath));
String schedulerString = new String(contents, "UTF-8").trim();
if (!schedulerString.contains("[cfq]")) {
LOG.warn("Device " + partition + " does not use the CFQ"
+ " scheduler; disk isolation using "
+ "CGroups will not work on this partition.");
}
} catch (IOException ie) {
LOG.warn(
"Unable to determine disk scheduler type for partition "
+ partition, ie);
}
}
}
}
}
}
}
@Override
public List<PrivilegedOperation> bootstrap(Configuration configuration)
throws ResourceHandlerException {
// if bootstrap is called on this class, disk is already enabled
// so no need to check again
this.cGroupsHandler
.mountCGroupController(CGroupsHandler.CGroupController.BLKIO);
return null;
}
@Override
public List<PrivilegedOperation> preStart(Container container)
throws ResourceHandlerException {
String cgroupId = container.getContainerId().toString();
cGroupsHandler
.createCGroup(CGroupsHandler.CGroupController.BLKIO, cgroupId);
try {
cGroupsHandler.updateCGroupParam(CGroupsHandler.CGroupController.BLKIO,
cgroupId, CGroupsHandler.CGROUP_PARAM_BLKIO_WEIGHT, DEFAULT_WEIGHT);
} catch (ResourceHandlerException re) {
cGroupsHandler.deleteCGroup(CGroupsHandler.CGroupController.BLKIO,
cgroupId);
LOG.warn("Could not update cgroup for container", re);
throw re;
}
List<PrivilegedOperation> ret = new ArrayList<>();
ret.add(new PrivilegedOperation(
PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP,
PrivilegedOperation.CGROUP_ARG_PREFIX
+ cGroupsHandler.getPathForCGroupTasks(
CGroupsHandler.CGroupController.BLKIO, cgroupId)));
return ret;
}
@Override
public List<PrivilegedOperation> reacquireContainer(ContainerId containerId)
throws ResourceHandlerException {
return null;
}
@Override
public List<PrivilegedOperation> postComplete(ContainerId containerId)
throws ResourceHandlerException {
cGroupsHandler.deleteCGroup(CGroupsHandler.CGroupController.BLKIO,
containerId.toString());
return null;
}
@Override
public List<PrivilegedOperation> teardown() throws ResourceHandlerException {
return null;
}
}

View File

@ -33,7 +33,8 @@
public interface CGroupsHandler {
public enum CGroupController {
CPU("cpu"),
NET_CLS("net_cls");
NET_CLS("net_cls"),
BLKIO("blkio");
private final String name;
@ -48,6 +49,7 @@ String getName() {
public static final String CGROUP_FILE_TASKS = "tasks";
public static final String CGROUP_PARAM_CLASSID = "classid";
public static final String CGROUP_PARAM_BLKIO_WEIGHT = "weight";
/**
* Mounts a cgroup controller

View File

@ -20,6 +20,7 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@ -38,6 +39,7 @@
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -63,7 +65,7 @@ class CGroupsHandlerImpl implements CGroupsHandler {
private final String cGroupMountPath;
private final long deleteCGroupTimeout;
private final long deleteCGroupDelay;
private final Map<CGroupController, String> controllerPaths;
private Map<CGroupController, String> controllerPaths;
private final ReadWriteLock rwLock;
private final PrivilegedOperationExecutor privilegedOperationExecutor;
private final Clock clock;
@ -106,55 +108,61 @@ private String getControllerPath(CGroupController controller) {
private void initializeControllerPaths() throws ResourceHandlerException {
if (enableCGroupMount) {
//nothing to do here - we support 'deferred' mounting of specific
//controllers - we'll populate the path for a given controller when an
//explicit mountCGroupController request is issued.
// nothing to do here - we support 'deferred' mounting of specific
// controllers - we'll populate the path for a given controller when an
// explicit mountCGroupController request is issued.
LOG.info("CGroup controller mounting enabled.");
} else {
//cluster admins are expected to have mounted controllers in specific
//locations - we'll attempt to figure out mount points
initializeControllerPathsFromMtab();
// cluster admins are expected to have mounted controllers in specific
// locations - we'll attempt to figure out mount points
Map<CGroupController, String> cPaths =
initializeControllerPathsFromMtab(MTAB_FILE, this.cGroupPrefix);
// we want to do a bulk update without the paths changing concurrently
try {
rwLock.writeLock().lock();
controllerPaths = cPaths;
} finally {
rwLock.writeLock().unlock();
}
}
}
private void initializeControllerPathsFromMtab()
throws ResourceHandlerException {
@VisibleForTesting
static Map<CGroupController, String> initializeControllerPathsFromMtab(
String mtab, String cGroupPrefix) throws ResourceHandlerException {
try {
Map<String, List<String>> parsedMtab = parseMtab();
//we want to do a bulk update without the paths changing concurrently
rwLock.writeLock().lock();
Map<String, List<String>> parsedMtab = parseMtab(mtab);
Map<CGroupController, String> ret = new HashMap<>();
for (CGroupController controller : CGroupController.values()) {
String name = controller.getName();
String controllerPath = findControllerInMtab(name, parsedMtab);
if (controllerPath != null) {
File f = new File(controllerPath + "/" + this.cGroupPrefix);
File f = new File(controllerPath + "/" + cGroupPrefix);
if (FileUtil.canWrite(f)) {
controllerPaths.put(controller, controllerPath);
ret.put(controller, controllerPath);
} else {
String error =
new StringBuffer("Mount point Based on mtab file: ")
.append(MTAB_FILE).append(
". Controller mount point not writable for: ")
.append(name).toString();
.append(mtab)
.append(". Controller mount point not writable for: ")
.append(name).toString();
LOG.error(error);
throw new ResourceHandlerException(error);
}
} else {
LOG.warn("Controller not mounted but automount disabled: " + name);
LOG.warn("Controller not mounted but automount disabled: " + name);
}
}
return ret;
} catch (IOException e) {
LOG.warn("Failed to initialize controller paths! Exception: " + e);
throw new ResourceHandlerException(
"Failed to initialize controller paths!");
} finally {
rwLock.writeLock().unlock();
"Failed to initialize controller paths!");
}
}
@ -173,12 +181,13 @@ private void initializeControllerPathsFromMtab()
* for mounts with type "cgroup". Cgroup controllers will
* appear in the list of options for a path.
*/
private Map<String, List<String>> parseMtab() throws IOException {
private static Map<String, List<String>> parseMtab(String mtab)
throws IOException {
Map<String, List<String>> ret = new HashMap<String, List<String>>();
BufferedReader in = null;
try {
FileInputStream fis = new FileInputStream(new File(getMtabFileName()));
FileInputStream fis = new FileInputStream(new File(mtab));
in = new BufferedReader(new InputStreamReader(fis, "UTF-8"));
for (String str = in.readLine(); str != null;
@ -197,7 +206,7 @@ private Map<String, List<String>> parseMtab() throws IOException {
}
}
} catch (IOException e) {
throw new IOException("Error while reading " + getMtabFileName(), e);
throw new IOException("Error while reading " + mtab, e);
} finally {
IOUtils.cleanup(LOG, in);
}
@ -205,7 +214,7 @@ private Map<String, List<String>> parseMtab() throws IOException {
return ret;
}
private String findControllerInMtab(String controller,
private static String findControllerInMtab(String controller,
Map<String, List<String>> entries) {
for (Map.Entry<String, List<String>> e : entries.entrySet()) {
if (e.getValue().contains(controller))
@ -215,10 +224,6 @@ private String findControllerInMtab(String controller,
return null;
}
String getMtabFileName() {
return MTAB_FILE;
}
@Override
public void mountCGroupController(CGroupController controller)
throws ResourceHandlerException {

View File

@ -0,0 +1,30 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Resource handler for disk resources.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public interface DiskResourceHandler extends ResourceHandler {
}

View File

@ -20,6 +20,7 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
@ -31,25 +32,27 @@
/**
* Provides mechanisms to get various resource handlers - cpu, memory, network,
* disk etc., - based on configuration
* disk etc., - based on configuration.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class ResourceHandlerModule {
private volatile static ResourceHandlerChain resourceHandlerChain;
private static volatile ResourceHandlerChain resourceHandlerChain;
/**
* This specific implementation might provide resource management as well
* as resource metrics functionality. We need to ensure that the same
* instance is used for both.
*/
private volatile static TrafficControlBandwidthHandlerImpl
private static volatile TrafficControlBandwidthHandlerImpl
trafficControlBandwidthHandler;
private volatile static CGroupsHandler cGroupsHandler;
private static volatile CGroupsHandler cGroupsHandler;
private static volatile CGroupsBlkioResourceHandlerImpl
cGroupsBlkioResourceHandler;
/**
* Returns an initialized, thread-safe CGroupsHandler instance
* Returns an initialized, thread-safe CGroupsHandler instance.
*/
public static CGroupsHandler getCGroupsHandler(Configuration conf)
throws ResourceHandlerException {
@ -94,6 +97,28 @@ public static CGroupsHandler getCGroupsHandler(Configuration conf)
return getTrafficControlBandwidthHandler(conf);
}
public static DiskResourceHandler getDiskResourceHandler(Configuration conf)
throws ResourceHandlerException {
if (conf.getBoolean(YarnConfiguration.NM_DISK_RESOURCE_ENABLED,
YarnConfiguration.DEFAULT_NM_DISK_RESOURCE_ENABLED)) {
return getCgroupsBlkioResourceHandler(conf);
}
return null;
}
private static CGroupsBlkioResourceHandlerImpl getCgroupsBlkioResourceHandler(
Configuration conf) throws ResourceHandlerException {
if (cGroupsBlkioResourceHandler == null) {
synchronized (DiskResourceHandler.class) {
if (cGroupsBlkioResourceHandler == null) {
cGroupsBlkioResourceHandler =
new CGroupsBlkioResourceHandlerImpl(getCGroupsHandler(conf));
}
}
}
return cGroupsBlkioResourceHandler;
}
private static void addHandlerIfNotNull(List<ResourceHandler> handlerList,
ResourceHandler handler) {
if (handler != null) {
@ -106,11 +131,12 @@ private static void initializeConfiguredResourceHandlerChain(
ArrayList<ResourceHandler> handlerList = new ArrayList<>();
addHandlerIfNotNull(handlerList, getOutboundBandwidthResourceHandler(conf));
addHandlerIfNotNull(handlerList, getDiskResourceHandler(conf));
resourceHandlerChain = new ResourceHandlerChain(handlerList);
}
public static ResourceHandlerChain getConfiguredResourceHandlerChain
(Configuration conf) throws ResourceHandlerException {
public static ResourceHandlerChain getConfiguredResourceHandlerChain(
Configuration conf) throws ResourceHandlerException {
if (resourceHandlerChain == null) {
synchronized (ResourceHandlerModule.class) {
if (resourceHandlerChain == null) {
@ -125,4 +151,9 @@ private static void initializeConfiguredResourceHandlerChain(
return null;
}
}
@VisibleForTesting
static void nullifyResourceHandlerChain() throws ResourceHandlerException {
resourceHandlerChain = null;
}
}

View File

@ -30,6 +30,7 @@
import java.io.Writer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -503,4 +504,9 @@ private void initializeControllerPaths() throws IOException {
String getMtabFileName() {
return MTAB_FILE;
}
@VisibleForTesting
Map<String, String> getControllerPaths() {
return Collections.unmodifiableMap(controllerPaths);
}
}

View File

@ -0,0 +1,116 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.List;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.times;
/**
* Tests for the cgroups disk handler implementation.
*/
public class TestCGroupsBlkioResourceHandlerImpl {
private CGroupsHandler mockCGroupsHandler;
private CGroupsBlkioResourceHandlerImpl cGroupsBlkioResourceHandlerImpl;
@Before
public void setup() {
mockCGroupsHandler = mock(CGroupsHandler.class);
cGroupsBlkioResourceHandlerImpl =
new CGroupsBlkioResourceHandlerImpl(mockCGroupsHandler);
}
@Test
public void testBootstrap() throws Exception {
Configuration conf = new YarnConfiguration();
List<PrivilegedOperation> ret =
cGroupsBlkioResourceHandlerImpl.bootstrap(conf);
verify(mockCGroupsHandler, times(1)).mountCGroupController(
CGroupsHandler.CGroupController.BLKIO);
Assert.assertNull(ret);
}
@Test
public void testPreStart() throws Exception {
String id = "container_01_01";
String path = "test-path/" + id;
ContainerId mockContainerId = mock(ContainerId.class);
when(mockContainerId.toString()).thenReturn(id);
Container mockContainer = mock(Container.class);
when(mockContainer.getContainerId()).thenReturn(mockContainerId);
when(
mockCGroupsHandler.getPathForCGroupTasks(
CGroupsHandler.CGroupController.BLKIO, id)).thenReturn(path);
List<PrivilegedOperation> ret =
cGroupsBlkioResourceHandlerImpl.preStart(mockContainer);
verify(mockCGroupsHandler, times(1)).createCGroup(
CGroupsHandler.CGroupController.BLKIO, id);
verify(mockCGroupsHandler, times(1)).updateCGroupParam(
CGroupsHandler.CGroupController.BLKIO, id,
CGroupsHandler.CGROUP_PARAM_BLKIO_WEIGHT,
CGroupsBlkioResourceHandlerImpl.DEFAULT_WEIGHT);
Assert.assertNotNull(ret);
Assert.assertEquals(1, ret.size());
PrivilegedOperation op = ret.get(0);
Assert.assertEquals(PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP,
op.getOperationType());
List<String> args = op.getArguments();
Assert.assertEquals(1, args.size());
Assert.assertEquals(PrivilegedOperation.CGROUP_ARG_PREFIX + path,
args.get(0));
}
@Test
public void testReacquireContainer() throws Exception {
ContainerId containerIdMock = mock(ContainerId.class);
Assert.assertNull(cGroupsBlkioResourceHandlerImpl
.reacquireContainer(containerIdMock));
}
@Test
public void testPostComplete() throws Exception {
String id = "container_01_01";
ContainerId mockContainerId = mock(ContainerId.class);
when(mockContainerId.toString()).thenReturn(id);
Assert.assertNull(cGroupsBlkioResourceHandlerImpl
.postComplete(mockContainerId));
verify(mockCGroupsHandler, times(1)).deleteCGroup(
CGroupsHandler.CGroupController.BLKIO, id);
}
@Test
public void testTeardown() throws Exception {
Assert.assertNull(cGroupsBlkioResourceHandlerImpl.teardown());
}
}

View File

@ -20,6 +20,7 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -35,18 +36,21 @@
import org.mockito.ArgumentCaptor;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.file.Files;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.verifyZeroInteractions;
/**
* Tests for the CGroups handler implementation.
*/
public class TestCGroupsHandlerImpl {
private static final Log LOG =
LogFactory.getLog(TestCGroupsHandlerImpl.class);
@ -84,8 +88,8 @@ public void testMountController() {
try {
cGroupsHandler = new CGroupsHandlerImpl(conf,
privilegedOperationExecutorMock);
PrivilegedOperation expectedOp = new PrivilegedOperation
(PrivilegedOperation.OperationType.MOUNT_CGROUPS, (String) null);
PrivilegedOperation expectedOp = new PrivilegedOperation(
PrivilegedOperation.OperationType.MOUNT_CGROUPS, (String) null);
//This is expected to be of the form :
//net_cls=<mount_path>/net_cls
StringBuffer controllerKV = new StringBuffer(controller.getName())
@ -94,8 +98,8 @@ public void testMountController() {
cGroupsHandler.mountCGroupController(controller);
try {
ArgumentCaptor<PrivilegedOperation> opCaptor = ArgumentCaptor.forClass
(PrivilegedOperation.class);
ArgumentCaptor<PrivilegedOperation> opCaptor = ArgumentCaptor.forClass(
PrivilegedOperation.class);
verify(privilegedOperationExecutorMock)
.executePrivilegedOperation(opCaptor.capture(), eq(false));
@ -200,17 +204,15 @@ public void testCGroupOperations() {
Assert.assertTrue(paramFile.exists());
try {
Assert.assertEquals(paramValue, new String(Files.readAllBytes
(paramFile
.toPath())));
Assert.assertEquals(paramValue, new String(Files.readAllBytes(
paramFile.toPath())));
} catch (IOException e) {
LOG.error("Caught exception: " + e);
Assert.assertTrue("Unexpected IOException trying to read cgroup param!",
false);
Assert.fail("Unexpected IOException trying to read cgroup param!");
}
Assert.assertEquals(paramValue, cGroupsHandler.getCGroupParam
(controller, testCGroup, param));
Assert.assertEquals(paramValue,
cGroupsHandler.getCGroupParam(controller, testCGroup, param));
//We can't really do a delete test here. Linux cgroups
//implementation provides additional semantics - the cgroup cannot be
@ -222,12 +224,79 @@ public void testCGroupOperations() {
//delete is not possible with a regular non-empty directory.
} catch (ResourceHandlerException e) {
LOG.error("Caught exception: " + e);
Assert.assertTrue(
"Unexpected ResourceHandlerException during cgroup operations!",
false);
Assert
.fail("Unexpected ResourceHandlerException during cgroup operations!");
}
}
public static File createMockCgroupMount(File parentDir, String type)
throws IOException {
return createMockCgroupMount(parentDir, type, "hadoop-yarn");
}
public static File createMockCgroupMount(File parentDir, String type,
String hierarchy) throws IOException {
File cgroupMountDir =
new File(parentDir.getAbsolutePath(), type + "/" + hierarchy);
FileUtils.deleteQuietly(cgroupMountDir);
if (!cgroupMountDir.mkdirs()) {
String message =
"Could not create dir " + cgroupMountDir.getAbsolutePath();
throw new IOException(message);
}
return cgroupMountDir;
}
public static File createMockMTab(File parentDir) throws IOException {
String cpuMtabContent =
"none " + parentDir.getAbsolutePath()
+ "/cpu cgroup rw,relatime,cpu 0 0\n";
String blkioMtabContent =
"none " + parentDir.getAbsolutePath()
+ "/blkio cgroup rw,relatime,blkio 0 0\n";
File mockMtab = new File(parentDir, UUID.randomUUID().toString());
if (!mockMtab.exists()) {
if (!mockMtab.createNewFile()) {
String message = "Could not create file " + mockMtab.getAbsolutePath();
throw new IOException(message);
}
}
FileWriter mtabWriter = new FileWriter(mockMtab.getAbsoluteFile());
mtabWriter.write(cpuMtabContent);
mtabWriter.write(blkioMtabContent);
mtabWriter.close();
mockMtab.deleteOnExit();
return mockMtab;
}
@Test
public void testMtabParsing() throws Exception {
File parentDir = new File(tmpPath);
// create mock cgroup
File cpuCgroupMountDir = createMockCgroupMount(parentDir, "cpu",
hierarchy);
Assert.assertTrue(cpuCgroupMountDir.exists());
File blkioCgroupMountDir = createMockCgroupMount(parentDir,
"blkio", hierarchy);
Assert.assertTrue(blkioCgroupMountDir.exists());
File mockMtabFile = createMockMTab(parentDir);
Map<CGroupsHandler.CGroupController, String> controllerPaths =
CGroupsHandlerImpl.initializeControllerPathsFromMtab(
mockMtabFile.getAbsolutePath(), hierarchy);
Assert.assertEquals(2, controllerPaths.size());
Assert.assertTrue(controllerPaths
.containsKey(CGroupsHandler.CGroupController.CPU));
Assert.assertTrue(controllerPaths
.containsKey(CGroupsHandler.CGroupController.BLKIO));
String cpuDir = controllerPaths.get(CGroupsHandler.CGroupController.CPU);
String blkioDir =
controllerPaths.get(CGroupsHandler.CGroupController.BLKIO);
Assert.assertEquals(parentDir.getAbsolutePath() + "/cpu", cpuDir);
Assert.assertEquals(parentDir.getAbsolutePath() + "/blkio", blkioDir);
}
@After
public void teardown() {
FileUtil.fullyDelete(new File(tmpPath));

View File

@ -37,7 +37,7 @@ public class TestResourceHandlerModule {
Configuration networkEnabledConf;
@Before
public void setup() {
public void setup() throws Exception {
emptyConf = new YarnConfiguration();
networkEnabledConf = new YarnConfiguration();
@ -46,6 +46,7 @@ public void setup() {
//We need to bypass mtab parsing for figuring out cgroups mount locations
networkEnabledConf.setBoolean(YarnConfiguration
.NM_LINUX_CONTAINER_CGROUPS_MOUNT, true);
ResourceHandlerModule.nullifyResourceHandlerChain();
}
@Test
@ -75,4 +76,27 @@ public void testOutboundBandwidthHandler() {
Assert.fail("Unexpected ResourceHandlerException: " + e);
}
}
@Test
public void testDiskResourceHandler() throws Exception {
DiskResourceHandler handler =
ResourceHandlerModule.getDiskResourceHandler(emptyConf);
Assert.assertNull(handler);
Configuration diskConf = new YarnConfiguration();
diskConf.setBoolean(YarnConfiguration.NM_DISK_RESOURCE_ENABLED, true);
handler = ResourceHandlerModule.getDiskResourceHandler(diskConf);
Assert.assertNotNull(handler);
ResourceHandlerChain resourceHandlerChain =
ResourceHandlerModule.getConfiguredResourceHandlerChain(diskConf);
List<ResourceHandler> resourceHandlers =
resourceHandlerChain.getResourceHandlerList();
// Exactly one resource handler in chain
Assert.assertEquals(resourceHandlers.size(), 1);
// Same instance is expected to be in the chain.
Assert.assertTrue(resourceHandlers.get(0) == handler);
}
}

View File

@ -21,6 +21,7 @@
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.TestCGroupsHandlerImpl;
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
import org.junit.Assert;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -33,7 +34,6 @@
import java.io.*;
import java.util.List;
import java.util.Scanner;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
public class TestCgroupsLCEResourcesHandler {
@ -142,7 +142,7 @@ static class CustomCgroupsLCEResourceHandler extends
@Override
int[] getOverallLimits(float x) {
if (generateLimitsMode == true) {
if (generateLimitsMode) {
return super.getOverallLimits(x);
}
return limits;
@ -172,10 +172,11 @@ public void testInit() throws IOException {
handler.initConfig();
// create mock cgroup
File cgroupMountDir = createMockCgroupMount(cgroupDir);
File cpuCgroupMountDir = TestCGroupsHandlerImpl.createMockCgroupMount(
cgroupDir, "cpu");
// create mock mtab
File mockMtab = createMockMTab(cgroupDir);
File mockMtab = TestCGroupsHandlerImpl.createMockMTab(cgroupDir);
// setup our handler and call init()
handler.setMtabFile(mockMtab.getAbsolutePath());
@ -184,8 +185,8 @@ public void testInit() throws IOException {
// in this case, we're using all cpu so the files
// shouldn't exist(because init won't create them
handler.init(mockLCE, plugin);
File periodFile = new File(cgroupMountDir, "cpu.cfs_period_us");
File quotaFile = new File(cgroupMountDir, "cpu.cfs_quota_us");
File periodFile = new File(cpuCgroupMountDir, "cpu.cfs_period_us");
File quotaFile = new File(cpuCgroupMountDir, "cpu.cfs_quota_us");
Assert.assertFalse(periodFile.exists());
Assert.assertFalse(quotaFile.exists());
@ -202,7 +203,7 @@ public void testInit() throws IOException {
// set cpu back to 100, quota should be -1
conf.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT,
100);
100);
handler.limits[0] = 100 * 1000;
handler.limits[1] = 1000 * 1000;
handler.init(mockLCE, plugin);
@ -235,7 +236,7 @@ public void testGetOverallLimits() {
Assert.assertEquals(expectedQuota, ret[0]);
Assert.assertEquals(-1, ret[1]);
int[] params = { 0, -1 };
int[] params = {0, -1};
for (int cores : params) {
try {
handler.getOverallLimits(cores);
@ -251,34 +252,6 @@ public void testGetOverallLimits() {
Assert.assertEquals(-1, ret[1]);
}
private File createMockCgroupMount(File cgroupDir) throws IOException {
File cgroupMountDir = new File(cgroupDir.getAbsolutePath(), "hadoop-yarn");
FileUtils.deleteQuietly(cgroupDir);
if (!cgroupMountDir.mkdirs()) {
String message =
"Could not create dir " + cgroupMountDir.getAbsolutePath();
throw new IOException(message);
}
return cgroupMountDir;
}
private File createMockMTab(File cgroupDir) throws IOException {
String mtabContent =
"none " + cgroupDir.getAbsolutePath() + " cgroup rw,relatime,cpu 0 0";
File mockMtab = new File("target", UUID.randomUUID().toString());
if (!mockMtab.exists()) {
if (!mockMtab.createNewFile()) {
String message = "Could not create file " + mockMtab.getAbsolutePath();
throw new IOException(message);
}
}
FileWriter mtabWriter = new FileWriter(mockMtab.getAbsoluteFile());
mtabWriter.write(mtabContent);
mtabWriter.close();
mockMtab.deleteOnExit();
return mockMtab;
}
@Test
public void testContainerLimits() throws IOException {
LinuxContainerExecutor mockLCE = new MockLinuxContainerExecutor();
@ -286,6 +259,7 @@ public void testContainerLimits() throws IOException {
new CustomCgroupsLCEResourceHandler();
handler.generateLimitsMode = true;
YarnConfiguration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.NM_DISK_RESOURCE_ENABLED, true);
final int numProcessors = 4;
ResourceCalculatorPlugin plugin =
Mockito.mock(ResourceCalculatorPlugin.class);
@ -294,71 +268,77 @@ public void testContainerLimits() throws IOException {
handler.initConfig();
// create mock cgroup
File cgroupMountDir = createMockCgroupMount(cgroupDir);
File cpuCgroupMountDir = TestCGroupsHandlerImpl.createMockCgroupMount(
cgroupDir, "cpu");
// create mock mtab
File mockMtab = createMockMTab(cgroupDir);
File mockMtab = TestCGroupsHandlerImpl.createMockMTab(cgroupDir);
// setup our handler and call init()
handler.setMtabFile(mockMtab.getAbsolutePath());
handler.init(mockLCE, plugin);
// check values
// default case - files shouldn't exist, strict mode off by default
// check the controller paths map isn't empty
ContainerId id = ContainerId.fromString("container_1_1_1_1");
handler.preExecute(id, Resource.newInstance(1024, 1));
File containerDir = new File(cgroupMountDir, id.toString());
Assert.assertTrue(containerDir.exists());
Assert.assertTrue(containerDir.isDirectory());
File periodFile = new File(containerDir, "cpu.cfs_period_us");
File quotaFile = new File(containerDir, "cpu.cfs_quota_us");
Assert.assertNotNull(handler.getControllerPaths());
// check values
// default case - files shouldn't exist, strict mode off by default
File containerCpuDir = new File(cpuCgroupMountDir, id.toString());
Assert.assertTrue(containerCpuDir.exists());
Assert.assertTrue(containerCpuDir.isDirectory());
File periodFile = new File(containerCpuDir, "cpu.cfs_period_us");
File quotaFile = new File(containerCpuDir, "cpu.cfs_quota_us");
Assert.assertFalse(periodFile.exists());
Assert.assertFalse(quotaFile.exists());
// no files created because we're using all cpu
FileUtils.deleteQuietly(containerDir);
FileUtils.deleteQuietly(containerCpuDir);
conf.setBoolean(
YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE, true);
YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE,
true);
handler.initConfig();
handler.preExecute(id,
Resource.newInstance(1024, YarnConfiguration.DEFAULT_NM_VCORES));
Assert.assertTrue(containerDir.exists());
Assert.assertTrue(containerDir.isDirectory());
periodFile = new File(containerDir, "cpu.cfs_period_us");
quotaFile = new File(containerDir, "cpu.cfs_quota_us");
Resource.newInstance(1024, YarnConfiguration.DEFAULT_NM_VCORES));
Assert.assertTrue(containerCpuDir.exists());
Assert.assertTrue(containerCpuDir.isDirectory());
periodFile = new File(containerCpuDir, "cpu.cfs_period_us");
quotaFile = new File(containerCpuDir, "cpu.cfs_quota_us");
Assert.assertFalse(periodFile.exists());
Assert.assertFalse(quotaFile.exists());
// 50% of CPU
FileUtils.deleteQuietly(containerDir);
FileUtils.deleteQuietly(containerCpuDir);
conf.setBoolean(
YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE, true);
YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE,
true);
handler.initConfig();
handler.preExecute(id,
Resource.newInstance(1024, YarnConfiguration.DEFAULT_NM_VCORES / 2));
Assert.assertTrue(containerDir.exists());
Assert.assertTrue(containerDir.isDirectory());
periodFile = new File(containerDir, "cpu.cfs_period_us");
quotaFile = new File(containerDir, "cpu.cfs_quota_us");
Resource.newInstance(1024, YarnConfiguration.DEFAULT_NM_VCORES / 2));
Assert.assertTrue(containerCpuDir.exists());
Assert.assertTrue(containerCpuDir.isDirectory());
periodFile = new File(containerCpuDir, "cpu.cfs_period_us");
quotaFile = new File(containerCpuDir, "cpu.cfs_quota_us");
Assert.assertTrue(periodFile.exists());
Assert.assertTrue(quotaFile.exists());
Assert.assertEquals(500 * 1000, readIntFromFile(periodFile));
Assert.assertEquals(1000 * 1000, readIntFromFile(quotaFile));
// CGroups set to 50% of CPU, container set to 50% of YARN CPU
FileUtils.deleteQuietly(containerDir);
FileUtils.deleteQuietly(containerCpuDir);
conf.setBoolean(
YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE, true);
YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE,
true);
conf
.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT, 50);
handler.initConfig();
handler.init(mockLCE, plugin);
handler.preExecute(id,
Resource.newInstance(1024, YarnConfiguration.DEFAULT_NM_VCORES / 2));
Assert.assertTrue(containerDir.exists());
Assert.assertTrue(containerDir.isDirectory());
periodFile = new File(containerDir, "cpu.cfs_period_us");
quotaFile = new File(containerDir, "cpu.cfs_quota_us");
Resource.newInstance(1024, YarnConfiguration.DEFAULT_NM_VCORES / 2));
Assert.assertTrue(containerCpuDir.exists());
Assert.assertTrue(containerCpuDir.isDirectory());
periodFile = new File(containerCpuDir, "cpu.cfs_period_us");
quotaFile = new File(containerCpuDir, "cpu.cfs_quota_us");
Assert.assertTrue(periodFile.exists());
Assert.assertTrue(quotaFile.exists());
Assert.assertEquals(1000 * 1000, readIntFromFile(periodFile));