From ad7cf36d5fd99ecaf29e33d8de437e21f81a32d3 Mon Sep 17 00:00:00 2001
From: Eli Collins
Date: Tue, 12 Jul 2011 00:54:48 +0000
Subject: [PATCH] MAPREDUCE-2606. Remove IsolationRunner. Contributed by
Alejandro Abdelnur
git-svn-id: 13f79535-47bb-0310-9956-ffa450edef68
mapreduce/CHANGES.txt | 2 +
.../content/xdocs/mapred_tutorial.xml | 28 +--
.../src/documentation/content/xdocs/site.xml | 1 -
.../apache/hadoop/mapred/ | 235 ------------------
.../org/apache/hadoop/mapred/ | 17 --
.../java/org/apache/hadoop/mapred/ | 4 +-
.../hadoop/mapred/ | 194 ---------------
.../apache/hadoop/mapred/ | 84 ++++++-
8 files changed, 86 insertions(+), 479 deletions(-)
delete mode 100644 mapreduce/src/java/org/apache/hadoop/mapred/
delete mode 100644 mapreduce/src/test/mapred/org/apache/hadoop/mapred/
diff --git a/mapreduce/CHANGES.txt b/mapreduce/CHANGES.txt
index 59818aac84..28d4619129 100644
--- a/mapreduce/CHANGES.txt
+++ b/mapreduce/CHANGES.txt
@@ -9,6 +9,8 @@ Trunk (unreleased changes)
MAPREDUCE-2430. Remove mrunit contrib. (nigel via eli)
+ MAPREDUCE-2606. Remove IsolationRunner. (Alejandro Abdelnur via eli)
MAPREDUCE-2107. [Gridmix] Total heap usage emulation in Gridmix.
diff --git a/mapreduce/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml b/mapreduce/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml
index a8c806ae16..813acb1eda 100644
--- a/mapreduce/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml
+++ b/mapreduce/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml
@@ -552,8 +552,7 @@
and others.
Finally, we will wrap up by discussing some useful features of the
- framework such as the DistributedCache
- IsolationRunner
+ framework such as the
Profiling is a utility to get a representative (2 or 3) sample
diff --git a/mapreduce/src/docs/src/documentation/content/xdocs/site.xml b/mapreduce/src/docs/src/documentation/content/xdocs/site.xml
index c1f5e7792d..a88e883e7a 100644
--- a/mapreduce/src/docs/src/documentation/content/xdocs/site.xml
+++ b/mapreduce/src/docs/src/documentation/content/xdocs/site.xml
@@ -187,7 +187,6 @@ See for more info.
diff --git a/mapreduce/src/java/org/apache/hadoop/mapred/ b/mapreduce/src/java/org/apache/hadoop/mapred/
deleted file mode 100644
index 3de96ed888..0000000000
--- a/mapreduce/src/java/org/apache/hadoop/mapred/
+++ /dev/null
@@ -1,235 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.mapred;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.ipc.ProtocolSignature;
-import org.apache.hadoop.mapred.JvmTask;
-import org.apache.hadoop.mapreduce.MRConfig;
-import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
- * IsolationRunner is intended to facilitate debugging by re-running a specific
- * task, given left-over task files for a (typically failed) past job.
- * Currently, it is limited to re-running map tasks.
- *
- * Users may coerce MapReduce to keep task files around by setting
- * mapreduce.task.files.preserve.failedtasks. See mapred_tutorial.xml for more documentation.
- */
-public class IsolationRunner {
- private static final Log LOG =
- LogFactory.getLog(IsolationRunner.class.getName());
- static class FakeUmbilical implements TaskUmbilicalProtocol {
- public long getProtocolVersion(String protocol, long clientVersion) {
- return TaskUmbilicalProtocol.versionID;
- }
- @Override
- public ProtocolSignature getProtocolSignature(String protocol,
- long clientVersion, int clientMethodsHash) throws IOException {
- return ProtocolSignature.getProtocolSignature(
- this, protocol, clientVersion, clientMethodsHash);
- }
- public void done(TaskAttemptID taskid) throws IOException {
-"Task " + taskid + " reporting done.");
- }
- public void fsError(TaskAttemptID taskId, String message) throws IOException {
-"Task " + taskId + " reporting file system error: " + message);
- }
- public void shuffleError(TaskAttemptID taskId, String message) throws IOException {
-"Task " + taskId + " reporting shuffle error: " + message);
- }
- public void fatalError(TaskAttemptID taskId, String msg) throws IOException {
-"Task " + taskId + " reporting fatal error: " + msg);
- }
- public JvmTask getTask(JvmContext context) throws IOException {
- return null;
- }
- public boolean ping(TaskAttemptID taskid) throws IOException {
- return true;
- }
- public void commitPending(TaskAttemptID taskId, TaskStatus taskStatus)
- throws IOException, InterruptedException {
- statusUpdate(taskId, taskStatus);
- }
- public boolean canCommit(TaskAttemptID taskid) throws IOException {
- return true;
- }
- public boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus)
- throws IOException, InterruptedException {
- StringBuffer buf = new StringBuffer("Task ");
- buf.append(taskId);
- buf.append(" making progress to ");
- buf.append(taskStatus.getProgress());
- String state = taskStatus.getStateString();
- if (state != null) {
- buf.append(" and state of ");
- buf.append(state);
- }
- // ignore phase
- // ignore counters
- return true;
- }
- public void reportDiagnosticInfo(TaskAttemptID taskid, String trace) throws IOException {
-"Task " + taskid + " has problem " + trace);
- }
- public MapTaskCompletionEventsUpdate getMapCompletionEvents(JobID jobId,
- int fromEventId, int maxLocs, TaskAttemptID id) throws IOException {
- return new MapTaskCompletionEventsUpdate(TaskCompletionEvent.EMPTY_ARRAY,
- false);
- }
- public void reportNextRecordRange(TaskAttemptID taskid,
- SortedRanges.Range range) throws IOException {
-"Task " + taskid + " reportedNextRecordRange " + range);
- }
- }
- private ClassLoader makeClassLoader(JobConf conf,
- File workDir) throws IOException {
- List classPaths = new ArrayList();
- // Add jar clas files (includes lib/* and classes/*)
- String jar = conf.getJar();
- if (jar != null) {
- TaskRunner.appendJobJarClasspaths(conf.getJar(), classPaths);
- }
- // Add the workdir, too.
- classPaths.add(workDir.toString());
- // Note: does more, including DistributedCache files.
- // Convert to URLs
- URL[] urls = new URL[classPaths.size()];
- for (int i = 0; i < classPaths.size(); ++i) {
- urls[i] = new File(classPaths.get(i)).toURL();
- }
- return new URLClassLoader(urls);
- }
- /**
- * Main method.
- */
- boolean run(String[] args)
- throws ClassNotFoundException, IOException, InterruptedException {
- if (args.length < 1) {
- System.out.println("Usage: IsolationRunner /job.xml " +
- "");
- return false;
- }
- File jobFilename = new File(args[0]);
- if (!jobFilename.exists() || !jobFilename.isFile()) {
- System.out.println(jobFilename + " is not a valid job file.");
- return false;
- }
- String user;
- if (args.length > 1) {
- user = args[1];
- } else {
- user = UserGroupInformation.getCurrentUser().getShortUserName();
- }
- JobConf conf = new JobConf(new Path(jobFilename.toString()));
- conf.setUser(user);
- TaskAttemptID taskId = TaskAttemptID.forName(conf.get(JobContext.TASK_ATTEMPT_ID));
- if (taskId == null) {
- System.out.println(" not found in configuration;" +
- " job.xml is not a task config");
- }
- boolean isMap = conf.getBoolean(JobContext.TASK_ISMAP, true);
- if (!isMap) {
- System.out.println("Only map tasks are supported.");
- return false;
- }
- int partition = conf.getInt(JobContext.TASK_PARTITION, 0);
- // setup the local and user working directories
- FileSystem local = FileSystem.getLocal(conf);
- LocalDirAllocator lDirAlloc = new LocalDirAllocator(MRConfig.LOCAL_DIR);
- File workDirName = TaskRunner.formWorkDir(lDirAlloc, taskId, false, conf);
- local.setWorkingDirectory(new Path(workDirName.toString()));
- FileSystem.get(conf).setWorkingDirectory(conf.getWorkingDirectory());
- // set up a classloader with the right classpath
- ClassLoader classLoader =
- makeClassLoader(conf, new File(workDirName.toString()));
- Thread.currentThread().setContextClassLoader(classLoader);
- conf.setClassLoader(classLoader);
- // split.dta/split.meta files are used only by IsolationRunner.
- // The file can now be in any of the configured local disks,
- // so use LocalDirAllocator to find out where it is.
- Path localMetaSplit =
- new LocalDirAllocator(MRConfig.LOCAL_DIR).getLocalPathToRead(
- TaskTracker.getLocalSplitMetaFile(conf.getUser(),
- taskId.getJobID().toString(), taskId
- .toString()), conf);
- DataInputStream splitFile = FileSystem.getLocal(conf).open(localMetaSplit);
- TaskSplitIndex splitIndex = new TaskSplitIndex();
- splitIndex.readFields(splitFile);
- splitFile.close();
- Task task =
- new MapTask(jobFilename.toString(), taskId, partition, splitIndex, 1);
- task.setConf(conf);
-, new FakeUmbilical());
- return true;
- }
- /**
- * Run a single task.
- *
- * @param args the first argument is the task directory
- */
- public static void main(String[] args)
- throws ClassNotFoundException, IOException, InterruptedException {
- if (!new IsolationRunner().run(args)) {
- System.exit(1);
- }
- }
diff --git a/mapreduce/src/java/org/apache/hadoop/mapred/ b/mapreduce/src/java/org/apache/hadoop/mapred/
index 21599c2053..2265c38f15 100644
--- a/mapreduce/src/java/org/apache/hadoop/mapred/
+++ b/mapreduce/src/java/org/apache/hadoop/mapred/
@@ -106,23 +106,6 @@ class MapTask extends Task {
public void localizeConfiguration(JobConf conf)
throws IOException {
- // split.dta/ files are used only by IsolationRunner.
- // Write the split file to the local disk if it is a normal map task (not a
- // job-setup or a job-cleanup task) and if the user wishes to run
- // IsolationRunner either by setting keep.failed.tasks.files to true or by
- // using keep.tasks.files.pattern
- if (supportIsolationRunner(conf) && isMapOrReduce()) {
- // localize the split meta-information
- Path localSplitMeta =
- new LocalDirAllocator(MRConfig.LOCAL_DIR).getLocalPathForWrite(
- TaskTracker.getLocalSplitMetaFile(conf.getUser(),
- getJobID().toString(), getTaskID()
- .toString()), conf);
- LOG.debug("Writing local split to " + localSplitMeta);
- DataOutputStream out = FileSystem.getLocal(conf).create(localSplitMeta);
- splitMetaInfo.write(out);
- out.close();
- }
diff --git a/mapreduce/src/java/org/apache/hadoop/mapred/ b/mapreduce/src/java/org/apache/hadoop/mapred/
index ee2d2bf03e..3c0d3f3b10 100644
--- a/mapreduce/src/java/org/apache/hadoop/mapred/
+++ b/mapreduce/src/java/org/apache/hadoop/mapred/
@@ -1097,7 +1097,7 @@ abstract public class Task implements Writable, Configurable {
// delete the staging area for the job
JobConf conf = new JobConf(jobContext.getConfiguration());
- if (!supportIsolationRunner(conf)) {
+ if (!keepTaskFiles(conf)) {
String jobTempDir = conf.get("mapreduce.job.dir");
Path jobTempDirPath = new Path(jobTempDir);
FileSystem fs = jobTempDirPath.getFileSystem(conf);
@@ -1106,7 +1106,7 @@ abstract public class Task implements Writable, Configurable {
done(umbilical, reporter);
- protected boolean supportIsolationRunner(JobConf conf) {
+ protected boolean keepTaskFiles(JobConf conf) {
return (conf.getKeepTaskFilesPattern() != null || conf
diff --git a/mapreduce/src/test/mapred/org/apache/hadoop/mapred/ b/mapreduce/src/test/mapred/org/apache/hadoop/mapred/
deleted file mode 100644
index 53eb29da84..0000000000
--- a/mapreduce/src/test/mapred/org/apache/hadoop/mapred/
+++ /dev/null
@@ -1,194 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.mapred;
-import java.util.UUID;
-import junit.framework.TestCase;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.mapred.lib.IdentityMapper;
-import org.apache.hadoop.mapred.lib.IdentityReducer;
-import org.apache.hadoop.mapreduce.MRConfig;
-import org.apache.hadoop.mapreduce.TaskType;
- * Re-runs a map task using the IsolationRunner.
- *
- * The task included here is an identity mapper that touches
- * a file in a side-effect directory. This is used
- * to verify that the task in fact ran.
- */
-public class TestIsolationRunner extends TestCase {
- private static final String SIDE_EFFECT_DIR_PROPERTY =
- "test.isolationrunner.sideeffectdir";
- private static String TEST_ROOT_DIR = new File(System.getProperty(
- "", "/tmp")).toURI().toString().replace(' ', '+');
- /** Identity mapper that also creates a side effect file. */
- static class SideEffectMapper extends IdentityMapper {
- private JobConf conf;
- @Override
- public void configure(JobConf conf) {
- this.conf = conf;
- }
- @Override
- public void close() throws IOException {
- writeSideEffectFile(conf, "map");
- }
- }
- static class SideEffectReducer extends IdentityReducer {
- private JobConf conf;
- @Override
- public void configure(JobConf conf) {
- this.conf = conf;
- }
- @Override
- public void close() throws IOException {
- writeSideEffectFile(conf, "reduce");
- }
- }
- private static void deleteSideEffectFiles(JobConf conf) throws IOException {
- FileSystem localFs = FileSystem.getLocal(conf);
- localFs.delete(new Path(conf.get(SIDE_EFFECT_DIR_PROPERTY)), true);
- assertEquals(0, countSideEffectFiles(conf, ""));
- }
- private static void writeSideEffectFile(JobConf conf, String prefix)
- throws IOException {
- FileSystem localFs = FileSystem.getLocal(conf);
- Path sideEffectFile = new Path(conf.get(SIDE_EFFECT_DIR_PROPERTY),
- prefix + "-" + UUID.randomUUID().toString());
- localFs.create(sideEffectFile).close();
- }
- private static int countSideEffectFiles(JobConf conf, final String prefix)
- throws IOException {
- FileSystem localFs = FileSystem.getLocal(conf);
- try {
- FileStatus[] files = localFs.listStatus(
- new Path(conf.get(SIDE_EFFECT_DIR_PROPERTY)), new PathFilter() {
- @Override public boolean accept(Path path) {
- return path.getName().startsWith(prefix + "-");
- }
- });
- return files.length;
- } catch (FileNotFoundException fnfe) {
- return 0;
- }
- }
- private Path getAttemptJobXml(JobConf conf, JobID jobId, TaskType taskType)
- throws IOException,
- LoginException {
- String taskid =
- new TaskAttemptID(new TaskID(jobId, taskType, 0), 0).toString();
- return new LocalDirAllocator(MRConfig.LOCAL_DIR).getLocalPathToRead(
- TaskTracker.getTaskConfFile(UserGroupInformation.getCurrentUser()
- .getUserName(), jobId.toString(), taskid, false), conf);
- }
- public void testIsolationRunOfMapTask()
- throws IOException,
- InterruptedException,
- ClassNotFoundException,
- LoginException {
- MiniMRCluster mr = null;
- try {
- mr = new MiniMRCluster(1, "file:///", 4);
- // Run a job succesfully; keep task files.
- JobConf conf = mr.createJobConf();
- conf.setKeepTaskFilesPattern(".*");
- "/isolationrunnerjob/sideeffect");
- // Delete previous runs' data.
- deleteSideEffectFiles(conf);
- JobID jobId = runJobNormally(conf);
- assertEquals(1, countSideEffectFiles(conf, "map"));
- assertEquals(1, countSideEffectFiles(conf, "reduce"));
- deleteSideEffectFiles(conf);
- // Retrieve succesful job's configuration and
- // run IsolationRunner against the map task.
- FileSystem localFs = FileSystem.getLocal(conf);
- Path mapJobXml =
- getAttemptJobXml(
- mr.getTaskTrackerRunner(0).getTaskTracker().getJobConf(), jobId,
- TaskType.MAP).makeQualified(localFs);
- assertTrue(localFs.exists(mapJobXml));
- new IsolationRunner().run(new String[] {
- new File(mapJobXml.toUri()).getCanonicalPath() });
- assertEquals(1, countSideEffectFiles(conf, "map"));
- assertEquals(0, countSideEffectFiles(conf, "reduce"));
- // Clean up
- deleteSideEffectFiles(conf);
- } finally {
- if (mr != null) {
- mr.shutdown();
- }
- }
- }
- static JobID runJobNormally(JobConf conf) throws IOException {
- final Path inDir = new Path(TEST_ROOT_DIR + "/isolationrunnerjob/input");
- final Path outDir = new Path(TEST_ROOT_DIR + "/isolationrunnerjob/output");
- FileSystem fs = FileSystem.get(conf);
- fs.delete(outDir, true);
- if (!fs.exists(inDir)) {
- fs.mkdirs(inDir);
- }
- String input = "The quick brown fox jumps over lazy dog\n";
- DataOutputStream file = fs.create(new Path(inDir, "file"));
- file.writeBytes(input);
- file.close();
- conf.setInputFormat(TextInputFormat.class);
- conf.setMapperClass(SideEffectMapper.class);
- conf.setReducerClass(SideEffectReducer.class);
- FileInputFormat.setInputPaths(conf, inDir);
- FileOutputFormat.setOutputPath(conf, outDir);
- conf.setNumMapTasks(1);
- conf.setNumReduceTasks(1);
- JobClient jobClient = new JobClient(conf);
- RunningJob job = jobClient.submitJob(conf);
- job.waitForCompletion();
- return job.getID();
- }
diff --git a/mapreduce/src/test/mapred/org/apache/hadoop/mapred/ b/mapreduce/src/test/mapred/org/apache/hadoop/mapred/
index 372a5fca75..982d405221 100644
--- a/mapreduce/src/test/mapred/org/apache/hadoop/mapred/
+++ b/mapreduce/src/test/mapred/org/apache/hadoop/mapred/
@@ -30,6 +30,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
@@ -53,7 +54,6 @@ import org.apache.hadoop.util.ReflectionUtils;
* TestTaskReporter instead of TaskReporter and call
* Similar to LocalJobRunner, we set up splits and call
* directly. No job is run, only map task is run.
- * We use IsolationRunner.FakeUmbilical.
* As the reporter's setProgress() validates progress after
* every record is read, we are done with the validation of map phase progress
* once is finished. Sort phase progress in map task is not
@@ -63,12 +63,90 @@ public class TestMapProgress extends TestCase {
public static final Log LOG = LogFactory.getLog(TestMapProgress.class);
private static String TEST_ROOT_DIR = new File(System.getProperty(
"", "/tmp")).getAbsolutePath() + "/mapPahseprogress";
+ static class FakeUmbilical implements TaskUmbilicalProtocol {
+ public long getProtocolVersion(String protocol, long clientVersion) {
+ return TaskUmbilicalProtocol.versionID;
+ }
+ @Override
+ public ProtocolSignature getProtocolSignature(String protocol,
+ long clientVersion, int clientMethodsHash) throws IOException {
+ return ProtocolSignature.getProtocolSignature(
+ this, protocol, clientVersion, clientMethodsHash);
+ }
+ public void done(TaskAttemptID taskid) throws IOException {
+"Task " + taskid + " reporting done.");
+ }
+ public void fsError(TaskAttemptID taskId, String message) throws IOException {
+"Task " + taskId + " reporting file system error: " + message);
+ }
+ public void shuffleError(TaskAttemptID taskId, String message) throws IOException {
+"Task " + taskId + " reporting shuffle error: " + message);
+ }
+ public void fatalError(TaskAttemptID taskId, String msg) throws IOException {
+"Task " + taskId + " reporting fatal error: " + msg);
+ }
+ public JvmTask getTask(JvmContext context) throws IOException {
+ return null;
+ }
+ public boolean ping(TaskAttemptID taskid) throws IOException {
+ return true;
+ }
+ public void commitPending(TaskAttemptID taskId, TaskStatus taskStatus)
+ throws IOException, InterruptedException {
+ statusUpdate(taskId, taskStatus);
+ }
+ public boolean canCommit(TaskAttemptID taskid) throws IOException {
+ return true;
+ }
+ public boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus)
+ throws IOException, InterruptedException {
+ StringBuffer buf = new StringBuffer("Task ");
+ buf.append(taskId);
+ buf.append(" making progress to ");
+ buf.append(taskStatus.getProgress());
+ String state = taskStatus.getStateString();
+ if (state != null) {
+ buf.append(" and state of ");
+ buf.append(state);
+ }
+ // ignore phase
+ // ignore counters
+ return true;
+ }
+ public void reportDiagnosticInfo(TaskAttemptID taskid, String trace) throws IOException {
+"Task " + taskid + " has problem " + trace);
+ }
+ public MapTaskCompletionEventsUpdate getMapCompletionEvents(JobID jobId,
+ int fromEventId, int maxLocs, TaskAttemptID id) throws IOException {
+ return new MapTaskCompletionEventsUpdate(TaskCompletionEvent.EMPTY_ARRAY,
+ false);
+ }
+ public void reportNextRecordRange(TaskAttemptID taskid,
+ SortedRanges.Range range) throws IOException {
+"Task " + taskid + " reportedNextRecordRange " + range);
+ }
+ }
private FileSystem fs = null;
private TestMapTask map = null;
private JobID jobId = null;
- private IsolationRunner.FakeUmbilical fakeUmbilical =
- new IsolationRunner.FakeUmbilical();
+ private FakeUmbilical fakeUmbilical = new FakeUmbilical();
* Task Reporter that validates map phase progress after each record is