MAPREDUCE-4693. Historyserver should provide counters for failed tasks. Contributed by Xuan Gong.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1450956 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Siddharth Seth 2013-02-27 21:03:19 +00:00
parent 0b9ed2364a
commit 979fb054f8
9 changed files with 225 additions and 75 deletions

View File

@ -195,6 +195,9 @@ Release 2.0.4-beta - UNRELEASED
MAPREDUCE-5008. Merger progress miscounts with respect to EOF_MARKER.
(Sandy Ryza via tomwhite)
MAPREDUCE-4693. History server should include counters for failed tasks.
(Xuan Gong via sseth)
Release 2.0.3-alpha - 2013-02-06
INCOMPATIBLE CHANGES

View File

@ -1183,7 +1183,8 @@ private static JobCounterUpdateEvent createJobCounterUpdateEventTAKilled(
taskAttempt.nodeRackName == null ? "UNKNOWN"
: taskAttempt.nodeRackName,
StringUtils.join(
LINE_SEPARATOR, taskAttempt.getDiagnostics()), taskAttempt
LINE_SEPARATOR, taskAttempt.getDiagnostics()),
taskAttempt.getCounters(), taskAttempt
.getProgressSplitBlock().burst());
return tauce;
}

View File

@ -730,7 +730,8 @@ private static TaskFailedEvent createTaskFailedEvent(TaskImpl task, List<String>
TypeConverter.fromYarn(task.getType()),
errorSb.toString(),
taskState.toString(),
taId == null ? null : TypeConverter.fromYarn(taId));
taId == null ? null : TypeConverter.fromYarn(taId),
task.getCounters());
return taskFailedEvent;
}

View File

@ -212,6 +212,7 @@
{"name": "rackname", "type": "string"},
{"name": "status", "type": "string"},
{"name": "error", "type": "string"},
{"name": "counters", "type": "JhCounters"},
{"name": "clockSplits", "type": { "type": "array", "items": "int"}},
{"name": "cpuUsages", "type": { "type": "array", "items": "int"}},
{"name": "vMemKbytes", "type": { "type": "array", "items": "int"}},
@ -226,7 +227,8 @@
{"name": "finishTime", "type": "long"},
{"name": "error", "type": "string"},
{"name": "failedDueToAttempt", "type": ["null", "string"] },
{"name": "status", "type": "string"}
{"name": "status", "type": "string"},
{"name": "counters", "type": "JhCounters"}
]
},

View File

