diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpSync.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpSync.java index 1cf2d97ec1..dbc86fd0b4 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpSync.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpSync.java @@ -20,18 +20,19 @@ import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonPathCapabilities; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSUtilClient; -import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; -import org.apache.hadoop.hdfs.web.WebHdfsFileSystem; import org.apache.hadoop.tools.CopyListing.InvalidInputException; import java.io.FileNotFoundException; import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.util.Arrays; import java.util.List; import java.util.Random; @@ -106,20 +107,7 @@ private boolean preSyncCheck() throws IOException { final FileSystem snapshotDiffFs = isRdiff() ? tgtFs : srcFs; final Path snapshotDiffDir = isRdiff() ? targetDir : sourceDir; - // currently we require both the source and the target file system are - // DistributedFileSystem or (S)WebHdfsFileSystem. - if (!(srcFs instanceof DistributedFileSystem - || srcFs instanceof WebHdfsFileSystem)) { - throw new IllegalArgumentException("Unsupported source file system: " - + srcFs.getScheme() + "://. " + - "Supported file systems: hdfs://, webhdfs:// and swebhdfs://."); - } - if (!(tgtFs instanceof DistributedFileSystem - || tgtFs instanceof WebHdfsFileSystem)) { - throw new IllegalArgumentException("Unsupported target file system: " - + tgtFs.getScheme() + "://. " + - "Supported file systems: hdfs://, webhdfs:// and swebhdfs://."); - } + checkFilesystemSupport(sourceDir,targetDir,srcFs, tgtFs); // make sure targetFS has no change between from and the current states if (!checkNoChange(tgtFs, targetDir)) { @@ -165,6 +153,42 @@ private boolean preSyncCheck() throws IOException { return true; } + /** + * Check if the source and target filesystems support snapshots. + */ + private void checkFilesystemSupport(Path sourceDir, Path targetDir, + FileSystem srcFs, FileSystem tgtFs) throws IOException { + if (!srcFs.hasPathCapability(sourceDir, + CommonPathCapabilities.FS_SNAPSHOTS)) { + throw new UnsupportedOperationException( + "The source file system " + srcFs.getScheme() + + " does not support snapshot."); + } + if (!tgtFs.hasPathCapability(targetDir, + CommonPathCapabilities.FS_SNAPSHOTS)) { + throw new UnsupportedOperationException( + "The target file system " + tgtFs.getScheme() + + " does not support snapshot."); + } + try { + getSnapshotDiffReportMethod(srcFs); + } catch (NoSuchMethodException e) { + throw new UnsupportedOperationException( + "The source file system " + srcFs.getScheme() + + " does not support getSnapshotDiffReport", + e); + } + try { + getSnapshotDiffReportMethod(tgtFs); + } catch (NoSuchMethodException e) { + throw new UnsupportedOperationException( + "The target file system " + tgtFs.getScheme() + + " does not support getSnapshotDiffReport", + e); + } + + } + public boolean sync() throws IOException { if (!preSyncCheck()) { return false; @@ -211,21 +235,10 @@ private boolean getAllDiffs() throws IOException { context.getTargetPath() : context.getSourcePaths().get(0); try { - SnapshotDiffReport report = null; - FileSystem fs = ssDir.getFileSystem(conf); final String from = getSnapshotName(context.getFromSnapshot()); final String to = getSnapshotName(context.getToSnapshot()); - if (fs instanceof DistributedFileSystem) { - DistributedFileSystem dfs = (DistributedFileSystem)fs; - report = dfs.getSnapshotDiffReport(ssDir, from, to); - } else if (fs instanceof WebHdfsFileSystem) { - WebHdfsFileSystem webHdfs = (WebHdfsFileSystem)fs; - report = webHdfs.getSnapshotDiffReport(ssDir, from, to); - } else { - throw new IllegalArgumentException("Unsupported file system: " + - fs.getScheme() + "://. " + - "Supported file systems: hdfs://, webhdfs:// and swebhdfs://."); - } + SnapshotDiffReport report = + getSnapshotDiffReport(ssDir.getFileSystem(conf), ssDir, from, to); this.diffMap = new EnumMap<>(SnapshotDiffReport.DiffType.class); for (SnapshotDiffReport.DiffType type : @@ -286,6 +299,36 @@ private boolean getAllDiffs() throws IOException { return false; } + /** + * Check if the filesystem implementation has a method named + * getSnapshotDiffReport. + */ + private static Method getSnapshotDiffReportMethod(FileSystem fs) + throws NoSuchMethodException { + return fs.getClass().getMethod( + "getSnapshotDiffReport", Path.class, String.class, String.class); + } + + /** + * Get the snapshotDiff b/w the fromSnapshot & toSnapshot for the given + * filesystem. + */ + private static SnapshotDiffReport getSnapshotDiffReport( + final FileSystem fs, + final Path snapshotDir, + final String fromSnapshot, + final String toSnapshot) throws IOException { + try { + return (SnapshotDiffReport) getSnapshotDiffReportMethod(fs).invoke( + fs, snapshotDir, fromSnapshot, toSnapshot); + } catch (InvocationTargetException e) { + throw new IOException(e.getCause()); + } catch (NoSuchMethodException|IllegalAccessException e) { + throw new IllegalArgumentException( + "Failed to invoke getSnapshotDiffReport.", e); + } + } + private String getSnapshotName(String name) { return Path.CUR_DIR.equals(name) ? "" : name; } @@ -327,14 +370,7 @@ private void deleteTargetTmpDir(FileSystem targetFs, private boolean checkNoChange(FileSystem fs, Path path) { try { final String from = getSnapshotName(context.getFromSnapshot()); - SnapshotDiffReport targetDiff = null; - if (fs instanceof DistributedFileSystem) { - DistributedFileSystem dfs = (DistributedFileSystem)fs; - targetDiff = dfs.getSnapshotDiffReport(path, from, ""); - } else { - WebHdfsFileSystem webHdfs = (WebHdfsFileSystem)fs; - targetDiff = webHdfs.getSnapshotDiffReport(path, from, ""); - } + SnapshotDiffReport targetDiff = getSnapshotDiffReport(fs, path, from, ""); if (!targetDiff.getDiffList().isEmpty()) { DistCp.LOG.warn("The target has been modified since snapshot " + context.getFromSnapshot()); diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSync.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSync.java index 93796e752e..0fbcd6571c 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSync.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSync.java @@ -23,6 +23,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.apache.hadoop.fs.CommonPathCapabilities; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; @@ -38,6 +40,7 @@ import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.test.LambdaTestUtils; import org.apache.hadoop.tools.mapred.CopyMapper; import org.junit.After; import org.junit.Assert; @@ -47,6 +50,7 @@ import java.io.IOException; import java.io.FileWriter; import java.io.BufferedWriter; +import java.net.URI; import java.nio.file.Files; import java.util.Arrays; import java.util.ArrayList; @@ -56,6 +60,9 @@ import java.util.Map; import java.util.regex.Pattern; +import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs; +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; + public class TestDistCpSync { private MiniDFSCluster cluster; private final Configuration conf = new HdfsConfiguration(); @@ -89,6 +96,7 @@ public void setUp() throws Exception { conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, target.toString()); conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, target.toString()); + conf.setClass("fs.dummy.impl", DummyFs.class, FileSystem.class); } @After @@ -1276,4 +1284,63 @@ private void snapshotDiffWithPaths(Path sourceFSPath, verifyCopyByFs(sourceFS, targetFS, sourceFS.getFileStatus(sourceFSPath), targetFS.getFileStatus(targetFSPath), false); } + + @Test + public void testSyncSnapshotDiffWithLocalFileSystem() throws Exception { + String[] args = new String[]{"-update", "-diff", "s1", "s2", + "file:///source", "file:///target"}; + LambdaTestUtils.intercept( + UnsupportedOperationException.class, + "The source file system file does not support snapshot", + () -> new DistCp(conf, OptionsParser.parse(args)).execute()); + } + + @Test + public void testSyncSnapshotDiffWithDummyFileSystem() { + String[] args = + new String[] { "-update", "-diff", "s1", "s2", "dummy:///source", + "dummy:///target" }; + try { + FileSystem dummyFs = FileSystem.get(URI.create("dummy:///"), conf); + assertThat(dummyFs).isInstanceOf(DummyFs.class); + new DistCp(conf, OptionsParser.parse(args)).execute(); + } catch (UnsupportedOperationException e) { + throw e; + } catch (Exception e) { + // can expect other exceptions as source and target paths + // are not created. + } + } + + public static class DummyFs extends RawLocalFileSystem { + public DummyFs() { + super(); + } + + public URI getUri() { + return URI.create("dummy:///"); + } + + @Override + public boolean hasPathCapability(Path path, String capability) + throws IOException { + switch (validatePathCapabilityArgs(makeQualified(path), capability)) { + case CommonPathCapabilities.FS_SNAPSHOTS: + return true; + default: + return super.hasPathCapability(path, capability); + } + } + + @Override + public FileStatus getFileStatus(Path f) throws IOException { + return new FileStatus(); + } + + public SnapshotDiffReport getSnapshotDiffReport(final Path snapshotDir, + final String fromSnapshot, final String toSnapshot) { + return new SnapshotDiffReport(snapshotDir.getName(), fromSnapshot, + toSnapshot, new ArrayList()); + } + } }