MAPREDUCE-6248. Exposed the internal MapReduce job's information as a public API in DistCp. Contributed by Jing Zhao.

This commit is contained in:
Vinod Kumar Vavilapalli 2015-03-03 16:28:22 -08:00
parent b2f1ec312e
commit 5af693fde2
2 changed files with 39 additions and 11 deletions

View File

@ -320,6 +320,9 @@ Release 2.7.0 - UNRELEASED
MAPREDUCE-5612. Add javadoc for TaskCompletionEvent.Status. MAPREDUCE-5612. Add javadoc for TaskCompletionEvent.Status.
(Chris Palmer via aajisaka) (Chris Palmer via aajisaka)
MAPREDUCE-6248. Exposed the internal MapReduce job's information as a public
API in DistCp. (Jing Zhao via vinodkv)
OPTIMIZATIONS OPTIMIZATIONS
MAPREDUCE-6169. MergeQueue should release reference to the current item MAPREDUCE-6169. MergeQueue should release reference to the current item

View File

@ -20,6 +20,8 @@
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured; import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@ -51,12 +53,14 @@
* launch the copy-job. DistCp may alternatively be sub-classed to fine-tune * launch the copy-job. DistCp may alternatively be sub-classed to fine-tune
* behaviour. * behaviour.
*/ */
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class DistCp extends Configured implements Tool { public class DistCp extends Configured implements Tool {
/** /**
* Priority of the ResourceManager shutdown hook. * Priority of the shutdown hook.
*/ */
public static final int SHUTDOWN_HOOK_PRIORITY = 30; static final int SHUTDOWN_HOOK_PRIORITY = 30;
private static final Log LOG = LogFactory.getLog(DistCp.class); private static final Log LOG = LogFactory.getLog(DistCp.class);
@ -66,7 +70,7 @@ public class DistCp extends Configured implements Tool {
private static final String PREFIX = "_distcp"; private static final String PREFIX = "_distcp";
private static final String WIP_PREFIX = "._WIP_"; private static final String WIP_PREFIX = "._WIP_";
private static final String DISTCP_DEFAULT_XML = "distcp-default.xml"; private static final String DISTCP_DEFAULT_XML = "distcp-default.xml";
public static final Random rand = new Random(); static final Random rand = new Random();
private boolean submitted; private boolean submitted;
private FileSystem jobFS; private FileSystem jobFS;
@ -90,7 +94,7 @@ public DistCp(Configuration configuration, DistCpOptions inputOptions) throws Ex
* To be used with the ToolRunner. Not for public consumption. * To be used with the ToolRunner. Not for public consumption.
*/ */
@VisibleForTesting @VisibleForTesting
public DistCp() {} DistCp() {}
/** /**
* Implementation of Tool::run(). Orchestrates the copy of source file(s) * Implementation of Tool::run(). Orchestrates the copy of source file(s)
@ -100,6 +104,7 @@ public DistCp() {}
* @param argv List of arguments passed to DistCp, from the ToolRunner. * @param argv List of arguments passed to DistCp, from the ToolRunner.
* @return On success, it returns 0. Else, -1. * @return On success, it returns 0. Else, -1.
*/ */
@Override
public int run(String[] argv) { public int run(String[] argv) {
if (argv.length < 1) { if (argv.length < 1) {
OptionsParser.usage(); OptionsParser.usage();
@ -145,9 +150,21 @@ public int run(String[] argv) {
* @throws Exception * @throws Exception
*/ */
public Job execute() throws Exception { public Job execute() throws Exception {
Job job = createAndSubmitJob();
if (inputOptions.shouldBlock()) {
waitForJobCompletion(job);
}
return job;
}
/**
* Create and submit the mapreduce job.
* @return The mapreduce job object that has been submitted
*/
public Job createAndSubmitJob() throws Exception {
assert inputOptions != null; assert inputOptions != null;
assert getConf() != null; assert getConf() != null;
Job job = null; Job job = null;
try { try {
synchronized(this) { synchronized(this) {
@ -169,15 +186,23 @@ public Job execute() throws Exception {
String jobID = job.getJobID().toString(); String jobID = job.getJobID().toString();
job.getConfiguration().set(DistCpConstants.CONF_LABEL_DISTCP_JOB_ID, jobID); job.getConfiguration().set(DistCpConstants.CONF_LABEL_DISTCP_JOB_ID, jobID);
LOG.info("DistCp job-id: " + jobID); LOG.info("DistCp job-id: " + jobID);
if (inputOptions.shouldBlock() && !job.waitForCompletion(true)) {
throw new IOException("DistCp failure: Job " + jobID + " has failed: "
+ job.getStatus().getFailureInfo());
}
return job; return job;
} }
/**
* Wait for the given job to complete.
* @param job the given mapreduce job that has already been submitted
*/
public void waitForJobCompletion(Job job) throws Exception {
assert job != null;
if (!job.waitForCompletion(true)) {
throw new IOException("DistCp failure: Job " + job.getJobID()
+ " has failed: " + job.getStatus().getFailureInfo());
}
}
/** /**
* Set targetPathExists in both inputOptions and job config, * Set targetPathExists in both inputOptions and job config,
* for the benefit of CopyCommitter * for the benefit of CopyCommitter
@ -436,7 +461,7 @@ private boolean isSubmitted() {
private static class Cleanup implements Runnable { private static class Cleanup implements Runnable {
private final DistCp distCp; private final DistCp distCp;
public Cleanup(DistCp distCp) { Cleanup(DistCp distCp) {
this.distCp = distCp; this.distCp = distCp;
} }