From 0939c49368f1bc9dc08cec9913b21ddea30d4e45 Mon Sep 17 00:00:00 2001 From: Jason Darrell Lowe Date: Tue, 23 Jul 2013 16:52:24 +0000 Subject: [PATCH] MAPREDUCE-5317. Stale files left behind for failed jobs. Contributed by Ravi Prakash git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1506154 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 6 + .../v2/app/job/JobStateInternal.java | 1 + .../v2/app/job/event/JobEventType.java | 1 + .../mapreduce/v2/app/job/impl/JobImpl.java | 132 ++++++++++++++++-- .../v2/app/job/impl/TestJobImpl.java | 79 +++++++++++ 5 files changed, 211 insertions(+), 8 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 5d0533ee8a..27f5ea9e90 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -167,6 +167,9 @@ Release 2.3.0 - UNRELEASED MAPREDUCE-5404. HSAdminServer does not use ephemeral ports in minicluster mode (Ted Yu via jlowe) + MAPREDUCE-5317. Stale files left behind for failed jobs (Ravi Prakash via + jlowe) + Release 2.1.1-beta - UNRELEASED INCOMPATIBLE CHANGES @@ -1222,6 +1225,9 @@ Release 0.23.10 - UNRELEASED MAPREDUCE-5380. Invalid mapred command should return non-zero exit code (Stephen Chu via jlowe) + MAPREDUCE-5317. Stale files left behind for failed jobs (Ravi Prakash via + jlowe) + Release 0.23.9 - 2013-07-08 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java index bdb627b235..aec91a3828 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java @@ -25,6 +25,7 @@ public enum JobStateInternal { RUNNING, COMMITTING, SUCCEEDED, + FAIL_WAIT, FAIL_ABORT, FAILED, KILL_WAIT, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java index f6c38d30bc..ab4d7f5d11 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java @@ -44,6 +44,7 @@ public enum JobEventType { //Producer:Job JOB_COMPLETED, + JOB_FAIL_WAIT_TIMEDOUT, //Producer:Any component JOB_DIAGNOSTIC_UPDATE, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index 2c33650de4..be2e49bf91 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -30,6 +30,9 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -313,7 +316,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, .addTransition (JobStateInternal.RUNNING, EnumSet.of(JobStateInternal.RUNNING, - JobStateInternal.COMMITTING, JobStateInternal.FAIL_ABORT), + JobStateInternal.COMMITTING, JobStateInternal.FAIL_WAIT, + JobStateInternal.FAIL_ABORT), JobEventType.JOB_TASK_COMPLETED, new TaskCompletedTransition()) .addTransition @@ -424,7 +428,37 @@ JobEventType.JOB_KILL, new KillTasksTransition()) JobEventType.JOB_TASK_ATTEMPT_COMPLETED, JobEventType.JOB_MAP_TASK_RESCHEDULED)) - // Transitions from FAIL_ABORT state + // Transitions from FAIL_WAIT state + .addTransition(JobStateInternal.FAIL_WAIT, + JobStateInternal.FAIL_WAIT, + JobEventType.JOB_DIAGNOSTIC_UPDATE, + DIAGNOSTIC_UPDATE_TRANSITION) + .addTransition(JobStateInternal.FAIL_WAIT, + JobStateInternal.FAIL_WAIT, + JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION) + .addTransition(JobStateInternal.FAIL_WAIT, + EnumSet.of(JobStateInternal.FAIL_WAIT, JobStateInternal.FAIL_ABORT), + JobEventType.JOB_TASK_COMPLETED, + new JobFailWaitTransition()) + .addTransition(JobStateInternal.FAIL_WAIT, + JobStateInternal.FAIL_ABORT, JobEventType.JOB_FAIL_WAIT_TIMEDOUT, + new JobFailWaitTimedOutTransition()) + .addTransition(JobStateInternal.FAIL_WAIT, JobStateInternal.KILLED, + JobEventType.JOB_KILL, + new KilledDuringAbortTransition()) + .addTransition(JobStateInternal.FAIL_WAIT, + JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR, + INTERNAL_ERROR_TRANSITION) + // Ignore-able events + .addTransition(JobStateInternal.FAIL_WAIT, + JobStateInternal.FAIL_WAIT, + EnumSet.of(JobEventType.JOB_UPDATED_NODES, + JobEventType.JOB_TASK_ATTEMPT_COMPLETED, + JobEventType.JOB_MAP_TASK_RESCHEDULED, + JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE, + JobEventType.JOB_AM_REBOOT)) + + //Transitions from FAIL_ABORT state .addTransition(JobStateInternal.FAIL_ABORT, JobStateInternal.FAIL_ABORT, JobEventType.JOB_DIAGNOSTIC_UPDATE, @@ -451,7 +485,8 @@ JobEventType.JOB_KILL, new KillTasksTransition()) JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE, JobEventType.JOB_COMMIT_COMPLETED, JobEventType.JOB_COMMIT_FAILED, - JobEventType.JOB_AM_REBOOT)) + JobEventType.JOB_AM_REBOOT, + JobEventType.JOB_FAIL_WAIT_TIMEDOUT)) // Transitions from KILL_ABORT state .addTransition(JobStateInternal.KILL_ABORT, @@ -602,6 +637,10 @@ JobEventType.JOB_KILL, new KillTasksTransition()) private JobStateInternal forcedState = null; + //Executor used for running future tasks. Setting thread pool size to 1 + private ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); + private ScheduledFuture failWaitTriggerScheduledFuture; + public JobImpl(JobId jobId, ApplicationAttemptId applicationAttemptId, Configuration conf, EventHandler eventHandler, TaskAttemptListener taskAttemptListener, @@ -962,6 +1001,7 @@ private static JobState getExternalState(JobStateInternal smState) { case SETUP: case COMMITTING: return JobState.RUNNING; + case FAIL_WAIT: case FAIL_ABORT: return JobState.FAILED; case REBOOT: @@ -1565,7 +1605,43 @@ public void transition(JobImpl job, JobEvent event) { job.unsuccessfulFinish(finalState); } } - + + //This transition happens when a job is to be failed. It waits for all the + //tasks to finish / be killed. + private static class JobFailWaitTransition + implements MultipleArcTransition { + @Override + public JobStateInternal transition(JobImpl job, JobEvent event) { + if(!job.failWaitTriggerScheduledFuture.isCancelled()) { + for(Task task: job.tasks.values()) { + if(!task.isFinished()) { + return JobStateInternal.FAIL_WAIT; + } + } + } + //Finished waiting. All tasks finished / were killed + job.failWaitTriggerScheduledFuture.cancel(false); + job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId, + job.jobContext, org.apache.hadoop.mapreduce.JobStatus.State.FAILED)); + return JobStateInternal.FAIL_ABORT; + } + } + + //This transition happens when a job to be failed times out while waiting on + //tasks that had been sent the KILL signal. It is triggered by a + //ScheduledFuture task queued in the executor. + private static class JobFailWaitTimedOutTransition + implements SingleArcTransition { + @Override + public void transition(JobImpl job, JobEvent event) { + LOG.info("Timeout expired in FAIL_WAIT waiting for tasks to get killed." + + " Going to fail job anyway"); + job.failWaitTriggerScheduledFuture.cancel(false); + job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId, + job.jobContext, org.apache.hadoop.mapreduce.JobStatus.State.FAILED)); + } + } + // JobFinishedEvent triggers the move of the history file out of the staging // area. May need to create a new event type for this if JobFinished should // not be generated for KilledJobs, etc. @@ -1798,6 +1874,23 @@ public JobStateInternal transition(JobImpl job, JobEvent event) { return checkJobAfterTaskCompletion(job); } + //This class is used to queue a ScheduledFuture to send an event to a job + //after some delay. This can be used to wait for maximum amount of time + //before proceeding anyway. e.g. When a job is waiting in FAIL_WAIT for + //all tasks to be killed. + static class TriggerScheduledFuture implements Runnable { + JobEvent toSend; + JobImpl job; + TriggerScheduledFuture(JobImpl job, JobEvent toSend) { + this.toSend = toSend; + this.job = job; + } + public void run() { + LOG.info("Sending event " + toSend + " to " + job.getID()); + job.getEventHandler().handle(toSend); + } + } + protected JobStateInternal checkJobAfterTaskCompletion(JobImpl job) { //check for Job failure if (job.failedMapTaskCount*100 > @@ -1811,10 +1904,33 @@ protected JobStateInternal checkJobAfterTaskCompletion(JobImpl job) { " failedReduces:" + job.failedReduceTaskCount; LOG.info(diagnosticMsg); job.addDiagnostic(diagnosticMsg); - job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId, - job.jobContext, - org.apache.hadoop.mapreduce.JobStatus.State.FAILED)); - return JobStateInternal.FAIL_ABORT; + + //Send kill signal to all unfinished tasks here. + boolean allDone = true; + for (Task task : job.tasks.values()) { + if(!task.isFinished()) { + allDone = false; + job.eventHandler.handle( + new TaskEvent(task.getID(), TaskEventType.T_KILL)); + } + } + + //If all tasks are already done, we should go directly to FAIL_ABORT + if(allDone) { + job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId, + job.jobContext, org.apache.hadoop.mapreduce.JobStatus.State.FAILED) + ); + return JobStateInternal.FAIL_ABORT; + } + + //Set max timeout to wait for the tasks to get killed + job.failWaitTriggerScheduledFuture = job.executor.schedule( + new TriggerScheduledFuture(job, new JobEvent(job.getID(), + JobEventType.JOB_FAIL_WAIT_TIMEDOUT)), job.conf.getInt( + MRJobConfig.MR_AM_COMMITTER_CANCEL_TIMEOUT_MS, + MRJobConfig.DEFAULT_MR_AM_COMMITTER_CANCEL_TIMEOUT_MS), + TimeUnit.MILLISECONDS); + return JobStateInternal.FAIL_WAIT; } return job.checkReadyForCommit(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java index 6da75a91bb..ad5745e136 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java @@ -57,13 +57,17 @@ import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventHandler; import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventType; import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal; +import org.apache.hadoop.mapreduce.v2.app.job.Task; +import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobStartEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.InitTransition; import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics; import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler; @@ -74,7 +78,9 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.event.InlineDispatcher; import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachineFactory; import org.apache.hadoop.yarn.util.ConverterUtils; @@ -84,6 +90,7 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.mockito.Mockito; /** @@ -332,6 +339,78 @@ public void testKilledDuringCommit() throws Exception { commitHandler.stop(); } + @Test + public void testAbortJobCalledAfterKillingTasks() throws IOException { + Configuration conf = new Configuration(); + conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir); + conf.set(MRJobConfig.MR_AM_COMMITTER_CANCEL_TIMEOUT_MS, "1000"); + InlineDispatcher dispatcher = new InlineDispatcher(); + dispatcher.init(conf); + dispatcher.start(); + OutputCommitter committer = Mockito.mock(OutputCommitter.class); + CommitterEventHandler commitHandler = + createCommitterEventHandler(dispatcher, committer); + commitHandler.init(conf); + commitHandler.start(); + JobImpl job = createRunningStubbedJob(conf, dispatcher, 2); + + //Fail one task. This should land the JobImpl in the FAIL_WAIT state + job.handle(new JobTaskEvent( + MRBuilderUtils.newTaskId(job.getID(), 1, TaskType.MAP), + TaskState.FAILED)); + //Verify abort job hasn't been called + Mockito.verify(committer, Mockito.never()) + .abortJob((JobContext) Mockito.any(), (State) Mockito.any()); + assertJobState(job, JobStateInternal.FAIL_WAIT); + + //Verify abortJob is called once and the job failed + Mockito.verify(committer, Mockito.timeout(2000).times(1)) + .abortJob((JobContext) Mockito.any(), (State) Mockito.any()); + assertJobState(job, JobStateInternal.FAILED); + + dispatcher.stop(); + } + + @Test (timeout=10000) + public void testFailAbortDoesntHang() throws IOException { + Configuration conf = new Configuration(); + conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir); + conf.set(MRJobConfig.MR_AM_COMMITTER_CANCEL_TIMEOUT_MS, "1000"); + + DrainDispatcher dispatcher = new DrainDispatcher(); + dispatcher.init(conf); + dispatcher.start(); + OutputCommitter committer = Mockito.mock(OutputCommitter.class); + CommitterEventHandler commitHandler = + createCommitterEventHandler(dispatcher, committer); + commitHandler.init(conf); + commitHandler.start(); + //Job has only 1 mapper task. No reducers + conf.setInt(MRJobConfig.NUM_REDUCES, 0); + conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1); + JobImpl job = createRunningStubbedJob(conf, dispatcher, 1); + + //Fail / finish all the tasks. This should land the JobImpl directly in the + //FAIL_ABORT state + for(Task t: job.tasks.values()) { + TaskImpl task = (TaskImpl) t; + task.handle(new TaskEvent(task.getID(), TaskEventType.T_SCHEDULE)); + for(TaskAttempt ta: task.getAttempts().values()) { + task.handle(new TaskTAttemptEvent(ta.getID(), + TaskEventType.T_ATTEMPT_FAILED)); + } + } + assertJobState(job, JobStateInternal.FAIL_ABORT); + + dispatcher.await(); + //Verify abortJob is called once and the job failed + Mockito.verify(committer, Mockito.timeout(2000).times(1)) + .abortJob((JobContext) Mockito.any(), (State) Mockito.any()); + assertJobState(job, JobStateInternal.FAILED); + + dispatcher.stop(); + } + @Test(timeout=20000) public void testKilledDuringFailAbort() throws Exception { Configuration conf = new Configuration();