MAPREDUCE-5464. Add analogs of the SLOTS_MILLIS counters that jive with the YARN resource model (Sandy Ryza)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1562216 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Sanford Ryza 2014-01-28 20:15:18 +00:00
parent 7f2b01a742
commit 2a20fe8370
5 changed files with 65 additions and 47 deletions

View File

@ -206,6 +206,9 @@ Release 2.4.0 - UNRELEASED
MAPREDUCE-5725. Make explicit that TestNetworkedJob relies on the Capacity
Scheduler (Sandy Ryza)
MAPREDUCE-5464. Add analogs of the SLOTS_MILLIS counters that jive with the
YARN resource model (Sandy Ryza)
OPTIMIZATIONS
MAPREDUCE-5484. YarnChild unnecessarily loads job conf twice (Sandy Ryza)

View File

@ -1265,57 +1265,56 @@ private void computeRackAndLocality() {
}
}
}
private static long computeSlotMillis(TaskAttemptImpl taskAttempt) {
private static void updateMillisCounters(JobCounterUpdateEvent jce,
TaskAttemptImpl taskAttempt) {
TaskType taskType = taskAttempt.getID().getTaskId().getTaskType();
int slotMemoryReq =
long duration = (taskAttempt.getFinishTime() - taskAttempt.getLaunchTime());
int mbRequired =
taskAttempt.getMemoryRequired(taskAttempt.conf, taskType);
int vcoresRequired = taskAttempt.getCpuRequired(taskAttempt.conf, taskType);
int minSlotMemSize = taskAttempt.conf.getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
int simSlotsRequired =
minSlotMemSize == 0 ? 0 : (int) Math.ceil((float) slotMemoryReq
minSlotMemSize == 0 ? 0 : (int) Math.ceil((float) mbRequired
/ minSlotMemSize);
long slotMillisIncrement =
simSlotsRequired
* (taskAttempt.getFinishTime() - taskAttempt.getLaunchTime());
return slotMillisIncrement;
if (taskType == TaskType.MAP) {
jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_MAPS, simSlotsRequired * duration);
jce.addCounterUpdate(JobCounter.MB_MILLIS_MAPS, duration * mbRequired);
jce.addCounterUpdate(JobCounter.VCORES_MILLIS_MAPS, duration * vcoresRequired);
jce.addCounterUpdate(JobCounter.MILLIS_MAPS, duration);
} else {
jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_REDUCES, simSlotsRequired * duration);
jce.addCounterUpdate(JobCounter.MB_MILLIS_REDUCES, duration * mbRequired);
jce.addCounterUpdate(JobCounter.VCORES_MILLIS_REDUCES, duration * vcoresRequired);
jce.addCounterUpdate(JobCounter.MILLIS_REDUCES, duration);
}
}
private static JobCounterUpdateEvent createJobCounterUpdateEventTASucceeded(
TaskAttemptImpl taskAttempt) {
long slotMillis = computeSlotMillis(taskAttempt);
TaskId taskId = taskAttempt.attemptId.getTaskId();
JobCounterUpdateEvent jce = new JobCounterUpdateEvent(taskId.getJobId());
jce.addCounterUpdate(
taskId.getTaskType() == TaskType.MAP ?
JobCounter.SLOTS_MILLIS_MAPS : JobCounter.SLOTS_MILLIS_REDUCES,
slotMillis);
updateMillisCounters(jce, taskAttempt);
return jce;
}
private static JobCounterUpdateEvent createJobCounterUpdateEventTAFailed(
TaskAttemptImpl taskAttempt, boolean taskAlreadyCompleted) {
TaskType taskType = taskAttempt.getID().getTaskId().getTaskType();
JobCounterUpdateEvent jce = new JobCounterUpdateEvent(taskAttempt.getID().getTaskId().getJobId());
long slotMillisIncrement = computeSlotMillis(taskAttempt);
if (taskType == TaskType.MAP) {
jce.addCounterUpdate(JobCounter.NUM_FAILED_MAPS, 1);
if(!taskAlreadyCompleted) {
// dont double count the elapsed time
jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_MAPS, slotMillisIncrement);
}
} else {
jce.addCounterUpdate(JobCounter.NUM_FAILED_REDUCES, 1);
if(!taskAlreadyCompleted) {
// dont double count the elapsed time
jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_REDUCES, slotMillisIncrement);
}
}
if (!taskAlreadyCompleted) {
updateMillisCounters(jce, taskAttempt);
}
return jce;
}
@ -1325,20 +1324,13 @@ private static JobCounterUpdateEvent createJobCounterUpdateEventTAKilled(
TaskType taskType = taskAttempt.getID().getTaskId().getTaskType();
JobCounterUpdateEvent jce = new JobCounterUpdateEvent(taskAttempt.getID().getTaskId().getJobId());
long slotMillisIncrement = computeSlotMillis(taskAttempt);
if (taskType == TaskType.MAP) {
jce.addCounterUpdate(JobCounter.NUM_KILLED_MAPS, 1);
if(!taskAlreadyCompleted) {
// dont double count the elapsed time
jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_MAPS, slotMillisIncrement);
}
} else {
jce.addCounterUpdate(JobCounter.NUM_KILLED_REDUCES, 1);
if(!taskAlreadyCompleted) {
// dont double count the elapsed time
jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_REDUCES, slotMillisIncrement);
}
}
if (!taskAlreadyCompleted) {
updateMillisCounters(jce, taskAttempt);
}
return jce;
}

