Revert "HDFS-17611. Move all DistCp execution logic to execute() (#7025)" (#7059)

This reverts commit ff75aa52f3.
This commit is contained in:
huhaiyang 2024-09-23 10:44:14 +08:00 committed by GitHub
parent ff75aa52f3
commit 81faae6343
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 8 additions and 25 deletions

View File

@ -140,7 +140,9 @@ public int run(String[] argv) {
try { try {
context = new DistCpContext(OptionsParser.parse(argv)); context = new DistCpContext(OptionsParser.parse(argv));
LOG.info("Input Options: {}", context); checkSplitLargeFile();
setTargetPathExists();
LOG.info("Input Options: " + context);
} catch (Throwable e) { } catch (Throwable e) {
LOG.error("Invalid arguments: ", e); LOG.error("Invalid arguments: ", e);
System.err.println("Invalid arguments: " + e.getMessage()); System.err.println("Invalid arguments: " + e.getMessage());
@ -150,7 +152,7 @@ public int run(String[] argv) {
Job job = null; Job job = null;
try { try {
job = execute(true); job = execute();
} catch (InvalidInputException e) { } catch (InvalidInputException e) {
LOG.error("Invalid input: ", e); LOG.error("Invalid input: ", e);
return DistCpConstants.INVALID_ARGUMENT; return DistCpConstants.INVALID_ARGUMENT;
@ -167,7 +169,7 @@ public int run(String[] argv) {
LOG.error("Exception encountered ", e); LOG.error("Exception encountered ", e);
return DistCpConstants.UNKNOWN_ERROR; return DistCpConstants.UNKNOWN_ERROR;
} finally { } finally {
// Blocking distcp so close the job after it's done //Blocking distcp so close the job after its done
if (job != null && context.shouldBlock()) { if (job != null && context.shouldBlock()) {
try { try {
job.close(); job.close();
@ -179,31 +181,15 @@ public int run(String[] argv) {
return DistCpConstants.SUCCESS; return DistCpConstants.SUCCESS;
} }
/**
* Original entrypoint of a distcp job. Calls {@link DistCp#execute(boolean)}
* without doing extra context checks and setting some configs.
* @return Job handle
* @throws Exception when fails to submit distcp job or distcp job fails
*/
public Job execute() throws Exception {
return execute(false);
}
/** /**
* Implements the core-execution. Creates the file-list for copy, * Implements the core-execution. Creates the file-list for copy,
* and launches the Hadoop-job, to do the copy. * and launches the Hadoop-job, to do the copy.
* @param extraContextChecks if true, does extra context checks and sets some configs.
* @return Job handle * @return Job handle
* @throws Exception when fails to submit distcp job or distcp job fails, or context checks fail * @throws Exception
*/ */
public Job execute(boolean extraContextChecks) throws Exception { public Job execute() throws Exception {
Preconditions.checkState(context != null, Preconditions.checkState(context != null,
"The DistCpContext should have been created before running DistCp!"); "The DistCpContext should have been created before running DistCp!");
if (extraContextChecks) {
checkSplitLargeFile();
setTargetPathExists();
}
Job job = createAndSubmitJob(); Job job = createAndSubmitJob();
if (context.shouldBlock()) { if (context.shouldBlock()) {

View File

@ -156,10 +156,7 @@ public void testCleanupOfJob() throws Exception {
DistCp distcp = mock(DistCp.class); DistCp distcp = mock(DistCp.class);
Job job = spy(Job.class); Job job = spy(Job.class);
Mockito.when(distcp.getConf()).thenReturn(conf); Mockito.when(distcp.getConf()).thenReturn(conf);
Mockito.when(distcp.createAndSubmitJob()).thenReturn(job); Mockito.when(distcp.execute()).thenReturn(job);
Mockito.when(distcp.execute()).thenCallRealMethod();
Mockito.when(distcp.execute(Mockito.anyBoolean())).thenCallRealMethod();
Mockito.doReturn(true).when(job).waitForCompletion(Mockito.anyBoolean());
Mockito.when(distcp.run(Mockito.any())).thenCallRealMethod(); Mockito.when(distcp.run(Mockito.any())).thenCallRealMethod();
String[] arg = { soure.toString(), target.toString() }; String[] arg = { soure.toString(), target.toString() };