diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java index f681cf8165..176ac01059 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java @@ -21,11 +21,15 @@ import java.util.Iterator; import java.util.Map; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.service.Service; import org.junit.Assert; - +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.jobhistory.EventType; +import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; @@ -100,12 +104,34 @@ public void testKillJob() throws Exception { @Test public void testKillTask() throws Exception { final CountDownLatch latch = new CountDownLatch(1); - MRApp app = new BlockingMRApp(2, 0, latch); + final CountDownLatch jobRunning = new CountDownLatch(1); + + final Dispatcher dispatcher = new AsyncDispatcher() { + protected void dispatch(Event event) { + // We have to wait until the internal state is RUNNING and not SETUP + // because it can cause a transition failure. If the dispatchable + // event is TASK_STARTED, we can continue because the job must be in + // RUNNING at that point. + if (event.getType() == EventType.TASK_STARTED) { + jobRunning.countDown(); + } + super.dispatch(event); + } + }; + + MRApp app = new BlockingMRApp(2, 0, latch) { + @Override + public Dispatcher createDispatcher() { + return dispatcher; + } + }; + //this will start the job but job won't complete as Task is blocked Job job = app.submit(new Configuration()); - - //wait and vailidate for Job to become RUNNING - app.waitForInternalState((JobImpl) job, JobStateInternal.RUNNING); + + // wait until the job transitions to JobInternalState.RUNNING + jobRunning.await(10, TimeUnit.SECONDS); + Map tasks = job.getTasks(); Assert.assertEquals("No of tasks is not correct", 2, tasks.size());