diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java index c36335afc1..6f8ab2be41 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java @@ -127,6 +127,7 @@ public DistCp(Configuration configuration, DistCpOptions inputOptions) * to target location, by: * 1. Creating a list of files to be copied to target. * 2. Launching a Map-only job to copy the files. (Delegates to execute().) + * The MR job is not closed as part of run if its a blocking call to run * @param argv List of arguments passed to DistCp, from the ToolRunner. * @return On success, it returns 0. Else, -1. */ @@ -148,9 +149,10 @@ public int run(String[] argv) { OptionsParser.usage(); return DistCpConstants.INVALID_ARGUMENT; } - + + Job job = null; try { - execute(); + job = execute(); } catch (InvalidInputException e) { LOG.error("Invalid input: ", e); return DistCpConstants.INVALID_ARGUMENT; @@ -166,6 +168,15 @@ public int run(String[] argv) { } catch (Exception e) { LOG.error("Exception encountered ", e); return DistCpConstants.UNKNOWN_ERROR; + } finally { + //Blocking distcp so close the job after its done + if (job != null && context.shouldBlock()) { + try { + job.close(); + } catch (IOException e) { + LOG.error("Exception encountered while closing distcp job", e); + } + } } return DistCpConstants.SUCCESS; } diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestExternalCall.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestExternalCall.java index 06122e6428..eba86a985a 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestExternalCall.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestExternalCall.java @@ -18,6 +18,8 @@ package org.apache.hadoop.tools; +import org.apache.hadoop.mapreduce.Job; +import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -35,6 +37,8 @@ import java.io.OutputStream; import java.security.Permission; +import static org.mockito.Mockito.*; + public class TestExternalCall { private static final Logger LOG = LoggerFactory.getLogger(TestExternalCall.class); @@ -134,6 +138,33 @@ public void testCleanupTestViaToolRunner() throws IOException, InterruptedExcept } + /** + * test methods run end execute of DistCp class. distcp job should be cleaned up after completion + * @throws Exception + */ + @Test + public void testCleanupOfJob() throws Exception { + + Configuration conf = getConf(); + + Path stagingDir = JobSubmissionFiles.getStagingDir(new Cluster(conf), + conf); + stagingDir.getFileSystem(conf).mkdirs(stagingDir); + Path soure = createFile("tmp.txt"); + Path target = createFile("target.txt"); + + DistCp distcp = mock(DistCp.class); + Job job = spy(Job.class); + Mockito.when(distcp.getConf()).thenReturn(conf); + Mockito.when(distcp.execute()).thenReturn(job); + Mockito.when(distcp.run(Mockito.any())).thenCallRealMethod(); + String[] arg = { soure.toString(), target.toString() }; + + distcp.run(arg); + Mockito.verify(job, times(1)).close(); + } + + private SecurityManager securityManager; protected static class ExitException extends SecurityException {