MAPREDUCE-4128. AM Recovery expects all attempts of a completed task to also be completed. (Bikas Saha via bobby)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1325765 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
02db5b7ef7
commit
27ea3ab6ba
@ -332,6 +332,9 @@ Release 0.23.3 - UNRELEASED
|
|||||||
text on the UI to N/A instead of a link to null. (Bhallamudi Venkata Siva
|
text on the UI to N/A instead of a link to null. (Bhallamudi Venkata Siva
|
||||||
Kamesh via sseth)
|
Kamesh via sseth)
|
||||||
|
|
||||||
|
MAPREDUCE-4128. AM Recovery expects all attempts of a completed task to
|
||||||
|
also be completed. (Bikas Saha via bobby)
|
||||||
|
|
||||||
Release 0.23.2 - UNRELEASED
|
Release 0.23.2 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -656,6 +656,7 @@ private void handleTaskAttemptCompletion(TaskAttemptId attemptId,
|
|||||||
private static TaskFinishedEvent createTaskFinishedEvent(TaskImpl task, TaskState taskState) {
|
private static TaskFinishedEvent createTaskFinishedEvent(TaskImpl task, TaskState taskState) {
|
||||||
TaskFinishedEvent tfe =
|
TaskFinishedEvent tfe =
|
||||||
new TaskFinishedEvent(TypeConverter.fromYarn(task.taskId),
|
new TaskFinishedEvent(TypeConverter.fromYarn(task.taskId),
|
||||||
|
TypeConverter.fromYarn(task.successfulAttempt),
|
||||||
task.getFinishTime(task.successfulAttempt),
|
task.getFinishTime(task.successfulAttempt),
|
||||||
TypeConverter.fromYarn(task.taskId.getTaskType()),
|
TypeConverter.fromYarn(task.taskId.getTaskType()),
|
||||||
taskState.toString(),
|
taskState.toString(),
|
||||||
|
@ -93,7 +93,7 @@ public void testFirstFlushOnCompletionEvent() throws Exception {
|
|||||||
|
|
||||||
// First completion event, but min-queue-size for batching flushes is 10
|
// First completion event, but min-queue-size for batching flushes is 10
|
||||||
handleEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent(
|
handleEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent(
|
||||||
t.taskID, 0, TaskType.MAP, "", null)));
|
t.taskID, null, 0, TaskType.MAP, "", null)));
|
||||||
verify(mockWriter).flush();
|
verify(mockWriter).flush();
|
||||||
|
|
||||||
} finally {
|
} finally {
|
||||||
@ -129,7 +129,7 @@ public void testMaxUnflushedCompletionEvents() throws Exception {
|
|||||||
|
|
||||||
for (int i = 0 ; i < 100 ; i++) {
|
for (int i = 0 ; i < 100 ; i++) {
|
||||||
queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent(
|
queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent(
|
||||||
t.taskID, 0, TaskType.MAP, "", null)));
|
t.taskID, null, 0, TaskType.MAP, "", null)));
|
||||||
}
|
}
|
||||||
|
|
||||||
handleNextNEvents(jheh, 9);
|
handleNextNEvents(jheh, 9);
|
||||||
@ -174,7 +174,7 @@ public void testUnflushedTimer() throws Exception {
|
|||||||
|
|
||||||
for (int i = 0 ; i < 100 ; i++) {
|
for (int i = 0 ; i < 100 ; i++) {
|
||||||
queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent(
|
queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent(
|
||||||
t.taskID, 0, TaskType.MAP, "", null)));
|
t.taskID, null, 0, TaskType.MAP, "", null)));
|
||||||
}
|
}
|
||||||
|
|
||||||
handleNextNEvents(jheh, 9);
|
handleNextNEvents(jheh, 9);
|
||||||
@ -215,7 +215,7 @@ public void testBatchedFlushJobEndMultiplier() throws Exception {
|
|||||||
|
|
||||||
for (int i = 0 ; i < 100 ; i++) {
|
for (int i = 0 ; i < 100 ; i++) {
|
||||||
queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent(
|
queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent(
|
||||||
t.taskID, 0, TaskType.MAP, "", null)));
|
t.taskID, null, 0, TaskType.MAP, "", null)));
|
||||||
}
|
}
|
||||||
queueEvent(jheh, new JobHistoryEvent(t.jobId, new JobFinishedEvent(
|
queueEvent(jheh, new JobHistoryEvent(t.jobId, new JobFinishedEvent(
|
||||||
TypeConverter.fromYarn(t.jobId), 0, 10, 10, 0, 0, null, null, new Counters())));
|
TypeConverter.fromYarn(t.jobId), 0, 10, 10, 0, 0, null, null, new Counters())));
|
||||||
|
@ -25,6 +25,8 @@
|
|||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
|
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
|
||||||
|
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
|
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus;
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus;
|
||||||
@ -37,6 +39,7 @@
|
|||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptFetchFailureEvent;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptFetchFailureEvent;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
|
||||||
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
public class TestFetchFailure {
|
public class TestFetchFailure {
|
||||||
@ -143,6 +146,107 @@ public void testFetchFailure() throws Exception {
|
|||||||
TaskAttemptCompletionEventStatus.SUCCEEDED, events[3].getStatus());
|
TaskAttemptCompletionEventStatus.SUCCEEDED, events[3].getStatus());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This tests that if a map attempt was failed (say due to fetch failures),
|
||||||
|
* then it gets re-run. When the next map attempt is running, if the AM dies,
|
||||||
|
* then, on AM re-run, the AM does not incorrectly remember the first failed
|
||||||
|
* attempt. Currently recovery does not recover running tasks. Effectively,
|
||||||
|
* the AM re-runs the maps from scratch.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testFetchFailureWithRecovery() throws Exception {
|
||||||
|
int runCount = 0;
|
||||||
|
MRApp app = new MRAppWithHistory(1, 1, false, this.getClass().getName(), true, ++runCount);
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
// map -> reduce -> fetch-failure -> map retry is incompatible with
|
||||||
|
// sequential, single-task-attempt approach in uber-AM, so disable:
|
||||||
|
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
|
||||||
|
Job job = app.submit(conf);
|
||||||
|
app.waitForState(job, JobState.RUNNING);
|
||||||
|
//all maps would be running
|
||||||
|
Assert.assertEquals("Num tasks not correct",
|
||||||
|
2, job.getTasks().size());
|
||||||
|
Iterator<Task> it = job.getTasks().values().iterator();
|
||||||
|
Task mapTask = it.next();
|
||||||
|
Task reduceTask = it.next();
|
||||||
|
|
||||||
|
//wait for Task state move to RUNNING
|
||||||
|
app.waitForState(mapTask, TaskState.RUNNING);
|
||||||
|
TaskAttempt mapAttempt1 = mapTask.getAttempts().values().iterator().next();
|
||||||
|
app.waitForState(mapAttempt1, TaskAttemptState.RUNNING);
|
||||||
|
|
||||||
|
//send the done signal to the map attempt
|
||||||
|
app.getContext().getEventHandler().handle(
|
||||||
|
new TaskAttemptEvent(mapAttempt1.getID(),
|
||||||
|
TaskAttemptEventType.TA_DONE));
|
||||||
|
|
||||||
|
// wait for map success
|
||||||
|
app.waitForState(mapTask, TaskState.SUCCEEDED);
|
||||||
|
|
||||||
|
TaskAttemptCompletionEvent[] events =
|
||||||
|
job.getTaskAttemptCompletionEvents(0, 100);
|
||||||
|
Assert.assertEquals("Num completion events not correct",
|
||||||
|
1, events.length);
|
||||||
|
Assert.assertEquals("Event status not correct",
|
||||||
|
TaskAttemptCompletionEventStatus.SUCCEEDED, events[0].getStatus());
|
||||||
|
|
||||||
|
// wait for reduce to start running
|
||||||
|
app.waitForState(reduceTask, TaskState.RUNNING);
|
||||||
|
TaskAttempt reduceAttempt =
|
||||||
|
reduceTask.getAttempts().values().iterator().next();
|
||||||
|
app.waitForState(reduceAttempt, TaskAttemptState.RUNNING);
|
||||||
|
|
||||||
|
//send 3 fetch failures from reduce to trigger map re execution
|
||||||
|
sendFetchFailure(app, reduceAttempt, mapAttempt1);
|
||||||
|
sendFetchFailure(app, reduceAttempt, mapAttempt1);
|
||||||
|
sendFetchFailure(app, reduceAttempt, mapAttempt1);
|
||||||
|
|
||||||
|
//wait for map Task state move back to RUNNING
|
||||||
|
app.waitForState(mapTask, TaskState.RUNNING);
|
||||||
|
|
||||||
|
// Crash the app again.
|
||||||
|
app.stop();
|
||||||
|
|
||||||
|
//rerun
|
||||||
|
app =
|
||||||
|
new MRAppWithHistory(1, 1, false, this.getClass().getName(), false,
|
||||||
|
++runCount);
|
||||||
|
conf = new Configuration();
|
||||||
|
conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
|
||||||
|
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
|
||||||
|
job = app.submit(conf);
|
||||||
|
app.waitForState(job, JobState.RUNNING);
|
||||||
|
//all maps would be running
|
||||||
|
Assert.assertEquals("Num tasks not correct",
|
||||||
|
2, job.getTasks().size());
|
||||||
|
it = job.getTasks().values().iterator();
|
||||||
|
mapTask = it.next();
|
||||||
|
reduceTask = it.next();
|
||||||
|
|
||||||
|
// the map is not in a SUCCEEDED state after restart of AM
|
||||||
|
app.waitForState(mapTask, TaskState.RUNNING);
|
||||||
|
mapAttempt1 = mapTask.getAttempts().values().iterator().next();
|
||||||
|
app.waitForState(mapAttempt1, TaskAttemptState.RUNNING);
|
||||||
|
|
||||||
|
//send the done signal to the map attempt
|
||||||
|
app.getContext().getEventHandler().handle(
|
||||||
|
new TaskAttemptEvent(mapAttempt1.getID(),
|
||||||
|
TaskAttemptEventType.TA_DONE));
|
||||||
|
|
||||||
|
// wait for map success
|
||||||
|
app.waitForState(mapTask, TaskState.SUCCEEDED);
|
||||||
|
|
||||||
|
reduceAttempt = reduceTask.getAttempts().values().iterator().next();
|
||||||
|
//send done to reduce
|
||||||
|
app.getContext().getEventHandler().handle(
|
||||||
|
new TaskAttemptEvent(reduceAttempt.getID(),
|
||||||
|
TaskAttemptEventType.TA_DONE));
|
||||||
|
|
||||||
|
app.waitForState(job, JobState.SUCCEEDED);
|
||||||
|
events = job.getTaskAttemptCompletionEvents(0, 100);
|
||||||
|
Assert.assertEquals("Num completion events not correct", 2, events.length);
|
||||||
|
}
|
||||||
|
|
||||||
private void sendFetchFailure(MRApp app, TaskAttempt reduceAttempt,
|
private void sendFetchFailure(MRApp app, TaskAttempt reduceAttempt,
|
||||||
TaskAttempt mapAttempt) {
|
TaskAttempt mapAttempt) {
|
||||||
app.getContext().getEventHandler().handle(
|
app.getContext().getEventHandler().handle(
|
||||||
@ -150,4 +254,20 @@ private void sendFetchFailure(MRApp app, TaskAttempt reduceAttempt,
|
|||||||
reduceAttempt.getID(),
|
reduceAttempt.getID(),
|
||||||
Arrays.asList(new TaskAttemptId[] {mapAttempt.getID()})));
|
Arrays.asList(new TaskAttemptId[] {mapAttempt.getID()})));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static class MRAppWithHistory extends MRApp {
|
||||||
|
public MRAppWithHistory(int maps, int reduces, boolean autoComplete,
|
||||||
|
String testName, boolean cleanOnStart, int startCount) {
|
||||||
|
super(maps, reduces, autoComplete, testName, cleanOnStart, startCount);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
|
||||||
|
AppContext context) {
|
||||||
|
JobHistoryEventHandler eventHandler = new JobHistoryEventHandler(context,
|
||||||
|
getStartCount());
|
||||||
|
return eventHandler;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -230,7 +230,8 @@
|
|||||||
{"name": "taskType", "type": "string"},
|
{"name": "taskType", "type": "string"},
|
||||||
{"name": "finishTime", "type": "long"},
|
{"name": "finishTime", "type": "long"},
|
||||||
{"name": "status", "type": "string"},
|
{"name": "status", "type": "string"},
|
||||||
{"name": "counters", "type": "JhCounters"}
|
{"name": "counters", "type": "JhCounters"},
|
||||||
|
{"name": "successfulAttemptId", "type": "string"}
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
|
|
||||||
|
@ -276,6 +276,17 @@ private void handleTaskAttemptFailedEvent(
|
|||||||
attemptInfo.shuffleFinishTime = event.getFinishTime();
|
attemptInfo.shuffleFinishTime = event.getFinishTime();
|
||||||
attemptInfo.sortFinishTime = event.getFinishTime();
|
attemptInfo.sortFinishTime = event.getFinishTime();
|
||||||
attemptInfo.mapFinishTime = event.getFinishTime();
|
attemptInfo.mapFinishTime = event.getFinishTime();
|
||||||
|
if(TaskStatus.State.SUCCEEDED.toString().equals(taskInfo.status))
|
||||||
|
{
|
||||||
|
//this is a successful task
|
||||||
|
if(attemptInfo.getAttemptId().equals(taskInfo.getSuccessfulAttemptId()))
|
||||||
|
{
|
||||||
|
// the failed attempt is the one that made this task successful
|
||||||
|
// so its no longer successful
|
||||||
|
taskInfo.status = null;
|
||||||
|
// not resetting the other fields set in handleTaskFinishedEvent()
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void handleTaskAttemptStartedEvent(TaskAttemptStartedEvent event) {
|
private void handleTaskAttemptStartedEvent(TaskAttemptStartedEvent event) {
|
||||||
@ -299,6 +310,7 @@ private void handleTaskFinishedEvent(TaskFinishedEvent event) {
|
|||||||
taskInfo.counters = event.getCounters();
|
taskInfo.counters = event.getCounters();
|
||||||
taskInfo.finishTime = event.getFinishTime();
|
taskInfo.finishTime = event.getFinishTime();
|
||||||
taskInfo.status = TaskStatus.State.SUCCEEDED.toString();
|
taskInfo.status = TaskStatus.State.SUCCEEDED.toString();
|
||||||
|
taskInfo.successfulAttemptId = event.getSuccessfulTaskAttemptId();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void handleTaskUpdatedEvent(TaskUpdatedEvent event) {
|
private void handleTaskUpdatedEvent(TaskUpdatedEvent event) {
|
||||||
@ -514,6 +526,7 @@ public static class TaskInfo {
|
|||||||
String status;
|
String status;
|
||||||
String error;
|
String error;
|
||||||
TaskAttemptID failedDueToAttemptId;
|
TaskAttemptID failedDueToAttemptId;
|
||||||
|
TaskAttemptID successfulAttemptId;
|
||||||
Map<TaskAttemptID, TaskAttemptInfo> attemptsMap;
|
Map<TaskAttemptID, TaskAttemptInfo> attemptsMap;
|
||||||
|
|
||||||
public TaskInfo() {
|
public TaskInfo() {
|
||||||
@ -554,6 +567,10 @@ public void printAll() {
|
|||||||
public TaskAttemptID getFailedDueToAttemptId() {
|
public TaskAttemptID getFailedDueToAttemptId() {
|
||||||
return failedDueToAttemptId;
|
return failedDueToAttemptId;
|
||||||
}
|
}
|
||||||
|
/** @return the attempt Id that caused this task to succeed */
|
||||||
|
public TaskAttemptID getSuccessfulAttemptId() {
|
||||||
|
return successfulAttemptId;
|
||||||
|
}
|
||||||
/** @return the error */
|
/** @return the error */
|
||||||
public String getError() { return error; }
|
public String getError() { return error; }
|
||||||
/** @return the map of all attempts for this task */
|
/** @return the map of all attempts for this task */
|
||||||
|
@ -22,6 +22,7 @@
|
|||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.mapreduce.Counters;
|
import org.apache.hadoop.mapreduce.Counters;
|
||||||
|
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
||||||
import org.apache.hadoop.mapreduce.TaskID;
|
import org.apache.hadoop.mapreduce.TaskID;
|
||||||
import org.apache.hadoop.mapreduce.TaskType;
|
import org.apache.hadoop.mapreduce.TaskType;
|
||||||
|
|
||||||
@ -36,6 +37,7 @@ public class TaskFinishedEvent implements HistoryEvent {
|
|||||||
private TaskFinished datum = null;
|
private TaskFinished datum = null;
|
||||||
|
|
||||||
private TaskID taskid;
|
private TaskID taskid;
|
||||||
|
private TaskAttemptID successfulAttemptId;
|
||||||
private long finishTime;
|
private long finishTime;
|
||||||
private TaskType taskType;
|
private TaskType taskType;
|
||||||
private String status;
|
private String status;
|
||||||
@ -44,15 +46,17 @@ public class TaskFinishedEvent implements HistoryEvent {
|
|||||||
/**
|
/**
|
||||||
* Create an event to record the successful completion of a task
|
* Create an event to record the successful completion of a task
|
||||||
* @param id Task ID
|
* @param id Task ID
|
||||||
|
* @param attemptId Task Attempt ID of the successful attempt for this task
|
||||||
* @param finishTime Finish time of the task
|
* @param finishTime Finish time of the task
|
||||||
* @param taskType Type of the task
|
* @param taskType Type of the task
|
||||||
* @param status Status string
|
* @param status Status string
|
||||||
* @param counters Counters for the task
|
* @param counters Counters for the task
|
||||||
*/
|
*/
|
||||||
public TaskFinishedEvent(TaskID id, long finishTime,
|
public TaskFinishedEvent(TaskID id, TaskAttemptID attemptId, long finishTime,
|
||||||
TaskType taskType,
|
TaskType taskType,
|
||||||
String status, Counters counters) {
|
String status, Counters counters) {
|
||||||
this.taskid = id;
|
this.taskid = id;
|
||||||
|
this.successfulAttemptId = attemptId;
|
||||||
this.finishTime = finishTime;
|
this.finishTime = finishTime;
|
||||||
this.taskType = taskType;
|
this.taskType = taskType;
|
||||||
this.status = status;
|
this.status = status;
|
||||||
@ -65,6 +69,10 @@ public Object getDatum() {
|
|||||||
if (datum == null) {
|
if (datum == null) {
|
||||||
datum = new TaskFinished();
|
datum = new TaskFinished();
|
||||||
datum.taskid = new Utf8(taskid.toString());
|
datum.taskid = new Utf8(taskid.toString());
|
||||||
|
if(successfulAttemptId != null)
|
||||||
|
{
|
||||||
|
datum.successfulAttemptId = new Utf8(successfulAttemptId.toString());
|
||||||
|
}
|
||||||
datum.finishTime = finishTime;
|
datum.finishTime = finishTime;
|
||||||
datum.counters = EventWriter.toAvro(counters);
|
datum.counters = EventWriter.toAvro(counters);
|
||||||
datum.taskType = new Utf8(taskType.name());
|
datum.taskType = new Utf8(taskType.name());
|
||||||
@ -76,6 +84,10 @@ public Object getDatum() {
|
|||||||
public void setDatum(Object oDatum) {
|
public void setDatum(Object oDatum) {
|
||||||
this.datum = (TaskFinished)oDatum;
|
this.datum = (TaskFinished)oDatum;
|
||||||
this.taskid = TaskID.forName(datum.taskid.toString());
|
this.taskid = TaskID.forName(datum.taskid.toString());
|
||||||
|
if (datum.successfulAttemptId != null) {
|
||||||
|
this.successfulAttemptId = TaskAttemptID
|
||||||
|
.forName(datum.successfulAttemptId.toString());
|
||||||
|
}
|
||||||
this.finishTime = datum.finishTime;
|
this.finishTime = datum.finishTime;
|
||||||
this.taskType = TaskType.valueOf(datum.taskType.toString());
|
this.taskType = TaskType.valueOf(datum.taskType.toString());
|
||||||
this.status = datum.status.toString();
|
this.status = datum.status.toString();
|
||||||
@ -84,6 +96,14 @@ public void setDatum(Object oDatum) {
|
|||||||
|
|
||||||
/** Get task id */
|
/** Get task id */
|
||||||
public TaskID getTaskId() { return TaskID.forName(taskid.toString()); }
|
public TaskID getTaskId() { return TaskID.forName(taskid.toString()); }
|
||||||
|
/** Get successful task attempt id */
|
||||||
|
public TaskAttemptID getSuccessfulTaskAttemptId() {
|
||||||
|
if(successfulAttemptId != null)
|
||||||
|
{
|
||||||
|
return TaskAttemptID.forName(successfulAttemptId.toString());
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
/** Get the task finish time */
|
/** Get the task finish time */
|
||||||
public long getFinishTime() { return finishTime; }
|
public long getFinishTime() { return finishTime; }
|
||||||
/** Get task counters */
|
/** Get task counters */
|
||||||
|
@ -128,7 +128,7 @@ HistoryEvent maybeEmitEvent(ParsedLine line, String taskIDName,
|
|||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
return new TaskFinishedEvent(taskID, Long.parseLong(finishTime),
|
return new TaskFinishedEvent(taskID, null, Long.parseLong(finishTime),
|
||||||
that.originalTaskType, status, eventCounters);
|
that.originalTaskType, status, eventCounters);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user