@ -295,6 +295,7 @@ private void handleTaskAttemptFailedEvent(
attemptInfo.shuffleFinishTime = event.getFinishTime();
attemptInfo.sortFinishTime = event.getFinishTime();
attemptInfo.mapFinishTime = event.getFinishTime();
attemptInfo.counters = event.getCounters();
if(TaskStatus.State.SUCCEEDED.toString().equals(taskInfo.status))
{
//this is a successful task
@ -347,6 +348,7 @@ private void handleTaskFailedEvent(TaskFailedEvent event) {
taskInfo.finishTime = event.getFinishTime();
taskInfo.error = StringInterner.weakIntern(event.getError());
taskInfo.failedDueToAttemptId = event.getFailedAttemptID();
taskInfo.counters = event.getCounters();
info.errorInfo = "Task " + taskInfo.taskId +" failed " +
taskInfo.attemptsMap.size() + " times ";
}

View File

@ -21,6 +21,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapred.TaskStatus;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
@ -36,8 +37,24 @@
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent {
private TaskAttemptUnsuccessfulCompletion datum =
new TaskAttemptUnsuccessfulCompletion();
private TaskAttemptUnsuccessfulCompletion datum = null;
private TaskAttemptID attemptId;
private TaskType taskType;
private String status;
private long finishTime;
private String hostname;
private int port;
private String rackName;
private String error;
private Counters counters;
int[][] allSplits;
int[] clockSplits;
int[] cpuUsages;
int[] vMemKbytes;
int[] physMemKbytes;
private static final Counters EMPTY_COUNTERS = new Counters();
/**
* Create an event to record the unsuccessful completion of attempts
@ -49,6 +66,7 @@ public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent {
* @param port rpc port for for the tracker
* @param rackName Name of the rack where the attempt executed
* @param error Error string
* @param counters Counters for the attempt
* @param allSplits the "splits", or a pixelated graph of various
* measurable worker node state variables against progress.
* Currently there are four; wallclock time, CPU time,
@ -58,31 +76,25 @@ public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent {
(TaskAttemptID id, TaskType taskType,
String status, long finishTime,
String hostname, int port, String rackName,
String error, int[][] allSplits) {
datum.taskid = new Utf8(id.getTaskID().toString());
datum.taskType = new Utf8(taskType.name());
datum.attemptId = new Utf8(id.toString());
datum.finishTime = finishTime;
datum.hostname = new Utf8(hostname);
if (rackName != null) {
datum.rackname = new Utf8(rackName);
}
datum.port = port;
datum.error = new Utf8(error);
datum.status = new Utf8(status);
datum.clockSplits
= AvroArrayUtils.toAvro
(ProgressSplitsBlock.arrayGetWallclockTime(allSplits));
datum.cpuUsages
= AvroArrayUtils.toAvro
(ProgressSplitsBlock.arrayGetCPUTime(allSplits));
datum.vMemKbytes
= AvroArrayUtils.toAvro
(ProgressSplitsBlock.arrayGetVMemKbytes(allSplits));
datum.physMemKbytes
= AvroArrayUtils.toAvro
(ProgressSplitsBlock.arrayGetPhysMemKbytes(allSplits));
String error, Counters counters, int[][] allSplits) {
this.attemptId = id;
this.taskType = taskType;
this.status = status;
this.finishTime = finishTime;
this.hostname = hostname;
this.port = port;
this.rackName = rackName;
this.error = error;
this.counters = counters;
this.allSplits = allSplits;
this.clockSplits =
ProgressSplitsBlock.arrayGetWallclockTime(allSplits);
this.cpuUsages =
ProgressSplitsBlock.arrayGetCPUTime(allSplits);
this.vMemKbytes =
ProgressSplitsBlock.arrayGetVMemKbytes(allSplits);
this.physMemKbytes =
ProgressSplitsBlock.arrayGetPhysMemKbytes(allSplits);
}
/**
@ -103,42 +115,109 @@ public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent {
(TaskAttemptID id, TaskType taskType,
String status, long finishTime,
String hostname, String error) {
this(id, taskType, status, finishTime, hostname, -1, "", error, null);
this(id, taskType, status, finishTime, hostname, -1, "",
error, EMPTY_COUNTERS, null);
}
public TaskAttemptUnsuccessfulCompletionEvent
(TaskAttemptID id, TaskType taskType,
String status, long finishTime,
String hostname, int port, String rackName,
String error, int[][] allSplits) {
this(id, taskType, status, finishTime, hostname, port,
rackName, error, EMPTY_COUNTERS, null);
}
TaskAttemptUnsuccessfulCompletionEvent() {}
public Object getDatum() { return datum; }
public void setDatum(Object datum) {
this.datum = (TaskAttemptUnsuccessfulCompletion)datum;
public Object getDatum() {
if(datum == null) {
datum = new TaskAttemptUnsuccessfulCompletion();
datum.taskid = new Utf8(attemptId.getTaskID().toString());
datum.taskType = new Utf8(taskType.name());
datum.attemptId = new Utf8(attemptId.toString());
datum.finishTime = finishTime;
datum.hostname = new Utf8(hostname);
if (rackName != null) {
datum.rackname = new Utf8(rackName);
}
datum.port = port;
datum.error = new Utf8(error);
datum.status = new Utf8(status);
datum.counters = EventWriter.toAvro(counters);
datum.clockSplits = AvroArrayUtils.toAvro(ProgressSplitsBlock
.arrayGetWallclockTime(allSplits));
datum.cpuUsages = AvroArrayUtils.toAvro(ProgressSplitsBlock
.arrayGetCPUTime(allSplits));
datum.vMemKbytes = AvroArrayUtils.toAvro(ProgressSplitsBlock
.arrayGetVMemKbytes(allSplits));
datum.physMemKbytes = AvroArrayUtils.toAvro(ProgressSplitsBlock
.arrayGetPhysMemKbytes(allSplits));
}
return datum;
}
public void setDatum(Object odatum) {
this.datum =
(TaskAttemptUnsuccessfulCompletion)odatum;
this.attemptId =
TaskAttemptID.forName(datum.attemptId.toString());
this.taskType =
TaskType.valueOf(datum.taskType.toString());
this.finishTime = datum.finishTime;
this.hostname = datum.hostname.toString();
this.rackName = datum.rackname.toString();
this.port = datum.port;
this.status = datum.status.toString();
this.error = datum.error.toString();
this.counters =
EventReader.fromAvro(datum.counters);
this.clockSplits =
AvroArrayUtils.fromAvro(datum.clockSplits);
this.cpuUsages =
AvroArrayUtils.fromAvro(datum.cpuUsages);
this.vMemKbytes =
AvroArrayUtils.fromAvro(datum.vMemKbytes);
this.physMemKbytes =
AvroArrayUtils.fromAvro(datum.physMemKbytes);
}
/** Get the task id */
public TaskID getTaskId() { return TaskID.forName(datum.taskid.toString()); }
public TaskID getTaskId() {
return attemptId.getTaskID();
}
/** Get the task type */
public TaskType getTaskType() {
return TaskType.valueOf(datum.taskType.toString());
return TaskType.valueOf(taskType.toString());
}
/** Get the attempt id */
public TaskAttemptID getTaskAttemptId() {
return TaskAttemptID.forName(datum.attemptId.toString());
return attemptId;
}
/** Get the finish time */
public long getFinishTime() { return datum.finishTime; }
public long getFinishTime() { return finishTime; }
/** Get the name of the host where the attempt executed */
public String getHostname() { return datum.hostname.toString(); }
public String getHostname() { return hostname; }
/** Get the rpc port for the host where the attempt executed */
public int getPort() { return datum.port; }
public int getPort() { return port; }
/** Get the rack name of the node where the attempt ran */
public String getRackName() {
return datum.rackname == null ? null : datum.rackname.toString();
return rackName == null ? null : rackName.toString();
}
/** Get the error string */
public String getError() { return datum.error.toString(); }
public String getError() { return error.toString(); }
/** Get the task status */
public String getTaskStatus() { return datum.status.toString(); }
public String getTaskStatus() {
return status.toString();
}
/** Get the counters */
Counters getCounters() { return counters; }
/** Get the event type */
public EventType getEventType() {
// Note that the task type can be setup/map/reduce/cleanup but the
@ -157,16 +236,16 @@ public EventType getEventType() {
public int[] getClockSplits() {
return AvroArrayUtils.fromAvro(datum.clockSplits);
return clockSplits;
}
public int[] getCpuUsages() {
return AvroArrayUtils.fromAvro(datum.cpuUsages);
return cpuUsages;
}
public int[] getVMemKbytes() {
return AvroArrayUtils.fromAvro(datum.vMemKbytes);
return vMemKbytes;
}
public int[] getPhysMemKbytes() {
return AvroArrayUtils.fromAvro(datum.physMemKbytes);
return physMemKbytes;
}
}

View File

@ -18,10 +18,9 @@
package org.apache.hadoop.mapreduce.jobhistory;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
@ -35,7 +34,17 @@
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class TaskFailedEvent implements HistoryEvent {
private TaskFailed datum = new TaskFailed();
private TaskFailed datum = null;
private TaskAttemptID failedDueToAttempt;
private TaskID id;
private TaskType taskType;
private long finishTime;
private String status;
private String error;
private Counters counters;
private static final Counters EMPTY_COUNTERS = new Counters();
/**
* Create an event to record task failure
@ -45,45 +54,87 @@ public class TaskFailedEvent implements HistoryEvent {
* @param error Error String
* @param status Status
* @param failedDueToAttempt The attempt id due to which the task failed
* @param counters Counters for the task
*/
public TaskFailedEvent(TaskID id, long finishTime,
TaskType taskType, String error, String status,
TaskAttemptID failedDueToAttempt, Counters counters) {
this.id = id;
this.finishTime = finishTime;
this.taskType = taskType;
this.error = error;
this.status = status;
this.failedDueToAttempt = failedDueToAttempt;
this.counters = counters;
}
public TaskFailedEvent(TaskID id, long finishTime,
TaskType taskType, String error, String status,
TaskAttemptID failedDueToAttempt) {
datum.taskid = new Utf8(id.toString());
datum.error = new Utf8(error);
datum.finishTime = finishTime;
datum.taskType = new Utf8(taskType.name());
datum.failedDueToAttempt = failedDueToAttempt == null
? null
: new Utf8(failedDueToAttempt.toString());
datum.status = new Utf8(status);
this(id, finishTime, taskType, error, status,
failedDueToAttempt, EMPTY_COUNTERS);
}
TaskFailedEvent() {}
public Object getDatum() { return datum; }
public void setDatum(Object datum) { this.datum = (TaskFailed)datum; }
public Object getDatum() {
if(datum == null) {
datum = new TaskFailed();
datum.taskid = new Utf8(id.toString());
datum.error = new Utf8(error);
datum.finishTime = finishTime;
datum.taskType = new Utf8(taskType.name());
datum.failedDueToAttempt =
failedDueToAttempt == null
? null
: new Utf8(failedDueToAttempt.toString());
datum.status = new Utf8(status);
datum.counters = EventWriter.toAvro(counters);
}
return datum;
}
public void setDatum(Object odatum) {
this.datum = (TaskFailed)odatum;
this.id =
TaskID.forName(datum.taskid.toString());
this.taskType =
TaskType.valueOf(datum.taskType.toString());
this.finishTime = datum.finishTime;
this.error = datum.error.toString();
this.failedDueToAttempt =
datum.failedDueToAttempt == null
? null
: TaskAttemptID.forName(
datum.failedDueToAttempt.toString());
this.status = datum.status.toString();
this.counters =
EventReader.fromAvro(datum.counters);
}
/** Get the task id */
public TaskID getTaskId() { return TaskID.forName(datum.taskid.toString()); }
public TaskID getTaskId() { return id; }
/** Get the error string */
public String getError() { return datum.error.toString(); }
public String getError() { return error; }
/** Get the finish time of the attempt */
public long getFinishTime() { return datum.finishTime; }
public long getFinishTime() {
return finishTime;
}
/** Get the task type */
public TaskType getTaskType() {
return TaskType.valueOf(datum.taskType.toString());
return taskType;
}
/** Get the attempt id due to which the task failed */
public TaskAttemptID getFailedAttemptID() {
return datum.failedDueToAttempt == null
? null
: TaskAttemptID.forName(datum.failedDueToAttempt.toString());
return failedDueToAttempt;
}
/** Get the task status */
public String getTaskStatus() { return datum.status.toString(); }
public String getTaskStatus() { return status; }
/** Get task counters */
public Counters getCounters() { return counters; }
/** Get the event type */
public EventType getEventType() { return EventType.TASK_FAILED; }
public EventType getEventType() {
return EventType.TASK_FAILED;
}
}

View File

@ -404,7 +404,7 @@ public void testHistoryParsingForFailedAttempts() throws Exception {
}
}
@Test
@Test (timeout=5000)
public void testCountersForFailedTask() throws Exception {
LOG.info("STARTING testCountersForFailedTask");
try {
@ -455,6 +455,9 @@ public void testCountersForFailedTask() throws Exception {
CompletedTask ct = new CompletedTask(yarnTaskID, entry.getValue());
Assert.assertNotNull("completed task report has null counters",
ct.getReport().getCounters());
//Make sure all the completedTask has counters, and the counters are not empty
Assert.assertTrue(ct.getReport().getCounters()
.getAllCounterGroups().size() > 0);
}
} finally {
LOG.info("FINISHED testCountersForFailedTask");

View File

@ -83,6 +83,9 @@ public class JobBuilder {
private Map<ParsedHost, ParsedHost> allHosts =
new HashMap<ParsedHost, ParsedHost>();
private org.apache.hadoop.mapreduce.jobhistory.JhCounters EMPTY_COUNTERS =
new org.apache.hadoop.mapreduce.jobhistory.JhCounters();
/**
* The number of splits a task can have, before we ignore them all.
*/
@ -459,7 +462,10 @@ private void processTaskFailedEvent(TaskFailedEvent event) {
TaskFailed t = (TaskFailed)(event.getDatum());
task.putDiagnosticInfo(t.error.toString());
task.putFailedDueToAttemptId(t.failedDueToAttempt.toString());
// No counters in TaskFailedEvent
org.apache.hadoop.mapreduce.jobhistory.JhCounters counters =
((TaskFailed) event.getDatum()).counters;
task.incorporateCounters(
counters == null ? EMPTY_COUNTERS : counters);
}
private void processTaskAttemptUnsuccessfulCompletionEvent(
@ -481,7 +487,10 @@ private void processTaskAttemptUnsuccessfulCompletionEvent(
}
attempt.setFinishTime(event.getFinishTime());
org.apache.hadoop.mapreduce.jobhistory.JhCounters counters =
((TaskAttemptUnsuccessfulCompletion) event.getDatum()).counters;
attempt.incorporateCounters(
counters == null ? EMPTY_COUNTERS : counters);
attempt.arraySetClockSplits(event.getClockSplits());
attempt.arraySetCpuUsages(event.getCpuUsages());
attempt.arraySetVMemKbytes(event.getVMemKbytes());
@ -489,7 +498,6 @@ private void processTaskAttemptUnsuccessfulCompletionEvent(
TaskAttemptUnsuccessfulCompletion t =
(TaskAttemptUnsuccessfulCompletion) (event.getDatum());
attempt.putDiagnosticInfo(t.error.toString());
// No counters in TaskAttemptUnsuccessfulCompletionEvent
}
private void processTaskAttemptStartedEvent(TaskAttemptStartedEvent event) {