HDFS-16911. Distcp with snapshot diff to support Ozone filesystem. (#5364)

This commit is contained in:
Sadanand Shenoy 2023-04-11 02:33:16 +05:30 committed by GitHub
parent 3e2ae1da00
commit 74ddf69f80
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 140 additions and 37 deletions

View File

@ -20,18 +20,19 @@
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonPathCapabilities;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
import org.apache.hadoop.tools.CopyListing.InvalidInputException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
@ -106,20 +107,7 @@ private boolean preSyncCheck() throws IOException {
final FileSystem snapshotDiffFs = isRdiff() ? tgtFs : srcFs;
final Path snapshotDiffDir = isRdiff() ? targetDir : sourceDir;
// currently we require both the source and the target file system are
// DistributedFileSystem or (S)WebHdfsFileSystem.
if (!(srcFs instanceof DistributedFileSystem
|| srcFs instanceof WebHdfsFileSystem)) {
throw new IllegalArgumentException("Unsupported source file system: "
+ srcFs.getScheme() + "://. " +
"Supported file systems: hdfs://, webhdfs:// and swebhdfs://.");
}
if (!(tgtFs instanceof DistributedFileSystem
|| tgtFs instanceof WebHdfsFileSystem)) {
throw new IllegalArgumentException("Unsupported target file system: "
+ tgtFs.getScheme() + "://. " +
"Supported file systems: hdfs://, webhdfs:// and swebhdfs://.");
}
checkFilesystemSupport(sourceDir,targetDir,srcFs, tgtFs);
// make sure targetFS has no change between from and the current states
if (!checkNoChange(tgtFs, targetDir)) {
@ -165,6 +153,42 @@ private boolean preSyncCheck() throws IOException {
return true;
}
/**
* Check if the source and target filesystems support snapshots.
*/
private void checkFilesystemSupport(Path sourceDir, Path targetDir,
FileSystem srcFs, FileSystem tgtFs) throws IOException {
if (!srcFs.hasPathCapability(sourceDir,
CommonPathCapabilities.FS_SNAPSHOTS)) {
throw new UnsupportedOperationException(
"The source file system " + srcFs.getScheme()
+ " does not support snapshot.");
}
if (!tgtFs.hasPathCapability(targetDir,
CommonPathCapabilities.FS_SNAPSHOTS)) {
throw new UnsupportedOperationException(
"The target file system " + tgtFs.getScheme()
+ " does not support snapshot.");
}
try {
getSnapshotDiffReportMethod(srcFs);
} catch (NoSuchMethodException e) {
throw new UnsupportedOperationException(
"The source file system " + srcFs.getScheme()
+ " does not support getSnapshotDiffReport",
e);
}
try {
getSnapshotDiffReportMethod(tgtFs);
} catch (NoSuchMethodException e) {
throw new UnsupportedOperationException(
"The target file system " + tgtFs.getScheme()
+ " does not support getSnapshotDiffReport",
e);
}
}
public boolean sync() throws IOException {
if (!preSyncCheck()) {
return false;
@ -211,21 +235,10 @@ private boolean getAllDiffs() throws IOException {
context.getTargetPath() : context.getSourcePaths().get(0);
try {
SnapshotDiffReport report = null;
FileSystem fs = ssDir.getFileSystem(conf);
final String from = getSnapshotName(context.getFromSnapshot());
final String to = getSnapshotName(context.getToSnapshot());
if (fs instanceof DistributedFileSystem) {
DistributedFileSystem dfs = (DistributedFileSystem)fs;
report = dfs.getSnapshotDiffReport(ssDir, from, to);
} else if (fs instanceof WebHdfsFileSystem) {
WebHdfsFileSystem webHdfs = (WebHdfsFileSystem)fs;
report = webHdfs.getSnapshotDiffReport(ssDir, from, to);
} else {
throw new IllegalArgumentException("Unsupported file system: " +
fs.getScheme() + "://. " +
"Supported file systems: hdfs://, webhdfs:// and swebhdfs://.");
}
SnapshotDiffReport report =
getSnapshotDiffReport(ssDir.getFileSystem(conf), ssDir, from, to);
this.diffMap = new EnumMap<>(SnapshotDiffReport.DiffType.class);
for (SnapshotDiffReport.DiffType type :
@ -286,6 +299,36 @@ private boolean getAllDiffs() throws IOException {
return false;
}
/**
* Check if the filesystem implementation has a method named
* getSnapshotDiffReport.
*/
private static Method getSnapshotDiffReportMethod(FileSystem fs)
throws NoSuchMethodException {
return fs.getClass().getMethod(
"getSnapshotDiffReport", Path.class, String.class, String.class);
}
/**
* Get the snapshotDiff b/w the fromSnapshot & toSnapshot for the given
* filesystem.
*/
private static SnapshotDiffReport getSnapshotDiffReport(
final FileSystem fs,
final Path snapshotDir,
final String fromSnapshot,
final String toSnapshot) throws IOException {
try {
return (SnapshotDiffReport) getSnapshotDiffReportMethod(fs).invoke(
fs, snapshotDir, fromSnapshot, toSnapshot);
} catch (InvocationTargetException e) {
throw new IOException(e.getCause());
} catch (NoSuchMethodException|IllegalAccessException e) {
throw new IllegalArgumentException(
"Failed to invoke getSnapshotDiffReport.", e);
}
}
private String getSnapshotName(String name) {
return Path.CUR_DIR.equals(name) ? "" : name;
}
@ -327,14 +370,7 @@ private void deleteTargetTmpDir(FileSystem targetFs,
private boolean checkNoChange(FileSystem fs, Path path) {
try {
final String from = getSnapshotName(context.getFromSnapshot());
SnapshotDiffReport targetDiff = null;
if (fs instanceof DistributedFileSystem) {
DistributedFileSystem dfs = (DistributedFileSystem)fs;
targetDiff = dfs.getSnapshotDiffReport(path, from, "");
} else {
WebHdfsFileSystem webHdfs = (WebHdfsFileSystem)fs;
targetDiff = webHdfs.getSnapshotDiffReport(path, from, "");
}
SnapshotDiffReport targetDiff = getSnapshotDiffReport(fs, path, from, "");
if (!targetDiff.getDiffList().isEmpty()) {
DistCp.LOG.warn("The target has been modified since snapshot "
+ context.getFromSnapshot());

View File

@ -23,6 +23,8 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.fs.CommonPathCapabilities;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
@ -38,6 +40,7 @@
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.tools.mapred.CopyMapper;
import org.junit.After;
import org.junit.Assert;
@ -47,6 +50,7 @@
import java.io.IOException;
import java.io.FileWriter;
import java.io.BufferedWriter;
import java.net.URI;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.ArrayList;
@ -56,6 +60,9 @@
import java.util.Map;
import java.util.regex.Pattern;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
public class TestDistCpSync {
private MiniDFSCluster cluster;
private final Configuration conf = new HdfsConfiguration();
@ -89,6 +96,7 @@ public void setUp() throws Exception {
conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, target.toString());
conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, target.toString());
conf.setClass("fs.dummy.impl", DummyFs.class, FileSystem.class);
}
@After
@ -1276,4 +1284,63 @@ private void snapshotDiffWithPaths(Path sourceFSPath,
verifyCopyByFs(sourceFS, targetFS, sourceFS.getFileStatus(sourceFSPath),
targetFS.getFileStatus(targetFSPath), false);
}
@Test
public void testSyncSnapshotDiffWithLocalFileSystem() throws Exception {
String[] args = new String[]{"-update", "-diff", "s1", "s2",
"file:///source", "file:///target"};
LambdaTestUtils.intercept(
UnsupportedOperationException.class,
"The source file system file does not support snapshot",
() -> new DistCp(conf, OptionsParser.parse(args)).execute());
}
@Test
public void testSyncSnapshotDiffWithDummyFileSystem() {
String[] args =
new String[] { "-update", "-diff", "s1", "s2", "dummy:///source",
"dummy:///target" };
try {
FileSystem dummyFs = FileSystem.get(URI.create("dummy:///"), conf);
assertThat(dummyFs).isInstanceOf(DummyFs.class);
new DistCp(conf, OptionsParser.parse(args)).execute();
} catch (UnsupportedOperationException e) {
throw e;
} catch (Exception e) {
// can expect other exceptions as source and target paths
// are not created.
}
}
public static class DummyFs extends RawLocalFileSystem {
public DummyFs() {
super();
}
public URI getUri() {
return URI.create("dummy:///");
}
@Override
public boolean hasPathCapability(Path path, String capability)
throws IOException {
switch (validatePathCapabilityArgs(makeQualified(path), capability)) {
case CommonPathCapabilities.FS_SNAPSHOTS:
return true;
default:
return super.hasPathCapability(path, capability);
}
}
@Override
public FileStatus getFileStatus(Path f) throws IOException {
return new FileStatus();
}
public SnapshotDiffReport getSnapshotDiffReport(final Path snapshotDir,
final String fromSnapshot, final String toSnapshot) {
return new SnapshotDiffReport(snapshotDir.getName(), fromSnapshot,
toSnapshot, new ArrayList<SnapshotDiffReport.DiffReportEntry>());
}
}
}