diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index 00c7b8405a..90e0d2181a 100755 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -18,6 +18,8 @@ package org.apache.hadoop.mapreduce.v2.app.job.impl; +import static org.apache.commons.lang.StringUtils.isEmpty; + import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; @@ -123,6 +125,7 @@ import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; @@ -136,6 +139,8 @@ import org.apache.hadoop.yarn.state.StateMachineFactory; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.RackResolver; +import org.apache.hadoop.yarn.util.UnitsConversionUtil; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -667,12 +672,8 @@ public TaskAttemptImpl(TaskId taskId, int i, this.jobFile = jobFile; this.partition = partition; - //TODO:create the resource reqt for this Task attempt this.resourceCapability = recordFactory.newRecordInstance(Resource.class); - this.resourceCapability.setMemorySize( - getMemoryRequired(conf, taskId.getTaskType())); - this.resourceCapability.setVirtualCores( - getCpuRequired(conf, taskId.getTaskType())); + populateResourceCapability(taskId.getTaskType()); this.dataLocalHosts = resolveHosts(dataLocalHosts); RackResolver.init(conf); @@ -689,25 +690,137 @@ public TaskAttemptImpl(TaskId taskId, int i, stateMachine = stateMachineFactory.make(this); } + private void populateResourceCapability(TaskType taskType) { + String resourceTypePrefix = + getResourceTypePrefix(taskType); + boolean memorySet = false; + boolean cpuVcoresSet = false; + if (resourceTypePrefix != null) { + List resourceRequests = + ResourceUtils.getRequestedResourcesFromConfig(conf, + resourceTypePrefix); + for (ResourceInformation resourceRequest : resourceRequests) { + String resourceName = resourceRequest.getName(); + if (MRJobConfig.RESOURCE_TYPE_NAME_MEMORY.equals(resourceName) || + MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY.equals( + resourceName)) { + if (memorySet) { + throw new IllegalArgumentException( + "Only one of the following keys " + + "can be specified for a single job: " + + MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY + ", " + + MRJobConfig.RESOURCE_TYPE_NAME_MEMORY); + } + String units = isEmpty(resourceRequest.getUnits()) ? + ResourceUtils.getDefaultUnit(ResourceInformation.MEMORY_URI) : + resourceRequest.getUnits(); + this.resourceCapability.setMemorySize( + UnitsConversionUtil.convert(units, "Mi", + resourceRequest.getValue())); + memorySet = true; + String memoryKey = getMemoryKey(taskType); + if (memoryKey != null && conf.get(memoryKey) != null) { + LOG.warn("Configuration " + resourceTypePrefix + resourceName + + "=" + resourceRequest.getValue() + resourceRequest.getUnits() + + " is overriding the " + memoryKey + "=" + conf.get(memoryKey) + + " configuration"); + } + } else if (MRJobConfig.RESOURCE_TYPE_NAME_VCORE.equals( + resourceName)) { + this.resourceCapability.setVirtualCores( + (int) UnitsConversionUtil.convert(resourceRequest.getUnits(), "", + resourceRequest.getValue())); + cpuVcoresSet = true; + String cpuKey = getCpuVcoresKey(taskType); + if (cpuKey != null && conf.get(cpuKey) != null) { + LOG.warn("Configuration " + resourceTypePrefix + + MRJobConfig.RESOURCE_TYPE_NAME_VCORE + "=" + + resourceRequest.getValue() + resourceRequest.getUnits() + + " is overriding the " + cpuKey + "=" + + conf.get(cpuKey) + " configuration"); + } + } else { + ResourceInformation resourceInformation = + this.resourceCapability.getResourceInformation(resourceName); + resourceInformation.setUnits(resourceRequest.getUnits()); + resourceInformation.setValue(resourceRequest.getValue()); + this.resourceCapability.setResourceInformation(resourceName, + resourceInformation); + } + } + } + if (!memorySet) { + this.resourceCapability.setMemorySize(getMemoryRequired(conf, taskType)); + } + if (!cpuVcoresSet) { + this.resourceCapability.setVirtualCores(getCpuRequired(conf, taskType)); + } + } + + private String getCpuVcoresKey(TaskType taskType) { + switch (taskType) { + case MAP: + return MRJobConfig.MAP_CPU_VCORES; + case REDUCE: + return MRJobConfig.REDUCE_CPU_VCORES; + default: + return null; + } + } + + private String getMemoryKey(TaskType taskType) { + switch (taskType) { + case MAP: + return MRJobConfig.MAP_MEMORY_MB; + case REDUCE: + return MRJobConfig.REDUCE_MEMORY_MB; + default: + return null; + } + } + + private Integer getCpuVcoreDefault(TaskType taskType) { + switch (taskType) { + case MAP: + return MRJobConfig.DEFAULT_MAP_CPU_VCORES; + case REDUCE: + return MRJobConfig.DEFAULT_REDUCE_CPU_VCORES; + default: + return null; + } + } + private int getMemoryRequired(JobConf conf, TaskType taskType) { return conf.getMemoryRequired(TypeConverter.fromYarn(taskType)); } private int getCpuRequired(Configuration conf, TaskType taskType) { int vcores = 1; - if (taskType == TaskType.MAP) { - vcores = - conf.getInt(MRJobConfig.MAP_CPU_VCORES, - MRJobConfig.DEFAULT_MAP_CPU_VCORES); - } else if (taskType == TaskType.REDUCE) { - vcores = - conf.getInt(MRJobConfig.REDUCE_CPU_VCORES, - MRJobConfig.DEFAULT_REDUCE_CPU_VCORES); + String cpuVcoreKey = getCpuVcoresKey(taskType); + if (cpuVcoreKey != null) { + Integer defaultCpuVcores = getCpuVcoreDefault(taskType); + if (null == defaultCpuVcores) { + defaultCpuVcores = vcores; + } + vcores = conf.getInt(cpuVcoreKey, defaultCpuVcores); } - return vcores; } + private String getResourceTypePrefix(TaskType taskType) { + switch (taskType) { + case MAP: + return MRJobConfig.MAP_RESOURCE_TYPE_PREFIX; + case REDUCE: + return MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX; + default: + LOG.info("TaskType " + taskType + + " does not support custom resource types - this support can be " + + "added in " + getClass().getSimpleName()); + return null; + } + } + /** * Create a {@link LocalResource} record with all the given parameters. * The NM that hosts AM container will upload resources to shared cache. diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/TestMapreduceConfigFields.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/TestMapreduceConfigFields.java index 7f187147c0..5d42fbfd0a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/TestMapreduceConfigFields.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/TestMapreduceConfigFields.java @@ -71,6 +71,17 @@ public void initializeMemberVariables() { .add(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY); configurationPropsToSkipCompare .add(JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY); + + // Resource type related properties are only prefixes, + // they need to be postfixed with the resource name + // in order to take effect. + // There is nothing to be added to mapred-default.xml + configurationPropsToSkipCompare.add( + MRJobConfig.MR_AM_RESOURCE_PREFIX); + configurationPropsToSkipCompare.add( + MRJobConfig.MAP_RESOURCE_TYPE_PREFIX); + configurationPropsToSkipCompare.add( + MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java index 61b780ef31..fe5d95dc25 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java @@ -28,13 +28,20 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.io.ByteArrayInputStream; import java.io.IOException; +import java.io.InputStream; import java.net.InetSocketAddress; +import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; +import org.junit.After; import org.junit.Assert; +import org.junit.BeforeClass; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -42,6 +49,7 @@ import org.apache.hadoop.fs.RawLocalFileSystem; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapTaskAttemptImpl; +import org.apache.hadoop.mapred.ReduceTaskAttemptImpl; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.JobCounter; import org.apache.hadoop.mapreduce.MRJobConfig; @@ -82,24 +90,36 @@ import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.LocalConfigurationProvider; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.ControlledClock; import org.apache.hadoop.yarn.util.SystemClock; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; +import org.apache.log4j.AppenderSkeleton; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.log4j.spi.LoggingEvent; import org.junit.Test; import org.mockito.ArgumentCaptor; +import com.google.common.collect.ImmutableList; + @SuppressWarnings({"unchecked", "rawtypes"}) public class TestTaskAttempt{ - + + private static final String CUSTOM_RESOURCE_NAME = "a-custom-resource"; + static public class StubbedFS extends RawLocalFileSystem { @Override public FileStatus getFileStatus(Path f) throws IOException { @@ -107,6 +127,63 @@ public FileStatus getFileStatus(Path f) throws IOException { } } + private static class CustomResourceTypesConfigurationProvider + extends LocalConfigurationProvider { + + @Override + public InputStream getConfigurationInputStream(Configuration bootstrapConf, + String name) throws YarnException, IOException { + if (YarnConfiguration.RESOURCE_TYPES_CONFIGURATION_FILE.equals(name)) { + return new ByteArrayInputStream( + ("\n" + + " \n" + + " yarn.resource-types\n" + + " a-custom-resource\n" + + " \n" + + " \n" + + " yarn.resource-types.a-custom-resource.units\n" + + " G\n" + + " \n" + + "\n").getBytes()); + } else { + return super.getConfigurationInputStream(bootstrapConf, name); + } + } + } + + private static class TestAppender extends AppenderSkeleton { + + private final List logEvents = new CopyOnWriteArrayList<>(); + + @Override + public boolean requiresLayout() { + return false; + } + + @Override + public void close() { + } + + @Override + protected void append(LoggingEvent arg0) { + logEvents.add(arg0); + } + + private List getLogEvents() { + return logEvents; + } + } + + @BeforeClass + public static void setupBeforeClass() { + ResourceUtils.resetResourceTypes(new Configuration()); + } + + @After + public void tearDown() { + ResourceUtils.resetResourceTypes(new Configuration()); + } + @Test public void testMRAppHistoryForMap() throws Exception { MRApp app = new FailingAttemptsMRApp(1, 0); @@ -328,17 +405,18 @@ public void verifyMillisCounters(Resource containerResource, private TaskAttemptImpl createMapTaskAttemptImplForTest( EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo) { Clock clock = SystemClock.getInstance(); - return createMapTaskAttemptImplForTest(eventHandler, taskSplitMetaInfo, clock); + return createMapTaskAttemptImplForTest(eventHandler, taskSplitMetaInfo, + clock, new JobConf()); } private TaskAttemptImpl createMapTaskAttemptImplForTest( - EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo, Clock clock) { + EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo, + Clock clock, JobConf jobConf) { ApplicationId appId = ApplicationId.newInstance(1, 1); JobId jobId = MRBuilderUtils.newJobId(appId, 1); TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP); TaskAttemptListener taListener = mock(TaskAttemptListener.class); Path jobFile = mock(Path.class); - JobConf jobConf = new JobConf(); TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, taskSplitMetaInfo, jobConf, taListener, null, @@ -346,6 +424,20 @@ private TaskAttemptImpl createMapTaskAttemptImplForTest( return taImpl; } + private TaskAttemptImpl createReduceTaskAttemptImplForTest( + EventHandler eventHandler, Clock clock, JobConf jobConf) { + ApplicationId appId = ApplicationId.newInstance(1, 1); + JobId jobId = MRBuilderUtils.newJobId(appId, 1); + TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.REDUCE); + TaskAttemptListener taListener = mock(TaskAttemptListener.class); + Path jobFile = mock(Path.class); + TaskAttemptImpl taImpl = + new ReduceTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, + 1, jobConf, taListener, null, + null, clock, null); + return taImpl; + } + private void testMRAppHistory(MRApp app) throws Exception { Configuration conf = new Configuration(); Job job = app.submit(conf); @@ -1412,6 +1504,259 @@ public void testTimeoutWhileFailFinishing() throws Exception { assertFalse("InternalError occurred", eventHandler.internalError); } + @Test + public void testMapperCustomResourceTypes() { + initResourceTypes(); + EventHandler eventHandler = mock(EventHandler.class); + TaskSplitMetaInfo taskSplitMetaInfo = new TaskSplitMetaInfo(); + Clock clock = SystemClock.getInstance(); + JobConf jobConf = new JobConf(); + jobConf.setLong(MRJobConfig.MAP_RESOURCE_TYPE_PREFIX + + CUSTOM_RESOURCE_NAME, 7L); + TaskAttemptImpl taImpl = createMapTaskAttemptImplForTest(eventHandler, + taskSplitMetaInfo, clock, jobConf); + ResourceInformation resourceInfo = + getResourceInfoFromContainerRequest(taImpl, eventHandler). + getResourceInformation(CUSTOM_RESOURCE_NAME); + assertEquals("Expecting the default unit (G)", + "G", resourceInfo.getUnits()); + assertEquals(7L, resourceInfo.getValue()); + } + + @Test + public void testReducerCustomResourceTypes() { + initResourceTypes(); + EventHandler eventHandler = mock(EventHandler.class); + Clock clock = SystemClock.getInstance(); + JobConf jobConf = new JobConf(); + jobConf.set(MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX + + CUSTOM_RESOURCE_NAME, "3m"); + TaskAttemptImpl taImpl = + createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf); + ResourceInformation resourceInfo = + getResourceInfoFromContainerRequest(taImpl, eventHandler). + getResourceInformation(CUSTOM_RESOURCE_NAME); + assertEquals("Expecting the specified unit (m)", + "m", resourceInfo.getUnits()); + assertEquals(3L, resourceInfo.getValue()); + } + + @Test + public void testReducerMemoryRequestViaMapreduceReduceMemoryMb() { + EventHandler eventHandler = mock(EventHandler.class); + Clock clock = SystemClock.getInstance(); + JobConf jobConf = new JobConf(); + jobConf.setInt(MRJobConfig.REDUCE_MEMORY_MB, 2048); + TaskAttemptImpl taImpl = + createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf); + long memorySize = + getResourceInfoFromContainerRequest(taImpl, eventHandler). + getMemorySize(); + assertEquals(2048, memorySize); + } + + @Test + public void testReducerMemoryRequestViaMapreduceReduceResourceMemory() { + EventHandler eventHandler = mock(EventHandler.class); + Clock clock = SystemClock.getInstance(); + JobConf jobConf = new JobConf(); + jobConf.set(MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX + + MRJobConfig.RESOURCE_TYPE_NAME_MEMORY, "2 Gi"); + TaskAttemptImpl taImpl = + createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf); + long memorySize = + getResourceInfoFromContainerRequest(taImpl, eventHandler). + getMemorySize(); + assertEquals(2048, memorySize); + } + + @Test + public void testReducerMemoryRequestDefaultMemory() { + EventHandler eventHandler = mock(EventHandler.class); + Clock clock = SystemClock.getInstance(); + TaskAttemptImpl taImpl = + createReduceTaskAttemptImplForTest(eventHandler, clock, new JobConf()); + long memorySize = + getResourceInfoFromContainerRequest(taImpl, eventHandler). + getMemorySize(); + assertEquals(MRJobConfig.DEFAULT_REDUCE_MEMORY_MB, memorySize); + } + + @Test + public void testReducerMemoryRequestWithoutUnits() { + Clock clock = SystemClock.getInstance(); + for (String memoryResourceName : ImmutableList.of( + MRJobConfig.RESOURCE_TYPE_NAME_MEMORY, + MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY)) { + EventHandler eventHandler = mock(EventHandler.class); + JobConf jobConf = new JobConf(); + jobConf.setInt(MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX + + memoryResourceName, 2048); + TaskAttemptImpl taImpl = + createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf); + long memorySize = + getResourceInfoFromContainerRequest(taImpl, eventHandler). + getMemorySize(); + assertEquals(2048, memorySize); + } + } + + @Test + public void testReducerMemoryRequestOverriding() { + for (String memoryName : ImmutableList.of( + MRJobConfig.RESOURCE_TYPE_NAME_MEMORY, + MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY)) { + TestAppender testAppender = new TestAppender(); + final Logger logger = Logger.getLogger(TaskAttemptImpl.class); + try { + logger.addAppender(testAppender); + EventHandler eventHandler = mock(EventHandler.class); + Clock clock = SystemClock.getInstance(); + JobConf jobConf = new JobConf(); + jobConf.set(MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX + memoryName, + "3Gi"); + jobConf.setInt(MRJobConfig.REDUCE_MEMORY_MB, 2048); + TaskAttemptImpl taImpl = + createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf); + long memorySize = + getResourceInfoFromContainerRequest(taImpl, eventHandler). + getMemorySize(); + assertEquals(3072, memorySize); + assertTrue(testAppender.getLogEvents().stream() + .anyMatch(e -> e.getLevel() == Level.WARN && ("Configuration " + + "mapreduce.reduce.resource." + memoryName + "=3Gi is " + + "overriding the mapreduce.reduce.memory.mb=2048 configuration") + .equals(e.getMessage()))); + } finally { + logger.removeAppender(testAppender); + } + } + } + + @Test(expected=IllegalArgumentException.class) + public void testReducerMemoryRequestMultipleName() { + EventHandler eventHandler = mock(EventHandler.class); + Clock clock = SystemClock.getInstance(); + JobConf jobConf = new JobConf(); + for (String memoryName : ImmutableList.of( + MRJobConfig.RESOURCE_TYPE_NAME_MEMORY, + MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY)) { + jobConf.set(MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX + memoryName, + "3Gi"); + } + createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf); + } + + @Test + public void testReducerCpuRequestViaMapreduceReduceCpuVcores() { + EventHandler eventHandler = mock(EventHandler.class); + Clock clock = SystemClock.getInstance(); + JobConf jobConf = new JobConf(); + jobConf.setInt(MRJobConfig.REDUCE_CPU_VCORES, 3); + TaskAttemptImpl taImpl = + createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf); + int vCores = + getResourceInfoFromContainerRequest(taImpl, eventHandler). + getVirtualCores(); + assertEquals(3, vCores); + } + + @Test + public void testReducerCpuRequestViaMapreduceReduceResourceVcores() { + EventHandler eventHandler = mock(EventHandler.class); + Clock clock = SystemClock.getInstance(); + JobConf jobConf = new JobConf(); + jobConf.set(MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX + + MRJobConfig.RESOURCE_TYPE_NAME_VCORE, "5"); + TaskAttemptImpl taImpl = + createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf); + int vCores = + getResourceInfoFromContainerRequest(taImpl, eventHandler). + getVirtualCores(); + assertEquals(5, vCores); + } + + @Test + public void testReducerCpuRequestDefaultMemory() { + EventHandler eventHandler = mock(EventHandler.class); + Clock clock = SystemClock.getInstance(); + TaskAttemptImpl taImpl = + createReduceTaskAttemptImplForTest(eventHandler, clock, new JobConf()); + int vCores = + getResourceInfoFromContainerRequest(taImpl, eventHandler). + getVirtualCores(); + assertEquals(MRJobConfig.DEFAULT_REDUCE_CPU_VCORES, vCores); + } + + @Test + public void testReducerCpuRequestOverriding() { + TestAppender testAppender = new TestAppender(); + final Logger logger = Logger.getLogger(TaskAttemptImpl.class); + try { + logger.addAppender(testAppender); + EventHandler eventHandler = mock(EventHandler.class); + Clock clock = SystemClock.getInstance(); + JobConf jobConf = new JobConf(); + jobConf.set(MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX + + MRJobConfig.RESOURCE_TYPE_NAME_VCORE, "7"); + jobConf.setInt(MRJobConfig.REDUCE_CPU_VCORES, 9); + TaskAttemptImpl taImpl = + createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf); + long vCores = + getResourceInfoFromContainerRequest(taImpl, eventHandler). + getVirtualCores(); + assertEquals(7, vCores); + assertTrue(testAppender.getLogEvents().stream().anyMatch( + e -> e.getLevel() == Level.WARN && ("Configuration " + + "mapreduce.reduce.resource.vcores=7 is overriding the " + + "mapreduce.reduce.cpu.vcores=9 configuration").equals( + e.getMessage()))); + } finally { + logger.removeAppender(testAppender); + } + } + + private Resource getResourceInfoFromContainerRequest( + TaskAttemptImpl taImpl, EventHandler eventHandler) { + taImpl.handle(new TaskAttemptEvent(taImpl.getID(), + TaskAttemptEventType.TA_SCHEDULE)); + + assertEquals("Task attempt is not in STARTING state", taImpl.getState(), + TaskAttemptState.STARTING); + + ArgumentCaptor captor = ArgumentCaptor.forClass(Event.class); + verify(eventHandler, times(2)).handle(captor.capture()); + + List containerRequestEvents = new ArrayList<>(); + for (Event e : captor.getAllValues()) { + if (e instanceof ContainerRequestEvent) { + containerRequestEvents.add((ContainerRequestEvent) e); + } + } + assertEquals("Expected one ContainerRequestEvent after scheduling " + + "task attempt", 1, containerRequestEvents.size()); + + return containerRequestEvents.get(0).getCapability(); + } + + @Test(expected=IllegalArgumentException.class) + public void testReducerCustomResourceTypeWithInvalidUnit() { + initResourceTypes(); + EventHandler eventHandler = mock(EventHandler.class); + Clock clock = SystemClock.getInstance(); + JobConf jobConf = new JobConf(); + jobConf.set(MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX + + CUSTOM_RESOURCE_NAME, "3z"); + createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf); + } + + private void initResourceTypes() { + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS, + CustomResourceTypesConfigurationProvider.class.getName()); + ResourceUtils.resetResourceTypes(conf); + } + private void setupTaskAttemptFinishingMonitor( EventHandler eventHandler, JobConf jobConf, AppContext appCtx) { TaskAttemptFinishingMonitor taskAttemptFinishingMonitor = diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index 91541eb29b..6acf1bc8dd 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -363,12 +363,47 @@ public interface MRJobConfig { public static final String MAP_INPUT_START = "mapreduce.map.input.start"; + /** + * Configuration key for specifying memory requirement for the mapper. + * Kept for backward-compatibility, mapreduce.map.resource.memory + * is the new preferred way to specify this. + */ public static final String MAP_MEMORY_MB = "mapreduce.map.memory.mb"; public static final int DEFAULT_MAP_MEMORY_MB = 1024; + /** + * Configuration key for specifying CPU requirement for the mapper. + * Kept for backward-compatibility, mapreduce.map.resource.vcores + * is the new preferred way to specify this. + */ public static final String MAP_CPU_VCORES = "mapreduce.map.cpu.vcores"; public static final int DEFAULT_MAP_CPU_VCORES = 1; + /** + * Custom resource names required by the mapper should be + * appended to this prefix, the value's format is {amount}[ ][{unit}]. + * If no unit is defined, the default unit will be used. + * Standard resource names: memory (default unit: Mi), vcores + */ + public static final String MAP_RESOURCE_TYPE_PREFIX = + "mapreduce.map.resource."; + + /** + * Resource type name for CPU vcores. + */ + public static final String RESOURCE_TYPE_NAME_VCORE = "vcores"; + + /** + * Resource type name for memory. + */ + public static final String RESOURCE_TYPE_NAME_MEMORY = "memory"; + + /** + * Alternative resource type name for memory. + */ + public static final String RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY = + "memory-mb"; + public static final String MAP_ENV = "mapreduce.map.env"; public static final String MAP_JAVA_OPTS = "mapreduce.map.java.opts"; @@ -417,12 +452,31 @@ public interface MRJobConfig { public static final String REDUCE_MARKRESET_BUFFER_SIZE = "mapreduce.reduce.markreset.buffer.size"; + /** + * Configuration key for specifying memory requirement for the reducer. + * Kept for backward-compatibility, mapreduce.reduce.resource.memory + * is the new preferred way to specify this. + */ public static final String REDUCE_MEMORY_MB = "mapreduce.reduce.memory.mb"; public static final int DEFAULT_REDUCE_MEMORY_MB = 1024; + /** + * Configuration key for specifying CPU requirement for the reducer. + * Kept for backward-compatibility, mapreduce.reduce.resource.vcores + * is the new preferred way to specify this. + */ public static final String REDUCE_CPU_VCORES = "mapreduce.reduce.cpu.vcores"; public static final int DEFAULT_REDUCE_CPU_VCORES = 1; + /** + * Resource names required by the reducer should be + * appended to this prefix, the value's format is {amount}[ ][{unit}]. + * If no unit is defined, the default unit will be used. + * Standard resource names: memory (default unit: Mi), vcores + */ + public static final String REDUCE_RESOURCE_TYPE_PREFIX = + "mapreduce.reduce.resource."; + public static final String REDUCE_MEMORY_TOTAL_BYTES = "mapreduce.reduce.memory.totalbytes"; public static final String SHUFFLE_INPUT_BUFFER_PERCENT = "mapreduce.reduce.shuffle.input.buffer.percent"; @@ -608,7 +662,10 @@ public interface MRJobConfig { public static final String DEFAULT_MR_AM_STAGING_DIR = "/tmp/hadoop-yarn/staging"; - /** The amount of memory the MR app master needs.*/ + /** The amount of memory the MR app master needs. + * Kept for backward-compatibility, yarn.app.mapreduce.am.resource.memory is + * the new preferred way to specify this + */ public static final String MR_AM_VMEM_MB = MR_AM_PREFIX+"resource.mb"; public static final int DEFAULT_MR_AM_VMEM_MB = 1536; @@ -618,6 +675,15 @@ public interface MRJobConfig { MR_AM_PREFIX+"resource.cpu-vcores"; public static final int DEFAULT_MR_AM_CPU_VCORES = 1; + /** + * Resource names required by the MR AM should be + * appended to this prefix, the value's format is {amount}[ ][{unit}]. + * If no unit is defined, the default unit will be used + * Standard resource names: memory (default unit: Mi), vcores + */ + public static final String MR_AM_RESOURCE_PREFIX = + MR_AM_PREFIX + "resource."; + /** Command line arguments passed to the MR app master.*/ public static final String MR_AM_COMMAND_OPTS = MR_AM_PREFIX+"command-opts"; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java index a23ff34b57..12a307930f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java @@ -18,6 +18,9 @@ package org.apache.hadoop.mapred; +import static org.apache.commons.lang.StringUtils.isEmpty; +import static org.apache.hadoop.mapreduce.MRJobConfig.MR_AM_RESOURCE_PREFIX; + import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; @@ -84,6 +87,7 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.api.records.YarnApplicationState; @@ -93,6 +97,8 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.security.client.RMDelegationTokenSelector; import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.hadoop.yarn.util.UnitsConversionUtil; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; import com.google.common.annotations.VisibleForTesting; @@ -659,16 +665,76 @@ public ApplicationSubmissionContext createApplicationSubmissionContext( private List generateResourceRequests() throws IOException { Resource capability = recordFactory.newRecordInstance(Resource.class); - capability.setMemorySize( - conf.getInt( - MRJobConfig.MR_AM_VMEM_MB, MRJobConfig.DEFAULT_MR_AM_VMEM_MB - ) - ); - capability.setVirtualCores( - conf.getInt( - MRJobConfig.MR_AM_CPU_VCORES, MRJobConfig.DEFAULT_MR_AM_CPU_VCORES - ) - ); + boolean memorySet = false; + boolean cpuVcoresSet = false; + List resourceRequests = ResourceUtils + .getRequestedResourcesFromConfig(conf, MR_AM_RESOURCE_PREFIX); + for (ResourceInformation resourceReq : resourceRequests) { + String resourceName = resourceReq.getName(); + if (MRJobConfig.RESOURCE_TYPE_NAME_MEMORY.equals(resourceName) || + MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY.equals( + resourceName)) { + if (memorySet) { + throw new IllegalArgumentException( + "Only one of the following keys " + + "can be specified for a single job: " + + MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY + ", " + + MRJobConfig.RESOURCE_TYPE_NAME_MEMORY); + } + String units = isEmpty(resourceReq.getUnits()) ? + ResourceUtils.getDefaultUnit(ResourceInformation.MEMORY_URI) : + resourceReq.getUnits(); + capability.setMemorySize( + UnitsConversionUtil.convert(units, "Mi", resourceReq.getValue())); + memorySet = true; + if (conf.get(MRJobConfig.MR_AM_VMEM_MB) != null) { + LOG.warn("Configuration " + MR_AM_RESOURCE_PREFIX + + resourceName + "=" + resourceReq.getValue() + + resourceReq.getUnits() + " is overriding the " + + MRJobConfig.MR_AM_VMEM_MB + "=" + + conf.get(MRJobConfig.MR_AM_VMEM_MB) + " configuration"); + } + } else if (MRJobConfig.RESOURCE_TYPE_NAME_VCORE.equals(resourceName)) { + capability.setVirtualCores( + (int) UnitsConversionUtil.convert(resourceReq.getUnits(), "", + resourceReq.getValue())); + cpuVcoresSet = true; + if (conf.get(MRJobConfig.MR_AM_CPU_VCORES) != null) { + LOG.warn("Configuration " + MR_AM_RESOURCE_PREFIX + + resourceName + "=" + resourceReq.getValue() + + resourceReq.getUnits() + " is overriding the " + + MRJobConfig.MR_AM_CPU_VCORES + "=" + + conf.get(MRJobConfig.MR_AM_CPU_VCORES) + " configuration"); + } + } else if (!MRJobConfig.MR_AM_VMEM_MB.equals( + MR_AM_RESOURCE_PREFIX + resourceName) && + !MRJobConfig.MR_AM_CPU_VCORES.equals( + MR_AM_RESOURCE_PREFIX + resourceName)) { + // the "mb", "cpu-vcores" resource types are not processed here + // since the yarn.app.mapreduce.am.resource.mb, + // yarn.app.mapreduce.am.resource.cpu-vcores keys are used for + // backward-compatibility - which is handled after this loop + ResourceInformation resourceInformation = capability + .getResourceInformation(resourceName); + resourceInformation.setUnits(resourceReq.getUnits()); + resourceInformation.setValue(resourceReq.getValue()); + capability.setResourceInformation(resourceName, resourceInformation); + } + } + if (!memorySet) { + capability.setMemorySize( + conf.getInt( + MRJobConfig.MR_AM_VMEM_MB, MRJobConfig.DEFAULT_MR_AM_VMEM_MB + ) + ); + } + if (!cpuVcoresSet) { + capability.setVirtualCores( + conf.getInt( + MRJobConfig.MR_AM_CPU_VCORES, MRJobConfig.DEFAULT_MR_AM_CPU_VCORES + ) + ); + } if (LOG.isDebugEnabled()) { LOG.debug("AppMaster capability = " + capability); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java index 55ddea6da3..378363b4b0 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java @@ -33,10 +33,12 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.net.InetSocketAddress; import java.nio.ByteBuffer; @@ -44,6 +46,7 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -69,6 +72,7 @@ import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.LocalConfigurationProvider; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; @@ -96,28 +100,37 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.Records; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.log4j.Appender; +import org.apache.log4j.AppenderSkeleton; import org.apache.log4j.Layout; +import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.log4j.SimpleLayout; import org.apache.log4j.WriterAppender; +import org.apache.log4j.spi.LoggingEvent; import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import com.google.common.collect.ImmutableList; + /** * Test YarnRunner and make sure the client side plugin works * fine @@ -131,6 +144,53 @@ public class TestYARNRunner { MRJobConfig.DEFAULT_TASK_PROFILE_PARAMS.substring(0, MRJobConfig.DEFAULT_TASK_PROFILE_PARAMS.lastIndexOf("%")); + private static class CustomResourceTypesConfigurationProvider + extends LocalConfigurationProvider { + + @Override + public InputStream getConfigurationInputStream(Configuration bootstrapConf, + String name) throws YarnException, IOException { + if (YarnConfiguration.RESOURCE_TYPES_CONFIGURATION_FILE.equals(name)) { + return new ByteArrayInputStream( + ("\n" + + " \n" + + " yarn.resource-types\n" + + " a-custom-resource\n" + + " \n" + + " \n" + + " yarn.resource-types.a-custom-resource.units\n" + + " G\n" + + " \n" + + "\n").getBytes()); + } else { + return super.getConfigurationInputStream(bootstrapConf, name); + } + } + } + + private static class TestAppender extends AppenderSkeleton { + + private final List logEvents = new CopyOnWriteArrayList<>(); + + @Override + public boolean requiresLayout() { + return false; + } + + @Override + public void close() { + } + + @Override + protected void append(LoggingEvent arg0) { + logEvents.add(arg0); + } + + private List getLogEvents() { + return logEvents; + } + } + private YARNRunner yarnRunner; private ResourceMgrDelegate resourceMgrDelegate; private YarnConfiguration conf; @@ -143,6 +203,11 @@ public class TestYARNRunner { private ClientServiceDelegate clientDelegate; private static final String failString = "Rejected job"; + @BeforeClass + public static void setupBeforeClass() { + ResourceUtils.resetResourceTypes(new Configuration()); + } + @Before public void setUp() throws Exception { resourceMgrDelegate = mock(ResourceMgrDelegate.class); @@ -175,6 +240,7 @@ public ApplicationSubmissionContext answer(InvocationOnMock invocation) @After public void cleanup() { FileUtil.fullyDelete(testWorkDir); + ResourceUtils.resetResourceTypes(new Configuration()); } @Test(timeout=20000) @@ -881,4 +947,99 @@ public void testSendJobConf() throws IOException { .get("hadoop.tmp.dir").equals("testconfdir")); UserGroupInformation.reset(); } + + @Test + public void testCustomAMRMResourceType() throws Exception { + initResourceTypes(); + String customResourceName = "a-custom-resource"; + + JobConf jobConf = new JobConf(); + + jobConf.setInt(MRJobConfig.MR_AM_RESOURCE_PREFIX + + customResourceName, 5); + jobConf.setInt(MRJobConfig.MR_AM_CPU_VCORES, 3); + + yarnRunner = new YARNRunner(jobConf); + + submissionContext = buildSubmitContext(yarnRunner, jobConf); + + List resourceRequests = + submissionContext.getAMContainerResourceRequests(); + + Assert.assertEquals(1, resourceRequests.size()); + ResourceRequest resourceRequest = resourceRequests.get(0); + + ResourceInformation resourceInformation = resourceRequest.getCapability() + .getResourceInformation(customResourceName); + Assert.assertEquals("Expecting the default unit (G)", + "G", resourceInformation.getUnits()); + Assert.assertEquals(5L, resourceInformation.getValue()); + Assert.assertEquals(3, resourceRequest.getCapability().getVirtualCores()); + } + + @Test + public void testAMRMemoryRequest() throws Exception { + for (String memoryName : ImmutableList.of( + MRJobConfig.RESOURCE_TYPE_NAME_MEMORY, + MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY)) { + JobConf jobConf = new JobConf(); + jobConf.set(MRJobConfig.MR_AM_RESOURCE_PREFIX + memoryName, "3 Gi"); + + yarnRunner = new YARNRunner(jobConf); + + submissionContext = buildSubmitContext(yarnRunner, jobConf); + + List resourceRequests = + submissionContext.getAMContainerResourceRequests(); + + Assert.assertEquals(1, resourceRequests.size()); + ResourceRequest resourceRequest = resourceRequests.get(0); + + long memorySize = resourceRequest.getCapability().getMemorySize(); + Assert.assertEquals(3072, memorySize); + } + } + + @Test + public void testAMRMemoryRequestOverriding() throws Exception { + for (String memoryName : ImmutableList.of( + MRJobConfig.RESOURCE_TYPE_NAME_MEMORY, + MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY)) { + TestAppender testAppender = new TestAppender(); + Logger logger = Logger.getLogger(YARNRunner.class); + logger.addAppender(testAppender); + try { + JobConf jobConf = new JobConf(); + jobConf.set(MRJobConfig.MR_AM_RESOURCE_PREFIX + memoryName, "3 Gi"); + jobConf.setInt(MRJobConfig.MR_AM_VMEM_MB, 2048); + + yarnRunner = new YARNRunner(jobConf); + + submissionContext = buildSubmitContext(yarnRunner, jobConf); + + List resourceRequests = + submissionContext.getAMContainerResourceRequests(); + + Assert.assertEquals(1, resourceRequests.size()); + ResourceRequest resourceRequest = resourceRequests.get(0); + + long memorySize = resourceRequest.getCapability().getMemorySize(); + Assert.assertEquals(3072, memorySize); + assertTrue(testAppender.getLogEvents().stream().anyMatch( + e -> e.getLevel() == Level.WARN && ("Configuration " + + "yarn.app.mapreduce.am.resource." + memoryName + "=3Gi is " + + "overriding the yarn.app.mapreduce.am.resource.mb=2048 " + + "configuration").equals(e.getMessage()))); + } finally { + logger.removeAppender(testAppender); + } + } + } + + private void initResourceTypes() { + Configuration configuration = new Configuration(); + configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS, + CustomResourceTypesConfigurationProvider.class.getName()); + ResourceUtils.resetResourceTypes(configuration); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java index 3a09de5d6b..8f75909cc5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java @@ -42,7 +42,10 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; +import java.util.regex.Matcher; +import java.util.regex.Pattern; /** * Helper class to read the resource-types to be supported by the system. @@ -56,6 +59,8 @@ public class ResourceUtils { private static final String MEMORY = ResourceInformation.MEMORY_MB.getName(); private static final String VCORES = ResourceInformation.VCORES.getName(); + private static final Pattern RESOURCE_REQUEST_VALUE_PATTERN = + Pattern.compile("^([0-9]+) ?([a-zA-Z]*)$"); private static volatile boolean initializedResources = false; private static final Map RESOURCE_NAME_TO_INDEX = @@ -600,4 +605,43 @@ public static void reinitializeResources( ResourceUtils .initializeResourcesFromResourceInformationMap(resourceInformationMap); } + + /** + * From a given configuration get all entries representing requested + * resources: entries that match the {prefix}{resourceName}={value}[{units}] + * pattern. + * @param configuration The configuration + * @param prefix Keys with this prefix are considered from the configuration + * @return The list of requested resources as described by the configuration + */ + public static List getRequestedResourcesFromConfig( + Configuration configuration, String prefix) { + List result = new ArrayList<>(); + Map customResourcesMap = configuration + .getValByRegex("^" + Pattern.quote(prefix) + "[^.]+$"); + for (Entry resource : customResourcesMap.entrySet()) { + String resourceName = resource.getKey().substring(prefix.length()); + Matcher matcher = + RESOURCE_REQUEST_VALUE_PATTERN.matcher(resource.getValue()); + if (!matcher.matches()) { + String errorMsg = "Invalid resource request specified for property " + + resource.getKey() + ": \"" + resource.getValue() + + "\", expected format is: value[ ][units]"; + LOG.error(errorMsg); + throw new IllegalArgumentException(errorMsg); + } + long value = Long.parseLong(matcher.group(1)); + String unit = matcher.group(2); + if (unit.isEmpty()) { + unit = ResourceUtils.getDefaultUnit(resourceName); + } + ResourceInformation resourceInformation = new ResourceInformation(); + resourceInformation.setName(resourceName); + resourceInformation.setValue(value); + resourceInformation.setUnits(unit); + result.add(resourceInformation); + } + return result; + } + }