MAPREDUCE-4520. Added support for MapReduce applications to request for CPU cores along-with memory post YARN-2. Contributed by Arun C. Murthy.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1430688 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
4539263971
commit
2c5c8fdb80
@ -172,6 +172,9 @@ Release 2.0.3-alpha - Unreleased
|
|||||||
|
|
||||||
NEW FEATURES
|
NEW FEATURES
|
||||||
|
|
||||||
|
MAPREDUCE-4520. Added support for MapReduce applications to request for
|
||||||
|
CPU cores along-with memory post YARN-2. (acmurthy)
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
|
||||||
MAPREDUCE-3678. The Map tasks logs should have the value of input
|
MAPREDUCE-3678. The Map tasks logs should have the value of input
|
||||||
|
@ -1050,6 +1050,10 @@ private void makeUberDecision(long dataInputLength) {
|
|||||||
conf.getInt(MRJobConfig.MR_AM_VMEM_MB,
|
conf.getInt(MRJobConfig.MR_AM_VMEM_MB,
|
||||||
MRJobConfig.DEFAULT_MR_AM_VMEM_MB);
|
MRJobConfig.DEFAULT_MR_AM_VMEM_MB);
|
||||||
|
|
||||||
|
long sysCPUSizeForUberSlot =
|
||||||
|
conf.getInt(MRJobConfig.MR_AM_CPU_VCORES,
|
||||||
|
MRJobConfig.DEFAULT_MR_AM_CPU_VCORES);
|
||||||
|
|
||||||
boolean uberEnabled =
|
boolean uberEnabled =
|
||||||
conf.getBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
|
conf.getBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
|
||||||
boolean smallNumMapTasks = (numMapTasks <= sysMaxMaps);
|
boolean smallNumMapTasks = (numMapTasks <= sysMaxMaps);
|
||||||
@ -1061,6 +1065,13 @@ private void makeUberDecision(long dataInputLength) {
|
|||||||
conf.getLong(MRJobConfig.REDUCE_MEMORY_MB, 0))
|
conf.getLong(MRJobConfig.REDUCE_MEMORY_MB, 0))
|
||||||
<= sysMemSizeForUberSlot)
|
<= sysMemSizeForUberSlot)
|
||||||
|| (sysMemSizeForUberSlot == JobConf.DISABLED_MEMORY_LIMIT));
|
|| (sysMemSizeForUberSlot == JobConf.DISABLED_MEMORY_LIMIT));
|
||||||
|
boolean smallCpu =
|
||||||
|
(
|
||||||
|
Math.max(
|
||||||
|
conf.getInt(MRJobConfig.MAP_CPU_VCORES, 1),
|
||||||
|
conf.getInt(MRJobConfig.REDUCE_CPU_VCORES, 1)) <
|
||||||
|
sysCPUSizeForUberSlot
|
||||||
|
);
|
||||||
boolean notChainJob = !isChainJob(conf);
|
boolean notChainJob = !isChainJob(conf);
|
||||||
|
|
||||||
// User has overall veto power over uberization, or user can modify
|
// User has overall veto power over uberization, or user can modify
|
||||||
@ -1071,7 +1082,8 @@ private void makeUberDecision(long dataInputLength) {
|
|||||||
// while "uber-AM" (MR AM + LocalContainerLauncher) loops over tasks
|
// while "uber-AM" (MR AM + LocalContainerLauncher) loops over tasks
|
||||||
// and thus requires sequential execution.
|
// and thus requires sequential execution.
|
||||||
isUber = uberEnabled && smallNumMapTasks && smallNumReduceTasks
|
isUber = uberEnabled && smallNumMapTasks && smallNumReduceTasks
|
||||||
&& smallInput && smallMemory && notChainJob && isValidUberMaxReduces;
|
&& smallInput && smallMemory && smallCpu
|
||||||
|
&& notChainJob && isValidUberMaxReduces;
|
||||||
|
|
||||||
if (isUber) {
|
if (isUber) {
|
||||||
LOG.info("Uberizing job " + jobId + ": " + numMapTasks + "m+"
|
LOG.info("Uberizing job " + jobId + ": " + numMapTasks + "m+"
|
||||||
|
@ -527,7 +527,10 @@ public TaskAttemptImpl(TaskId taskId, int i,
|
|||||||
|
|
||||||
//TODO:create the resource reqt for this Task attempt
|
//TODO:create the resource reqt for this Task attempt
|
||||||
this.resourceCapability = recordFactory.newRecordInstance(Resource.class);
|
this.resourceCapability = recordFactory.newRecordInstance(Resource.class);
|
||||||
this.resourceCapability.setMemory(getMemoryRequired(conf, taskId.getTaskType()));
|
this.resourceCapability.setMemory(
|
||||||
|
getMemoryRequired(conf, taskId.getTaskType()));
|
||||||
|
this.resourceCapability.setVirtualCores(
|
||||||
|
getCpuRequired(conf, taskId.getTaskType()));
|
||||||
this.dataLocalHosts = dataLocalHosts;
|
this.dataLocalHosts = dataLocalHosts;
|
||||||
RackResolver.init(conf);
|
RackResolver.init(conf);
|
||||||
|
|
||||||
@ -551,6 +554,21 @@ private int getMemoryRequired(Configuration conf, TaskType taskType) {
|
|||||||
return memory;
|
return memory;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private int getCpuRequired(Configuration conf, TaskType taskType) {
|
||||||
|
int vcores = 1;
|
||||||
|
if (taskType == TaskType.MAP) {
|
||||||
|
vcores =
|
||||||
|
conf.getInt(MRJobConfig.MAP_CPU_VCORES,
|
||||||
|
MRJobConfig.DEFAULT_MAP_CPU_VCORES);
|
||||||
|
} else if (taskType == TaskType.REDUCE) {
|
||||||
|
vcores =
|
||||||
|
conf.getInt(MRJobConfig.REDUCE_CPU_VCORES,
|
||||||
|
MRJobConfig.DEFAULT_REDUCE_CPU_VCORES);
|
||||||
|
}
|
||||||
|
|
||||||
|
return vcores;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a {@link LocalResource} record with all the given parameters.
|
* Create a {@link LocalResource} record with all the given parameters.
|
||||||
*/
|
*/
|
||||||
|
@ -184,7 +184,8 @@ public interface MRJobConfig {
|
|||||||
public static final String MAP_MEMORY_MB = "mapreduce.map.memory.mb";
|
public static final String MAP_MEMORY_MB = "mapreduce.map.memory.mb";
|
||||||
public static final int DEFAULT_MAP_MEMORY_MB = 1024;
|
public static final int DEFAULT_MAP_MEMORY_MB = 1024;
|
||||||
|
|
||||||
public static final String MAP_MEMORY_PHYSICAL_MB = "mapreduce.map.memory.physical.mb";
|
public static final String MAP_CPU_VCORES = "mapreduce.map.cpu.vcores";
|
||||||
|
public static final int DEFAULT_MAP_CPU_VCORES = 1;
|
||||||
|
|
||||||
public static final String MAP_ENV = "mapreduce.map.env";
|
public static final String MAP_ENV = "mapreduce.map.env";
|
||||||
|
|
||||||
@ -228,11 +229,12 @@ public interface MRJobConfig {
|
|||||||
|
|
||||||
public static final String REDUCE_MARKRESET_BUFFER_SIZE = "mapreduce.reduce.markreset.buffer.size";
|
public static final String REDUCE_MARKRESET_BUFFER_SIZE = "mapreduce.reduce.markreset.buffer.size";
|
||||||
|
|
||||||
public static final String REDUCE_MEMORY_PHYSICAL_MB = "mapreduce.reduce.memory.physical.mb";
|
|
||||||
|
|
||||||
public static final String REDUCE_MEMORY_MB = "mapreduce.reduce.memory.mb";
|
public static final String REDUCE_MEMORY_MB = "mapreduce.reduce.memory.mb";
|
||||||
public static final int DEFAULT_REDUCE_MEMORY_MB = 1024;
|
public static final int DEFAULT_REDUCE_MEMORY_MB = 1024;
|
||||||
|
|
||||||
|
public static final String REDUCE_CPU_VCORES = "mapreduce.reduce.cpu.vcores";
|
||||||
|
public static final int DEFAULT_REDUCE_CPU_VCORES = 1;
|
||||||
|
|
||||||
public static final String REDUCE_MEMORY_TOTAL_BYTES = "mapreduce.reduce.memory.totalbytes";
|
public static final String REDUCE_MEMORY_TOTAL_BYTES = "mapreduce.reduce.memory.totalbytes";
|
||||||
|
|
||||||
public static final String SHUFFLE_INPUT_BUFFER_PERCENT = "mapreduce.reduce.shuffle.input.buffer.percent";
|
public static final String SHUFFLE_INPUT_BUFFER_PERCENT = "mapreduce.reduce.shuffle.input.buffer.percent";
|
||||||
@ -353,6 +355,11 @@ public interface MRJobConfig {
|
|||||||
MR_AM_PREFIX+"resource.mb";
|
MR_AM_PREFIX+"resource.mb";
|
||||||
public static final int DEFAULT_MR_AM_VMEM_MB = 1536;
|
public static final int DEFAULT_MR_AM_VMEM_MB = 1536;
|
||||||
|
|
||||||
|
/** The number of virtual cores the MR app master needs.*/
|
||||||
|
public static final String MR_AM_CPU_VCORES =
|
||||||
|
MR_AM_PREFIX+"resource.cpu-vcores";
|
||||||
|
public static final int DEFAULT_MR_AM_CPU_VCORES = 1;
|
||||||
|
|
||||||
/** Command line arguments passed to the MR app master.*/
|
/** Command line arguments passed to the MR app master.*/
|
||||||
public static final String MR_AM_COMMAND_OPTS =
|
public static final String MR_AM_COMMAND_OPTS =
|
||||||
MR_AM_PREFIX+"command-opts";
|
MR_AM_PREFIX+"command-opts";
|
||||||
|
@ -218,6 +218,22 @@
|
|||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>mapreduce.map.cpu.vcores</name>
|
||||||
|
<value>1</value>
|
||||||
|
<description>
|
||||||
|
The number of virtual cores required for each map task.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>mapreduce.reduce.cpu.vcores</name>
|
||||||
|
<value>1</value>
|
||||||
|
<description>
|
||||||
|
The number of virtual cores required for each reduce task.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>mapreduce.reduce.merge.inmem.threshold</name>
|
<name>mapreduce.reduce.merge.inmem.threshold</name>
|
||||||
<value>1000</value>
|
<value>1000</value>
|
||||||
@ -916,6 +932,14 @@
|
|||||||
<description>The amount of memory the MR AppMaster needs.</description>
|
<description>The amount of memory the MR AppMaster needs.</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>yarn.app.mapreduce.am.resource.cpu-vcores</name>
|
||||||
|
<value>1</value>
|
||||||
|
<description>
|
||||||
|
The number of virtual CPU cores the MR AppMaster needs.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<description>CLASSPATH for MR applications. A comma-separated list
|
<description>CLASSPATH for MR applications. A comma-separated list
|
||||||
of CLASSPATH entries</description>
|
of CLASSPATH entries</description>
|
||||||
|
@ -324,8 +324,16 @@ public ApplicationSubmissionContext createApplicationSubmissionContext(
|
|||||||
|
|
||||||
// Setup resource requirements
|
// Setup resource requirements
|
||||||
Resource capability = recordFactory.newRecordInstance(Resource.class);
|
Resource capability = recordFactory.newRecordInstance(Resource.class);
|
||||||
capability.setMemory(conf.getInt(MRJobConfig.MR_AM_VMEM_MB,
|
capability.setMemory(
|
||||||
MRJobConfig.DEFAULT_MR_AM_VMEM_MB));
|
conf.getInt(
|
||||||
|
MRJobConfig.MR_AM_VMEM_MB, MRJobConfig.DEFAULT_MR_AM_VMEM_MB
|
||||||
|
)
|
||||||
|
);
|
||||||
|
capability.setVirtualCores(
|
||||||
|
conf.getInt(
|
||||||
|
MRJobConfig.MR_AM_CPU_VCORES, MRJobConfig.DEFAULT_MR_AM_CPU_VCORES
|
||||||
|
)
|
||||||
|
);
|
||||||
LOG.debug("AppMaster capability = " + capability);
|
LOG.debug("AppMaster capability = " + capability);
|
||||||
|
|
||||||
// Setup LocalResources
|
// Setup LocalResources
|
||||||
|
Loading…
Reference in New Issue
Block a user