MAPREDUCE-6984. MR AM to clean up temporary files from previous attempt in case of no recovery. (Gergo Repas via Haibo Chen)

This commit is contained in:
Haibo Chen 2018-01-19 12:56:17 -08:00
parent 4aca4ff759
commit cce71dceef
2 changed files with 105 additions and 0 deletions

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.mapreduce.v2.app; package org.apache.hadoop.mapreduce.v2.app;
import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Constructor; import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException; import java.lang.reflect.InvocationTargetException;
@ -54,6 +55,7 @@
import org.apache.hadoop.mapred.TaskUmbilicalProtocol; import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
import org.apache.hadoop.mapreduce.CryptoUtils; import org.apache.hadoop.mapreduce.CryptoUtils;
import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobStatus.State;
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.OutputFormat;
@ -1217,6 +1219,7 @@ protected void serviceStart() throws Exception {
amInfos = new LinkedList<AMInfo>(); amInfos = new LinkedList<AMInfo>();
completedTasksFromPreviousRun = new HashMap<TaskId, TaskInfo>(); completedTasksFromPreviousRun = new HashMap<TaskId, TaskInfo>();
processRecovery(); processRecovery();
cleanUpPreviousJobOutput();
// Current an AMInfo for the current AM generation. // Current an AMInfo for the current AM generation.
AMInfo amInfo = AMInfo amInfo =
@ -1395,6 +1398,26 @@ private boolean shouldAttemptRecovery() throws IOException {
return true; return true;
} }
private void cleanUpPreviousJobOutput() {
// recovered application masters should not remove data from previous job
if (!recovered()) {
JobContext jobContext = getJobContextFromConf(getConfig());
try {
LOG.info("Starting to clean up previous job's temporary files");
this.committer.abortJob(jobContext, State.FAILED);
LOG.info("Finished cleaning up previous job temporary files");
} catch (FileNotFoundException e) {
LOG.info("Previous job temporary files do not exist, " +
"no clean up was necessary.");
} catch (Exception e) {
// the clean up of a previous attempt is not critical to the success
// of this job - only logging the error
LOG.error("Error while trying to clean up previous job's temporary " +
"files", e);
}
}
}
private static FSDataInputStream getPreviousJobHistoryStream( private static FSDataInputStream getPreviousJobHistoryStream(
Configuration conf, ApplicationAttemptId appAttemptId) Configuration conf, ApplicationAttemptId appAttemptId)
throws IOException { throws IOException {

View File

@ -19,9 +19,11 @@
package org.apache.hadoop.mapreduce.v2.app; package org.apache.hadoop.mapreduce.v2.app;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -43,6 +45,7 @@
import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobCounter; import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.JobID;
@ -452,6 +455,8 @@ public void testCrashOfMapsOnlyJob() throws Exception {
public static class TestFileOutputCommitter extends public static class TestFileOutputCommitter extends
org.apache.hadoop.mapred.FileOutputCommitter { org.apache.hadoop.mapred.FileOutputCommitter {
private boolean abortJobCalled;
@Override @Override
public boolean isRecoverySupported( public boolean isRecoverySupported(
org.apache.hadoop.mapred.JobContext jobContext) { org.apache.hadoop.mapred.JobContext jobContext) {
@ -462,6 +467,16 @@ public boolean isRecoverySupported(
} }
return isRecoverySupported; return isRecoverySupported;
} }
@Override
public void abortJob(JobContext context, int runState) throws IOException {
super.abortJob(context, runState);
this.abortJobCalled = true;
}
private boolean isAbortJobCalled() {
return this.abortJobCalled;
}
} }
/** /**
@ -1009,6 +1024,73 @@ public void testOutputRecovery() throws Exception {
validateOutput(); validateOutput();
} }
@Test
public void testPreviousJobOutputCleanedWhenNoRecovery() throws Exception {
int runCount = 0;
MRApp app = new MRAppWithHistory(1, 2, false, this.getClass().getName(),
true, ++runCount);
Configuration conf = new Configuration();
conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, false);
conf.setClass("mapred.output.committer.class",
TestFileOutputCommitter.class,
org.apache.hadoop.mapred.OutputCommitter.class);
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
Job job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size());
//stop the app before the job completes.
app.stop();
app.close();
//rerun
app = new MRAppWithHistory(1, 2, false, this.getClass().getName(), false,
++runCount);
job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size());
TestFileOutputCommitter committer = (
TestFileOutputCommitter) app.getCommitter();
assertTrue("commiter.abortJob() has not been called",
committer.isAbortJobCalled());
app.close();
}
@Test
public void testPreviousJobIsNotCleanedWhenRecovery()
throws Exception {
int runCount = 0;
MRApp app = new MRAppWithHistory(1, 2, false, this.getClass().getName(),
true, ++runCount);
Configuration conf = new Configuration();
conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
conf.setClass("mapred.output.committer.class",
TestFileOutputCommitter.class,
org.apache.hadoop.mapred.OutputCommitter.class);
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
// TestFileOutputCommitter supports recovery if want.am.recovery=true
conf.setBoolean("want.am.recovery", true);
conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
Job job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size());
//stop the app before the job completes.
app.stop();
app.close();
//rerun
app = new MRAppWithHistory(1, 2, false, this.getClass().getName(), false,
++runCount);
job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size());
TestFileOutputCommitter committer = (
TestFileOutputCommitter) app.getCommitter();
assertFalse("commiter.abortJob() has been called",
committer.isAbortJobCalled());
app.close();
}
@Test @Test
public void testOutputRecoveryMapsOnly() throws Exception { public void testOutputRecoveryMapsOnly() throws Exception {
int runCount = 0; int runCount = 0;