From 2c5c8fdb80546467274607b26a1295b352c58fc8 Mon Sep 17 00:00:00 2001 From: Arun Murthy Date: Wed, 9 Jan 2013 05:28:40 +0000 Subject: [PATCH] MAPREDUCE-4520. Added support for MapReduce applications to request for CPU cores along-with memory post YARN-2. Contributed by Arun C. Murthy. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1430688 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 +++ .../mapreduce/v2/app/job/impl/JobImpl.java | 14 ++++++++++- .../v2/app/job/impl/TaskAttemptImpl.java | 20 +++++++++++++++- .../apache/hadoop/mapreduce/MRJobConfig.java | 13 +++++++--- .../src/main/resources/mapred-default.xml | 24 +++++++++++++++++++ .../org/apache/hadoop/mapred/YARNRunner.java | 12 ++++++++-- 6 files changed, 79 insertions(+), 7 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 5d342374be..8f42562ce8 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -172,6 +172,9 @@ Release 2.0.3-alpha - Unreleased NEW FEATURES + MAPREDUCE-4520. Added support for MapReduce applications to request for + CPU cores along-with memory post YARN-2. (acmurthy) + IMPROVEMENTS MAPREDUCE-3678. The Map tasks logs should have the value of input diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index b20457f862..7306cda792 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -1050,6 +1050,10 @@ private void makeUberDecision(long dataInputLength) { conf.getInt(MRJobConfig.MR_AM_VMEM_MB, MRJobConfig.DEFAULT_MR_AM_VMEM_MB); + long sysCPUSizeForUberSlot = + conf.getInt(MRJobConfig.MR_AM_CPU_VCORES, + MRJobConfig.DEFAULT_MR_AM_CPU_VCORES); + boolean uberEnabled = conf.getBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); boolean smallNumMapTasks = (numMapTasks <= sysMaxMaps); @@ -1061,6 +1065,13 @@ private void makeUberDecision(long dataInputLength) { conf.getLong(MRJobConfig.REDUCE_MEMORY_MB, 0)) <= sysMemSizeForUberSlot) || (sysMemSizeForUberSlot == JobConf.DISABLED_MEMORY_LIMIT)); + boolean smallCpu = + ( + Math.max( + conf.getInt(MRJobConfig.MAP_CPU_VCORES, 1), + conf.getInt(MRJobConfig.REDUCE_CPU_VCORES, 1)) < + sysCPUSizeForUberSlot + ); boolean notChainJob = !isChainJob(conf); // User has overall veto power over uberization, or user can modify @@ -1071,7 +1082,8 @@ private void makeUberDecision(long dataInputLength) { // while "uber-AM" (MR AM + LocalContainerLauncher) loops over tasks // and thus requires sequential execution. isUber = uberEnabled && smallNumMapTasks && smallNumReduceTasks - && smallInput && smallMemory && notChainJob && isValidUberMaxReduces; + && smallInput && smallMemory && smallCpu + && notChainJob && isValidUberMaxReduces; if (isUber) { LOG.info("Uberizing job " + jobId + ": " + numMapTasks + "m+" diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index f7aedbbed9..31ea2f2ac1 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -527,7 +527,10 @@ public TaskAttemptImpl(TaskId taskId, int i, //TODO:create the resource reqt for this Task attempt this.resourceCapability = recordFactory.newRecordInstance(Resource.class); - this.resourceCapability.setMemory(getMemoryRequired(conf, taskId.getTaskType())); + this.resourceCapability.setMemory( + getMemoryRequired(conf, taskId.getTaskType())); + this.resourceCapability.setVirtualCores( + getCpuRequired(conf, taskId.getTaskType())); this.dataLocalHosts = dataLocalHosts; RackResolver.init(conf); @@ -551,6 +554,21 @@ private int getMemoryRequired(Configuration conf, TaskType taskType) { return memory; } + private int getCpuRequired(Configuration conf, TaskType taskType) { + int vcores = 1; + if (taskType == TaskType.MAP) { + vcores = + conf.getInt(MRJobConfig.MAP_CPU_VCORES, + MRJobConfig.DEFAULT_MAP_CPU_VCORES); + } else if (taskType == TaskType.REDUCE) { + vcores = + conf.getInt(MRJobConfig.REDUCE_CPU_VCORES, + MRJobConfig.DEFAULT_REDUCE_CPU_VCORES); + } + + return vcores; + } + /** * Create a {@link LocalResource} record with all the given parameters. */ diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index 71ab51b29f..defd06dc2a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -184,7 +184,8 @@ public interface MRJobConfig { public static final String MAP_MEMORY_MB = "mapreduce.map.memory.mb"; public static final int DEFAULT_MAP_MEMORY_MB = 1024; - public static final String MAP_MEMORY_PHYSICAL_MB = "mapreduce.map.memory.physical.mb"; + public static final String MAP_CPU_VCORES = "mapreduce.map.cpu.vcores"; + public static final int DEFAULT_MAP_CPU_VCORES = 1; public static final String MAP_ENV = "mapreduce.map.env"; @@ -228,11 +229,12 @@ public interface MRJobConfig { public static final String REDUCE_MARKRESET_BUFFER_SIZE = "mapreduce.reduce.markreset.buffer.size"; - public static final String REDUCE_MEMORY_PHYSICAL_MB = "mapreduce.reduce.memory.physical.mb"; - public static final String REDUCE_MEMORY_MB = "mapreduce.reduce.memory.mb"; public static final int DEFAULT_REDUCE_MEMORY_MB = 1024; + public static final String REDUCE_CPU_VCORES = "mapreduce.reduce.cpu.vcores"; + public static final int DEFAULT_REDUCE_CPU_VCORES = 1; + public static final String REDUCE_MEMORY_TOTAL_BYTES = "mapreduce.reduce.memory.totalbytes"; public static final String SHUFFLE_INPUT_BUFFER_PERCENT = "mapreduce.reduce.shuffle.input.buffer.percent"; @@ -353,6 +355,11 @@ public interface MRJobConfig { MR_AM_PREFIX+"resource.mb"; public static final int DEFAULT_MR_AM_VMEM_MB = 1536; + /** The number of virtual cores the MR app master needs.*/ + public static final String MR_AM_CPU_VCORES = + MR_AM_PREFIX+"resource.cpu-vcores"; + public static final int DEFAULT_MR_AM_CPU_VCORES = 1; + /** Command line arguments passed to the MR app master.*/ public static final String MR_AM_COMMAND_OPTS = MR_AM_PREFIX+"command-opts"; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index fa17d87506..8b86f5706a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -218,6 +218,22 @@ + + mapreduce.map.cpu.vcores + 1 + + The number of virtual cores required for each map task. + + + + + mapreduce.reduce.cpu.vcores + 1 + + The number of virtual cores required for each reduce task. + + + mapreduce.reduce.merge.inmem.threshold 1000 @@ -916,6 +932,14 @@ The amount of memory the MR AppMaster needs. + + yarn.app.mapreduce.am.resource.cpu-vcores + 1 + + The number of virtual CPU cores the MR AppMaster needs. + + + CLASSPATH for MR applications. A comma-separated list of CLASSPATH entries diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java index 1b684364f5..c3f6b75b75 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java @@ -324,8 +324,16 @@ public ApplicationSubmissionContext createApplicationSubmissionContext( // Setup resource requirements Resource capability = recordFactory.newRecordInstance(Resource.class); - capability.setMemory(conf.getInt(MRJobConfig.MR_AM_VMEM_MB, - MRJobConfig.DEFAULT_MR_AM_VMEM_MB)); + capability.setMemory( + conf.getInt( + MRJobConfig.MR_AM_VMEM_MB, MRJobConfig.DEFAULT_MR_AM_VMEM_MB + ) + ); + capability.setVirtualCores( + conf.getInt( + MRJobConfig.MR_AM_CPU_VCORES, MRJobConfig.DEFAULT_MR_AM_CPU_VCORES + ) + ); LOG.debug("AppMaster capability = " + capability); // Setup LocalResources