From 8d569c2220236551d5c95e2ebdaaea52eebe37e6 Mon Sep 17 00:00:00 2001 From: Jason Darrell Lowe Date: Fri, 18 Apr 2014 20:33:28 +0000 Subject: [PATCH] MAPREDUCE-4937. MR AM handles an oversized split metainfo file poorly. Contributed by Eric Payne git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1588559 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 ++ .../hadoop/mapreduce/v2/app/MRAppMaster.java | 15 ++++++- .../v2/app/job/event/JobEventType.java | 1 + .../mapreduce/v2/app/job/impl/JobImpl.java | 43 ++++++++++++------- .../v2/app/job/impl/TestJobImpl.java | 30 +++++++++++++ 5 files changed, 75 insertions(+), 17 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 2e190faa05..0bf5c9bf25 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -177,6 +177,9 @@ Release 2.5.0 - UNRELEASED MAPREDUCE-5775. Remove unnecessary job.setNumReduceTasks in SleepJob.createJob (jhanver chand sharma via devaraj) + MAPREDUCE-4937. MR AM handles an oversized split metainfo file poorly + (Eric Payne via jlowe) + Release 2.4.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index 1656d3c0b1..405e30edd3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -1054,6 +1054,7 @@ protected void serviceStart() throws Exception { // It's more test friendly to put it here. DefaultMetricsSystem.initialize("MRAppMaster"); + boolean initFailed = false; if (!errorHappenedShutDown) { // create a job event for job intialization JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT); @@ -1062,6 +1063,10 @@ protected void serviceStart() throws Exception { // job-init to be done completely here. jobEventDispatcher.handle(initJobEvent); + // If job is still not initialized, an error happened during + // initialization. Must complete starting all of the services so failure + // events can be processed. + initFailed = (((JobImpl)job).getInternalState() != JobStateInternal.INITED); // JobImpl's InitTransition is done (call above is synchronous), so the // "uber-decision" (MR-1220) has been made. Query job and switch to @@ -1090,8 +1095,14 @@ protected void serviceStart() throws Exception { // set job classloader if configured MRApps.setJobClassLoader(getConfig()); - // All components have started, start the job. - startJobs(); + + if (initFailed) { + JobEvent initFailedEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT_FAILED); + jobEventDispatcher.handle(initFailedEvent); + } else { + // All components have started, start the job. + startJobs(); + } } @Override diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java index ab4d7f5d11..ed95161654 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java @@ -28,6 +28,7 @@ public enum JobEventType { //Producer:MRAppMaster JOB_INIT, + JOB_INIT_FAILED, JOB_START, //Producer:Task diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index 36bfca7183..b101f0eb96 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -250,9 +250,12 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION) .addTransition (JobStateInternal.NEW, - EnumSet.of(JobStateInternal.INITED, JobStateInternal.FAILED), + EnumSet.of(JobStateInternal.INITED, JobStateInternal.NEW), JobEventType.JOB_INIT, new InitTransition()) + .addTransition(JobStateInternal.NEW, JobStateInternal.FAIL_ABORT, + JobEventType.JOB_INIT_FAILED, + new InitFailedTransition()) .addTransition(JobStateInternal.NEW, JobStateInternal.KILLED, JobEventType.JOB_KILL, new KillNewJobTransition()) @@ -265,7 +268,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, // Ignore-able events .addTransition(JobStateInternal.NEW, JobStateInternal.NEW, JobEventType.JOB_UPDATED_NODES) - + // Transitions from INITED state .addTransition(JobStateInternal.INITED, JobStateInternal.INITED, JobEventType.JOB_DIAGNOSTIC_UPDATE, @@ -1374,6 +1377,15 @@ public static class InitTransition public JobStateInternal transition(JobImpl job, JobEvent event) { job.metrics.submittedJob(job); job.metrics.preparingJob(job); + + if (job.newApiCommitter) { + job.jobContext = new JobContextImpl(job.conf, + job.oldJobId); + } else { + job.jobContext = new org.apache.hadoop.mapred.JobContextImpl( + job.conf, job.oldJobId); + } + try { setup(job); job.fs = job.getFileSystem(job.conf); @@ -1409,14 +1421,6 @@ public JobStateInternal transition(JobImpl job, JobEvent event) { checkTaskLimits(); - if (job.newApiCommitter) { - job.jobContext = new JobContextImpl(job.conf, - job.oldJobId); - } else { - job.jobContext = new org.apache.hadoop.mapred.JobContextImpl( - job.conf, job.oldJobId); - } - long inputLength = 0; for (int i = 0; i < job.numMapTasks; ++i) { inputLength += taskSplitMetaInfo[i].getInputDataLength(); @@ -1443,15 +1447,14 @@ public JobStateInternal transition(JobImpl job, JobEvent event) { job.metrics.endPreparingJob(job); return JobStateInternal.INITED; - } catch (IOException e) { + } catch (Exception e) { LOG.warn("Job init failed", e); job.metrics.endPreparingJob(job); job.addDiagnostic("Job init failed : " + StringUtils.stringifyException(e)); - job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId, - job.jobContext, - org.apache.hadoop.mapreduce.JobStatus.State.FAILED)); - return JobStateInternal.FAILED; + // Leave job in the NEW state. The MR AM will detect that the state is + // not INITED and send a JOB_INIT_FAILED event. + return JobStateInternal.NEW; } } @@ -1552,6 +1555,16 @@ private void checkTaskLimits() { } } // end of InitTransition + private static class InitFailedTransition + implements SingleArcTransition { + @Override + public void transition(JobImpl job, JobEvent event) { + job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId, + job.jobContext, + org.apache.hadoop.mapreduce.JobStatus.State.FAILED)); + } + } + private static class SetupCompletedTransition implements SingleArcTransition { @Override diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java index 7b0bc27627..6cfc83ea00 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java @@ -81,6 +81,7 @@ import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.InlineDispatcher; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachineFactory; import org.apache.hadoop.yarn.util.ConverterUtils; @@ -728,6 +729,35 @@ public void testTransitionsAtFailed() throws IOException { commitHandler.stop(); } + static final String EXCEPTIONMSG = "Splits max exceeded"; + @Test + public void testMetaInfoSizeOverMax() throws Exception { + Configuration conf = new Configuration(); + JobID jobID = JobID.forName("job_1234567890000_0001"); + JobId jobId = TypeConverter.toYarn(jobID); + MRAppMetrics mrAppMetrics = MRAppMetrics.create(); + JobImpl job = + new JobImpl(jobId, ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0, 0), 0), conf, mock(EventHandler.class), + null, new JobTokenSecretManager(), new Credentials(), null, null, + mrAppMetrics, null, true, null, 0, null, null, null, null); + InitTransition initTransition = new InitTransition() { + @Override + protected TaskSplitMetaInfo[] createSplits(JobImpl job, JobId jobId) { + throw new YarnRuntimeException(EXCEPTIONMSG); + } + }; + JobEvent mockJobEvent = mock(JobEvent.class); + + JobStateInternal jobSI = initTransition.transition(job, mockJobEvent); + Assert.assertTrue("When init fails, return value from InitTransition.transition should equal NEW.", + jobSI.equals(JobStateInternal.NEW)); + Assert.assertTrue("Job diagnostics should contain YarnRuntimeException", + job.getDiagnostics().toString().contains("YarnRuntimeException")); + Assert.assertTrue("Job diagnostics should contain " + EXCEPTIONMSG, + job.getDiagnostics().toString().contains(EXCEPTIONMSG)); + } + private static CommitterEventHandler createCommitterEventHandler( Dispatcher dispatcher, OutputCommitter committer) { final SystemClock clock = new SystemClock();