MAPREDUCE-7298. Distcp doesn't close the job after the job is completed. Contributed by Aasha Medhi.

Change-Id: I63d249bbb18ccedaeee9f10123a78e32f9e54ed2
This commit is contained in:
Arpit Agarwal 2020-10-02 08:29:55 -07:00
parent 51598d8b1b
commit 18fa4397e6
No known key found for this signature in database
GPG Key ID: E4B09E903FDF9E98
2 changed files with 44 additions and 2 deletions

View File

@ -127,6 +127,7 @@ public DistCp(Configuration configuration, DistCpOptions inputOptions)
* to target location, by: * to target location, by:
* 1. Creating a list of files to be copied to target. * 1. Creating a list of files to be copied to target.
* 2. Launching a Map-only job to copy the files. (Delegates to execute().) * 2. Launching a Map-only job to copy the files. (Delegates to execute().)
* The MR job is not closed as part of run if its a blocking call to run
* @param argv List of arguments passed to DistCp, from the ToolRunner. * @param argv List of arguments passed to DistCp, from the ToolRunner.
* @return On success, it returns 0. Else, -1. * @return On success, it returns 0. Else, -1.
*/ */
@ -149,8 +150,9 @@ public int run(String[] argv) {
return DistCpConstants.INVALID_ARGUMENT; return DistCpConstants.INVALID_ARGUMENT;
} }
Job job = null;
try { try {
execute(); job = execute();
} catch (InvalidInputException e) { } catch (InvalidInputException e) {
LOG.error("Invalid input: ", e); LOG.error("Invalid input: ", e);
return DistCpConstants.INVALID_ARGUMENT; return DistCpConstants.INVALID_ARGUMENT;
@ -166,6 +168,15 @@ public int run(String[] argv) {
} catch (Exception e) { } catch (Exception e) {
LOG.error("Exception encountered ", e); LOG.error("Exception encountered ", e);
return DistCpConstants.UNKNOWN_ERROR; return DistCpConstants.UNKNOWN_ERROR;
} finally {
//Blocking distcp so close the job after its done
if (job != null && context.shouldBlock()) {
try {
job.close();
} catch (IOException e) {
LOG.error("Exception encountered while closing distcp job", e);
}
}
} }
return DistCpConstants.SUCCESS; return DistCpConstants.SUCCESS;
} }

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.tools; package org.apache.hadoop.tools;
import org.apache.hadoop.mapreduce.Job;
import org.mockito.Mockito;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -35,6 +37,8 @@
import java.io.OutputStream; import java.io.OutputStream;
import java.security.Permission; import java.security.Permission;
import static org.mockito.Mockito.*;
public class TestExternalCall { public class TestExternalCall {
private static final Logger LOG = LoggerFactory.getLogger(TestExternalCall.class); private static final Logger LOG = LoggerFactory.getLogger(TestExternalCall.class);
@ -134,6 +138,33 @@ public void testCleanupTestViaToolRunner() throws IOException, InterruptedExcept
} }
/**
* test methods run end execute of DistCp class. distcp job should be cleaned up after completion
* @throws Exception
*/
@Test
public void testCleanupOfJob() throws Exception {
Configuration conf = getConf();
Path stagingDir = JobSubmissionFiles.getStagingDir(new Cluster(conf),
conf);
stagingDir.getFileSystem(conf).mkdirs(stagingDir);
Path soure = createFile("tmp.txt");
Path target = createFile("target.txt");
DistCp distcp = mock(DistCp.class);
Job job = spy(Job.class);
Mockito.when(distcp.getConf()).thenReturn(conf);
Mockito.when(distcp.execute()).thenReturn(job);
Mockito.when(distcp.run(Mockito.any())).thenCallRealMethod();
String[] arg = { soure.toString(), target.toString() };
distcp.run(arg);
Mockito.verify(job, times(1)).close();
}
private SecurityManager securityManager; private SecurityManager securityManager;
protected static class ExitException extends SecurityException { protected static class ExitException extends SecurityException {