From ed70fa142cabdbc1065e4dbbc95e99c8850c4751 Mon Sep 17 00:00:00 2001 From: Jing Zhao Date: Wed, 4 Mar 2015 10:30:53 -0800 Subject: [PATCH] HDFS-7535. Utilize Snapshot diff report for distcp. Contributed by Jing Zhao. --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 + .../org/apache/hadoop/tools/CopyListing.java | 4 +- .../org/apache/hadoop/tools/DiffInfo.java | 90 +++++ .../java/org/apache/hadoop/tools/DistCp.java | 16 +- .../apache/hadoop/tools/DistCpConstants.java | 3 + .../hadoop/tools/DistCpOptionSwitch.java | 12 +- .../apache/hadoop/tools/DistCpOptions.java | 34 ++ .../org/apache/hadoop/tools/DistCpSync.java | 192 ++++++++++ .../apache/hadoop/tools/OptionsParser.java | 24 +- .../hadoop/tools/mapred/CopyCommitter.java | 3 +- .../apache/hadoop/tools/TestDistCpSync.java | 349 ++++++++++++++++++ .../hadoop/tools/TestOptionsParser.java | 75 +++- 12 files changed, 790 insertions(+), 14 deletions(-) create mode 100644 hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DiffInfo.java create mode 100644 hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpSync.java create mode 100644 hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSync.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 62006d3520..3c6d447188 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -704,6 +704,8 @@ Release 2.7.0 - UNRELEASED HDFS-7789. DFSck should resolve the path to support cross-FS symlinks. (gera) + HDFS-7535. Utilize Snapshot diff report for distcp. (jing9) + OPTIMIZATIONS HDFS-7454. Reduce memory footprint for AclEntries in NameNode. diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java index a7b68a98a2..e3c58e9a22 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java @@ -224,7 +224,9 @@ public static CopyListing getCopyListing(Configuration configuration, Credentials credentials, DistCpOptions options) throws IOException { - + if (options.shouldUseDiff()) { + return new GlobbedCopyListing(configuration, credentials); + } String copyListingClassName = configuration.get(DistCpConstants. CONF_LABEL_COPY_LISTING_CLASS, ""); Class copyListingClass; diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DiffInfo.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DiffInfo.java new file mode 100644 index 0000000000..b617de7880 --- /dev/null +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DiffInfo.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.tools; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; + +/** + * Information presenting a rename/delete op derived from a snapshot diff entry. + * This includes the source file/dir of the rename/delete op, and the target + * file/dir of a rename op. + */ +class DiffInfo { + static final Comparator sourceComparator = new Comparator() { + @Override + public int compare(DiffInfo d1, DiffInfo d2) { + return d2.source.compareTo(d1.source); + } + }; + + static final Comparator targetComparator = new Comparator() { + @Override + public int compare(DiffInfo d1, DiffInfo d2) { + return d1.target == null ? -1 : + (d2.target == null ? 1 : d1.target.compareTo(d2.target)); + } + }; + + /** The source file/dir of the rename or deletion op */ + final Path source; + /** + * The intermediate file/dir for the op. For a rename or a delete op, + * we first rename the source to this tmp file/dir. + */ + private Path tmp; + /** The target file/dir of the rename op. Null means the op is deletion. */ + final Path target; + + DiffInfo(Path source, Path target) { + assert source != null; + this.source = source; + this.target= target; + } + + void setTmp(Path tmp) { + this.tmp = tmp; + } + + Path getTmp() { + return tmp; + } + + static DiffInfo[] getDiffs(SnapshotDiffReport report, Path targetDir) { + List diffs = new ArrayList<>(); + for (SnapshotDiffReport.DiffReportEntry entry : report.getDiffList()) { + if (entry.getType() == SnapshotDiffReport.DiffType.DELETE) { + final Path source = new Path(targetDir, + DFSUtil.bytes2String(entry.getSourcePath())); + diffs.add(new DiffInfo(source, null)); + } else if (entry.getType() == SnapshotDiffReport.DiffType.RENAME) { + final Path source = new Path(targetDir, + DFSUtil.bytes2String(entry.getSourcePath())); + final Path target = new Path(targetDir, + DFSUtil.bytes2String(entry.getTargetPath())); + diffs.add(new DiffInfo(source, target)); + } + } + return diffs.toArray(new DiffInfo[diffs.size()]); + } +} diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java index b80aeb8733..ada4b25f81 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java @@ -18,6 +18,9 @@ package org.apache.hadoop.tools; +import java.io.IOException; +import java.util.Random; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -27,10 +30,10 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Cluster; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobSubmissionFiles; -import org.apache.hadoop.mapreduce.Cluster; import org.apache.hadoop.tools.CopyListing.*; import org.apache.hadoop.tools.mapred.CopyMapper; import org.apache.hadoop.tools.mapred.CopyOutputFormat; @@ -39,9 +42,6 @@ import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; -import java.io.IOException; -import java.util.Random; - import com.google.common.annotations.VisibleForTesting; /** @@ -62,7 +62,7 @@ public class DistCp extends Configured implements Tool { */ static final int SHUTDOWN_HOOK_PRIORITY = 30; - private static final Log LOG = LogFactory.getLog(DistCp.class); + static final Log LOG = LogFactory.getLog(DistCp.class); private DistCpOptions inputOptions; private Path metaFolder; @@ -171,9 +171,13 @@ public Job createAndSubmitJob() throws Exception { //Don't cleanup while we are setting up. metaFolder = createMetaFolderPath(); jobFS = metaFolder.getFileSystem(getConf()); - job = createJob(); } + if (inputOptions.shouldUseDiff()) { + if (!DistCpSync.sync(inputOptions, getConf())) { + inputOptions.disableUsingDiff(); + } + } createInputFileListing(job); job.submit(); diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java index 7e71096952..a1af2af767 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java @@ -53,6 +53,7 @@ public class DistCpConstants { public static final String CONF_LABEL_SKIP_CRC = "distcp.skip.crc"; public static final String CONF_LABEL_OVERWRITE = "distcp.copy.overwrite"; public static final String CONF_LABEL_APPEND = "distcp.copy.append"; + public static final String CONF_LABEL_DIFF = "distcp.copy.diff"; public static final String CONF_LABEL_BANDWIDTH_MB = "distcp.map.bandwidth.mb"; public static final String CONF_LABEL_MAX_CHUNKS_TOLERABLE = @@ -134,4 +135,6 @@ public class DistCpConstants { * Value of reserved raw HDFS directory when copying raw.* xattrs. */ static final String HDFS_RESERVED_RAW_DIRECTORY_NAME = "/.reserved/raw"; + + static final String HDFS_DISTCP_DIFF_DIRECTORY_NAME = ".distcp.diff.tmp"; } diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java index 159d5ca16c..e9c7d46152 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java @@ -41,8 +41,6 @@ public enum DistCpOptionSwitch { * target file. Note that when preserving checksum type, block size is also * preserved. * - * @see PRESERVE_STATUS_DEFAULT - * * If any of the optional switches are present among rbugpcaxt, then * only the corresponding file attribute is preserved. */ @@ -149,6 +147,11 @@ public enum DistCpOptionSwitch { new Option("append", false, "Reuse existing data in target files and append new data to them if possible")), + DIFF(DistCpConstants.CONF_LABEL_DIFF, + new Option("diff", false, + "Use snapshot diff report to identify the difference between source and target"), + 2), + /** * Should DisctpExecution be blocking */ @@ -178,6 +181,11 @@ public enum DistCpOptionSwitch { this.option = option; } + DistCpOptionSwitch(String confLabel, Option option, int argNum) { + this(confLabel, option); + this.option.setArgs(argNum); + } + /** * Get Configuration label for the option * @return configuration label name diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java index 57d2fb7eb1..709e5832ba 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java @@ -42,6 +42,7 @@ public class DistCpOptions { private boolean append = false; private boolean skipCRC = false; private boolean blocking = true; + private boolean useDiff = false; private int maxMaps = DistCpConstants.DEFAULT_MAPS; private int mapBandwidth = DistCpConstants.DEFAULT_BANDWIDTH_MB; @@ -61,6 +62,9 @@ public class DistCpOptions { private Path sourceFileListing; private List sourcePaths; + private String fromSnapshot; + private String toSnapshot; + private Path targetPath; // targetPathExist is a derived field, it's initialized in the @@ -264,6 +268,29 @@ public void setAppend(boolean append) { this.append = append; } + public boolean shouldUseDiff() { + return this.useDiff; + } + + public String getFromSnapshot() { + return this.fromSnapshot; + } + + public String getToSnapshot() { + return this.toSnapshot; + } + + public void setUseDiff(boolean useDiff, String fromSnapshot, String toSnapshot) { + validate(DistCpOptionSwitch.DIFF, useDiff); + this.useDiff = useDiff; + this.fromSnapshot = fromSnapshot; + this.toSnapshot = toSnapshot; + } + + public void disableUsingDiff() { + this.useDiff = false; + } + /** * Should CRC/checksum check be skipped while checking files are identical * @@ -508,6 +535,7 @@ public void validate(DistCpOptionSwitch option, boolean value) { boolean skipCRC = (option == DistCpOptionSwitch.SKIP_CRC ? value : this.skipCRC); boolean append = (option == DistCpOptionSwitch.APPEND ? value : this.append); + boolean useDiff = (option == DistCpOptionSwitch.DIFF ? value : this.useDiff); if (syncFolder && atomicCommit) { throw new IllegalArgumentException("Atomic commit can't be used with " + @@ -536,6 +564,10 @@ public void validate(DistCpOptionSwitch option, boolean value) { throw new IllegalArgumentException( "Append is disallowed when skipping CRC"); } + if ((!syncFolder || !deleteMissing) && useDiff) { + throw new IllegalArgumentException( + "Diff is valid only with update and delete options"); + } } /** @@ -556,6 +588,8 @@ public void appendToConf(Configuration conf) { String.valueOf(overwrite)); DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.APPEND, String.valueOf(append)); + DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.DIFF, + String.valueOf(useDiff)); DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.SKIP_CRC, String.valueOf(skipCRC)); DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.BANDWIDTH, diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpSync.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpSync.java new file mode 100644 index 0000000000..26d7eb416d --- /dev/null +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpSync.java @@ -0,0 +1,192 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.tools; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Random; + +/** + * This class provides the basic functionality to sync two FileSystems based on + * the snapshot diff report. More specifically, we have the following settings: + * 1. Both the source and target FileSystem must be DistributedFileSystem + * 2. Two snapshots (e.g., s1 and s2) have been created on the source FS. + * The diff between these two snapshots will be copied to the target FS. + * 3. The target has the same snapshot s1. No changes have been made on the + * target since s1. All the files/directories in the target are the same with + * source.s1 + */ +class DistCpSync { + + static boolean sync(DistCpOptions inputOptions, Configuration conf) + throws IOException { + List sourcePaths = inputOptions.getSourcePaths(); + if (sourcePaths.size() != 1) { + // we only support one source dir which must be a snapshottable directory + DistCp.LOG.warn(sourcePaths.size() + " source paths are provided"); + return false; + } + final Path sourceDir = sourcePaths.get(0); + final Path targetDir = inputOptions.getTargetPath(); + + final FileSystem sfs = sourceDir.getFileSystem(conf); + final FileSystem tfs = targetDir.getFileSystem(conf); + // currently we require both the source and the target file system are + // DistributedFileSystem. + if (!(sfs instanceof DistributedFileSystem) || + !(tfs instanceof DistributedFileSystem)) { + DistCp.LOG.warn("To use diff-based distcp, the FileSystems needs to" + + " be DistributedFileSystem"); + return false; + } + final DistributedFileSystem sourceFs = (DistributedFileSystem) sfs; + final DistributedFileSystem targetFs= (DistributedFileSystem) tfs; + + // make sure targetFS has no change between from and the current states + if (!checkNoChange(inputOptions, targetFs, targetDir)) { + return false; + } + + Path tmpDir = null; + try { + tmpDir = createTargetTmpDir(targetFs, targetDir); + DiffInfo[] diffs = getDiffs(inputOptions, sourceFs, sourceDir, targetDir); + if (diffs == null) { + return false; + } + // do the real sync work: deletion and rename + syncDiff(diffs, targetFs, tmpDir); + return true; + } catch (Exception e) { + DistCp.LOG.warn("Failed to use snapshot diff for distcp", e); + return false; + } finally { + deleteTargetTmpDir(targetFs, tmpDir); + // TODO: since we have tmp directory, we can support "undo" with failures + } + } + + private static Path createTargetTmpDir(DistributedFileSystem targetFs, + Path targetDir) throws IOException { + final Path tmp = new Path(targetDir, + DistCpConstants.HDFS_DISTCP_DIFF_DIRECTORY_NAME + DistCp.rand.nextInt()); + if (!targetFs.mkdirs(tmp)) { + throw new IOException("The tmp directory " + tmp + " already exists"); + } + return tmp; + } + + private static void deleteTargetTmpDir(DistributedFileSystem targetFs, + Path tmpDir) { + try { + if (tmpDir != null) { + targetFs.delete(tmpDir, true); + } + } catch (IOException e) { + DistCp.LOG.error("Unable to cleanup tmp dir: " + tmpDir, e); + } + } + + /** + * Compute the snapshot diff on the given file system. Return true if the diff + * is empty, i.e., no changes have happened in the FS. + */ + private static boolean checkNoChange(DistCpOptions inputOptions, + DistributedFileSystem fs, Path path) { + try { + SnapshotDiffReport targetDiff = + fs.getSnapshotDiffReport(path, inputOptions.getFromSnapshot(), ""); + if (!targetDiff.getDiffList().isEmpty()) { + DistCp.LOG.warn("The target has been modified since snapshot " + + inputOptions.getFromSnapshot()); + return false; + } else { + return true; + } + } catch (IOException e) { + DistCp.LOG.warn("Failed to compute snapshot diff on " + path, e); + } + return false; + } + + @VisibleForTesting + static DiffInfo[] getDiffs(DistCpOptions inputOptions, + DistributedFileSystem fs, Path sourceDir, Path targetDir) { + try { + SnapshotDiffReport sourceDiff = fs.getSnapshotDiffReport(sourceDir, + inputOptions.getFromSnapshot(), inputOptions.getToSnapshot()); + return DiffInfo.getDiffs(sourceDiff, targetDir); + } catch (IOException e) { + DistCp.LOG.warn("Failed to compute snapshot diff on " + sourceDir, e); + } + return null; + } + + private static void syncDiff(DiffInfo[] diffs, + DistributedFileSystem targetFs, Path tmpDir) throws IOException { + moveToTmpDir(diffs, targetFs, tmpDir); + moveToTarget(diffs, targetFs); + } + + /** + * Move all the source files that should be renamed or deleted to the tmp + * directory. + */ + private static void moveToTmpDir(DiffInfo[] diffs, + DistributedFileSystem targetFs, Path tmpDir) throws IOException { + // sort the diffs based on their source paths to make sure the files and + // subdirs are moved before moving their parents/ancestors. + Arrays.sort(diffs, DiffInfo.sourceComparator); + Random random = new Random(); + for (DiffInfo diff : diffs) { + Path tmpTarget = new Path(tmpDir, diff.source.getName()); + while (targetFs.exists(tmpTarget)) { + tmpTarget = new Path(tmpDir, diff.source.getName() + random.nextInt()); + } + diff.setTmp(tmpTarget); + targetFs.rename(diff.source, tmpTarget); + } + } + + /** + * Finish the rename operations: move all the intermediate files/directories + * from the tmp dir to the final targets. + */ + private static void moveToTarget(DiffInfo[] diffs, + DistributedFileSystem targetFs) throws IOException { + // sort the diffs based on their target paths to make sure the parent + // directories are created first. + Arrays.sort(diffs, DiffInfo.targetComparator); + for (DiffInfo diff : diffs) { + if (diff.target != null) { + if (!targetFs.exists(diff.target.getParent())) { + targetFs.mkdirs(diff.target.getParent()); + } + targetFs.rename(diff.getTmp(), diff.target); + } + } + } +} diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java index 525136cbb2..a3a76ef585 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java @@ -18,13 +18,22 @@ package org.apache.hadoop.tools; -import org.apache.commons.cli.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; import org.apache.hadoop.tools.DistCpOptions.FileAttribute; -import java.util.*; +import com.google.common.base.Preconditions; /** * The OptionsParser parses out the command-line options passed to DistCp, @@ -207,6 +216,13 @@ public static DistCpOptions parse(String args[]) throws IllegalArgumentException } } + if (command.hasOption(DistCpOptionSwitch.DIFF.getSwitch())) { + String[] snapshots = getVals(command, DistCpOptionSwitch.DIFF.getSwitch()); + Preconditions.checkArgument(snapshots != null && snapshots.length == 2, + "Must provide both the starting and ending snapshot names"); + option.setUseDiff(true, snapshots[0], snapshots[1]); + } + if (command.hasOption(DistCpOptionSwitch.FILE_LIMIT.getSwitch())) { String fileLimitString = getVal(command, DistCpOptionSwitch.FILE_LIMIT.getSwitch().trim()); @@ -247,6 +263,10 @@ private static String getVal(CommandLine command, String swtch) { } } + private static String[] getVals(CommandLine command, String option) { + return command.getOptionValues(option); + } + public static void usage() { HelpFormatter formatter = new HelpFormatter(); formatter.printHelp("distcp OPTIONS [source_path...] \n\nOPTIONS", cliOptions); diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java index d5fdd7fe80..f36ef77975 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java @@ -90,7 +90,8 @@ public void commitJob(JobContext jobContext) throws IOException { } try { - if (conf.getBoolean(DistCpConstants.CONF_LABEL_DELETE_MISSING, false)) { + if (conf.getBoolean(DistCpConstants.CONF_LABEL_DELETE_MISSING, false) + && !(conf.getBoolean(DistCpConstants.CONF_LABEL_DIFF, false))) { deleteMissing(conf); } else if (conf.getBoolean(DistCpConstants.CONF_LABEL_ATOMIC_COPY, false)) { commitData(conf); diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSync.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSync.java new file mode 100644 index 0000000000..7d5dad06c8 --- /dev/null +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSync.java @@ -0,0 +1,349 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.tools; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.tools.mapred.CopyMapper; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +public class TestDistCpSync { + private MiniDFSCluster cluster; + private final Configuration conf = new HdfsConfiguration(); + private DistributedFileSystem dfs; + private DistCpOptions options; + private final Path source = new Path("/source"); + private final Path target = new Path("/target"); + private final long BLOCK_SIZE = 1024; + private final short DATA_NUM = 1; + + @Before + public void setUp() throws Exception { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(DATA_NUM).build(); + cluster.waitActive(); + + dfs = cluster.getFileSystem(); + dfs.mkdirs(source); + dfs.mkdirs(target); + + options = new DistCpOptions(Arrays.asList(source), target); + options.setSyncFolder(true); + options.setDeleteMissing(true); + options.setUseDiff(true, "s1", "s2"); + options.appendToConf(conf); + + conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, target.toString()); + conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, target.toString()); + } + + @After + public void tearDown() throws Exception { + IOUtils.cleanup(null, dfs); + if (cluster != null) { + cluster.shutdown(); + } + } + + /** + * Test the sync returns false in the following scenarios: + * 1. the source/target dir are not snapshottable dir + * 2. the source/target does not have the given snapshots + * 3. changes have been made in target + */ + @Test + public void testFallback() throws Exception { + // the source/target dir are not snapshottable dir + Assert.assertFalse(DistCpSync.sync(options, conf)); + + // the source/target does not have the given snapshots + dfs.allowSnapshot(source); + dfs.allowSnapshot(target); + Assert.assertFalse(DistCpSync.sync(options, conf)); + + dfs.createSnapshot(source, "s1"); + dfs.createSnapshot(source, "s2"); + dfs.createSnapshot(target, "s1"); + Assert.assertTrue(DistCpSync.sync(options, conf)); + + // changes have been made in target + final Path subTarget = new Path(target, "sub"); + dfs.mkdirs(subTarget); + Assert.assertFalse(DistCpSync.sync(options, conf)); + + dfs.delete(subTarget, true); + Assert.assertTrue(DistCpSync.sync(options, conf)); + } + + /** + * create some files and directories under the given directory. + * the final subtree looks like this: + * dir/ + * foo/ bar/ + * d1/ f1 d2/ f2 + * f3 f4 + */ + private void initData(Path dir) throws Exception { + final Path foo = new Path(dir, "foo"); + final Path bar = new Path(dir, "bar"); + final Path d1 = new Path(foo, "d1"); + final Path f1 = new Path(foo, "f1"); + final Path d2 = new Path(bar, "d2"); + final Path f2 = new Path(bar, "f2"); + final Path f3 = new Path(d1, "f3"); + final Path f4 = new Path(d2, "f4"); + + DFSTestUtil.createFile(dfs, f1, BLOCK_SIZE, DATA_NUM, 0); + DFSTestUtil.createFile(dfs, f2, BLOCK_SIZE, DATA_NUM, 0); + DFSTestUtil.createFile(dfs, f3, BLOCK_SIZE, DATA_NUM, 0); + DFSTestUtil.createFile(dfs, f4, BLOCK_SIZE, DATA_NUM, 0); + } + + /** + * make some changes under the given directory (created in the above way). + * 1. rename dir/foo/d1 to dir/bar/d1 + * 2. delete dir/bar/d1/f3 + * 3. rename dir/foo to /dir/bar/d1/foo + * 4. delete dir/bar/d1/foo/f1 + * 5. create file dir/bar/d1/foo/f1 whose size is 2*BLOCK_SIZE + * 6. append one BLOCK to file dir/bar/f2 + * 7. rename dir/bar to dir/foo + * + * Thus after all these ops the subtree looks like this: + * dir/ + * foo/ + * d1/ f2(A) d2/ + * foo/ f4 + * f1(new) + */ + private void changeData(Path dir) throws Exception { + final Path foo = new Path(dir, "foo"); + final Path bar = new Path(dir, "bar"); + final Path d1 = new Path(foo, "d1"); + final Path f2 = new Path(bar, "f2"); + + final Path bar_d1 = new Path(bar, "d1"); + dfs.rename(d1, bar_d1); + final Path f3 = new Path(bar_d1, "f3"); + dfs.delete(f3, true); + final Path newfoo = new Path(bar_d1, "foo"); + dfs.rename(foo, newfoo); + final Path f1 = new Path(newfoo, "f1"); + dfs.delete(f1, true); + DFSTestUtil.createFile(dfs, f1, 2 * BLOCK_SIZE, DATA_NUM, 0); + DFSTestUtil.appendFile(dfs, f2, (int) BLOCK_SIZE); + dfs.rename(bar, new Path(dir, "foo")); + } + + /** + * Test the basic functionality. + */ + @Test + public void testSync() throws Exception { + initData(source); + initData(target); + dfs.allowSnapshot(source); + dfs.allowSnapshot(target); + dfs.createSnapshot(source, "s1"); + dfs.createSnapshot(target, "s1"); + + // make changes under source + changeData(source); + dfs.createSnapshot(source, "s2"); + + // do the sync + Assert.assertTrue(DistCpSync.sync(options, conf)); + + // build copy listing + final Path listingPath = new Path("/tmp/META/fileList.seq"); + CopyListing listing = new GlobbedCopyListing(conf, new Credentials()); + listing.buildListing(listingPath, options); + + Map copyListing = getListing(listingPath); + CopyMapper copyMapper = new CopyMapper(); + StubContext stubContext = new StubContext(conf, null, 0); + Mapper.Context context = + stubContext.getContext(); + // Enable append + context.getConfiguration().setBoolean( + DistCpOptionSwitch.APPEND.getConfigLabel(), true); + copyMapper.setup(context); + for (Map.Entry entry : copyListing.entrySet()) { + copyMapper.map(entry.getKey(), entry.getValue(), context); + } + + // verify that we only copied new appended data of f2 and the new file f1 + Assert.assertEquals(BLOCK_SIZE * 3, stubContext.getReporter() + .getCounter(CopyMapper.Counter.BYTESCOPIED).getValue()); + + // verify the source and target now has the same structure + verifyCopy(dfs.getFileStatus(source), dfs.getFileStatus(target), false); + } + + private Map getListing(Path listingPath) + throws Exception { + SequenceFile.Reader reader = new SequenceFile.Reader(conf, + SequenceFile.Reader.file(listingPath)); + Text key = new Text(); + CopyListingFileStatus value = new CopyListingFileStatus(); + Map values = new HashMap<>(); + while (reader.next(key, value)) { + values.put(key, value); + key = new Text(); + value = new CopyListingFileStatus(); + } + return values; + } + + private void verifyCopy(FileStatus s, FileStatus t, boolean compareName) + throws Exception { + Assert.assertEquals(s.isDirectory(), t.isDirectory()); + if (compareName) { + Assert.assertEquals(s.getPath().getName(), t.getPath().getName()); + } + if (!s.isDirectory()) { + // verify the file content is the same + byte[] sbytes = DFSTestUtil.readFileBuffer(dfs, s.getPath()); + byte[] tbytes = DFSTestUtil.readFileBuffer(dfs, t.getPath()); + Assert.assertArrayEquals(sbytes, tbytes); + } else { + FileStatus[] slist = dfs.listStatus(s.getPath()); + FileStatus[] tlist = dfs.listStatus(t.getPath()); + Assert.assertEquals(slist.length, tlist.length); + for (int i = 0; i < slist.length; i++) { + verifyCopy(slist[i], tlist[i], true); + } + } + } + + private void initData2(Path dir) throws Exception { + final Path test = new Path(dir, "test"); + final Path foo = new Path(dir, "foo"); + final Path bar = new Path(dir, "bar"); + final Path f1 = new Path(test, "f1"); + final Path f2 = new Path(foo, "f2"); + final Path f3 = new Path(bar, "f3"); + + DFSTestUtil.createFile(dfs, f1, BLOCK_SIZE, DATA_NUM, 0L); + DFSTestUtil.createFile(dfs, f2, BLOCK_SIZE, DATA_NUM, 1L); + DFSTestUtil.createFile(dfs, f3, BLOCK_SIZE, DATA_NUM, 2L); + } + + private void changeData2(Path dir) throws Exception { + final Path tmpFoo = new Path(dir, "tmpFoo"); + final Path test = new Path(dir, "test"); + final Path foo = new Path(dir, "foo"); + final Path bar = new Path(dir, "bar"); + + dfs.rename(test, tmpFoo); + dfs.rename(foo, test); + dfs.rename(bar, foo); + dfs.rename(tmpFoo, bar); + } + + @Test + public void testSync2() throws Exception { + initData2(source); + initData2(target); + dfs.allowSnapshot(source); + dfs.allowSnapshot(target); + dfs.createSnapshot(source, "s1"); + dfs.createSnapshot(target, "s1"); + + // make changes under source + changeData2(source); + dfs.createSnapshot(source, "s2"); + + SnapshotDiffReport report = dfs.getSnapshotDiffReport(source, "s1", "s2"); + System.out.println(report); + + // do the sync + Assert.assertTrue(DistCpSync.sync(options, conf)); + verifyCopy(dfs.getFileStatus(source), dfs.getFileStatus(target), false); + } + + private void initData3(Path dir) throws Exception { + final Path test = new Path(dir, "test"); + final Path foo = new Path(dir, "foo"); + final Path bar = new Path(dir, "bar"); + final Path f1 = new Path(test, "file"); + final Path f2 = new Path(foo, "file"); + final Path f3 = new Path(bar, "file"); + + DFSTestUtil.createFile(dfs, f1, BLOCK_SIZE, DATA_NUM, 0L); + DFSTestUtil.createFile(dfs, f2, BLOCK_SIZE * 2, DATA_NUM, 1L); + DFSTestUtil.createFile(dfs, f3, BLOCK_SIZE * 3, DATA_NUM, 2L); + } + + private void changeData3(Path dir) throws Exception { + final Path test = new Path(dir, "test"); + final Path foo = new Path(dir, "foo"); + final Path bar = new Path(dir, "bar"); + final Path f1 = new Path(test, "file"); + final Path f2 = new Path(foo, "file"); + final Path f3 = new Path(bar, "file"); + final Path newf1 = new Path(test, "newfile"); + final Path newf2 = new Path(foo, "newfile"); + final Path newf3 = new Path(bar, "newfile"); + + dfs.rename(f1, newf1); + dfs.rename(f2, newf2); + dfs.rename(f3, newf3); + } + + /** + * Test a case where there are multiple source files with the same name + */ + @Test + public void testSync3() throws Exception { + initData3(source); + initData3(target); + dfs.allowSnapshot(source); + dfs.allowSnapshot(target); + dfs.createSnapshot(source, "s1"); + dfs.createSnapshot(target, "s1"); + + // make changes under source + changeData3(source); + dfs.createSnapshot(source, "s2"); + + SnapshotDiffReport report = dfs.getSnapshotDiffReport(source, "s1", "s2"); + System.out.println(report); + + // do the sync + Assert.assertTrue(DistCpSync.sync(options, conf)); + verifyCopy(dfs.getFileStatus(source), dfs.getFileStatus(target), false); + } +} diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java index 30fb25b07c..cc9da3351d 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java @@ -584,7 +584,7 @@ public void testAppendOption() { // make sure -append is only valid when -update is specified try { - options = OptionsParser.parse(new String[] { "-append", + OptionsParser.parse(new String[] { "-append", "hdfs://localhost:8020/source/first", "hdfs://localhost:8020/target/" }); fail("Append should fail if update option is not specified"); @@ -595,7 +595,7 @@ public void testAppendOption() { // make sure -append is invalid when skipCrc is specified try { - options = OptionsParser.parse(new String[] { + OptionsParser.parse(new String[] { "-append", "-update", "-skipcrccheck", "hdfs://localhost:8020/source/first", "hdfs://localhost:8020/target/" }); @@ -605,4 +605,75 @@ public void testAppendOption() { "Append is disallowed when skipping CRC", e); } } + + @Test + public void testDiffOption() { + Configuration conf = new Configuration(); + Assert.assertFalse(conf.getBoolean(DistCpOptionSwitch.DIFF.getConfigLabel(), + false)); + + DistCpOptions options = OptionsParser.parse(new String[] { "-update", + "-delete", "-diff", "s1", "s2", + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/" }); + options.appendToConf(conf); + Assert.assertTrue(conf.getBoolean(DistCpOptionSwitch.DIFF.getConfigLabel(), false)); + Assert.assertTrue(options.shouldUseDiff()); + Assert.assertEquals("s1", options.getFromSnapshot()); + Assert.assertEquals("s2", options.getToSnapshot()); + + options = OptionsParser.parse(new String[] { + "-delete", "-diff", "s1", ".", "-update", + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/" }); + options.appendToConf(conf); + Assert.assertTrue(conf.getBoolean(DistCpOptionSwitch.DIFF.getConfigLabel(), + false)); + Assert.assertTrue(options.shouldUseDiff()); + Assert.assertEquals("s1", options.getFromSnapshot()); + Assert.assertEquals(".", options.getToSnapshot()); + + // -diff requires two option values + try { + OptionsParser.parse(new String[] {"-diff", "s1", "-delete", "-update", + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/" }); + fail("-diff should fail with only one snapshot name"); + } catch (IllegalArgumentException e) { + GenericTestUtils.assertExceptionContains( + "Must provide both the starting and ending snapshot names", e); + } + + // make sure -diff is only valid when -update and -delete is specified + try { + OptionsParser.parse(new String[] { "-diff", "s1", "s2", + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/" }); + fail("-diff should fail if -update or -delete option is not specified"); + } catch (IllegalArgumentException e) { + GenericTestUtils.assertExceptionContains( + "Diff is valid only with update and delete options", e); + } + + try { + OptionsParser.parse(new String[] { "-diff", "s1", "s2", "-update", + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/" }); + fail("-diff should fail if -update or -delete option is not specified"); + } catch (IllegalArgumentException e) { + GenericTestUtils.assertExceptionContains( + "Diff is valid only with update and delete options", e); + } + + try { + OptionsParser.parse(new String[] { "-diff", "s1", "s2", + "-delete", "-overwrite", + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/" }); + fail("-diff should fail if -update or -delete option is not specified"); + } catch (IllegalArgumentException e) { + GenericTestUtils.assertExceptionContains( + "Diff is valid only with update and delete options", e); + } + } }