From 51a667bef8300d6499c9867b50eee352311a4185 Mon Sep 17 00:00:00 2001 From: Arun Murthy Date: Tue, 18 Oct 2011 21:37:31 +0000 Subject: [PATCH] MAPREDUCE-2762. Cleanup MR staging directory on completion. Contributed by Mahadev Konar. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1185880 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../hadoop/mapreduce/v2/app/MRAppMaster.java | 125 +++++++++++++----- .../mapreduce/v2/app/job/impl/JobImpl.java | 38 ++++-- .../mapreduce/v2/app/TestStagingCleanup.java | 102 ++++++++++++++ .../java/org/apache/hadoop/mapred/Task.java | 2 +- .../apache/hadoop/mapreduce/JobSubmitter.java | 2 +- .../apache/hadoop/mapreduce/MRJobConfig.java | 2 + .../TestApplicationCleanup.java | 1 - 8 files changed, 230 insertions(+), 45 deletions(-) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 0f82606d6c..fd1804c341 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -1666,6 +1666,9 @@ Release 0.23.0 - Unreleased MAPREDUCE-3197. TestMRClientService failing on building clean checkout of branch 0.23 (mahadev) + MAPREDUCE-2762. Cleanup MR staging directory on completion. (mahadev via + acmurthy) + Release 0.22.0 - Unreleased INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index bd1c1e7836..df4f5cd1f5 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -31,12 +31,14 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.LocalContainerLauncher; import org.apache.hadoop.mapred.TaskAttemptListenerImpl; import org.apache.hadoop.mapred.TaskUmbilicalProtocol; import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; @@ -228,6 +230,97 @@ public void init(final Configuration conf) { super.init(conf); } // end of init() + + protected boolean keepJobFiles(JobConf conf) { + return (conf.getKeepTaskFilesPattern() != null || conf + .getKeepFailedTaskFiles()); + } + + /** + * Create the default file System for this job. + * @param conf the conf object + * @return the default filesystem for this job + * @throws IOException + */ + protected FileSystem getFileSystem(Configuration conf) throws IOException { + return FileSystem.get(conf); + } + + /** + * clean up staging directories for the job. + * @throws IOException + */ + public void cleanupStagingDir() throws IOException { + /* make sure we clean the staging files */ + String jobTempDir = null; + FileSystem fs = getFileSystem(getConfig()); + try { + if (!keepJobFiles(new JobConf(getConfig()))) { + jobTempDir = getConfig().get(MRJobConfig.MAPREDUCE_JOB_DIR); + if (jobTempDir == null) { + LOG.warn("Job Staging directory is null"); + return; + } + Path jobTempDirPath = new Path(jobTempDir); + LOG.info("Deleting staging directory " + fs.getDefaultUri(getConfig()) + + " " + jobTempDir); + fs.delete(jobTempDirPath, true); + } + } catch(IOException io) { + LOG.error("Failed to cleanup staging dir " + jobTempDir, io); + } + } + + /** + * Exit call. Just in a function call to enable testing. + */ + protected void sysexit() { + System.exit(0); + } + + private class JobFinishEventHandler implements EventHandler { + @Override + public void handle(JobFinishEvent event) { + // job has finished + // this is the only job, so shut down the Appmaster + // note in a workflow scenario, this may lead to creation of a new + // job (FIXME?) + + // TODO:currently just wait for some time so clients can know the + // final states. Will be removed once RM come on. + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + LOG.info("Calling stop for all the services"); + try { + stop(); + } catch (Throwable t) { + LOG.warn("Graceful stop failed ", t); + } + try { + cleanupStagingDir(); + } catch(IOException io) { + LOG.warn("Failed to delete staging dir"); + } + //TODO: this is required because rpc server does not shut down + // in spite of calling server.stop(). + //Bring the process down by force. + //Not needed after HADOOP-7140 + LOG.info("Exiting MR AppMaster..GoodBye!"); + sysexit(); + } + } + + /** + * create an event handler that handles the job finish event. + * @return the job finish event handler. + */ + protected EventHandler createJobFinishEventHandler() { + return new JobFinishEventHandler(); + } + /** Create and initialize (but don't start) a single job. */ protected Job createJob(Configuration conf) { @@ -238,36 +331,7 @@ protected Job createJob(Configuration conf) { ((RunningAppContext) context).jobs.put(newJob.getID(), newJob); dispatcher.register(JobFinishEvent.Type.class, - new EventHandler() { - @Override - public void handle(JobFinishEvent event) { - // job has finished - // this is the only job, so shut down the Appmaster - // note in a workflow scenario, this may lead to creation of a new - // job (FIXME?) - - // TODO:currently just wait for some time so clients can know the - // final states. Will be removed once RM come on. - try { - Thread.sleep(5000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - LOG.info("Calling stop for all the services"); - try { - stop(); - } catch (Throwable t) { - LOG.warn("Graceful stop failed ", t); - } - //TODO: this is required because rpc server does not shut down - // in spite of calling server.stop(). - //Bring the process down by force. - //Not needed after HADOOP-7140 - LOG.info("Exiting MR AppMaster..GoodBye!"); - System.exit(0); - } - }); - + createJobFinishEventHandler()); return newJob; } // end createJob() @@ -553,6 +617,7 @@ public void start() { ///////////////////// Create the job itself. job = createJob(getConfig()); + // End of creating the job. // metrics system init is really init & start. diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index 4a47462a2d..bf5443025f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -339,7 +339,6 @@ JobEventType.JOB_KILL, new KillTasksTransition()) JobEventType.JOB_DIAGNOSTIC_UPDATE, JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE, JobEventType.INTERNAL_ERROR)) - // create the topology tables .installTopology(); @@ -724,6 +723,16 @@ void logJobHistoryFinishedEvent() { this.getEventHandler().handle(new JobHistoryEvent(this.jobId, jfe)); } + /** + * Create the default file System for this job. + * @param conf the conf object + * @return the default filesystem for this job + * @throws IOException + */ + protected FileSystem getFileSystem(Configuration conf) throws IOException { + return FileSystem.get(conf); + } + static JobState checkJobCompleteSuccess(JobImpl job) { // check for Job success if (job.completedTaskCount == job.getTasks().size()) { @@ -733,7 +742,6 @@ static JobState checkJobCompleteSuccess(JobImpl job) { } catch (IOException e) { LOG.warn("Could not do commit for Job", e); } - job.logJobHistoryFinishedEvent(); return job.finished(JobState.SUCCEEDED); } @@ -816,7 +824,7 @@ public JobState transition(JobImpl job, JobEvent event) { job.metrics.preparingJob(job); try { setup(job); - job.fs = FileSystem.get(job.conf); + job.fs = job.getFileSystem(job.conf); //log to job history JobSubmittedEvent jse = new JobSubmittedEvent(job.oldJobId, @@ -848,13 +856,14 @@ public JobState transition(JobImpl job, JobEvent event) { LOG.info("Using mapred newApiCommitter."); } - LOG.info("OutputCommitter set in config " + job.conf.get("mapred.output.committer.class")); + LOG.info("OutputCommitter set in config " + job.conf.get( + "mapred.output.committer.class")); if (newApiCommitter) { job.jobContext = new JobContextImpl(job.conf, job.oldJobId); - org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = RecordFactoryProvider - .getRecordFactory(null) + org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID + = RecordFactoryProvider.getRecordFactory(null) .newRecordInstance( org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId.class); attemptID.setTaskId(RecordFactoryProvider.getRecordFactory(null) @@ -884,14 +893,17 @@ public JobState transition(JobImpl job, JobEvent event) { inputLength += taskSplitMetaInfo[i].getInputDataLength(); } -//FIXME: need new memory criterion for uber-decision (oops, too late here; until AM-resizing supported, must depend on job client to pass fat-slot needs) + //FIXME: need new memory criterion for uber-decision (oops, too late here; + // until AM-resizing supported, must depend on job client to pass fat-slot needs) // these are no longer "system" settings, necessarily; user may override int sysMaxMaps = job.conf.getInt(MRJobConfig.JOB_UBERTASK_MAXMAPS, 9); int sysMaxReduces = job.conf.getInt(MRJobConfig.JOB_UBERTASK_MAXREDUCES, 1); long sysMaxBytes = job.conf.getLong(MRJobConfig.JOB_UBERTASK_MAXBYTES, - job.conf.getLong("dfs.block.size", 64*1024*1024)); //FIXME: this is wrong; get FS from [File?]InputFormat and default block size from that - //long sysMemSizeForUberSlot = JobTracker.getMemSizeForReduceSlot(); // FIXME [could use default AM-container memory size...] + job.conf.getLong("dfs.block.size", 64*1024*1024)); //FIXME: this is + // wrong; get FS from [File?]InputFormat and default block size from that + //long sysMemSizeForUberSlot = JobTracker.getMemSizeForReduceSlot(); + // FIXME [could use default AM-container memory size...] boolean uberEnabled = job.conf.getBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); @@ -900,8 +912,8 @@ public JobState transition(JobImpl job, JobEvent event) { boolean smallInput = (inputLength <= sysMaxBytes); boolean smallMemory = true; //FIXME (see above) // ignoring overhead due to UberTask and statics as negligible here: -// FIXME && (Math.max(memoryPerMap, memoryPerReduce) <= sysMemSizeForUberSlot -// || sysMemSizeForUberSlot == JobConf.DISABLED_MEMORY_LIMIT) + // FIXME && (Math.max(memoryPerMap, memoryPerReduce) <= sysMemSizeForUberSlot + // || sysMemSizeForUberSlot == JobConf.DISABLED_MEMORY_LIMIT) boolean notChainJob = !isChainJob(job.conf); // User has overall veto power over uberization, or user can modify @@ -935,7 +947,9 @@ public JobState transition(JobImpl job, JobEvent event) { job.conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 1); // disable speculation: makes no sense to speculate an entire job -// canSpeculateMaps = canSpeculateReduces = false; // [TODO: in old version, ultimately was from conf.getMapSpeculativeExecution(), conf.getReduceSpeculativeExecution()] + //canSpeculateMaps = canSpeculateReduces = false; // [TODO: in old + //version, ultimately was from conf.getMapSpeculativeExecution(), + //conf.getReduceSpeculativeExecution()] } else { StringBuilder msg = new StringBuilder(); msg.append("Not uberizing ").append(job.jobId).append(" because:"); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java new file mode 100644 index 0000000000..037e8baed5 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java @@ -0,0 +1,102 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.mapreduce.v2.app; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; + +import junit.framework.TestCase; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.v2.api.records.JobId; +import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.junit.Test; + + +/** + * Make sure that the job staging directory clean up happens. + */ + public class TestStagingCleanup extends TestCase { + + private Configuration conf = new Configuration(); + private FileSystem fs; + private String stagingJobDir = "tmpJobDir"; + private Path stagingJobPath = new Path(stagingJobDir); + private final static RecordFactory recordFactory = RecordFactoryProvider. + getRecordFactory(null); + private static final Log LOG = LogFactory.getLog(TestStagingCleanup.class); + + @Test + public void testDeletionofStaging() throws IOException { + conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir); + fs = mock(FileSystem.class); + when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true); + ApplicationAttemptId attemptId = recordFactory.newRecordInstance( + ApplicationAttemptId.class); + attemptId.setAttemptId(0); + ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class); + appId.setClusterTimestamp(System.currentTimeMillis()); + appId.setId(0); + attemptId.setApplicationId(appId); + JobId jobid = recordFactory.newRecordInstance(JobId.class); + jobid.setAppId(appId); + MRAppMaster appMaster = new TestMRApp(attemptId); + EventHandler handler = + appMaster.createJobFinishEventHandler(); + handler.handle(new JobFinishEvent(jobid)); + verify(fs).delete(stagingJobPath, true); + } + + private class TestMRApp extends MRAppMaster { + + public TestMRApp(ApplicationAttemptId applicationAttemptId) { + super(applicationAttemptId); + } + + @Override + protected FileSystem getFileSystem(Configuration conf) { + return fs; + } + + @Override + protected void sysexit() { + } + + @Override + public Configuration getConfig() { + return conf; + } + } + + } \ No newline at end of file diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java index 60b711be9a..29ce4822b7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java @@ -1119,7 +1119,7 @@ protected void runJobCleanupTask(TaskUmbilicalProtocol umbilical, // delete the staging area for the job JobConf conf = new JobConf(jobContext.getConfiguration()); if (!keepTaskFiles(conf)) { - String jobTempDir = conf.get("mapreduce.job.dir"); + String jobTempDir = conf.get(MRJobConfig.MAPREDUCE_JOB_DIR); Path jobTempDirPath = new Path(jobTempDir); FileSystem fs = jobTempDirPath.getFileSystem(conf); fs.delete(jobTempDirPath, true); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java index 0451a0df42..beeb101de4 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java @@ -341,7 +341,7 @@ JobStatus submitJobInternal(Job job, Cluster cluster) Path submitJobDir = new Path(jobStagingArea, jobId.toString()); JobStatus status = null; try { - conf.set("mapreduce.job.dir", submitJobDir.toString()); + conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString()); LOG.debug("Configuring job " + jobId + " with " + submitJobDir + " as the submit dir"); // get delegation token for the dir diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index 60830536df..41b3897257 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -238,6 +238,8 @@ public interface MRJobConfig { public static final String REDUCE_JAVA_OPTS = "mapreduce.reduce.java.opts"; public static final String REDUCE_ULIMIT = "mapreduce.reduce.ulimit"; + + public static final String MAPREDUCE_JOB_DIR = "mapreduce.job.dir"; public static final String REDUCE_MAX_ATTEMPTS = "mapreduce.reduce.maxattempts"; diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java index 79320b6eb7..86bf29055b 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java @@ -31,7 +31,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; -import org.apache.hadoop.yarn.util.Records; import org.apache.log4j.Level; import org.apache.log4j.LogManager; import org.apache.log4j.Logger;