MAPREDUCE-4937. MR AM handles an oversized split metainfo file poorly. Contributed by Eric Payne

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1588559 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jason Darrell Lowe 2014-04-18 20:33:28 +00:00
parent cda8646cfa
commit 8d569c2220
5 changed files with 75 additions and 17 deletions

View File

@ -177,6 +177,9 @@ Release 2.5.0 - UNRELEASED
MAPREDUCE-5775. Remove unnecessary job.setNumReduceTasks in SleepJob.createJob
(jhanver chand sharma via devaraj)
MAPREDUCE-4937. MR AM handles an oversized split metainfo file poorly
(Eric Payne via jlowe)
Release 2.4.1 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -1054,6 +1054,7 @@ protected void serviceStart() throws Exception {
// It's more test friendly to put it here.
DefaultMetricsSystem.initialize("MRAppMaster");
boolean initFailed = false;
if (!errorHappenedShutDown) {
// create a job event for job intialization
JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT);
@ -1062,6 +1063,10 @@ protected void serviceStart() throws Exception {
// job-init to be done completely here.
jobEventDispatcher.handle(initJobEvent);
// If job is still not initialized, an error happened during
// initialization. Must complete starting all of the services so failure
// events can be processed.
initFailed = (((JobImpl)job).getInternalState() != JobStateInternal.INITED);
// JobImpl's InitTransition is done (call above is synchronous), so the
// "uber-decision" (MR-1220) has been made. Query job and switch to
@ -1090,8 +1095,14 @@ protected void serviceStart() throws Exception {
// set job classloader if configured
MRApps.setJobClassLoader(getConfig());
// All components have started, start the job.
startJobs();
if (initFailed) {
JobEvent initFailedEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT_FAILED);
jobEventDispatcher.handle(initFailedEvent);
} else {
// All components have started, start the job.
startJobs();
}
}
@Override

View File

@ -28,6 +28,7 @@ public enum JobEventType {
//Producer:MRAppMaster
JOB_INIT,
JOB_INIT_FAILED,
JOB_START,
//Producer:Task

View File

@ -250,9 +250,12 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
.addTransition
(JobStateInternal.NEW,
EnumSet.of(JobStateInternal.INITED, JobStateInternal.FAILED),
EnumSet.of(JobStateInternal.INITED, JobStateInternal.NEW),
JobEventType.JOB_INIT,
new InitTransition())
.addTransition(JobStateInternal.NEW, JobStateInternal.FAIL_ABORT,
JobEventType.JOB_INIT_FAILED,
new InitFailedTransition())
.addTransition(JobStateInternal.NEW, JobStateInternal.KILLED,
JobEventType.JOB_KILL,
new KillNewJobTransition())
@ -265,7 +268,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
// Ignore-able events
.addTransition(JobStateInternal.NEW, JobStateInternal.NEW,
JobEventType.JOB_UPDATED_NODES)
// Transitions from INITED state
.addTransition(JobStateInternal.INITED, JobStateInternal.INITED,
JobEventType.JOB_DIAGNOSTIC_UPDATE,
@ -1374,6 +1377,15 @@ public static class InitTransition
public JobStateInternal transition(JobImpl job, JobEvent event) {
job.metrics.submittedJob(job);
job.metrics.preparingJob(job);
if (job.newApiCommitter) {
job.jobContext = new JobContextImpl(job.conf,
job.oldJobId);
} else {
job.jobContext = new org.apache.hadoop.mapred.JobContextImpl(
job.conf, job.oldJobId);
}
try {
setup(job);
job.fs = job.getFileSystem(job.conf);
@ -1409,14 +1421,6 @@ public JobStateInternal transition(JobImpl job, JobEvent event) {
checkTaskLimits();
if (job.newApiCommitter) {
job.jobContext = new JobContextImpl(job.conf,
job.oldJobId);
} else {
job.jobContext = new org.apache.hadoop.mapred.JobContextImpl(
job.conf, job.oldJobId);
}
long inputLength = 0;
for (int i = 0; i < job.numMapTasks; ++i) {
inputLength += taskSplitMetaInfo[i].getInputDataLength();
@ -1443,15 +1447,14 @@ public JobStateInternal transition(JobImpl job, JobEvent event) {
job.metrics.endPreparingJob(job);
return JobStateInternal.INITED;
} catch (IOException e) {
} catch (Exception e) {
LOG.warn("Job init failed", e);
job.metrics.endPreparingJob(job);
job.addDiagnostic("Job init failed : "
+ StringUtils.stringifyException(e));
job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
job.jobContext,
org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
return JobStateInternal.FAILED;
// Leave job in the NEW state. The MR AM will detect that the state is
// not INITED and send a JOB_INIT_FAILED event.
return JobStateInternal.NEW;
}
}
@ -1552,6 +1555,16 @@ private void checkTaskLimits() {
}
} // end of InitTransition
private static class InitFailedTransition
implements SingleArcTransition<JobImpl, JobEvent> {
@Override
public void transition(JobImpl job, JobEvent event) {
job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
job.jobContext,
org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
}
}
private static class SetupCompletedTransition
implements SingleArcTransition<JobImpl, JobEvent> {
@Override

View File

@ -81,6 +81,7 @@
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.event.InlineDispatcher;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.ConverterUtils;
@ -728,6 +729,35 @@ public void testTransitionsAtFailed() throws IOException {
commitHandler.stop();
}
static final String EXCEPTIONMSG = "Splits max exceeded";
@Test
public void testMetaInfoSizeOverMax() throws Exception {
Configuration conf = new Configuration();
JobID jobID = JobID.forName("job_1234567890000_0001");
JobId jobId = TypeConverter.toYarn(jobID);
MRAppMetrics mrAppMetrics = MRAppMetrics.create();
JobImpl job =
new JobImpl(jobId, ApplicationAttemptId.newInstance(
ApplicationId.newInstance(0, 0), 0), conf, mock(EventHandler.class),
null, new JobTokenSecretManager(), new Credentials(), null, null,
mrAppMetrics, null, true, null, 0, null, null, null, null);
InitTransition initTransition = new InitTransition() {
@Override
protected TaskSplitMetaInfo[] createSplits(JobImpl job, JobId jobId) {
throw new YarnRuntimeException(EXCEPTIONMSG);
}
};
JobEvent mockJobEvent = mock(JobEvent.class);
JobStateInternal jobSI = initTransition.transition(job, mockJobEvent);
Assert.assertTrue("When init fails, return value from InitTransition.transition should equal NEW.",
jobSI.equals(JobStateInternal.NEW));
Assert.assertTrue("Job diagnostics should contain YarnRuntimeException",
job.getDiagnostics().toString().contains("YarnRuntimeException"));
Assert.assertTrue("Job diagnostics should contain " + EXCEPTIONMSG,
job.getDiagnostics().toString().contains(EXCEPTIONMSG));
}
private static CommitterEventHandler createCommitterEventHandler(
Dispatcher dispatcher, OutputCommitter committer) {
final SystemClock clock = new SystemClock();