MAPREDUCE-2762. Cleanup MR staging directory on completion. Contributed by Mahadev Konar.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1185880 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arun Murthy 2011-10-18 21:37:31 +00:00
parent 073a13aeb1
commit 51a667bef8
8 changed files with 230 additions and 45 deletions

View File

@ -1666,6 +1666,9 @@ Release 0.23.0 - Unreleased
MAPREDUCE-3197. TestMRClientService failing on building clean checkout of
branch 0.23 (mahadev)
MAPREDUCE-2762. Cleanup MR staging directory on completion. (mahadev via
acmurthy)
Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES

View File

@ -31,12 +31,14 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.LocalContainerLauncher;
import org.apache.hadoop.mapred.TaskAttemptListenerImpl;
import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
@ -228,17 +230,55 @@ public void init(final Configuration conf) {
super.init(conf);
} // end of init()
/** Create and initialize (but don't start) a single job. */
protected Job createJob(Configuration conf) {
// create single job
Job newJob = new JobImpl(appAttemptID, conf, dispatcher.getEventHandler(),
taskAttemptListener, jobTokenSecretManager, fsTokens, clock,
completedTasksFromPreviousRun, metrics, currentUser.getUserName());
((RunningAppContext) context).jobs.put(newJob.getID(), newJob);
protected boolean keepJobFiles(JobConf conf) {
return (conf.getKeepTaskFilesPattern() != null || conf
.getKeepFailedTaskFiles());
}
dispatcher.register(JobFinishEvent.Type.class,
new EventHandler<JobFinishEvent>() {
/**
* Create the default file System for this job.
* @param conf the conf object
* @return the default filesystem for this job
* @throws IOException
*/
protected FileSystem getFileSystem(Configuration conf) throws IOException {
return FileSystem.get(conf);
}
/**
* clean up staging directories for the job.
* @throws IOException
*/
public void cleanupStagingDir() throws IOException {
/* make sure we clean the staging files */
String jobTempDir = null;
FileSystem fs = getFileSystem(getConfig());
try {
if (!keepJobFiles(new JobConf(getConfig()))) {
jobTempDir = getConfig().get(MRJobConfig.MAPREDUCE_JOB_DIR);
if (jobTempDir == null) {
LOG.warn("Job Staging directory is null");
return;
}
Path jobTempDirPath = new Path(jobTempDir);
LOG.info("Deleting staging directory " + fs.getDefaultUri(getConfig()) +
" " + jobTempDir);
fs.delete(jobTempDirPath, true);
}
} catch(IOException io) {
LOG.error("Failed to cleanup staging dir " + jobTempDir, io);
}
}
/**
* Exit call. Just in a function call to enable testing.
*/
protected void sysexit() {
System.exit(0);
}
private class JobFinishEventHandler implements EventHandler<JobFinishEvent> {
@Override
public void handle(JobFinishEvent event) {
// job has finished
@ -259,15 +299,39 @@ public void handle(JobFinishEvent event) {
} catch (Throwable t) {
LOG.warn("Graceful stop failed ", t);
}
try {
cleanupStagingDir();
} catch(IOException io) {
LOG.warn("Failed to delete staging dir");
}
//TODO: this is required because rpc server does not shut down
// in spite of calling server.stop().
//Bring the process down by force.
//Not needed after HADOOP-7140
LOG.info("Exiting MR AppMaster..GoodBye!");
System.exit(0);
sysexit();
}
}
});
/**
* create an event handler that handles the job finish event.
* @return the job finish event handler.
*/
protected EventHandler<JobFinishEvent> createJobFinishEventHandler() {
return new JobFinishEventHandler();
}
/** Create and initialize (but don't start) a single job. */
protected Job createJob(Configuration conf) {
// create single job
Job newJob = new JobImpl(appAttemptID, conf, dispatcher.getEventHandler(),
taskAttemptListener, jobTokenSecretManager, fsTokens, clock,
completedTasksFromPreviousRun, metrics, currentUser.getUserName());
((RunningAppContext) context).jobs.put(newJob.getID(), newJob);
dispatcher.register(JobFinishEvent.Type.class,
createJobFinishEventHandler());
return newJob;
} // end createJob()
@ -553,6 +617,7 @@ public void start() {
///////////////////// Create the job itself.
job = createJob(getConfig());
// End of creating the job.
// metrics system init is really init & start.

View File

@ -339,7 +339,6 @@ JobEventType.JOB_KILL, new KillTasksTransition())
JobEventType.JOB_DIAGNOSTIC_UPDATE,
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
JobEventType.INTERNAL_ERROR))
// create the topology tables
.installTopology();
@ -724,6 +723,16 @@ void logJobHistoryFinishedEvent() {
this.getEventHandler().handle(new JobHistoryEvent(this.jobId, jfe));
}
/**
* Create the default file System for this job.
* @param conf the conf object
* @return the default filesystem for this job
* @throws IOException
*/
protected FileSystem getFileSystem(Configuration conf) throws IOException {
return FileSystem.get(conf);
}
static JobState checkJobCompleteSuccess(JobImpl job) {
// check for Job success
if (job.completedTaskCount == job.getTasks().size()) {
@ -733,7 +742,6 @@ static JobState checkJobCompleteSuccess(JobImpl job) {
} catch (IOException e) {
LOG.warn("Could not do commit for Job", e);
}
job.logJobHistoryFinishedEvent();
return job.finished(JobState.SUCCEEDED);
}
@ -816,7 +824,7 @@ public JobState transition(JobImpl job, JobEvent event) {
job.metrics.preparingJob(job);
try {
setup(job);
job.fs = FileSystem.get(job.conf);
job.fs = job.getFileSystem(job.conf);
//log to job history
JobSubmittedEvent jse = new JobSubmittedEvent(job.oldJobId,
@ -848,13 +856,14 @@ public JobState transition(JobImpl job, JobEvent event) {
LOG.info("Using mapred newApiCommitter.");
}
LOG.info("OutputCommitter set in config " + job.conf.get("mapred.output.committer.class"));
LOG.info("OutputCommitter set in config " + job.conf.get(
"mapred.output.committer.class"));
if (newApiCommitter) {
job.jobContext = new JobContextImpl(job.conf,
job.oldJobId);
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = RecordFactoryProvider
.getRecordFactory(null)
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID
= RecordFactoryProvider.getRecordFactory(null)
.newRecordInstance(
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId.class);
attemptID.setTaskId(RecordFactoryProvider.getRecordFactory(null)
@ -884,14 +893,17 @@ public JobState transition(JobImpl job, JobEvent event) {
inputLength += taskSplitMetaInfo[i].getInputDataLength();
}
//FIXME: need new memory criterion for uber-decision (oops, too late here; until AM-resizing supported, must depend on job client to pass fat-slot needs)
//FIXME: need new memory criterion for uber-decision (oops, too late here;
// until AM-resizing supported, must depend on job client to pass fat-slot needs)
// these are no longer "system" settings, necessarily; user may override
int sysMaxMaps = job.conf.getInt(MRJobConfig.JOB_UBERTASK_MAXMAPS, 9);
int sysMaxReduces =
job.conf.getInt(MRJobConfig.JOB_UBERTASK_MAXREDUCES, 1);
long sysMaxBytes = job.conf.getLong(MRJobConfig.JOB_UBERTASK_MAXBYTES,
job.conf.getLong("dfs.block.size", 64*1024*1024)); //FIXME: this is wrong; get FS from [File?]InputFormat and default block size from that
//long sysMemSizeForUberSlot = JobTracker.getMemSizeForReduceSlot(); // FIXME [could use default AM-container memory size...]
job.conf.getLong("dfs.block.size", 64*1024*1024)); //FIXME: this is
// wrong; get FS from [File?]InputFormat and default block size from that
//long sysMemSizeForUberSlot = JobTracker.getMemSizeForReduceSlot();
// FIXME [could use default AM-container memory size...]
boolean uberEnabled =
job.conf.getBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
@ -935,7 +947,9 @@ public JobState transition(JobImpl job, JobEvent event) {
job.conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 1);
// disable speculation: makes no sense to speculate an entire job
// canSpeculateMaps = canSpeculateReduces = false; // [TODO: in old version, ultimately was from conf.getMapSpeculativeExecution(), conf.getReduceSpeculativeExecution()]
//canSpeculateMaps = canSpeculateReduces = false; // [TODO: in old
//version, ultimately was from conf.getMapSpeculativeExecution(),
//conf.getReduceSpeculativeExecution()]
} else {
StringBuilder msg = new StringBuilder();
msg.append("Not uberizing ").append(job.jobId).append(" because:");

View File

@ -0,0 +1,102 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.junit.Test;
/**
* Make sure that the job staging directory clean up happens.
*/
public class TestStagingCleanup extends TestCase {
private Configuration conf = new Configuration();
private FileSystem fs;
private String stagingJobDir = "tmpJobDir";
private Path stagingJobPath = new Path(stagingJobDir);
private final static RecordFactory recordFactory = RecordFactoryProvider.
getRecordFactory(null);
private static final Log LOG = LogFactory.getLog(TestStagingCleanup.class);
@Test
public void testDeletionofStaging() throws IOException {
conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
fs = mock(FileSystem.class);
when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true);
ApplicationAttemptId attemptId = recordFactory.newRecordInstance(
ApplicationAttemptId.class);
attemptId.setAttemptId(0);
ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class);
appId.setClusterTimestamp(System.currentTimeMillis());
appId.setId(0);
attemptId.setApplicationId(appId);
JobId jobid = recordFactory.newRecordInstance(JobId.class);
jobid.setAppId(appId);
MRAppMaster appMaster = new TestMRApp(attemptId);
EventHandler<JobFinishEvent> handler =
appMaster.createJobFinishEventHandler();
handler.handle(new JobFinishEvent(jobid));
verify(fs).delete(stagingJobPath, true);
}
private class TestMRApp extends MRAppMaster {
public TestMRApp(ApplicationAttemptId applicationAttemptId) {
super(applicationAttemptId);
}
@Override
protected FileSystem getFileSystem(Configuration conf) {
return fs;
}
@Override
protected void sysexit() {
}
@Override
public Configuration getConfig() {
return conf;
}
}
}

View File

@ -1119,7 +1119,7 @@ protected void runJobCleanupTask(TaskUmbilicalProtocol umbilical,
// delete the staging area for the job
JobConf conf = new JobConf(jobContext.getConfiguration());
if (!keepTaskFiles(conf)) {
String jobTempDir = conf.get("mapreduce.job.dir");
String jobTempDir = conf.get(MRJobConfig.MAPREDUCE_JOB_DIR);
Path jobTempDirPath = new Path(jobTempDir);
FileSystem fs = jobTempDirPath.getFileSystem(conf);
fs.delete(jobTempDirPath, true);

View File

@ -341,7 +341,7 @@ JobStatus submitJobInternal(Job job, Cluster cluster)
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
JobStatus status = null;
try {
conf.set("mapreduce.job.dir", submitJobDir.toString());
conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());
LOG.debug("Configuring job " + jobId + " with " + submitJobDir
+ " as the submit dir");
// get delegation token for the dir

View File

@ -239,6 +239,8 @@ public interface MRJobConfig {
public static final String REDUCE_ULIMIT = "mapreduce.reduce.ulimit";
public static final String MAPREDUCE_JOB_DIR = "mapreduce.job.dir";
public static final String REDUCE_MAX_ATTEMPTS = "mapreduce.reduce.maxattempts";
public static final String SHUFFLE_PARALLEL_COPIES = "mapreduce.reduce.shuffle.parallelcopies";

View File

@ -31,7 +31,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.util.Records;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;