View File

@ -41,6 +41,7 @@
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapTaskAttemptImpl;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
@ -182,13 +183,13 @@ public void testHostResolveAttempt() throws Exception {
}
@Test
public void testSlotMillisCounterUpdate() throws Exception {
verifySlotMillis(2048, 2048, 1024);
verifySlotMillis(2048, 1024, 1024);
verifySlotMillis(10240, 1024, 2048);
public void testMillisCountersUpdate() throws Exception {
verifyMillisCounters(2048, 2048, 1024);
verifyMillisCounters(2048, 1024, 1024);
verifyMillisCounters(10240, 1024, 2048);
}
public void verifySlotMillis(int mapMemMb, int reduceMemMb,
public void verifyMillisCounters(int mapMemMb, int reduceMemMb,
int minContainerSize) throws Exception {
Clock actualClock = new SystemClock();
ControlledClock clock = new ControlledClock(actualClock);
@ -232,13 +233,23 @@ public void verifySlotMillis(int mapMemMb, int reduceMemMb,
Assert.assertEquals(mta.getLaunchTime(), 10);
Assert.assertEquals(rta.getFinishTime(), 11);
Assert.assertEquals(rta.getLaunchTime(), 10);
Counters counters = job.getAllCounters();
Assert.assertEquals((int) Math.ceil((float) mapMemMb / minContainerSize),
job.getAllCounters().findCounter(JobCounter.SLOTS_MILLIS_MAPS)
.getValue());
Assert.assertEquals(
(int) Math.ceil((float) reduceMemMb / minContainerSize), job
.getAllCounters().findCounter(JobCounter.SLOTS_MILLIS_REDUCES)
.getValue());
counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS).getValue());
Assert.assertEquals((int) Math.ceil((float) reduceMemMb / minContainerSize),
counters.findCounter(JobCounter.SLOTS_MILLIS_REDUCES).getValue());
Assert.assertEquals(1,
counters.findCounter(JobCounter.MILLIS_MAPS).getValue());
Assert.assertEquals(1,
counters.findCounter(JobCounter.MILLIS_REDUCES).getValue());
Assert.assertEquals(mapMemMb,
counters.findCounter(JobCounter.MB_MILLIS_MAPS).getValue());
Assert.assertEquals(reduceMemMb,
counters.findCounter(JobCounter.MB_MILLIS_REDUCES).getValue());
Assert.assertEquals(1,
counters.findCounter(JobCounter.VCORES_MILLIS_MAPS).getValue());
Assert.assertEquals(1,
counters.findCounter(JobCounter.VCORES_MILLIS_REDUCES).getValue());
}
private TaskAttemptImpl createMapTaskAttemptImplForTest(

View File

@ -49,5 +49,11 @@ public enum JobCounter {
TASKS_REQ_PREEMPT,
CHECKPOINTS,
CHECKPOINT_BYTES,
CHECKPOINT_TIME
CHECKPOINT_TIME,
MILLIS_MAPS,
MILLIS_REDUCES,
VCORES_MILLIS_MAPS,
VCORES_MILLIS_REDUCES,
MB_MILLIS_MAPS,
MB_MILLIS_REDUCES
}

View File

@ -25,9 +25,15 @@ DATA_LOCAL_MAPS.name= Data-local map tasks
RACK_LOCAL_MAPS.name= Rack-local map tasks
SLOTS_MILLIS_MAPS.name= Total time spent by all maps in occupied slots (ms)
SLOTS_MILLIS_REDUCES.name= Total time spent by all reduces in occupied slots (ms)
MILLIS_MAPS.name= Total time spent by all map tasks (ms)
MILLIS_REDUCES.name= Total time spent by all reduce tasks (ms)
MB_MILLIS_MAPS.name= Total megabyte-seconds taken by all map tasks
MB_MILLIS_REDUCES.name= Total megabyte-seconds taken by all reduce tasks
VCORES_MILLIS_MAPS.name= Total vcore-seconds taken by all map tasks
VCORES_MILLIS_REDUCES.name= Total vcore-seconds taken by all reduce tasks
FALLOW_SLOTS_MILLIS_MAPS.name= Total time spent by all maps waiting after reserving slots (ms)
FALLOW_SLOTS_MILLIS_REDUCES.name= Total time spent by all reduces waiting after reserving slots (ms)
TASKS_REQ_PREEMPT.name= Tasks that have been asked to preempt
CHECKPOINTS.name= Number of checkpoints reported
CHECKPOINT_BYTES.name= Total amount of bytes in checkpoints
CHECKPOINT_TIME.name= Total time spent checkpointing (ms)
CHECKPOINT_TIME.name= Total time spent checkpointing (ms)