MAPREDUCE-2606. Remove IsolationRunner. Contributed by Alejandro Abdelnur
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1145413 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
9e7e69e666
commit
ad7cf36d5f
@ -9,6 +9,8 @@ Trunk (unreleased changes)
|
|||||||
|
|
||||||
MAPREDUCE-2430. Remove mrunit contrib. (nigel via eli)
|
MAPREDUCE-2430. Remove mrunit contrib. (nigel via eli)
|
||||||
|
|
||||||
|
MAPREDUCE-2606. Remove IsolationRunner. (Alejandro Abdelnur via eli)
|
||||||
|
|
||||||
NEW FEATURES
|
NEW FEATURES
|
||||||
|
|
||||||
MAPREDUCE-2107. [Gridmix] Total heap usage emulation in Gridmix.
|
MAPREDUCE-2107. [Gridmix] Total heap usage emulation in Gridmix.
|
||||||
|
@ -552,8 +552,7 @@
|
|||||||
and others.</p>
|
and others.</p>
|
||||||
|
|
||||||
<p>Finally, we will wrap up by discussing some useful features of the
|
<p>Finally, we will wrap up by discussing some useful features of the
|
||||||
framework such as the <code>DistributedCache</code>,
|
framework such as the <code>DistributedCache</code>.
|
||||||
<code>IsolationRunner</code> etc.</p>
|
|
||||||
|
|
||||||
<section>
|
<section>
|
||||||
<title>Payload</title>
|
<title>Payload</title>
|
||||||
@ -2307,31 +2306,6 @@
|
|||||||
</p>
|
</p>
|
||||||
</section>
|
</section>
|
||||||
|
|
||||||
<section>
|
|
||||||
<title>IsolationRunner</title>
|
|
||||||
|
|
||||||
<p><a href="ext:api/org/apache/hadoop/mapred/isolationrunner">
|
|
||||||
IsolationRunner</a> is a utility to help debug MapReduce programs.</p>
|
|
||||||
|
|
||||||
<p>To use the <code>IsolationRunner</code>, first set
|
|
||||||
<code>keep.failed.tasks.files</code> to <code>true</code>
|
|
||||||
(also see <code>keep.tasks.files.pattern</code>).</p>
|
|
||||||
|
|
||||||
<p>
|
|
||||||
Next, go to the node on which the failed task ran and go to the
|
|
||||||
<code>TaskTracker</code>'s local directory and run the
|
|
||||||
<code>IsolationRunner</code>:<br/>
|
|
||||||
<code>$ cd <local path>
|
|
||||||
/taskTracker/$user/jobcache/$jobid/${taskid}/work</code><br/>
|
|
||||||
<code>
|
|
||||||
$ bin/hadoop org.apache.hadoop.mapred.IsolationRunner ../job.xml
|
|
||||||
</code>
|
|
||||||
</p>
|
|
||||||
|
|
||||||
<p><code>IsolationRunner</code> will run the failed task in a single
|
|
||||||
jvm, which can be in the debugger, over precisely the same input.</p>
|
|
||||||
</section>
|
|
||||||
|
|
||||||
<section>
|
<section>
|
||||||
<title>Profiling</title>
|
<title>Profiling</title>
|
||||||
<p>Profiling is a utility to get a representative (2 or 3) sample
|
<p>Profiling is a utility to get a representative (2 or 3) sample
|
||||||
|
@ -187,7 +187,6 @@ See http://forrest.apache.org/docs/linking.html for more info.
|
|||||||
<filesplit href="FileSplit.html" />
|
<filesplit href="FileSplit.html" />
|
||||||
<inputformat href="InputFormat.html" />
|
<inputformat href="InputFormat.html" />
|
||||||
<inputsplit href="InputSplit.html" />
|
<inputsplit href="InputSplit.html" />
|
||||||
<isolationrunner href="IsolationRunner.html" />
|
|
||||||
<jobclient href="JobClient.html">
|
<jobclient href="JobClient.html">
|
||||||
<runjob href="#runJob(org.apache.hadoop.mapred.JobConf)" />
|
<runjob href="#runJob(org.apache.hadoop.mapred.JobConf)" />
|
||||||
<submitjob href="#submitJob(org.apache.hadoop.mapred.JobConf)" />
|
<submitjob href="#submitJob(org.apache.hadoop.mapred.JobConf)" />
|
||||||
|
@ -1,235 +0,0 @@
|
|||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.mapred;
|
|
||||||
|
|
||||||
import java.io.DataInputStream;
|
|
||||||
import java.io.File;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.net.URL;
|
|
||||||
import java.net.URLClassLoader;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
|
||||||
import org.apache.hadoop.fs.LocalDirAllocator;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
|
||||||
import org.apache.hadoop.ipc.ProtocolSignature;
|
|
||||||
import org.apache.hadoop.mapred.JvmTask;
|
|
||||||
import org.apache.hadoop.mapreduce.MRConfig;
|
|
||||||
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* IsolationRunner is intended to facilitate debugging by re-running a specific
|
|
||||||
* task, given left-over task files for a (typically failed) past job.
|
|
||||||
* Currently, it is limited to re-running map tasks.
|
|
||||||
*
|
|
||||||
* Users may coerce MapReduce to keep task files around by setting
|
|
||||||
* mapreduce.task.files.preserve.failedtasks. See mapred_tutorial.xml for more documentation.
|
|
||||||
*/
|
|
||||||
@InterfaceAudience.Public
|
|
||||||
@InterfaceStability.Evolving
|
|
||||||
public class IsolationRunner {
|
|
||||||
private static final Log LOG =
|
|
||||||
LogFactory.getLog(IsolationRunner.class.getName());
|
|
||||||
|
|
||||||
static class FakeUmbilical implements TaskUmbilicalProtocol {
|
|
||||||
|
|
||||||
public long getProtocolVersion(String protocol, long clientVersion) {
|
|
||||||
return TaskUmbilicalProtocol.versionID;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ProtocolSignature getProtocolSignature(String protocol,
|
|
||||||
long clientVersion, int clientMethodsHash) throws IOException {
|
|
||||||
return ProtocolSignature.getProtocolSignature(
|
|
||||||
this, protocol, clientVersion, clientMethodsHash);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void done(TaskAttemptID taskid) throws IOException {
|
|
||||||
LOG.info("Task " + taskid + " reporting done.");
|
|
||||||
}
|
|
||||||
|
|
||||||
public void fsError(TaskAttemptID taskId, String message) throws IOException {
|
|
||||||
LOG.info("Task " + taskId + " reporting file system error: " + message);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void shuffleError(TaskAttemptID taskId, String message) throws IOException {
|
|
||||||
LOG.info("Task " + taskId + " reporting shuffle error: " + message);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void fatalError(TaskAttemptID taskId, String msg) throws IOException {
|
|
||||||
LOG.info("Task " + taskId + " reporting fatal error: " + msg);
|
|
||||||
}
|
|
||||||
|
|
||||||
public JvmTask getTask(JvmContext context) throws IOException {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean ping(TaskAttemptID taskid) throws IOException {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void commitPending(TaskAttemptID taskId, TaskStatus taskStatus)
|
|
||||||
throws IOException, InterruptedException {
|
|
||||||
statusUpdate(taskId, taskStatus);
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean canCommit(TaskAttemptID taskid) throws IOException {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus)
|
|
||||||
throws IOException, InterruptedException {
|
|
||||||
StringBuffer buf = new StringBuffer("Task ");
|
|
||||||
buf.append(taskId);
|
|
||||||
buf.append(" making progress to ");
|
|
||||||
buf.append(taskStatus.getProgress());
|
|
||||||
String state = taskStatus.getStateString();
|
|
||||||
if (state != null) {
|
|
||||||
buf.append(" and state of ");
|
|
||||||
buf.append(state);
|
|
||||||
}
|
|
||||||
LOG.info(buf.toString());
|
|
||||||
// ignore phase
|
|
||||||
// ignore counters
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void reportDiagnosticInfo(TaskAttemptID taskid, String trace) throws IOException {
|
|
||||||
LOG.info("Task " + taskid + " has problem " + trace);
|
|
||||||
}
|
|
||||||
|
|
||||||
public MapTaskCompletionEventsUpdate getMapCompletionEvents(JobID jobId,
|
|
||||||
int fromEventId, int maxLocs, TaskAttemptID id) throws IOException {
|
|
||||||
return new MapTaskCompletionEventsUpdate(TaskCompletionEvent.EMPTY_ARRAY,
|
|
||||||
false);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void reportNextRecordRange(TaskAttemptID taskid,
|
|
||||||
SortedRanges.Range range) throws IOException {
|
|
||||||
LOG.info("Task " + taskid + " reportedNextRecordRange " + range);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private ClassLoader makeClassLoader(JobConf conf,
|
|
||||||
File workDir) throws IOException {
|
|
||||||
List<String> classPaths = new ArrayList<String>();
|
|
||||||
// Add jar clas files (includes lib/* and classes/*)
|
|
||||||
String jar = conf.getJar();
|
|
||||||
if (jar != null) {
|
|
||||||
TaskRunner.appendJobJarClasspaths(conf.getJar(), classPaths);
|
|
||||||
}
|
|
||||||
// Add the workdir, too.
|
|
||||||
classPaths.add(workDir.toString());
|
|
||||||
// Note: TaskRunner.run() does more, including DistributedCache files.
|
|
||||||
|
|
||||||
// Convert to URLs
|
|
||||||
URL[] urls = new URL[classPaths.size()];
|
|
||||||
for (int i = 0; i < classPaths.size(); ++i) {
|
|
||||||
urls[i] = new File(classPaths.get(i)).toURL();
|
|
||||||
}
|
|
||||||
return new URLClassLoader(urls);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Main method.
|
|
||||||
*/
|
|
||||||
boolean run(String[] args)
|
|
||||||
throws ClassNotFoundException, IOException, InterruptedException {
|
|
||||||
if (args.length < 1) {
|
|
||||||
System.out.println("Usage: IsolationRunner <path>/job.xml " +
|
|
||||||
"<optional-user-name>");
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
File jobFilename = new File(args[0]);
|
|
||||||
if (!jobFilename.exists() || !jobFilename.isFile()) {
|
|
||||||
System.out.println(jobFilename + " is not a valid job file.");
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
String user;
|
|
||||||
if (args.length > 1) {
|
|
||||||
user = args[1];
|
|
||||||
} else {
|
|
||||||
user = UserGroupInformation.getCurrentUser().getShortUserName();
|
|
||||||
}
|
|
||||||
JobConf conf = new JobConf(new Path(jobFilename.toString()));
|
|
||||||
conf.setUser(user);
|
|
||||||
TaskAttemptID taskId = TaskAttemptID.forName(conf.get(JobContext.TASK_ATTEMPT_ID));
|
|
||||||
if (taskId == null) {
|
|
||||||
System.out.println("mapreduce.task.attempt.id not found in configuration;" +
|
|
||||||
" job.xml is not a task config");
|
|
||||||
}
|
|
||||||
boolean isMap = conf.getBoolean(JobContext.TASK_ISMAP, true);
|
|
||||||
if (!isMap) {
|
|
||||||
System.out.println("Only map tasks are supported.");
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
int partition = conf.getInt(JobContext.TASK_PARTITION, 0);
|
|
||||||
|
|
||||||
// setup the local and user working directories
|
|
||||||
FileSystem local = FileSystem.getLocal(conf);
|
|
||||||
LocalDirAllocator lDirAlloc = new LocalDirAllocator(MRConfig.LOCAL_DIR);
|
|
||||||
|
|
||||||
File workDirName = TaskRunner.formWorkDir(lDirAlloc, taskId, false, conf);
|
|
||||||
local.setWorkingDirectory(new Path(workDirName.toString()));
|
|
||||||
FileSystem.get(conf).setWorkingDirectory(conf.getWorkingDirectory());
|
|
||||||
|
|
||||||
// set up a classloader with the right classpath
|
|
||||||
ClassLoader classLoader =
|
|
||||||
makeClassLoader(conf, new File(workDirName.toString()));
|
|
||||||
Thread.currentThread().setContextClassLoader(classLoader);
|
|
||||||
conf.setClassLoader(classLoader);
|
|
||||||
|
|
||||||
// split.dta/split.meta files are used only by IsolationRunner.
|
|
||||||
// The file can now be in any of the configured local disks,
|
|
||||||
// so use LocalDirAllocator to find out where it is.
|
|
||||||
Path localMetaSplit =
|
|
||||||
new LocalDirAllocator(MRConfig.LOCAL_DIR).getLocalPathToRead(
|
|
||||||
TaskTracker.getLocalSplitMetaFile(conf.getUser(),
|
|
||||||
taskId.getJobID().toString(), taskId
|
|
||||||
.toString()), conf);
|
|
||||||
DataInputStream splitFile = FileSystem.getLocal(conf).open(localMetaSplit);
|
|
||||||
TaskSplitIndex splitIndex = new TaskSplitIndex();
|
|
||||||
splitIndex.readFields(splitFile);
|
|
||||||
splitFile.close();
|
|
||||||
|
|
||||||
Task task =
|
|
||||||
new MapTask(jobFilename.toString(), taskId, partition, splitIndex, 1);
|
|
||||||
task.setConf(conf);
|
|
||||||
task.run(conf, new FakeUmbilical());
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Run a single task.
|
|
||||||
*
|
|
||||||
* @param args the first argument is the task directory
|
|
||||||
*/
|
|
||||||
public static void main(String[] args)
|
|
||||||
throws ClassNotFoundException, IOException, InterruptedException {
|
|
||||||
if (!new IsolationRunner().run(args)) {
|
|
||||||
System.exit(1);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -106,23 +106,6 @@ class MapTask extends Task {
|
|||||||
public void localizeConfiguration(JobConf conf)
|
public void localizeConfiguration(JobConf conf)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
super.localizeConfiguration(conf);
|
super.localizeConfiguration(conf);
|
||||||
// split.dta/split.info files are used only by IsolationRunner.
|
|
||||||
// Write the split file to the local disk if it is a normal map task (not a
|
|
||||||
// job-setup or a job-cleanup task) and if the user wishes to run
|
|
||||||
// IsolationRunner either by setting keep.failed.tasks.files to true or by
|
|
||||||
// using keep.tasks.files.pattern
|
|
||||||
if (supportIsolationRunner(conf) && isMapOrReduce()) {
|
|
||||||
// localize the split meta-information
|
|
||||||
Path localSplitMeta =
|
|
||||||
new LocalDirAllocator(MRConfig.LOCAL_DIR).getLocalPathForWrite(
|
|
||||||
TaskTracker.getLocalSplitMetaFile(conf.getUser(),
|
|
||||||
getJobID().toString(), getTaskID()
|
|
||||||
.toString()), conf);
|
|
||||||
LOG.debug("Writing local split to " + localSplitMeta);
|
|
||||||
DataOutputStream out = FileSystem.getLocal(conf).create(localSplitMeta);
|
|
||||||
splitMetaInfo.write(out);
|
|
||||||
out.close();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -1097,7 +1097,7 @@ abstract public class Task implements Writable, Configurable {
|
|||||||
|
|
||||||
// delete the staging area for the job
|
// delete the staging area for the job
|
||||||
JobConf conf = new JobConf(jobContext.getConfiguration());
|
JobConf conf = new JobConf(jobContext.getConfiguration());
|
||||||
if (!supportIsolationRunner(conf)) {
|
if (!keepTaskFiles(conf)) {
|
||||||
String jobTempDir = conf.get("mapreduce.job.dir");
|
String jobTempDir = conf.get("mapreduce.job.dir");
|
||||||
Path jobTempDirPath = new Path(jobTempDir);
|
Path jobTempDirPath = new Path(jobTempDir);
|
||||||
FileSystem fs = jobTempDirPath.getFileSystem(conf);
|
FileSystem fs = jobTempDirPath.getFileSystem(conf);
|
||||||
@ -1106,7 +1106,7 @@ abstract public class Task implements Writable, Configurable {
|
|||||||
done(umbilical, reporter);
|
done(umbilical, reporter);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected boolean supportIsolationRunner(JobConf conf) {
|
protected boolean keepTaskFiles(JobConf conf) {
|
||||||
return (conf.getKeepTaskFilesPattern() != null || conf
|
return (conf.getKeepTaskFilesPattern() != null || conf
|
||||||
.getKeepFailedTaskFiles());
|
.getKeepFailedTaskFiles());
|
||||||
}
|
}
|
||||||
|
@ -1,194 +0,0 @@
|
|||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.mapred;
|
|
||||||
|
|
||||||
import java.io.DataOutputStream;
|
|
||||||
import java.io.File;
|
|
||||||
import java.io.FileNotFoundException;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.UUID;
|
|
||||||
|
|
||||||
import javax.security.auth.login.LoginException;
|
|
||||||
|
|
||||||
import junit.framework.TestCase;
|
|
||||||
|
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
|
||||||
import org.apache.hadoop.fs.LocalDirAllocator;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
|
||||||
import org.apache.hadoop.fs.PathFilter;
|
|
||||||
import org.apache.hadoop.mapred.lib.IdentityMapper;
|
|
||||||
import org.apache.hadoop.mapred.lib.IdentityReducer;
|
|
||||||
import org.apache.hadoop.mapreduce.MRConfig;
|
|
||||||
import org.apache.hadoop.mapreduce.TaskType;
|
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Re-runs a map task using the IsolationRunner.
|
|
||||||
*
|
|
||||||
* The task included here is an identity mapper that touches
|
|
||||||
* a file in a side-effect directory. This is used
|
|
||||||
* to verify that the task in fact ran.
|
|
||||||
*/
|
|
||||||
public class TestIsolationRunner extends TestCase {
|
|
||||||
|
|
||||||
private static final String SIDE_EFFECT_DIR_PROPERTY =
|
|
||||||
"test.isolationrunner.sideeffectdir";
|
|
||||||
private static String TEST_ROOT_DIR = new File(System.getProperty(
|
|
||||||
"test.build.data", "/tmp")).toURI().toString().replace(' ', '+');
|
|
||||||
|
|
||||||
/** Identity mapper that also creates a side effect file. */
|
|
||||||
static class SideEffectMapper<K, V> extends IdentityMapper<K, V> {
|
|
||||||
private JobConf conf;
|
|
||||||
@Override
|
|
||||||
public void configure(JobConf conf) {
|
|
||||||
this.conf = conf;
|
|
||||||
}
|
|
||||||
@Override
|
|
||||||
public void close() throws IOException {
|
|
||||||
writeSideEffectFile(conf, "map");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static class SideEffectReducer<K, V> extends IdentityReducer<K, V> {
|
|
||||||
private JobConf conf;
|
|
||||||
@Override
|
|
||||||
public void configure(JobConf conf) {
|
|
||||||
this.conf = conf;
|
|
||||||
}
|
|
||||||
@Override
|
|
||||||
public void close() throws IOException {
|
|
||||||
writeSideEffectFile(conf, "reduce");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void deleteSideEffectFiles(JobConf conf) throws IOException {
|
|
||||||
FileSystem localFs = FileSystem.getLocal(conf);
|
|
||||||
localFs.delete(new Path(conf.get(SIDE_EFFECT_DIR_PROPERTY)), true);
|
|
||||||
assertEquals(0, countSideEffectFiles(conf, ""));
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void writeSideEffectFile(JobConf conf, String prefix)
|
|
||||||
throws IOException {
|
|
||||||
FileSystem localFs = FileSystem.getLocal(conf);
|
|
||||||
Path sideEffectFile = new Path(conf.get(SIDE_EFFECT_DIR_PROPERTY),
|
|
||||||
prefix + "-" + UUID.randomUUID().toString());
|
|
||||||
localFs.create(sideEffectFile).close();
|
|
||||||
}
|
|
||||||
|
|
||||||
private static int countSideEffectFiles(JobConf conf, final String prefix)
|
|
||||||
throws IOException {
|
|
||||||
FileSystem localFs = FileSystem.getLocal(conf);
|
|
||||||
try {
|
|
||||||
FileStatus[] files = localFs.listStatus(
|
|
||||||
new Path(conf.get(SIDE_EFFECT_DIR_PROPERTY)), new PathFilter() {
|
|
||||||
@Override public boolean accept(Path path) {
|
|
||||||
return path.getName().startsWith(prefix + "-");
|
|
||||||
}
|
|
||||||
});
|
|
||||||
return files.length;
|
|
||||||
} catch (FileNotFoundException fnfe) {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private Path getAttemptJobXml(JobConf conf, JobID jobId, TaskType taskType)
|
|
||||||
throws IOException,
|
|
||||||
LoginException {
|
|
||||||
String taskid =
|
|
||||||
new TaskAttemptID(new TaskID(jobId, taskType, 0), 0).toString();
|
|
||||||
return new LocalDirAllocator(MRConfig.LOCAL_DIR).getLocalPathToRead(
|
|
||||||
TaskTracker.getTaskConfFile(UserGroupInformation.getCurrentUser()
|
|
||||||
.getUserName(), jobId.toString(), taskid, false), conf);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testIsolationRunOfMapTask()
|
|
||||||
throws IOException,
|
|
||||||
InterruptedException,
|
|
||||||
ClassNotFoundException,
|
|
||||||
LoginException {
|
|
||||||
MiniMRCluster mr = null;
|
|
||||||
try {
|
|
||||||
mr = new MiniMRCluster(1, "file:///", 4);
|
|
||||||
|
|
||||||
// Run a job succesfully; keep task files.
|
|
||||||
JobConf conf = mr.createJobConf();
|
|
||||||
conf.setKeepTaskFilesPattern(".*");
|
|
||||||
conf.set(SIDE_EFFECT_DIR_PROPERTY, TEST_ROOT_DIR +
|
|
||||||
"/isolationrunnerjob/sideeffect");
|
|
||||||
// Delete previous runs' data.
|
|
||||||
deleteSideEffectFiles(conf);
|
|
||||||
JobID jobId = runJobNormally(conf);
|
|
||||||
assertEquals(1, countSideEffectFiles(conf, "map"));
|
|
||||||
assertEquals(1, countSideEffectFiles(conf, "reduce"));
|
|
||||||
|
|
||||||
deleteSideEffectFiles(conf);
|
|
||||||
|
|
||||||
// Retrieve succesful job's configuration and
|
|
||||||
// run IsolationRunner against the map task.
|
|
||||||
FileSystem localFs = FileSystem.getLocal(conf);
|
|
||||||
Path mapJobXml =
|
|
||||||
getAttemptJobXml(
|
|
||||||
mr.getTaskTrackerRunner(0).getTaskTracker().getJobConf(), jobId,
|
|
||||||
TaskType.MAP).makeQualified(localFs);
|
|
||||||
assertTrue(localFs.exists(mapJobXml));
|
|
||||||
|
|
||||||
new IsolationRunner().run(new String[] {
|
|
||||||
new File(mapJobXml.toUri()).getCanonicalPath() });
|
|
||||||
|
|
||||||
assertEquals(1, countSideEffectFiles(conf, "map"));
|
|
||||||
assertEquals(0, countSideEffectFiles(conf, "reduce"));
|
|
||||||
|
|
||||||
// Clean up
|
|
||||||
deleteSideEffectFiles(conf);
|
|
||||||
} finally {
|
|
||||||
if (mr != null) {
|
|
||||||
mr.shutdown();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static JobID runJobNormally(JobConf conf) throws IOException {
|
|
||||||
final Path inDir = new Path(TEST_ROOT_DIR + "/isolationrunnerjob/input");
|
|
||||||
final Path outDir = new Path(TEST_ROOT_DIR + "/isolationrunnerjob/output");
|
|
||||||
|
|
||||||
FileSystem fs = FileSystem.get(conf);
|
|
||||||
fs.delete(outDir, true);
|
|
||||||
if (!fs.exists(inDir)) {
|
|
||||||
fs.mkdirs(inDir);
|
|
||||||
}
|
|
||||||
String input = "The quick brown fox jumps over lazy dog\n";
|
|
||||||
DataOutputStream file = fs.create(new Path(inDir, "file"));
|
|
||||||
file.writeBytes(input);
|
|
||||||
file.close();
|
|
||||||
|
|
||||||
conf.setInputFormat(TextInputFormat.class);
|
|
||||||
conf.setMapperClass(SideEffectMapper.class);
|
|
||||||
conf.setReducerClass(SideEffectReducer.class);
|
|
||||||
|
|
||||||
FileInputFormat.setInputPaths(conf, inDir);
|
|
||||||
FileOutputFormat.setOutputPath(conf, outDir);
|
|
||||||
conf.setNumMapTasks(1);
|
|
||||||
conf.setNumReduceTasks(1);
|
|
||||||
|
|
||||||
JobClient jobClient = new JobClient(conf);
|
|
||||||
RunningJob job = jobClient.submitJob(conf);
|
|
||||||
job.waitForCompletion();
|
|
||||||
return job.getID();
|
|
||||||
}
|
|
||||||
}
|
|
@ -30,6 +30,7 @@ import org.apache.commons.logging.LogFactory;
|
|||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.io.BytesWritable;
|
import org.apache.hadoop.io.BytesWritable;
|
||||||
|
import org.apache.hadoop.ipc.ProtocolSignature;
|
||||||
import org.apache.hadoop.mapreduce.InputFormat;
|
import org.apache.hadoop.mapreduce.InputFormat;
|
||||||
import org.apache.hadoop.mapreduce.InputSplit;
|
import org.apache.hadoop.mapreduce.InputSplit;
|
||||||
import org.apache.hadoop.mapreduce.Job;
|
import org.apache.hadoop.mapreduce.Job;
|
||||||
@ -53,7 +54,6 @@ import org.apache.hadoop.util.ReflectionUtils;
|
|||||||
* TestTaskReporter instead of TaskReporter and call mapTask.run().
|
* TestTaskReporter instead of TaskReporter and call mapTask.run().
|
||||||
* Similar to LocalJobRunner, we set up splits and call mapTask.run()
|
* Similar to LocalJobRunner, we set up splits and call mapTask.run()
|
||||||
* directly. No job is run, only map task is run.
|
* directly. No job is run, only map task is run.
|
||||||
* We use IsolationRunner.FakeUmbilical.
|
|
||||||
* As the reporter's setProgress() validates progress after
|
* As the reporter's setProgress() validates progress after
|
||||||
* every record is read, we are done with the validation of map phase progress
|
* every record is read, we are done with the validation of map phase progress
|
||||||
* once mapTask.run() is finished. Sort phase progress in map task is not
|
* once mapTask.run() is finished. Sort phase progress in map task is not
|
||||||
@ -63,12 +63,90 @@ public class TestMapProgress extends TestCase {
|
|||||||
public static final Log LOG = LogFactory.getLog(TestMapProgress.class);
|
public static final Log LOG = LogFactory.getLog(TestMapProgress.class);
|
||||||
private static String TEST_ROOT_DIR = new File(System.getProperty(
|
private static String TEST_ROOT_DIR = new File(System.getProperty(
|
||||||
"test.build.data", "/tmp")).getAbsolutePath() + "/mapPahseprogress";
|
"test.build.data", "/tmp")).getAbsolutePath() + "/mapPahseprogress";
|
||||||
|
|
||||||
|
static class FakeUmbilical implements TaskUmbilicalProtocol {
|
||||||
|
|
||||||
|
public long getProtocolVersion(String protocol, long clientVersion) {
|
||||||
|
return TaskUmbilicalProtocol.versionID;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ProtocolSignature getProtocolSignature(String protocol,
|
||||||
|
long clientVersion, int clientMethodsHash) throws IOException {
|
||||||
|
return ProtocolSignature.getProtocolSignature(
|
||||||
|
this, protocol, clientVersion, clientMethodsHash);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void done(TaskAttemptID taskid) throws IOException {
|
||||||
|
LOG.info("Task " + taskid + " reporting done.");
|
||||||
|
}
|
||||||
|
|
||||||
|
public void fsError(TaskAttemptID taskId, String message) throws IOException {
|
||||||
|
LOG.info("Task " + taskId + " reporting file system error: " + message);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void shuffleError(TaskAttemptID taskId, String message) throws IOException {
|
||||||
|
LOG.info("Task " + taskId + " reporting shuffle error: " + message);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void fatalError(TaskAttemptID taskId, String msg) throws IOException {
|
||||||
|
LOG.info("Task " + taskId + " reporting fatal error: " + msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
public JvmTask getTask(JvmContext context) throws IOException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean ping(TaskAttemptID taskid) throws IOException {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void commitPending(TaskAttemptID taskId, TaskStatus taskStatus)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
statusUpdate(taskId, taskStatus);
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean canCommit(TaskAttemptID taskid) throws IOException {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
StringBuffer buf = new StringBuffer("Task ");
|
||||||
|
buf.append(taskId);
|
||||||
|
buf.append(" making progress to ");
|
||||||
|
buf.append(taskStatus.getProgress());
|
||||||
|
String state = taskStatus.getStateString();
|
||||||
|
if (state != null) {
|
||||||
|
buf.append(" and state of ");
|
||||||
|
buf.append(state);
|
||||||
|
}
|
||||||
|
LOG.info(buf.toString());
|
||||||
|
// ignore phase
|
||||||
|
// ignore counters
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void reportDiagnosticInfo(TaskAttemptID taskid, String trace) throws IOException {
|
||||||
|
LOG.info("Task " + taskid + " has problem " + trace);
|
||||||
|
}
|
||||||
|
|
||||||
|
public MapTaskCompletionEventsUpdate getMapCompletionEvents(JobID jobId,
|
||||||
|
int fromEventId, int maxLocs, TaskAttemptID id) throws IOException {
|
||||||
|
return new MapTaskCompletionEventsUpdate(TaskCompletionEvent.EMPTY_ARRAY,
|
||||||
|
false);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void reportNextRecordRange(TaskAttemptID taskid,
|
||||||
|
SortedRanges.Range range) throws IOException {
|
||||||
|
LOG.info("Task " + taskid + " reportedNextRecordRange " + range);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private FileSystem fs = null;
|
private FileSystem fs = null;
|
||||||
private TestMapTask map = null;
|
private TestMapTask map = null;
|
||||||
private JobID jobId = null;
|
private JobID jobId = null;
|
||||||
private IsolationRunner.FakeUmbilical fakeUmbilical =
|
private FakeUmbilical fakeUmbilical = new FakeUmbilical();
|
||||||
new IsolationRunner.FakeUmbilical();
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Task Reporter that validates map phase progress after each record is
|
* Task Reporter that validates map phase progress after each record is
|
||||||
|
Loading…
x
Reference in New Issue
Block a user