diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index b945e05a47..33a88e6772 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -1184,6 +1184,9 @@ Release 0.23.0 - Unreleased MAPREDUCE-2917. Fixed corner case in container reservation which led to starvation and hung jobs. (acmurthy) + MAPREDUCE-2756. Better error handling in JobControl for failed jobs. + (Robert Evans via acmurthy) + Release 0.22.0 - Unreleased INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/ControlledJob.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/ControlledJob.java index a7f5a4c118..b5f9482edf 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/ControlledJob.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/ControlledJob.java @@ -23,6 +23,8 @@ import java.util.ArrayList; import java.util.List; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; @@ -47,6 +49,7 @@ @InterfaceAudience.Public @InterfaceStability.Evolving public class ControlledJob { + private static final Log LOG = LogFactory.getLog(ControlledJob.class); // A job will be in one of the following states public static enum State {SUCCESS, WAITING, RUNNING, READY, FAILED, @@ -235,6 +238,17 @@ public void killJob() throws IOException, InterruptedException { job.killJob(); } + public synchronized void failJob(String message) throws IOException, InterruptedException { + try { + if(job != null && this.state == State.RUNNING) { + job.killJob(); + } + } finally { + this.state = State.FAILED; + this.message = message; + } + } + /** * Check the state of this running job. The state may * remain the same, become SUCCESS or FAILED. @@ -322,6 +336,7 @@ protected synchronized void submit() { job.submit(); this.state = State.RUNNING; } catch (Exception ioe) { + LOG.info(getJobName()+" got an error while submitting ",ioe); this.state = State.FAILED; this.message = StringUtils.stringifyException(ioe); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/JobControl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/JobControl.java index 494e5e9dce..a2bc70aaf5 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/JobControl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/JobControl.java @@ -21,13 +21,16 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; -import java.util.Hashtable; +import java.util.Iterator; +import java.util.LinkedList; import java.util.List; -import java.util.Map; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob.State; +import org.apache.hadoop.util.StringUtils; /** * This class encapsulates a set of MapReduce jobs and its dependency. @@ -49,17 +52,16 @@ @InterfaceAudience.Public @InterfaceStability.Evolving public class JobControl implements Runnable { + private static final Log LOG = LogFactory.getLog(JobControl.class); // The thread can be in one of the following state public static enum ThreadState {RUNNING, SUSPENDED,STOPPED, STOPPING, READY}; private ThreadState runnerState; // the thread state - private Map waitingJobs; - private Map readyJobs; - private Map runningJobs; - private Map successfulJobs; - private Map failedJobs; + private LinkedList jobsInProgress = new LinkedList(); + private LinkedList successfulJobs = new LinkedList(); + private LinkedList failedJobs = new LinkedList(); private long nextJobID; private String groupName; @@ -69,46 +71,51 @@ public static enum ThreadState {RUNNING, SUSPENDED,STOPPED, STOPPING, READY}; * @param groupName a name identifying this group */ public JobControl(String groupName) { - this.waitingJobs = new Hashtable(); - this.readyJobs = new Hashtable(); - this.runningJobs = new Hashtable(); - this.successfulJobs = new Hashtable(); - this.failedJobs = new Hashtable(); this.nextJobID = -1; this.groupName = groupName; this.runnerState = ThreadState.READY; } private static List toList( - Map jobs) { + LinkedList jobs) { ArrayList retv = new ArrayList(); synchronized (jobs) { - for (ControlledJob job : jobs.values()) { + for (ControlledJob job : jobs) { retv.add(job); } } return retv; } + synchronized private List getJobsIn(State state) { + LinkedList l = new LinkedList(); + for(ControlledJob j: jobsInProgress) { + if(j.getJobState() == state) { + l.add(j); + } + } + return l; + } + /** * @return the jobs in the waiting state */ public List getWaitingJobList() { - return toList(this.waitingJobs); + return getJobsIn(State.WAITING); } /** * @return the jobs in the running state */ public List getRunningJobList() { - return toList(this.runningJobs); + return getJobsIn(State.RUNNING); } /** * @return the jobs in the ready state */ public List getReadyJobsList() { - return toList(this.readyJobs); + return getJobsIn(State.READY); } /** @@ -126,34 +133,6 @@ private String getNextJobID() { nextJobID += 1; return this.groupName + this.nextJobID; } - - private static void addToQueue(ControlledJob aJob, - Map queue) { - synchronized(queue) { - queue.put(aJob.getJobID(), aJob); - } - } - - private void addToQueue(ControlledJob aJob) { - Map queue = getQueue(aJob.getJobState()); - addToQueue(aJob, queue); - } - - private Map getQueue(State state) { - Map retv = null; - if (state == State.WAITING) { - retv = this.waitingJobs; - } else if (state == State.READY) { - retv = this.readyJobs; - } else if (state == State.RUNNING) { - retv = this.runningJobs; - } else if (state == State.SUCCESS) { - retv = this.successfulJobs; - } else if (state == State.FAILED || state == State.DEPENDENT_FAILED) { - retv = this.failedJobs; - } - return retv; - } /** * Add a new job. @@ -163,7 +142,7 @@ synchronized public String addJob(ControlledJob aJob) { String id = this.getNextJobID(); aJob.setJobID(id); aJob.setJobState(State.WAITING); - this.addToQueue(aJob); + jobsInProgress.add(aJob); return id; } @@ -211,47 +190,8 @@ public void resume () { } } - synchronized private void checkRunningJobs() - throws IOException, InterruptedException { - - Map oldJobs = null; - oldJobs = this.runningJobs; - this.runningJobs = new Hashtable(); - - for (ControlledJob nextJob : oldJobs.values()) { - nextJob.checkState(); - this.addToQueue(nextJob); - } - } - - synchronized private void checkWaitingJobs() - throws IOException, InterruptedException { - Map oldJobs = null; - oldJobs = this.waitingJobs; - this.waitingJobs = new Hashtable(); - - for (ControlledJob nextJob : oldJobs.values()) { - nextJob.checkState(); - this.addToQueue(nextJob); - } - } - - synchronized private void startReadyJobs() { - Map oldJobs = null; - oldJobs = this.readyJobs; - this.readyJobs = new Hashtable(); - - for (ControlledJob nextJob : oldJobs.values()) { - //Submitting Job to Hadoop - nextJob.submit(); - this.addToQueue(nextJob); - } - } - synchronized public boolean allFinished() { - return this.waitingJobs.size() == 0 && - this.readyJobs.size() == 0 && - this.runningJobs.size() == 0; + return jobsInProgress.isEmpty(); } /** @@ -262,39 +202,83 @@ synchronized public boolean allFinished() { * Submit the jobs in ready state */ public void run() { - this.runnerState = ThreadState.RUNNING; - while (true) { - while (this.runnerState == ThreadState.SUSPENDED) { + try { + this.runnerState = ThreadState.RUNNING; + while (true) { + while (this.runnerState == ThreadState.SUSPENDED) { + try { + Thread.sleep(5000); + } + catch (Exception e) { + //TODO the thread was interrupted, do something!!! + } + } + + synchronized(this) { + Iterator it = jobsInProgress.iterator(); + while(it.hasNext()) { + ControlledJob j = it.next(); + LOG.debug("Checking state of job "+j); + switch(j.checkState()) { + case SUCCESS: + successfulJobs.add(j); + it.remove(); + break; + case FAILED: + case DEPENDENT_FAILED: + failedJobs.add(j); + it.remove(); + break; + case READY: + j.submit(); + break; + case RUNNING: + case WAITING: + //Do Nothing + break; + } + } + } + + if (this.runnerState != ThreadState.RUNNING && + this.runnerState != ThreadState.SUSPENDED) { + break; + } try { Thread.sleep(5000); } catch (Exception e) { - + //TODO the thread was interrupted, do something!!! + } + if (this.runnerState != ThreadState.RUNNING && + this.runnerState != ThreadState.SUSPENDED) { + break; } } - try { - checkRunningJobs(); - checkWaitingJobs(); - startReadyJobs(); - } catch (Exception e) { - this.runnerState = ThreadState.STOPPED; - } - if (this.runnerState != ThreadState.RUNNING && - this.runnerState != ThreadState.SUSPENDED) { - break; - } - try { - Thread.sleep(5000); - } - catch (Exception e) { - - } - if (this.runnerState != ThreadState.RUNNING && - this.runnerState != ThreadState.SUSPENDED) { - break; - } + }catch(Throwable t) { + LOG.error("Error while trying to run jobs.",t); + //Mark all jobs as failed because we got something bad. + failAllJobs(t); } this.runnerState = ThreadState.STOPPED; } + synchronized private void failAllJobs(Throwable t) { + String message = "Unexpected System Error Occured: "+ + StringUtils.stringifyException(t); + Iterator it = jobsInProgress.iterator(); + while(it.hasNext()) { + ControlledJob j = it.next(); + try { + j.failJob(message); + } catch (IOException e) { + LOG.error("Error while tyring to clean up "+j.getJobName(), e); + } catch (InterruptedException e) { + LOG.error("Error while tyring to clean up "+j.getJobName(), e); + } finally { + failedJobs.add(j); + it.remove(); + } + } + } } diff --git a/hadoop-mapreduce-project/src/test/unit/org/apache/hadoop/mapreduce/lib/jobcontrol/TestMapReduceJobControlWithMocks.java b/hadoop-mapreduce-project/src/test/unit/org/apache/hadoop/mapreduce/lib/jobcontrol/TestMapReduceJobControlWithMocks.java index cc72949d39..b8469fa18c 100644 --- a/hadoop-mapreduce-project/src/test/unit/org/apache/hadoop/mapreduce/lib/jobcontrol/TestMapReduceJobControlWithMocks.java +++ b/hadoop-mapreduce-project/src/test/unit/org/apache/hadoop/mapreduce/lib/jobcontrol/TestMapReduceJobControlWithMocks.java @@ -23,6 +23,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.doThrow; import java.io.IOException; import java.util.Arrays; @@ -80,6 +81,29 @@ public void testFailedJob() throws Exception { jobControl.stop(); } + + @Test + public void testErrorWhileSubmitting() throws Exception { + JobControl jobControl = new JobControl("Test"); + + Job mockJob = mock(Job.class); + + ControlledJob job1 = new ControlledJob(mockJob, null); + when(mockJob.getConfiguration()).thenReturn(new Configuration()); + doThrow(new IncompatibleClassChangeError("This is a test")).when(mockJob).submit(); + + jobControl.addJob(job1); + + runJobControl(jobControl); + try { + assertEquals("Success list", 0, jobControl.getSuccessfulJobList().size()); + assertEquals("Failed list", 1, jobControl.getFailedJobList().size()); + + assertTrue(job1.getJobState() == ControlledJob.State.FAILED); + } finally { + jobControl.stop(); + } + } @Test public void testKillJob() throws Exception {