YARN-6927. Add support for individual resource types requests in MapReduce

(Contributed by Gergo Repas via Daniel Templeton)
This commit is contained in:
Daniel Templeton 2017-10-30 11:04:22 -07:00
parent e4878a59b3
commit 9a7e810838
7 changed files with 835 additions and 29 deletions

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.mapreduce.v2.app.job.impl;
import static org.apache.commons.lang.StringUtils.isEmpty;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
@ -123,6 +125,7 @@
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
@ -136,6 +139,8 @@
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.RackResolver;
import org.apache.hadoop.yarn.util.UnitsConversionUtil;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
@ -667,12 +672,8 @@ public TaskAttemptImpl(TaskId taskId, int i,
this.jobFile = jobFile;
this.partition = partition;
//TODO:create the resource reqt for this Task attempt
this.resourceCapability = recordFactory.newRecordInstance(Resource.class);
this.resourceCapability.setMemorySize(
getMemoryRequired(conf, taskId.getTaskType()));
this.resourceCapability.setVirtualCores(
getCpuRequired(conf, taskId.getTaskType()));
populateResourceCapability(taskId.getTaskType());
this.dataLocalHosts = resolveHosts(dataLocalHosts);
RackResolver.init(conf);
@ -689,25 +690,137 @@ public TaskAttemptImpl(TaskId taskId, int i,
stateMachine = stateMachineFactory.make(this);
}
private void populateResourceCapability(TaskType taskType) {
String resourceTypePrefix =
getResourceTypePrefix(taskType);
boolean memorySet = false;
boolean cpuVcoresSet = false;
if (resourceTypePrefix != null) {
List<ResourceInformation> resourceRequests =
ResourceUtils.getRequestedResourcesFromConfig(conf,
resourceTypePrefix);
for (ResourceInformation resourceRequest : resourceRequests) {
String resourceName = resourceRequest.getName();
if (MRJobConfig.RESOURCE_TYPE_NAME_MEMORY.equals(resourceName) ||
MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY.equals(
resourceName)) {
if (memorySet) {
throw new IllegalArgumentException(
"Only one of the following keys " +
"can be specified for a single job: " +
MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY + ", " +
MRJobConfig.RESOURCE_TYPE_NAME_MEMORY);
}
String units = isEmpty(resourceRequest.getUnits()) ?
ResourceUtils.getDefaultUnit(ResourceInformation.MEMORY_URI) :
resourceRequest.getUnits();
this.resourceCapability.setMemorySize(
UnitsConversionUtil.convert(units, "Mi",
resourceRequest.getValue()));
memorySet = true;
String memoryKey = getMemoryKey(taskType);
if (memoryKey != null && conf.get(memoryKey) != null) {
LOG.warn("Configuration " + resourceTypePrefix + resourceName +
"=" + resourceRequest.getValue() + resourceRequest.getUnits() +
" is overriding the " + memoryKey + "=" + conf.get(memoryKey) +
" configuration");
}
} else if (MRJobConfig.RESOURCE_TYPE_NAME_VCORE.equals(
resourceName)) {
this.resourceCapability.setVirtualCores(
(int) UnitsConversionUtil.convert(resourceRequest.getUnits(), "",
resourceRequest.getValue()));
cpuVcoresSet = true;
String cpuKey = getCpuVcoresKey(taskType);
if (cpuKey != null && conf.get(cpuKey) != null) {
LOG.warn("Configuration " + resourceTypePrefix +
MRJobConfig.RESOURCE_TYPE_NAME_VCORE + "=" +
resourceRequest.getValue() + resourceRequest.getUnits() +
" is overriding the " + cpuKey + "=" +
conf.get(cpuKey) + " configuration");
}
} else {
ResourceInformation resourceInformation =
this.resourceCapability.getResourceInformation(resourceName);
resourceInformation.setUnits(resourceRequest.getUnits());
resourceInformation.setValue(resourceRequest.getValue());
this.resourceCapability.setResourceInformation(resourceName,
resourceInformation);
}
}
}
if (!memorySet) {
this.resourceCapability.setMemorySize(getMemoryRequired(conf, taskType));
}
if (!cpuVcoresSet) {
this.resourceCapability.setVirtualCores(getCpuRequired(conf, taskType));
}
}
private String getCpuVcoresKey(TaskType taskType) {
switch (taskType) {
case MAP:
return MRJobConfig.MAP_CPU_VCORES;
case REDUCE:
return MRJobConfig.REDUCE_CPU_VCORES;
default:
return null;
}
}
private String getMemoryKey(TaskType taskType) {
switch (taskType) {
case MAP:
return MRJobConfig.MAP_MEMORY_MB;
case REDUCE:
return MRJobConfig.REDUCE_MEMORY_MB;
default:
return null;
}
}
private Integer getCpuVcoreDefault(TaskType taskType) {
switch (taskType) {
case MAP:
return MRJobConfig.DEFAULT_MAP_CPU_VCORES;
case REDUCE:
return MRJobConfig.DEFAULT_REDUCE_CPU_VCORES;
default:
return null;
}
}
private int getMemoryRequired(JobConf conf, TaskType taskType) {
return conf.getMemoryRequired(TypeConverter.fromYarn(taskType));
}
private int getCpuRequired(Configuration conf, TaskType taskType) {
int vcores = 1;
if (taskType == TaskType.MAP) {
vcores =
conf.getInt(MRJobConfig.MAP_CPU_VCORES,
MRJobConfig.DEFAULT_MAP_CPU_VCORES);
} else if (taskType == TaskType.REDUCE) {
vcores =
conf.getInt(MRJobConfig.REDUCE_CPU_VCORES,
MRJobConfig.DEFAULT_REDUCE_CPU_VCORES);
String cpuVcoreKey = getCpuVcoresKey(taskType);
if (cpuVcoreKey != null) {
Integer defaultCpuVcores = getCpuVcoreDefault(taskType);
if (null == defaultCpuVcores) {
defaultCpuVcores = vcores;
}
vcores = conf.getInt(cpuVcoreKey, defaultCpuVcores);
}
return vcores;
}
private String getResourceTypePrefix(TaskType taskType) {
switch (taskType) {
case MAP:
return MRJobConfig.MAP_RESOURCE_TYPE_PREFIX;
case REDUCE:
return MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX;
default:
LOG.info("TaskType " + taskType +
" does not support custom resource types - this support can be " +
"added in " + getClass().getSimpleName());
return null;
}
}
/**
* Create a {@link LocalResource} record with all the given parameters.
* The NM that hosts AM container will upload resources to shared cache.

View File

@ -71,6 +71,17 @@ public void initializeMemberVariables() {
.add(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY);
configurationPropsToSkipCompare
.add(JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY);
// Resource type related properties are only prefixes,
// they need to be postfixed with the resource name
// in order to take effect.
// There is nothing to be added to mapred-default.xml
configurationPropsToSkipCompare.add(
MRJobConfig.MR_AM_RESOURCE_PREFIX);
configurationPropsToSkipCompare.add(
MRJobConfig.MAP_RESOURCE_TYPE_PREFIX);
configurationPropsToSkipCompare.add(
MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX);
}
}

View File

@ -28,13 +28,20 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import org.junit.After;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@ -42,6 +49,7 @@
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapTaskAttemptImpl;
import org.apache.hadoop.mapred.ReduceTaskAttemptImpl;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.MRJobConfig;
@ -82,24 +90,36 @@
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.LocalConfigurationProvider;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.ControlledClock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.spi.LoggingEvent;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import com.google.common.collect.ImmutableList;
@SuppressWarnings({"unchecked", "rawtypes"})
public class TestTaskAttempt{
private static final String CUSTOM_RESOURCE_NAME = "a-custom-resource";
static public class StubbedFS extends RawLocalFileSystem {
@Override
public FileStatus getFileStatus(Path f) throws IOException {
@ -107,6 +127,63 @@ public FileStatus getFileStatus(Path f) throws IOException {
}
}
private static class CustomResourceTypesConfigurationProvider
extends LocalConfigurationProvider {
@Override
public InputStream getConfigurationInputStream(Configuration bootstrapConf,
String name) throws YarnException, IOException {
if (YarnConfiguration.RESOURCE_TYPES_CONFIGURATION_FILE.equals(name)) {
return new ByteArrayInputStream(
("<configuration>\n" +
" <property>\n" +
" <name>yarn.resource-types</name>\n" +
" <value>a-custom-resource</value>\n" +
" </property>\n" +
" <property>\n" +
" <name>yarn.resource-types.a-custom-resource.units</name>\n" +
" <value>G</value>\n" +
" </property>\n" +
"</configuration>\n").getBytes());
} else {
return super.getConfigurationInputStream(bootstrapConf, name);
}
}
}
private static class TestAppender extends AppenderSkeleton {
private final List<LoggingEvent> logEvents = new CopyOnWriteArrayList<>();
@Override
public boolean requiresLayout() {
return false;
}
@Override
public void close() {
}
@Override
protected void append(LoggingEvent arg0) {
logEvents.add(arg0);
}
private List<LoggingEvent> getLogEvents() {
return logEvents;
}
}
@BeforeClass
public static void setupBeforeClass() {
ResourceUtils.resetResourceTypes(new Configuration());
}
@After
public void tearDown() {
ResourceUtils.resetResourceTypes(new Configuration());
}
@Test
public void testMRAppHistoryForMap() throws Exception {
MRApp app = new FailingAttemptsMRApp(1, 0);
@ -328,17 +405,18 @@ public void verifyMillisCounters(Resource containerResource,
private TaskAttemptImpl createMapTaskAttemptImplForTest(
EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo) {
Clock clock = SystemClock.getInstance();
return createMapTaskAttemptImplForTest(eventHandler, taskSplitMetaInfo, clock);
return createMapTaskAttemptImplForTest(eventHandler, taskSplitMetaInfo,
clock, new JobConf());
}
private TaskAttemptImpl createMapTaskAttemptImplForTest(
EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo, Clock clock) {
EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo,
Clock clock, JobConf jobConf) {
ApplicationId appId = ApplicationId.newInstance(1, 1);
JobId jobId = MRBuilderUtils.newJobId(appId, 1);
TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
TaskAttemptListener taListener = mock(TaskAttemptListener.class);
Path jobFile = mock(Path.class);
JobConf jobConf = new JobConf();
TaskAttemptImpl taImpl =
new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
taskSplitMetaInfo, jobConf, taListener, null,
@ -346,6 +424,20 @@ private TaskAttemptImpl createMapTaskAttemptImplForTest(
return taImpl;
}
private TaskAttemptImpl createReduceTaskAttemptImplForTest(
EventHandler eventHandler, Clock clock, JobConf jobConf) {
ApplicationId appId = ApplicationId.newInstance(1, 1);
JobId jobId = MRBuilderUtils.newJobId(appId, 1);
TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.REDUCE);
TaskAttemptListener taListener = mock(TaskAttemptListener.class);
Path jobFile = mock(Path.class);
TaskAttemptImpl taImpl =
new ReduceTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
1, jobConf, taListener, null,
null, clock, null);
return taImpl;
}
private void testMRAppHistory(MRApp app) throws Exception {
Configuration conf = new Configuration();
Job job = app.submit(conf);
@ -1412,6 +1504,259 @@ public void testTimeoutWhileFailFinishing() throws Exception {
assertFalse("InternalError occurred", eventHandler.internalError);
}
@Test
public void testMapperCustomResourceTypes() {
initResourceTypes();
EventHandler eventHandler = mock(EventHandler.class);
TaskSplitMetaInfo taskSplitMetaInfo = new TaskSplitMetaInfo();
Clock clock = SystemClock.getInstance();
JobConf jobConf = new JobConf();
jobConf.setLong(MRJobConfig.MAP_RESOURCE_TYPE_PREFIX
+ CUSTOM_RESOURCE_NAME, 7L);
TaskAttemptImpl taImpl = createMapTaskAttemptImplForTest(eventHandler,
taskSplitMetaInfo, clock, jobConf);
ResourceInformation resourceInfo =
getResourceInfoFromContainerRequest(taImpl, eventHandler).
getResourceInformation(CUSTOM_RESOURCE_NAME);
assertEquals("Expecting the default unit (G)",
"G", resourceInfo.getUnits());
assertEquals(7L, resourceInfo.getValue());
}
@Test
public void testReducerCustomResourceTypes() {
initResourceTypes();
EventHandler eventHandler = mock(EventHandler.class);
Clock clock = SystemClock.getInstance();
JobConf jobConf = new JobConf();
jobConf.set(MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX
+ CUSTOM_RESOURCE_NAME, "3m");
TaskAttemptImpl taImpl =
createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf);
ResourceInformation resourceInfo =
getResourceInfoFromContainerRequest(taImpl, eventHandler).
getResourceInformation(CUSTOM_RESOURCE_NAME);
assertEquals("Expecting the specified unit (m)",
"m", resourceInfo.getUnits());
assertEquals(3L, resourceInfo.getValue());
}
@Test
public void testReducerMemoryRequestViaMapreduceReduceMemoryMb() {
EventHandler eventHandler = mock(EventHandler.class);
Clock clock = SystemClock.getInstance();
JobConf jobConf = new JobConf();
jobConf.setInt(MRJobConfig.REDUCE_MEMORY_MB, 2048);
TaskAttemptImpl taImpl =
createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf);
long memorySize =
getResourceInfoFromContainerRequest(taImpl, eventHandler).
getMemorySize();
assertEquals(2048, memorySize);
}
@Test
public void testReducerMemoryRequestViaMapreduceReduceResourceMemory() {
EventHandler eventHandler = mock(EventHandler.class);
Clock clock = SystemClock.getInstance();
JobConf jobConf = new JobConf();
jobConf.set(MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX +
MRJobConfig.RESOURCE_TYPE_NAME_MEMORY, "2 Gi");
TaskAttemptImpl taImpl =
createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf);
long memorySize =
getResourceInfoFromContainerRequest(taImpl, eventHandler).
getMemorySize();
assertEquals(2048, memorySize);
}
@Test
public void testReducerMemoryRequestDefaultMemory() {
EventHandler eventHandler = mock(EventHandler.class);
Clock clock = SystemClock.getInstance();
TaskAttemptImpl taImpl =
createReduceTaskAttemptImplForTest(eventHandler, clock, new JobConf());
long memorySize =
getResourceInfoFromContainerRequest(taImpl, eventHandler).
getMemorySize();
assertEquals(MRJobConfig.DEFAULT_REDUCE_MEMORY_MB, memorySize);
}
@Test
public void testReducerMemoryRequestWithoutUnits() {
Clock clock = SystemClock.getInstance();
for (String memoryResourceName : ImmutableList.of(
MRJobConfig.RESOURCE_TYPE_NAME_MEMORY,
MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY)) {
EventHandler eventHandler = mock(EventHandler.class);
JobConf jobConf = new JobConf();
jobConf.setInt(MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX +
memoryResourceName, 2048);
TaskAttemptImpl taImpl =
createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf);
long memorySize =
getResourceInfoFromContainerRequest(taImpl, eventHandler).
getMemorySize();
assertEquals(2048, memorySize);
}
}
@Test
public void testReducerMemoryRequestOverriding() {
for (String memoryName : ImmutableList.of(
MRJobConfig.RESOURCE_TYPE_NAME_MEMORY,
MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY)) {
TestAppender testAppender = new TestAppender();
final Logger logger = Logger.getLogger(TaskAttemptImpl.class);
try {
logger.addAppender(testAppender);
EventHandler eventHandler = mock(EventHandler.class);
Clock clock = SystemClock.getInstance();
JobConf jobConf = new JobConf();
jobConf.set(MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX + memoryName,
"3Gi");
jobConf.setInt(MRJobConfig.REDUCE_MEMORY_MB, 2048);
TaskAttemptImpl taImpl =
createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf);
long memorySize =
getResourceInfoFromContainerRequest(taImpl, eventHandler).
getMemorySize();
assertEquals(3072, memorySize);
assertTrue(testAppender.getLogEvents().stream()
.anyMatch(e -> e.getLevel() == Level.WARN && ("Configuration " +
"mapreduce.reduce.resource." + memoryName + "=3Gi is " +
"overriding the mapreduce.reduce.memory.mb=2048 configuration")
.equals(e.getMessage())));
} finally {
logger.removeAppender(testAppender);
}
}
}
@Test(expected=IllegalArgumentException.class)
public void testReducerMemoryRequestMultipleName() {
EventHandler eventHandler = mock(EventHandler.class);
Clock clock = SystemClock.getInstance();
JobConf jobConf = new JobConf();
for (String memoryName : ImmutableList.of(
MRJobConfig.RESOURCE_TYPE_NAME_MEMORY,
MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY)) {
jobConf.set(MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX + memoryName,
"3Gi");
}
createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf);
}
@Test
public void testReducerCpuRequestViaMapreduceReduceCpuVcores() {
EventHandler eventHandler = mock(EventHandler.class);
Clock clock = SystemClock.getInstance();
JobConf jobConf = new JobConf();
jobConf.setInt(MRJobConfig.REDUCE_CPU_VCORES, 3);
TaskAttemptImpl taImpl =
createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf);
int vCores =
getResourceInfoFromContainerRequest(taImpl, eventHandler).
getVirtualCores();
assertEquals(3, vCores);
}
@Test
public void testReducerCpuRequestViaMapreduceReduceResourceVcores() {
EventHandler eventHandler = mock(EventHandler.class);
Clock clock = SystemClock.getInstance();
JobConf jobConf = new JobConf();
jobConf.set(MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX +
MRJobConfig.RESOURCE_TYPE_NAME_VCORE, "5");
TaskAttemptImpl taImpl =
createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf);
int vCores =
getResourceInfoFromContainerRequest(taImpl, eventHandler).
getVirtualCores();
assertEquals(5, vCores);
}
@Test
public void testReducerCpuRequestDefaultMemory() {
EventHandler eventHandler = mock(EventHandler.class);
Clock clock = SystemClock.getInstance();
TaskAttemptImpl taImpl =
createReduceTaskAttemptImplForTest(eventHandler, clock, new JobConf());
int vCores =
getResourceInfoFromContainerRequest(taImpl, eventHandler).
getVirtualCores();
assertEquals(MRJobConfig.DEFAULT_REDUCE_CPU_VCORES, vCores);
}
@Test
public void testReducerCpuRequestOverriding() {
TestAppender testAppender = new TestAppender();
final Logger logger = Logger.getLogger(TaskAttemptImpl.class);
try {
logger.addAppender(testAppender);
EventHandler eventHandler = mock(EventHandler.class);
Clock clock = SystemClock.getInstance();
JobConf jobConf = new JobConf();
jobConf.set(MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX +
MRJobConfig.RESOURCE_TYPE_NAME_VCORE, "7");
jobConf.setInt(MRJobConfig.REDUCE_CPU_VCORES, 9);
TaskAttemptImpl taImpl =
createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf);
long vCores =
getResourceInfoFromContainerRequest(taImpl, eventHandler).
getVirtualCores();
assertEquals(7, vCores);
assertTrue(testAppender.getLogEvents().stream().anyMatch(
e -> e.getLevel() == Level.WARN && ("Configuration " +
"mapreduce.reduce.resource.vcores=7 is overriding the " +
"mapreduce.reduce.cpu.vcores=9 configuration").equals(
e.getMessage())));
} finally {
logger.removeAppender(testAppender);
}
}
private Resource getResourceInfoFromContainerRequest(
TaskAttemptImpl taImpl, EventHandler eventHandler) {
taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
TaskAttemptEventType.TA_SCHEDULE));
assertEquals("Task attempt is not in STARTING state", taImpl.getState(),
TaskAttemptState.STARTING);
ArgumentCaptor<Event> captor = ArgumentCaptor.forClass(Event.class);
verify(eventHandler, times(2)).handle(captor.capture());
List<ContainerRequestEvent> containerRequestEvents = new ArrayList<>();
for (Event e : captor.getAllValues()) {
if (e instanceof ContainerRequestEvent) {
containerRequestEvents.add((ContainerRequestEvent) e);
}
}
assertEquals("Expected one ContainerRequestEvent after scheduling "
+ "task attempt", 1, containerRequestEvents.size());
return containerRequestEvents.get(0).getCapability();
}
@Test(expected=IllegalArgumentException.class)
public void testReducerCustomResourceTypeWithInvalidUnit() {
initResourceTypes();
EventHandler eventHandler = mock(EventHandler.class);
Clock clock = SystemClock.getInstance();
JobConf jobConf = new JobConf();
jobConf.set(MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX
+ CUSTOM_RESOURCE_NAME, "3z");
createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf);
}
private void initResourceTypes() {
Configuration conf = new Configuration();
conf.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
CustomResourceTypesConfigurationProvider.class.getName());
ResourceUtils.resetResourceTypes(conf);
}
private void setupTaskAttemptFinishingMonitor(
EventHandler eventHandler, JobConf jobConf, AppContext appCtx) {
TaskAttemptFinishingMonitor taskAttemptFinishingMonitor =

View File

@ -363,12 +363,47 @@ public interface MRJobConfig {
public static final String MAP_INPUT_START = "mapreduce.map.input.start";
/**
* Configuration key for specifying memory requirement for the mapper.
* Kept for backward-compatibility, mapreduce.map.resource.memory
* is the new preferred way to specify this.
*/
public static final String MAP_MEMORY_MB = "mapreduce.map.memory.mb";
public static final int DEFAULT_MAP_MEMORY_MB = 1024;
/**
* Configuration key for specifying CPU requirement for the mapper.
* Kept for backward-compatibility, mapreduce.map.resource.vcores
* is the new preferred way to specify this.
*/
public static final String MAP_CPU_VCORES = "mapreduce.map.cpu.vcores";
public static final int DEFAULT_MAP_CPU_VCORES = 1;
/**
* Custom resource names required by the mapper should be
* appended to this prefix, the value's format is {amount}[ ][{unit}].
* If no unit is defined, the default unit will be used.
* Standard resource names: memory (default unit: Mi), vcores
*/
public static final String MAP_RESOURCE_TYPE_PREFIX =
"mapreduce.map.resource.";
/**
* Resource type name for CPU vcores.
*/
public static final String RESOURCE_TYPE_NAME_VCORE = "vcores";
/**
* Resource type name for memory.
*/
public static final String RESOURCE_TYPE_NAME_MEMORY = "memory";
/**
* Alternative resource type name for memory.
*/
public static final String RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY =
"memory-mb";
public static final String MAP_ENV = "mapreduce.map.env";
public static final String MAP_JAVA_OPTS = "mapreduce.map.java.opts";
@ -417,12 +452,31 @@ public interface MRJobConfig {
public static final String REDUCE_MARKRESET_BUFFER_SIZE = "mapreduce.reduce.markreset.buffer.size";
/**
* Configuration key for specifying memory requirement for the reducer.
* Kept for backward-compatibility, mapreduce.reduce.resource.memory
* is the new preferred way to specify this.
*/
public static final String REDUCE_MEMORY_MB = "mapreduce.reduce.memory.mb";
public static final int DEFAULT_REDUCE_MEMORY_MB = 1024;
/**
* Configuration key for specifying CPU requirement for the reducer.
* Kept for backward-compatibility, mapreduce.reduce.resource.vcores
* is the new preferred way to specify this.
*/
public static final String REDUCE_CPU_VCORES = "mapreduce.reduce.cpu.vcores";
public static final int DEFAULT_REDUCE_CPU_VCORES = 1;
/**
* Resource names required by the reducer should be
* appended to this prefix, the value's format is {amount}[ ][{unit}].
* If no unit is defined, the default unit will be used.
* Standard resource names: memory (default unit: Mi), vcores
*/
public static final String REDUCE_RESOURCE_TYPE_PREFIX =
"mapreduce.reduce.resource.";
public static final String REDUCE_MEMORY_TOTAL_BYTES = "mapreduce.reduce.memory.totalbytes";
public static final String SHUFFLE_INPUT_BUFFER_PERCENT = "mapreduce.reduce.shuffle.input.buffer.percent";
@ -608,7 +662,10 @@ public interface MRJobConfig {
public static final String DEFAULT_MR_AM_STAGING_DIR =
"/tmp/hadoop-yarn/staging";
/** The amount of memory the MR app master needs.*/
/** The amount of memory the MR app master needs.
* Kept for backward-compatibility, yarn.app.mapreduce.am.resource.memory is
* the new preferred way to specify this
*/
public static final String MR_AM_VMEM_MB =
MR_AM_PREFIX+"resource.mb";
public static final int DEFAULT_MR_AM_VMEM_MB = 1536;
@ -618,6 +675,15 @@ public interface MRJobConfig {
MR_AM_PREFIX+"resource.cpu-vcores";
public static final int DEFAULT_MR_AM_CPU_VCORES = 1;
/**
* Resource names required by the MR AM should be
* appended to this prefix, the value's format is {amount}[ ][{unit}].
* If no unit is defined, the default unit will be used
* Standard resource names: memory (default unit: Mi), vcores
*/
public static final String MR_AM_RESOURCE_PREFIX =
MR_AM_PREFIX + "resource.";
/** Command line arguments passed to the MR app master.*/
public static final String MR_AM_COMMAND_OPTS =
MR_AM_PREFIX+"command-opts";

View File

@ -18,6 +18,9 @@
package org.apache.hadoop.mapred;
import static org.apache.commons.lang.StringUtils.isEmpty;
import static org.apache.hadoop.mapreduce.MRJobConfig.MR_AM_RESOURCE_PREFIX;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
@ -84,6 +87,7 @@
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@ -93,6 +97,8 @@
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenSelector;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.UnitsConversionUtil;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import com.google.common.annotations.VisibleForTesting;
@ -659,16 +665,76 @@ public ApplicationSubmissionContext createApplicationSubmissionContext(
private List<ResourceRequest> generateResourceRequests() throws IOException {
Resource capability = recordFactory.newRecordInstance(Resource.class);
capability.setMemorySize(
conf.getInt(
MRJobConfig.MR_AM_VMEM_MB, MRJobConfig.DEFAULT_MR_AM_VMEM_MB
)
);
capability.setVirtualCores(
conf.getInt(
MRJobConfig.MR_AM_CPU_VCORES, MRJobConfig.DEFAULT_MR_AM_CPU_VCORES
)
);
boolean memorySet = false;
boolean cpuVcoresSet = false;
List<ResourceInformation> resourceRequests = ResourceUtils
.getRequestedResourcesFromConfig(conf, MR_AM_RESOURCE_PREFIX);
for (ResourceInformation resourceReq : resourceRequests) {
String resourceName = resourceReq.getName();
if (MRJobConfig.RESOURCE_TYPE_NAME_MEMORY.equals(resourceName) ||
MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY.equals(
resourceName)) {
if (memorySet) {
throw new IllegalArgumentException(
"Only one of the following keys " +
"can be specified for a single job: " +
MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY + ", " +
MRJobConfig.RESOURCE_TYPE_NAME_MEMORY);
}
String units = isEmpty(resourceReq.getUnits()) ?
ResourceUtils.getDefaultUnit(ResourceInformation.MEMORY_URI) :
resourceReq.getUnits();
capability.setMemorySize(
UnitsConversionUtil.convert(units, "Mi", resourceReq.getValue()));
memorySet = true;
if (conf.get(MRJobConfig.MR_AM_VMEM_MB) != null) {
LOG.warn("Configuration " + MR_AM_RESOURCE_PREFIX +
resourceName + "=" + resourceReq.getValue() +
resourceReq.getUnits() + " is overriding the " +
MRJobConfig.MR_AM_VMEM_MB + "=" +
conf.get(MRJobConfig.MR_AM_VMEM_MB) + " configuration");
}
} else if (MRJobConfig.RESOURCE_TYPE_NAME_VCORE.equals(resourceName)) {
capability.setVirtualCores(
(int) UnitsConversionUtil.convert(resourceReq.getUnits(), "",
resourceReq.getValue()));
cpuVcoresSet = true;
if (conf.get(MRJobConfig.MR_AM_CPU_VCORES) != null) {
LOG.warn("Configuration " + MR_AM_RESOURCE_PREFIX +
resourceName + "=" + resourceReq.getValue() +
resourceReq.getUnits() + " is overriding the " +
MRJobConfig.MR_AM_CPU_VCORES + "=" +
conf.get(MRJobConfig.MR_AM_CPU_VCORES) + " configuration");
}
} else if (!MRJobConfig.MR_AM_VMEM_MB.equals(
MR_AM_RESOURCE_PREFIX + resourceName) &&
!MRJobConfig.MR_AM_CPU_VCORES.equals(
MR_AM_RESOURCE_PREFIX + resourceName)) {
// the "mb", "cpu-vcores" resource types are not processed here
// since the yarn.app.mapreduce.am.resource.mb,
// yarn.app.mapreduce.am.resource.cpu-vcores keys are used for
// backward-compatibility - which is handled after this loop
ResourceInformation resourceInformation = capability
.getResourceInformation(resourceName);
resourceInformation.setUnits(resourceReq.getUnits());
resourceInformation.setValue(resourceReq.getValue());
capability.setResourceInformation(resourceName, resourceInformation);
}
}
if (!memorySet) {
capability.setMemorySize(
conf.getInt(
MRJobConfig.MR_AM_VMEM_MB, MRJobConfig.DEFAULT_MR_AM_VMEM_MB
)
);
}
if (!cpuVcoresSet) {
capability.setVirtualCores(
conf.getInt(
MRJobConfig.MR_AM_CPU_VCORES, MRJobConfig.DEFAULT_MR_AM_CPU_VCORES
)
);
}
if (LOG.isDebugEnabled()) {
LOG.debug("AppMaster capability = " + capability);
}

View File

@ -33,10 +33,12 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
@ -44,6 +46,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -69,6 +72,7 @@
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.LocalConfigurationProvider;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
@ -96,28 +100,37 @@
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.log4j.Appender;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.Layout;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.SimpleLayout;
import org.apache.log4j.WriterAppender;
import org.apache.log4j.spi.LoggingEvent;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import com.google.common.collect.ImmutableList;
/**
* Test YarnRunner and make sure the client side plugin works
* fine
@ -131,6 +144,53 @@ public class TestYARNRunner {
MRJobConfig.DEFAULT_TASK_PROFILE_PARAMS.substring(0,
MRJobConfig.DEFAULT_TASK_PROFILE_PARAMS.lastIndexOf("%"));
private static class CustomResourceTypesConfigurationProvider
extends LocalConfigurationProvider {
@Override
public InputStream getConfigurationInputStream(Configuration bootstrapConf,
String name) throws YarnException, IOException {
if (YarnConfiguration.RESOURCE_TYPES_CONFIGURATION_FILE.equals(name)) {
return new ByteArrayInputStream(
("<configuration>\n" +
" <property>\n" +
" <name>yarn.resource-types</name>\n" +
" <value>a-custom-resource</value>\n" +
" </property>\n" +
" <property>\n" +
" <name>yarn.resource-types.a-custom-resource.units</name>\n" +
" <value>G</value>\n" +
" </property>\n" +
"</configuration>\n").getBytes());
} else {
return super.getConfigurationInputStream(bootstrapConf, name);
}
}
}
private static class TestAppender extends AppenderSkeleton {
private final List<LoggingEvent> logEvents = new CopyOnWriteArrayList<>();
@Override
public boolean requiresLayout() {
return false;
}
@Override
public void close() {
}
@Override
protected void append(LoggingEvent arg0) {
logEvents.add(arg0);
}
private List<LoggingEvent> getLogEvents() {
return logEvents;
}
}
private YARNRunner yarnRunner;
private ResourceMgrDelegate resourceMgrDelegate;
private YarnConfiguration conf;
@ -143,6 +203,11 @@ public class TestYARNRunner {
private ClientServiceDelegate clientDelegate;
private static final String failString = "Rejected job";
@BeforeClass
public static void setupBeforeClass() {
ResourceUtils.resetResourceTypes(new Configuration());
}
@Before
public void setUp() throws Exception {
resourceMgrDelegate = mock(ResourceMgrDelegate.class);
@ -175,6 +240,7 @@ public ApplicationSubmissionContext answer(InvocationOnMock invocation)
@After
public void cleanup() {
FileUtil.fullyDelete(testWorkDir);
ResourceUtils.resetResourceTypes(new Configuration());
}
@Test(timeout=20000)
@ -881,4 +947,99 @@ public void testSendJobConf() throws IOException {
.get("hadoop.tmp.dir").equals("testconfdir"));
UserGroupInformation.reset();
}
@Test
public void testCustomAMRMResourceType() throws Exception {
initResourceTypes();
String customResourceName = "a-custom-resource";
JobConf jobConf = new JobConf();
jobConf.setInt(MRJobConfig.MR_AM_RESOURCE_PREFIX +
customResourceName, 5);
jobConf.setInt(MRJobConfig.MR_AM_CPU_VCORES, 3);
yarnRunner = new YARNRunner(jobConf);
submissionContext = buildSubmitContext(yarnRunner, jobConf);
List<ResourceRequest> resourceRequests =
submissionContext.getAMContainerResourceRequests();
Assert.assertEquals(1, resourceRequests.size());
ResourceRequest resourceRequest = resourceRequests.get(0);
ResourceInformation resourceInformation = resourceRequest.getCapability()
.getResourceInformation(customResourceName);
Assert.assertEquals("Expecting the default unit (G)",
"G", resourceInformation.getUnits());
Assert.assertEquals(5L, resourceInformation.getValue());
Assert.assertEquals(3, resourceRequest.getCapability().getVirtualCores());
}
@Test
public void testAMRMemoryRequest() throws Exception {
for (String memoryName : ImmutableList.of(
MRJobConfig.RESOURCE_TYPE_NAME_MEMORY,
MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY)) {
JobConf jobConf = new JobConf();
jobConf.set(MRJobConfig.MR_AM_RESOURCE_PREFIX + memoryName, "3 Gi");
yarnRunner = new YARNRunner(jobConf);
submissionContext = buildSubmitContext(yarnRunner, jobConf);
List<ResourceRequest> resourceRequests =
submissionContext.getAMContainerResourceRequests();
Assert.assertEquals(1, resourceRequests.size());
ResourceRequest resourceRequest = resourceRequests.get(0);
long memorySize = resourceRequest.getCapability().getMemorySize();
Assert.assertEquals(3072, memorySize);
}
}
@Test
public void testAMRMemoryRequestOverriding() throws Exception {
for (String memoryName : ImmutableList.of(
MRJobConfig.RESOURCE_TYPE_NAME_MEMORY,
MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY)) {
TestAppender testAppender = new TestAppender();
Logger logger = Logger.getLogger(YARNRunner.class);
logger.addAppender(testAppender);
try {
JobConf jobConf = new JobConf();
jobConf.set(MRJobConfig.MR_AM_RESOURCE_PREFIX + memoryName, "3 Gi");
jobConf.setInt(MRJobConfig.MR_AM_VMEM_MB, 2048);
yarnRunner = new YARNRunner(jobConf);
submissionContext = buildSubmitContext(yarnRunner, jobConf);
List<ResourceRequest> resourceRequests =
submissionContext.getAMContainerResourceRequests();
Assert.assertEquals(1, resourceRequests.size());
ResourceRequest resourceRequest = resourceRequests.get(0);
long memorySize = resourceRequest.getCapability().getMemorySize();
Assert.assertEquals(3072, memorySize);
assertTrue(testAppender.getLogEvents().stream().anyMatch(
e -> e.getLevel() == Level.WARN && ("Configuration " +
"yarn.app.mapreduce.am.resource." + memoryName + "=3Gi is " +
"overriding the yarn.app.mapreduce.am.resource.mb=2048 " +
"configuration").equals(e.getMessage())));
} finally {
logger.removeAppender(testAppender);
}
}
}
private void initResourceTypes() {
Configuration configuration = new Configuration();
configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
CustomResourceTypesConfigurationProvider.class.getName());
ResourceUtils.resetResourceTypes(configuration);
}
}

View File

@ -42,7 +42,10 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* Helper class to read the resource-types to be supported by the system.
@ -56,6 +59,8 @@ public class ResourceUtils {
private static final String MEMORY = ResourceInformation.MEMORY_MB.getName();
private static final String VCORES = ResourceInformation.VCORES.getName();
private static final Pattern RESOURCE_REQUEST_VALUE_PATTERN =
Pattern.compile("^([0-9]+) ?([a-zA-Z]*)$");
private static volatile boolean initializedResources = false;
private static final Map<String, Integer> RESOURCE_NAME_TO_INDEX =
@ -600,4 +605,43 @@ public static void reinitializeResources(
ResourceUtils
.initializeResourcesFromResourceInformationMap(resourceInformationMap);
}
/**
* From a given configuration get all entries representing requested
* resources: entries that match the {prefix}{resourceName}={value}[{units}]
* pattern.
* @param configuration The configuration
* @param prefix Keys with this prefix are considered from the configuration
* @return The list of requested resources as described by the configuration
*/
public static List<ResourceInformation> getRequestedResourcesFromConfig(
Configuration configuration, String prefix) {
List<ResourceInformation> result = new ArrayList<>();
Map<String, String> customResourcesMap = configuration
.getValByRegex("^" + Pattern.quote(prefix) + "[^.]+$");
for (Entry<String, String> resource : customResourcesMap.entrySet()) {
String resourceName = resource.getKey().substring(prefix.length());
Matcher matcher =
RESOURCE_REQUEST_VALUE_PATTERN.matcher(resource.getValue());
if (!matcher.matches()) {
String errorMsg = "Invalid resource request specified for property "
+ resource.getKey() + ": \"" + resource.getValue()
+ "\", expected format is: value[ ][units]";
LOG.error(errorMsg);
throw new IllegalArgumentException(errorMsg);
}
long value = Long.parseLong(matcher.group(1));
String unit = matcher.group(2);
if (unit.isEmpty()) {
unit = ResourceUtils.getDefaultUnit(resourceName);
}
ResourceInformation resourceInformation = new ResourceInformation();
resourceInformation.setName(resourceName);
resourceInformation.setValue(value);
resourceInformation.setUnits(unit);
result.add(resourceInformation);
}
return result;
}
}