diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java index 33ab3ee4eb..2272781f72 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java @@ -553,10 +553,6 @@ private Path listTargetFiles(final Configuration conf, conf.get(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH)); List targets = new ArrayList<>(1); targets.add(targetFinalPath); - Path resultNonePath = Path.getPathWithoutSchemeAndAuthority(targetFinalPath) - .toString().startsWith(DistCpConstants.HDFS_RESERVED_RAW_DIRECTORY_NAME) - ? DistCpConstants.RAW_NONE_PATH - : DistCpConstants.NONE_PATH; // // Set up options to be the same from the CopyListing.buildListing's // perspective, so to collect similar listings as when doing the copy @@ -568,7 +564,7 @@ private Path listTargetFiles(final Configuration conf, conf.getBoolean(DistCpConstants.CONF_LABEL_USE_ITERATOR, false); LOG.info("Scanning destination directory {} with thread count: {}", targetFinalPath, threads); - DistCpOptions options = new DistCpOptions.Builder(targets, resultNonePath) + DistCpOptions options = new DistCpOptions.Builder(targets, targetFinalPath) .withOverwrite(overwrite) .withSyncFolder(syncFolder) .withNumListstatusThreads(threads) diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java index fab14d138b..159338f0d4 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java @@ -709,4 +709,60 @@ private Job runDistCpDirectWrite(final Path srcDir, final Path destDir) Collections.singletonList(srcDir), destDir) .withDirectWrite(true))); } + + @Test + public void testDistCpWithFile() throws Exception { + describe("Distcp only file"); + + Path source = new Path(remoteDir, "file"); + Path dest = new Path(localDir, "file"); + dest = localFS.makeQualified(dest); + + mkdirs(remoteFS, remoteDir); + mkdirs(localFS, localDir); + + int len = 4; + int base = 0x40; + byte[] block = dataset(len, base, base + len); + ContractTestUtils.createFile(remoteFS, source, true, block); + verifyPathExists(remoteFS, "", source); + verifyPathExists(localFS, "", localDir); + + DistCpTestUtils.assertRunDistCp(DistCpConstants.SUCCESS, source.toString(), + dest.toString(), null, conf); + + Assertions + .assertThat(RemoteIterators.toList(localFS.listFiles(dest, true))) + .describedAs("files").hasSize(1); + verifyFileContents(localFS, dest, block); + } + + @Test + public void testDistCpWithUpdateExistFile() throws Exception { + describe("Now update an exist file."); + + Path source = new Path(remoteDir, "file"); + Path dest = new Path(localDir, "file"); + dest = localFS.makeQualified(dest); + + mkdirs(remoteFS, remoteDir); + mkdirs(localFS, localDir); + + int len = 4; + int base = 0x40; + byte[] block = dataset(len, base, base + len); + byte[] destBlock = dataset(len, base, base + len + 1); + ContractTestUtils.createFile(remoteFS, source, true, block); + ContractTestUtils.createFile(localFS, dest, true, destBlock); + + verifyPathExists(remoteFS, "", source); + verifyPathExists(localFS, "", dest); + DistCpTestUtils.assertRunDistCp(DistCpConstants.SUCCESS, source.toString(), + dest.toString(), "-delete -update", conf); + + Assertions.assertThat(RemoteIterators.toList(localFS.listFiles(dest, true))) + .hasSize(1); + verifyFileContents(localFS, dest, block); + } + } \ No newline at end of file diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java index 685f030e15..62940f64b3 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java @@ -265,6 +265,51 @@ public void testDeleteMissing() throws IOException { } } + @Test + public void testDeleteMissingWithOnlyFile() throws IOException { + TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config); + JobContext jobContext = new JobContextImpl(taskAttemptContext + .getConfiguration(), taskAttemptContext.getTaskAttemptID().getJobID()); + Configuration conf = jobContext.getConfiguration(); + + String sourceBase; + String targetBase; + FileSystem fs = null; + try { + OutputCommitter committer = new CopyCommitter(null, taskAttemptContext); + fs = FileSystem.get(conf); + sourceBase = TestDistCpUtils.createTestSetupWithOnlyFile(fs, + FsPermission.getDefault()); + targetBase = TestDistCpUtils.createTestSetupWithOnlyFile(fs, + FsPermission.getDefault()); + + final DistCpOptions options = new DistCpOptions.Builder( + Collections.singletonList(new Path(sourceBase)), new Path(targetBase)) + .withSyncFolder(true).withDeleteMissing(true).build(); + options.appendToConf(conf); + final DistCpContext context = new DistCpContext(options); + + CopyListing listing = new GlobbedCopyListing(conf, CREDENTIALS); + Path listingFile = new Path(sourceBase); + listing.buildListing(listingFile, context); + + conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase); + conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, targetBase); + + committer.commitJob(jobContext); + verifyFoldersAreInSync(fs, targetBase, sourceBase); + verifyFoldersAreInSync(fs, sourceBase, targetBase); + + //Test for idempotent commit + committer.commitJob(jobContext); + verifyFoldersAreInSync(fs, targetBase, sourceBase); + verifyFoldersAreInSync(fs, sourceBase, targetBase); + } finally { + TestDistCpUtils.delete(fs, "/tmp1"); + conf.set(DistCpConstants.CONF_LABEL_DELETE_MISSING, "false"); + } + } + // for HDFS-14621, should preserve times after -delete @Test public void testPreserveTimeWithDeleteMiss() throws IOException { diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestDistCpUtils.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestDistCpUtils.java index f10dbf5573..dd1e65dd98 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestDistCpUtils.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestDistCpUtils.java @@ -1360,6 +1360,15 @@ private static String getBase(String base) { return base + "/" + location; } + public static String createTestSetupWithOnlyFile(FileSystem fs, + FsPermission perm) throws IOException { + String location = String.valueOf(rand.nextLong()); + fs.mkdirs(new Path("/tmp1/" + location)); + fs.setPermission(new Path("/tmp1/" + location), perm); + createFile(fs, new Path("/tmp1/" + location + "/file")); + return "/tmp1/" + location + "/file"; + } + public static void delete(FileSystem fs, String path) { try { if (fs != null) {