diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index 5ef12509ad..5489f52f6e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -422,7 +422,8 @@ JobEventType.JOB_KILL, new KillTasksTransition()) EnumSet.of(JobEventType.JOB_UPDATED_NODES, JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE, JobEventType.JOB_TASK_ATTEMPT_COMPLETED, - JobEventType.JOB_MAP_TASK_RESCHEDULED)) + JobEventType.JOB_MAP_TASK_RESCHEDULED, + JobEventType.JOB_TASK_COMPLETED)) // Transitions from SUCCEEDED state .addTransition(JobStateInternal.SUCCEEDED, JobStateInternal.SUCCEEDED, @@ -441,7 +442,8 @@ JobEventType.JOB_KILL, new KillTasksTransition()) JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE, JobEventType.JOB_AM_REBOOT, JobEventType.JOB_TASK_ATTEMPT_COMPLETED, - JobEventType.JOB_MAP_TASK_RESCHEDULED)) + JobEventType.JOB_MAP_TASK_RESCHEDULED, + JobEventType.JOB_TASK_COMPLETED)) // Transitions from FAIL_WAIT state .addTransition(JobStateInternal.FAIL_WAIT, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java index 43e59a7b34..5f378e4f9c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java @@ -204,7 +204,7 @@ public void testCommitJobFailsJob() throws Exception { public void testCheckJobCompleteSuccess() throws Exception { Configuration conf = new Configuration(); conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir); - AsyncDispatcher dispatcher = new AsyncDispatcher(); + DrainDispatcher dispatcher = new DrainDispatcher(); dispatcher.init(conf); dispatcher.start(); CyclicBarrier syncBarrier = new CyclicBarrier(2); @@ -226,6 +226,11 @@ public void testCheckJobCompleteSuccess() throws Exception { JobEventType.JOB_MAP_TASK_RESCHEDULED)); assertJobState(job, JobStateInternal.COMMITTING); + job.handle(new JobEvent(job.getID(), + JobEventType.JOB_TASK_COMPLETED)); + dispatcher.await(); + assertJobState(job, JobStateInternal.COMMITTING); + // let the committer complete and verify the job succeeds syncBarrier.await(); assertJobState(job, JobStateInternal.SUCCEEDED); @@ -237,6 +242,11 @@ public void testCheckJobCompleteSuccess() throws Exception { job.handle(new JobEvent(job.getID(), JobEventType.JOB_MAP_TASK_RESCHEDULED)); assertJobState(job, JobStateInternal.SUCCEEDED); + + job.handle(new JobEvent(job.getID(), + JobEventType.JOB_TASK_COMPLETED)); + dispatcher.await(); + assertJobState(job, JobStateInternal.SUCCEEDED); dispatcher.stop(); commitHandler.stop();