From 3788fe52daa227c806209dd2fa32ac59e10c1c43 Mon Sep 17 00:00:00 2001 From: Masatake Iwasaki Date: Mon, 21 Jun 2021 12:10:44 +0000 Subject: [PATCH] HDFS-13916. Distcp SnapshotDiff to support WebHDFS. Contributed by Xun REN. Signed-off-by: Masatake Iwasaki --- .../org/apache/hadoop/tools/DistCpSync.java | 74 +++++--- .../apache/hadoop/tools/TestDistCpSync.java | 162 +++++++++++++++--- 2 files changed, 189 insertions(+), 47 deletions(-) diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpSync.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpSync.java index 35ef3e4ab7..6cbc936136 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpSync.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpSync.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; +import org.apache.hadoop.hdfs.web.WebHdfsFileSystem; import org.apache.hadoop.tools.CopyListing.InvalidInputException; import java.io.FileNotFoundException; @@ -40,7 +41,8 @@ /** * This class provides the basic functionality to sync two FileSystems based on * the snapshot diff report. More specifically, we have the following settings: - * 1. Both the source and target FileSystem must be DistributedFileSystem + * 1. Both the source and target FileSystem must be DistributedFileSystem or + * (s)WebHdfsFileSystem * 2. Two snapshots (e.g., s1 and s2) have been created on the source FS. * The diff between these two snapshots will be copied to the target FS. * 3. The target has the same snapshot s1. No changes have been made on the @@ -73,7 +75,7 @@ private boolean isRdiff() { /** * Check if three conditions are met before sync. * 1. Only one source directory. - * 2. Both source and target file system are DFS. + * 2. Both source and target file system are DFS or WebHdfs. * 3. There is no change between from and the current status in target * file system. * Throw exceptions if first two aren't met, and return false to fallback to @@ -95,17 +97,22 @@ private boolean preSyncCheck() throws IOException { final Path snapshotDiffDir = isRdiff() ? targetDir : sourceDir; // currently we require both the source and the target file system are - // DistributedFileSystem. - if (!(srcFs instanceof DistributedFileSystem) || - !(tgtFs instanceof DistributedFileSystem)) { - throw new IllegalArgumentException("The FileSystems needs to" + - " be DistributedFileSystem for using snapshot-diff-based distcp"); + // DistributedFileSystem or (S)WebHdfsFileSystem. + if (!(srcFs instanceof DistributedFileSystem + || srcFs instanceof WebHdfsFileSystem)) { + throw new IllegalArgumentException("Unsupported source file system: " + + srcFs.getScheme() + "://. " + + "Supported file systems: hdfs://, webhdfs:// and swebhdfs://."); + } + if (!(tgtFs instanceof DistributedFileSystem + || tgtFs instanceof WebHdfsFileSystem)) { + throw new IllegalArgumentException("Unsupported target file system: " + + tgtFs.getScheme() + "://. " + + "Supported file systems: hdfs://, webhdfs:// and swebhdfs://."); } - final DistributedFileSystem targetFs = (DistributedFileSystem) tgtFs; - // make sure targetFS has no change between from and the current states - if (!checkNoChange(targetFs, targetDir)) { + if (!checkNoChange(tgtFs, targetDir)) { // set the source path using the snapshot path context.setSourcePaths(Arrays.asList(getSnapshotPath(sourceDir, context.getToSnapshot()))); @@ -161,23 +168,22 @@ public boolean sync() throws IOException { final Path sourceDir = sourcePaths.get(0); final Path targetDir = context.getTargetPath(); final FileSystem tfs = targetDir.getFileSystem(conf); - final DistributedFileSystem targetFs = (DistributedFileSystem) tfs; Path tmpDir = null; try { - tmpDir = createTargetTmpDir(targetFs, targetDir); + tmpDir = createTargetTmpDir(tfs, targetDir); DiffInfo[] renameAndDeleteDiffs = getRenameAndDeleteDiffsForSync(targetDir); if (renameAndDeleteDiffs.length > 0) { // do the real sync work: deletion and rename - syncDiff(renameAndDeleteDiffs, targetFs, tmpDir); + syncDiff(renameAndDeleteDiffs, tfs, tmpDir); } return true; } catch (Exception e) { DistCp.LOG.warn("Failed to use snapshot diff for distcp", e); return false; } finally { - deleteTargetTmpDir(targetFs, tmpDir); + deleteTargetTmpDir(tfs, tmpDir); // TODO: since we have tmp directory, we can support "undo" with failures // set the source path using the snapshot path context.setSourcePaths(Arrays.asList(getSnapshotPath(sourceDir, @@ -195,12 +201,22 @@ private boolean getAllDiffs() throws IOException { context.getTargetPath() : context.getSourcePaths().get(0); try { - DistributedFileSystem fs = - (DistributedFileSystem) ssDir.getFileSystem(conf); + SnapshotDiffReport report = null; + FileSystem fs = ssDir.getFileSystem(conf); final String from = getSnapshotName(context.getFromSnapshot()); final String to = getSnapshotName(context.getToSnapshot()); - SnapshotDiffReport report = fs.getSnapshotDiffReport(ssDir, - from, to); + if (fs instanceof DistributedFileSystem) { + DistributedFileSystem dfs = (DistributedFileSystem)fs; + report = dfs.getSnapshotDiffReport(ssDir, from, to); + } else if (fs instanceof WebHdfsFileSystem) { + WebHdfsFileSystem webHdfs = (WebHdfsFileSystem)fs; + report = webHdfs.getSnapshotDiffReport(ssDir, from, to); + } else { + throw new IllegalArgumentException("Unsupported file system: " + + fs.getScheme() + "://. " + + "Supported file systems: hdfs://, webhdfs:// and swebhdfs://."); + } + this.diffMap = new EnumMap<>(SnapshotDiffReport.DiffType.class); for (SnapshotDiffReport.DiffType type : SnapshotDiffReport.DiffType.values()) { @@ -265,7 +281,7 @@ private Path getSnapshotPath(Path inputDir, String snapshotName) { } } - private Path createTargetTmpDir(DistributedFileSystem targetFs, + private Path createTargetTmpDir(FileSystem targetFs, Path targetDir) throws IOException { final Path tmp = new Path(targetDir, DistCpConstants.HDFS_DISTCP_DIFF_DIRECTORY_NAME + DistCp.rand.nextInt()); @@ -275,7 +291,7 @@ private Path createTargetTmpDir(DistributedFileSystem targetFs, return tmp; } - private void deleteTargetTmpDir(DistributedFileSystem targetFs, + private void deleteTargetTmpDir(FileSystem targetFs, Path tmpDir) { try { if (tmpDir != null) { @@ -290,11 +306,17 @@ private void deleteTargetTmpDir(DistributedFileSystem targetFs, * Compute the snapshot diff on the given file system. Return true if the diff * is empty, i.e., no changes have happened in the FS. */ - private boolean checkNoChange(DistributedFileSystem fs, Path path) { + private boolean checkNoChange(FileSystem fs, Path path) { try { final String from = getSnapshotName(context.getFromSnapshot()); - SnapshotDiffReport targetDiff = - fs.getSnapshotDiffReport(path, from, ""); + SnapshotDiffReport targetDiff = null; + if (fs instanceof DistributedFileSystem) { + DistributedFileSystem dfs = (DistributedFileSystem)fs; + targetDiff = dfs.getSnapshotDiffReport(path, from, ""); + } else { + WebHdfsFileSystem webHdfs = (WebHdfsFileSystem)fs; + targetDiff = webHdfs.getSnapshotDiffReport(path, from, ""); + } if (!targetDiff.getDiffList().isEmpty()) { DistCp.LOG.warn("The target has been modified since snapshot " + context.getFromSnapshot()); @@ -310,7 +332,7 @@ private boolean checkNoChange(DistributedFileSystem fs, Path path) { } private void syncDiff(DiffInfo[] diffs, - DistributedFileSystem targetFs, Path tmpDir) throws IOException { + FileSystem targetFs, Path tmpDir) throws IOException { moveToTmpDir(diffs, targetFs, tmpDir); moveToTarget(diffs, targetFs); } @@ -320,7 +342,7 @@ private void syncDiff(DiffInfo[] diffs, * directory. */ private void moveToTmpDir(DiffInfo[] diffs, - DistributedFileSystem targetFs, Path tmpDir) throws IOException { + FileSystem targetFs, Path tmpDir) throws IOException { // sort the diffs based on their source paths to make sure the files and // subdirs are moved before moving their parents/ancestors. Arrays.sort(diffs, DiffInfo.sourceComparator); @@ -341,7 +363,7 @@ private void moveToTmpDir(DiffInfo[] diffs, * from the tmp dir to the final targets. */ private void moveToTarget(DiffInfo[] diffs, - DistributedFileSystem targetFs) throws IOException { + FileSystem targetFs) throws IOException { // sort the diffs based on their target paths to make sure the parent // directories are created first. Arrays.sort(diffs, DiffInfo.targetComparator); diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSync.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSync.java index c80a79bbb8..220caaefdb 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSync.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSync.java @@ -20,6 +20,7 @@ import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; @@ -27,6 +28,9 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; +import org.apache.hadoop.hdfs.web.WebHdfsConstants; +import org.apache.hadoop.hdfs.web.WebHdfsFileSystem; +import org.apache.hadoop.hdfs.web.WebHdfsTestUtil; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; @@ -39,7 +43,6 @@ import org.junit.Before; import org.junit.Test; - import java.io.IOException; import java.io.FileWriter; import java.io.BufferedWriter; @@ -48,12 +51,14 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; public class TestDistCpSync { private MiniDFSCluster cluster; private final Configuration conf = new HdfsConfiguration(); private DistributedFileSystem dfs; + private WebHdfsFileSystem webfs; private DistCpContext context; private final Path source = new Path("/source"); private final Path target = new Path("/target"); @@ -65,6 +70,9 @@ public void setUp() throws Exception { cluster = new MiniDFSCluster.Builder(conf).numDataNodes(DATA_NUM).build(); cluster.waitActive(); + webfs = WebHdfsTestUtil. + getWebHdfsFileSystem(conf, WebHdfsConstants.WEBHDFS_SCHEME); + dfs = cluster.getFileSystem(); dfs.mkdirs(source); dfs.mkdirs(target); @@ -160,6 +168,10 @@ private boolean sync() throws Exception { * f3 f4 */ private void initData(Path dir) throws Exception { + initData(dfs, dir); + } + + private void initData(FileSystem fs, Path dir) throws Exception { final Path foo = new Path(dir, "foo"); final Path bar = new Path(dir, "bar"); final Path d1 = new Path(foo, "d1"); @@ -169,10 +181,10 @@ private void initData(Path dir) throws Exception { final Path f3 = new Path(d1, "f3"); final Path f4 = new Path(d2, "f4"); - DFSTestUtil.createFile(dfs, f1, BLOCK_SIZE, DATA_NUM, 0); - DFSTestUtil.createFile(dfs, f2, BLOCK_SIZE, DATA_NUM, 0); - DFSTestUtil.createFile(dfs, f3, BLOCK_SIZE, DATA_NUM, 0); - DFSTestUtil.createFile(dfs, f4, BLOCK_SIZE, DATA_NUM, 0); + DFSTestUtil.createFile(fs, f1, BLOCK_SIZE, DATA_NUM, 0); + DFSTestUtil.createFile(fs, f2, BLOCK_SIZE, DATA_NUM, 0); + DFSTestUtil.createFile(fs, f3, BLOCK_SIZE, DATA_NUM, 0); + DFSTestUtil.createFile(fs, f4, BLOCK_SIZE, DATA_NUM, 0); } /** @@ -192,7 +204,7 @@ private void initData(Path dir) throws Exception { * foo/ f4 * f1(new) */ - private int changeData(Path dir) throws Exception { + private int changeData(FileSystem fs, Path dir) throws Exception { final Path foo = new Path(dir, "foo"); final Path bar = new Path(dir, "bar"); final Path d1 = new Path(foo, "d1"); @@ -200,21 +212,21 @@ private int changeData(Path dir) throws Exception { final Path bar_d1 = new Path(bar, "d1"); int numCreatedModified = 0; - dfs.rename(d1, bar_d1); + fs.rename(d1, bar_d1); numCreatedModified += 1; // modify ./foo numCreatedModified += 1; // modify ./bar final Path f3 = new Path(bar_d1, "f3"); - dfs.delete(f3, true); + fs.delete(f3, true); final Path newfoo = new Path(bar_d1, "foo"); - dfs.rename(foo, newfoo); + fs.rename(foo, newfoo); numCreatedModified += 1; // modify ./foo/d1 final Path f1 = new Path(newfoo, "f1"); - dfs.delete(f1, true); - DFSTestUtil.createFile(dfs, f1, 2 * BLOCK_SIZE, DATA_NUM, 0); + fs.delete(f1, true); + DFSTestUtil.createFile(fs, f1, 2 * BLOCK_SIZE, DATA_NUM, 0); numCreatedModified += 1; // create ./foo/f1 - DFSTestUtil.appendFile(dfs, f2, (int) BLOCK_SIZE); + DFSTestUtil.appendFile(fs, f2, (int) BLOCK_SIZE); numCreatedModified += 1; // modify ./bar/f2 - dfs.rename(bar, new Path(dir, "foo")); + fs.rename(bar, new Path(dir, "foo")); return numCreatedModified; } @@ -228,7 +240,7 @@ public void testSync() throws Exception { enableAndCreateFirstSnapshot(); // make changes under source - int numCreatedModified = changeData(source); + int numCreatedModified = changeData(dfs, source); dfs.createSnapshot(source, "s2"); // before sync, make some further changes on source. this should not affect @@ -295,23 +307,51 @@ private Map getListing(Path listingPath) return values; } + /** + * By default, we are using DFS for both source and target. + * @param s source file status + * @param t target file status + * @param compareName whether will we compare the name of the files + * @throws Exception + */ private void verifyCopy(FileStatus s, FileStatus t, boolean compareName) - throws Exception { + throws Exception { + verifyCopy(dfs, dfs, s, t, compareName); + } + + /** + * Verify copy by using different file systems. + * @param sfs source file system + * @param tfs target file system + * @param s source file status + * @param t target file status + * @param compareName whether will we compare the name of the files + * @throws Exception + */ + private void verifyCopyByFs(FileSystem sfs, FileSystem tfs, + FileStatus s, FileStatus t, boolean compareName) + throws Exception { + verifyCopy(sfs, tfs, s, t, compareName); + } + + private void verifyCopy(FileSystem sfs, FileSystem tfs, + FileStatus s, FileStatus t, boolean compareName) + throws Exception { Assert.assertEquals(s.isDirectory(), t.isDirectory()); if (compareName) { Assert.assertEquals(s.getPath().getName(), t.getPath().getName()); } if (!s.isDirectory()) { // verify the file content is the same - byte[] sbytes = DFSTestUtil.readFileBuffer(dfs, s.getPath()); - byte[] tbytes = DFSTestUtil.readFileBuffer(dfs, t.getPath()); + byte[] sbytes = DFSTestUtil.readFileBuffer(sfs, s.getPath()); + byte[] tbytes = DFSTestUtil.readFileBuffer(tfs, t.getPath()); Assert.assertArrayEquals(sbytes, tbytes); } else { - FileStatus[] slist = dfs.listStatus(s.getPath()); - FileStatus[] tlist = dfs.listStatus(t.getPath()); + FileStatus[] slist = sfs.listStatus(s.getPath()); + FileStatus[] tlist = tfs.listStatus(t.getPath()); Assert.assertEquals(slist.length, tlist.length); for (int i = 0; i < slist.length; i++) { - verifyCopy(slist[i], tlist[i], true); + verifyCopy(sfs, tfs, slist[i], tlist[i], true); } } } @@ -333,7 +373,7 @@ public void testSyncWithCurrent() throws Exception { enableAndCreateFirstSnapshot(); // make changes under source - changeData(source); + changeData(dfs, source); // do the sync sync(); @@ -907,4 +947,84 @@ public void testSync11() throws Exception { deleteFilterFile(filterFile); } } + + /** + * Test DistCp ues diff option under (s)WebHDFSFileSyste. + * In this test, we are using DFS as source and WebHDFS as target + */ + @Test + public void testSyncSnapshotDiffWithWebHdfs1() throws Exception { + Path dfsSource = new Path(dfs.getUri().toString(), source); + Path webHdfsTarget = new Path(webfs.getUri().toString(), target); + + snapshotDiffWithPaths(dfsSource, webHdfsTarget); + } + + /** + * Test DistCp ues diff option under (s)WebHDFSFileSyste. + * In this test, we are using WebHDFS as source and DFS as target + */ + @Test + public void testSyncSnapshotDiffWithWebHdfs2() throws Exception { + Path webHdfsSource = new Path(webfs.getUri().toString(), source); + Path dfsTarget = new Path(dfs.getUri().toString(), target); + + snapshotDiffWithPaths(webHdfsSource, dfsTarget); + } + + /** + * Test DistCp ues diff option under (s)WebHDFSFileSyste. + * In this test, we are using WebHDFS for both source and target + */ + @Test + public void testSyncSnapshotDiffWithWebHdfs3() throws Exception { + Path webHdfsSource = new Path(webfs.getUri().toString(), source); + Path webHdfsTarget = new Path(webfs.getUri().toString(), target); + + snapshotDiffWithPaths(webHdfsSource, webHdfsTarget); + } + + private void snapshotDiffWithPaths(Path sourceFSPath, + Path targetFSPath) throws Exception { + + FileSystem sourceFS = sourceFSPath.getFileSystem(conf); + FileSystem targetFS = targetFSPath.getFileSystem(conf); + + // Initialize both source and target file system + initData(sourceFS, sourceFSPath); + initData(targetFS, targetFSPath); + + // create snapshots on both source and target side with the same name + List paths = Arrays.asList(sourceFSPath, targetFSPath); + for (Path path: paths) { + FileSystem fs = path.getFileSystem(conf); + if (fs instanceof DistributedFileSystem) { + ((DistributedFileSystem)fs).allowSnapshot(path); + } else if (fs instanceof WebHdfsFileSystem) { + ((WebHdfsFileSystem)fs).allowSnapshot(path); + } else { + throw new IOException("Unsupported fs: " + fs.getScheme()); + } + fs.createSnapshot(path, "s1"); + } + + // do some modification on source side + changeData(sourceFS, sourceFSPath); + + // create a new snapshot on source side + sourceFS.createSnapshot(sourceFSPath, "s2"); + + //try to copy the difference + final DistCpOptions options = new DistCpOptions.Builder( + Collections.singletonList(sourceFSPath), targetFSPath) + .withUseDiff("s1", "s2") + .withSyncFolder(true) + .build(); + options.appendToConf(conf); + + new DistCp(conf, options).execute(); + + verifyCopyByFs(sourceFS, targetFS, sourceFS.getFileStatus(sourceFSPath), + targetFS.getFileStatus(targetFSPath), false); + } }