MAPREDUCE-4774. JobImpl does not handle asynchronous task events in FAILED state (jlowe via bobby)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1407679 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
3ccc76c29b
commit
7c9778e2c3
@ -647,6 +647,9 @@ Release 0.23.5 - UNRELEASED
|
||||
|
||||
MAPREDUCE-4782. NLineInputFormat skips first line of last InputSplit
|
||||
(Mark Fuhs via bobby)
|
||||
|
||||
MAPREDUCE-4774. JobImpl does not handle asynchronous task events in FAILED
|
||||
state (jlowe via bobby)
|
||||
|
||||
Release 0.23.4 - UNRELEASED
|
||||
|
||||
|
@ -348,6 +348,9 @@ JobEventType.JOB_KILL, new KillTasksTransition())
|
||||
.addTransition(JobStateInternal.FAILED, JobStateInternal.FAILED,
|
||||
EnumSet.of(JobEventType.JOB_KILL,
|
||||
JobEventType.JOB_UPDATED_NODES,
|
||||
JobEventType.JOB_TASK_COMPLETED,
|
||||
JobEventType.JOB_TASK_ATTEMPT_COMPLETED,
|
||||
JobEventType.JOB_MAP_TASK_RESCHEDULED,
|
||||
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE))
|
||||
|
||||
// Transitions from KILLED state
|
||||
|
@ -27,6 +27,7 @@
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
@ -42,6 +43,7 @@
|
||||
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
|
||||
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.Task;
|
||||
@ -51,10 +53,14 @@
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.InitTransition;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.JobNoTasksCompletedTransition;
|
||||
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.SystemClock;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.InlineDispatcher;
|
||||
import org.apache.hadoop.yarn.state.StateMachine;
|
||||
import org.apache.hadoop.yarn.state.StateMachineFactory;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
@ -340,7 +346,7 @@ null, mock(JobTokenSecretManager.class), null, null, null,
|
||||
return isUber;
|
||||
}
|
||||
|
||||
private InitTransition getInitTransition() {
|
||||
private static InitTransition getInitTransition() {
|
||||
InitTransition initTransition = new InitTransition() {
|
||||
@Override
|
||||
protected TaskSplitMetaInfo[] createSplits(JobImpl job, JobId jobId) {
|
||||
@ -350,4 +356,63 @@ protected TaskSplitMetaInfo[] createSplits(JobImpl job, JobId jobId) {
|
||||
};
|
||||
return initTransition;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTransitionsAtFailed() throws IOException {
|
||||
Configuration conf = new Configuration();
|
||||
JobID jobID = JobID.forName("job_1234567890000_0001");
|
||||
JobId jobId = TypeConverter.toYarn(jobID);
|
||||
OutputCommitter committer = mock(OutputCommitter.class);
|
||||
doThrow(new IOException("forcefail"))
|
||||
.when(committer).setupJob(any(JobContext.class));
|
||||
InlineDispatcher dispatcher = new InlineDispatcher();
|
||||
JobImpl job = new StubbedJob(jobId, Records
|
||||
.newRecord(ApplicationAttemptId.class), conf,
|
||||
dispatcher.getEventHandler(), committer, true, null);
|
||||
|
||||
dispatcher.register(JobEventType.class, job);
|
||||
job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
|
||||
Assert.assertEquals(JobState.FAILED, job.getState());
|
||||
|
||||
job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_COMPLETED));
|
||||
Assert.assertEquals(JobState.FAILED, job.getState());
|
||||
job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_ATTEMPT_COMPLETED));
|
||||
Assert.assertEquals(JobState.FAILED, job.getState());
|
||||
job.handle(new JobEvent(jobId, JobEventType.JOB_MAP_TASK_RESCHEDULED));
|
||||
Assert.assertEquals(JobState.FAILED, job.getState());
|
||||
job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE));
|
||||
Assert.assertEquals(JobState.FAILED, job.getState());
|
||||
}
|
||||
|
||||
private static class StubbedJob extends JobImpl {
|
||||
//override the init transition
|
||||
private final InitTransition initTransition = getInitTransition();
|
||||
StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent> localFactory
|
||||
= stateMachineFactory.addTransition(JobStateInternal.NEW,
|
||||
EnumSet.of(JobStateInternal.INITED, JobStateInternal.FAILED),
|
||||
JobEventType.JOB_INIT,
|
||||
// This is abusive.
|
||||
initTransition);
|
||||
|
||||
private final StateMachine<JobStateInternal, JobEventType, JobEvent>
|
||||
localStateMachine;
|
||||
|
||||
@Override
|
||||
protected StateMachine<JobStateInternal, JobEventType, JobEvent> getStateMachine() {
|
||||
return localStateMachine;
|
||||
}
|
||||
|
||||
public StubbedJob(JobId jobId, ApplicationAttemptId applicationAttemptId,
|
||||
Configuration conf, EventHandler eventHandler,
|
||||
OutputCommitter committer, boolean newApiCommitter, String user) {
|
||||
super(jobId, applicationAttemptId, conf, eventHandler,
|
||||
null, new JobTokenSecretManager(), new Credentials(),
|
||||
new SystemClock(), null, MRAppMetrics.create(), committer,
|
||||
newApiCommitter, user, System.currentTimeMillis(), null, null);
|
||||
|
||||
// This "this leak" is okay because the retained pointer is in an
|
||||
// instance variable.
|
||||
localStateMachine = localFactory.make(this);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user