HDFS-8828. Utilize Snapshot diff report to build diff copy list in distcp. (Yufei Gu via Yongjun Zhang)
This commit is contained in:
parent
5e8fe89437
commit
0bc15cb6e6
@ -822,6 +822,9 @@ Release 2.8.0 - UNRELEASED
|
||||
HDFS-8884. Fail-fast check in BlockPlacementPolicyDefault#chooseTarget.
|
||||
(yliu)
|
||||
|
||||
HDFS-8828. Utilize Snapshot diff report to build diff copy list in distcp.
|
||||
(Yufei Gu via Yongjun Zhang)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
||||
|
@ -27,6 +27,8 @@
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.tools.util.DistCpUtils;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Constructor;
|
||||
@ -46,7 +48,7 @@
|
||||
public abstract class CopyListing extends Configured {
|
||||
|
||||
private Credentials credentials;
|
||||
|
||||
static final Log LOG = LogFactory.getLog(DistCp.class);
|
||||
/**
|
||||
* Build listing function creates the input listing that distcp uses to
|
||||
* perform the copy.
|
||||
@ -89,6 +91,7 @@ public final void buildListing(Path pathToListFile,
|
||||
config.setLong(DistCpConstants.CONF_LABEL_TOTAL_NUMBER_OF_RECORDS, getNumberOfPaths());
|
||||
|
||||
validateFinalListing(pathToListFile, options);
|
||||
LOG.info("Number of paths in the copy list: " + this.getNumberOfPaths());
|
||||
}
|
||||
|
||||
/**
|
||||
@ -153,6 +156,7 @@ private void validateFinalListing(Path pathToListFile, DistCpOptions options)
|
||||
Text currentKey = new Text();
|
||||
Set<URI> aclSupportCheckFsSet = Sets.newHashSet();
|
||||
Set<URI> xAttrSupportCheckFsSet = Sets.newHashSet();
|
||||
long idx = 0;
|
||||
while (reader.next(currentKey)) {
|
||||
if (currentKey.equals(lastKey)) {
|
||||
CopyListingFileStatus currentFileStatus = new CopyListingFileStatus();
|
||||
@ -178,6 +182,12 @@ private void validateFinalListing(Path pathToListFile, DistCpOptions options)
|
||||
}
|
||||
}
|
||||
lastKey.set(currentKey);
|
||||
|
||||
if (options.shouldUseDiff() && LOG.isDebugEnabled()) {
|
||||
LOG.debug("Copy list entry " + idx + ": " +
|
||||
lastFileStatus.getPath().toUri().getPath());
|
||||
idx++;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
IOUtils.closeStream(reader);
|
||||
@ -224,9 +234,6 @@ public static CopyListing getCopyListing(Configuration configuration,
|
||||
Credentials credentials,
|
||||
DistCpOptions options)
|
||||
throws IOException {
|
||||
if (options.shouldUseDiff()) {
|
||||
return new GlobbedCopyListing(configuration, credentials);
|
||||
}
|
||||
String copyListingClassName = configuration.get(DistCpConstants.
|
||||
CONF_LABEL_COPY_LISTING_CLASS, "");
|
||||
Class<? extends CopyListing> copyListingClass;
|
||||
|
@ -17,12 +17,9 @@
|
||||
*/
|
||||
package org.apache.hadoop.tools;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
|
||||
|
||||
/**
|
||||
@ -54,12 +51,19 @@ public int compare(DiffInfo d1, DiffInfo d2) {
|
||||
*/
|
||||
private Path tmp;
|
||||
/** The target file/dir of the rename op. Null means the op is deletion. */
|
||||
final Path target;
|
||||
Path target;
|
||||
|
||||
DiffInfo(Path source, Path target) {
|
||||
private final SnapshotDiffReport.DiffType type;
|
||||
|
||||
public SnapshotDiffReport.DiffType getType(){
|
||||
return this.type;
|
||||
}
|
||||
|
||||
DiffInfo(Path source, Path target, SnapshotDiffReport.DiffType type) {
|
||||
assert source != null;
|
||||
this.source = source;
|
||||
this.target= target;
|
||||
this.type = type;
|
||||
}
|
||||
|
||||
void setTmp(Path tmp) {
|
||||
@ -69,22 +73,4 @@ void setTmp(Path tmp) {
|
||||
Path getTmp() {
|
||||
return tmp;
|
||||
}
|
||||
|
||||
static DiffInfo[] getDiffs(SnapshotDiffReport report, Path targetDir) {
|
||||
List<DiffInfo> diffs = new ArrayList<>();
|
||||
for (SnapshotDiffReport.DiffReportEntry entry : report.getDiffList()) {
|
||||
if (entry.getType() == SnapshotDiffReport.DiffType.DELETE) {
|
||||
final Path source = new Path(targetDir,
|
||||
DFSUtil.bytes2String(entry.getSourcePath()));
|
||||
diffs.add(new DiffInfo(source, null));
|
||||
} else if (entry.getType() == SnapshotDiffReport.DiffType.RENAME) {
|
||||
final Path source = new Path(targetDir,
|
||||
DFSUtil.bytes2String(entry.getSourcePath()));
|
||||
final Path target = new Path(targetDir,
|
||||
DFSUtil.bytes2String(entry.getTargetPath()));
|
||||
diffs.add(new DiffInfo(source, target));
|
||||
}
|
||||
}
|
||||
return diffs.toArray(new DiffInfo[diffs.size()]);
|
||||
}
|
||||
}
|
||||
|
@ -175,11 +175,18 @@ public Job createAndSubmitJob() throws Exception {
|
||||
job = createJob();
|
||||
}
|
||||
if (inputOptions.shouldUseDiff()) {
|
||||
if (!DistCpSync.sync(inputOptions, getConf())) {
|
||||
DistCpSync distCpSync = new DistCpSync(inputOptions, getConf());
|
||||
if (distCpSync.sync()) {
|
||||
createInputFileListingWithDiff(job, distCpSync);
|
||||
} else {
|
||||
inputOptions.disableUsingDiff();
|
||||
}
|
||||
}
|
||||
createInputFileListing(job);
|
||||
|
||||
// Fallback to default DistCp if without "diff" option or sync failed.
|
||||
if (!inputOptions.shouldUseDiff()) {
|
||||
createInputFileListing(job);
|
||||
}
|
||||
|
||||
job.submit();
|
||||
submitted = true;
|
||||
@ -384,6 +391,22 @@ protected Path createInputFileListing(Job job) throws IOException {
|
||||
return fileListingPath;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create input listing based on snapshot diff report.
|
||||
* @param job - Handle to job
|
||||
* @param distCpSync the class wraps the snapshot diff report
|
||||
* @return Returns the path where the copy listing is created
|
||||
* @throws IOException - If any
|
||||
*/
|
||||
private Path createInputFileListingWithDiff(Job job, DistCpSync distCpSync)
|
||||
throws IOException {
|
||||
Path fileListingPath = getFileListingPath();
|
||||
CopyListing copyListing = new SimpleCopyListing(job.getConfiguration(),
|
||||
job.getCredentials(), distCpSync);
|
||||
copyListing.buildListing(fileListingPath, inputOptions);
|
||||
return fileListingPath;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get default name of the copy listing file. Use the meta folder
|
||||
* to create the copy listing file
|
||||
|
@ -614,9 +614,9 @@ public void validate(DistCpOptionSwitch option, boolean value) {
|
||||
throw new IllegalArgumentException(
|
||||
"Append is disallowed when skipping CRC");
|
||||
}
|
||||
if ((!syncFolder || !deleteMissing) && useDiff) {
|
||||
if ((!syncFolder || deleteMissing) && useDiff) {
|
||||
throw new IllegalArgumentException(
|
||||
"Diff is valid only with update and delete options");
|
||||
"Diff is valid only with update options");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -17,10 +17,10 @@
|
||||
*/
|
||||
package org.apache.hadoop.tools;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
|
||||
@ -29,6 +29,9 @@
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
import java.util.EnumMap;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
|
||||
/**
|
||||
* This class provides the basic functionality to sync two FileSystems based on
|
||||
@ -41,9 +44,26 @@
|
||||
* source.s1
|
||||
*/
|
||||
class DistCpSync {
|
||||
private DistCpOptions inputOptions;
|
||||
private Configuration conf;
|
||||
private EnumMap<SnapshotDiffReport.DiffType, List<DiffInfo>> diffMap;
|
||||
private DiffInfo[] renameDiffs;
|
||||
|
||||
static boolean sync(DistCpOptions inputOptions, Configuration conf)
|
||||
throws IOException {
|
||||
DistCpSync(DistCpOptions options, Configuration conf) {
|
||||
this.inputOptions = options;
|
||||
this.conf = conf;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if three conditions are met before sync.
|
||||
* 1. Only one source directory.
|
||||
* 2. Both source and target file system are DFS.
|
||||
* 3. There is no change between from and the current status in target
|
||||
* file system.
|
||||
* Throw exceptions if first two aren't met, and return false to fallback to
|
||||
* default distcp if the third condition isn't met.
|
||||
*/
|
||||
private boolean preSyncCheck() throws IOException {
|
||||
List<Path> sourcePaths = inputOptions.getSourcePaths();
|
||||
if (sourcePaths.size() != 1) {
|
||||
// we only support one source dir which must be a snapshottable directory
|
||||
@ -62,26 +82,41 @@ static boolean sync(DistCpOptions inputOptions, Configuration conf)
|
||||
throw new IllegalArgumentException("The FileSystems needs to" +
|
||||
" be DistributedFileSystem for using snapshot-diff-based distcp");
|
||||
}
|
||||
final DistributedFileSystem sourceFs = (DistributedFileSystem) sfs;
|
||||
final DistributedFileSystem targetFs= (DistributedFileSystem) tfs;
|
||||
final DistributedFileSystem targetFs = (DistributedFileSystem) tfs;
|
||||
|
||||
// make sure targetFS has no change between from and the current states
|
||||
if (!checkNoChange(inputOptions, targetFs, targetDir)) {
|
||||
if (!checkNoChange(targetFs, targetDir)) {
|
||||
// set the source path using the snapshot path
|
||||
inputOptions.setSourcePaths(Arrays.asList(getSourceSnapshotPath(sourceDir,
|
||||
inputOptions.getToSnapshot())));
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
public boolean sync() throws IOException {
|
||||
if (!preSyncCheck()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!getAllDiffs()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
List<Path> sourcePaths = inputOptions.getSourcePaths();
|
||||
final Path sourceDir = sourcePaths.get(0);
|
||||
final Path targetDir = inputOptions.getTargetPath();
|
||||
final FileSystem tfs = targetDir.getFileSystem(conf);
|
||||
final DistributedFileSystem targetFs = (DistributedFileSystem) tfs;
|
||||
|
||||
Path tmpDir = null;
|
||||
try {
|
||||
tmpDir = createTargetTmpDir(targetFs, targetDir);
|
||||
DiffInfo[] diffs = getDiffs(inputOptions, sourceFs, sourceDir, targetDir);
|
||||
if (diffs == null) {
|
||||
return false;
|
||||
DiffInfo[] renameAndDeleteDiffs = getRenameAndDeleteDiffs(targetDir);
|
||||
if (renameAndDeleteDiffs.length > 0) {
|
||||
// do the real sync work: deletion and rename
|
||||
syncDiff(renameAndDeleteDiffs, targetFs, tmpDir);
|
||||
}
|
||||
// do the real sync work: deletion and rename
|
||||
syncDiff(diffs, targetFs, tmpDir);
|
||||
return true;
|
||||
} catch (Exception e) {
|
||||
DistCp.LOG.warn("Failed to use snapshot diff for distcp", e);
|
||||
@ -95,11 +130,64 @@ static boolean sync(DistCpOptions inputOptions, Configuration conf)
|
||||
}
|
||||
}
|
||||
|
||||
private static String getSnapshotName(String name) {
|
||||
/**
|
||||
* Get all diffs from source directory snapshot diff report, put them into an
|
||||
* EnumMap whose key is DiffType, and value is a DiffInfo list. If there is
|
||||
* no entry for a given DiffType, the associated value will be an empty list.
|
||||
*/
|
||||
private boolean getAllDiffs() throws IOException {
|
||||
List<Path> sourcePaths = inputOptions.getSourcePaths();
|
||||
final Path sourceDir = sourcePaths.get(0);
|
||||
try {
|
||||
DistributedFileSystem fs =
|
||||
(DistributedFileSystem) sourceDir.getFileSystem(conf);
|
||||
final String from = getSnapshotName(inputOptions.getFromSnapshot());
|
||||
final String to = getSnapshotName(inputOptions.getToSnapshot());
|
||||
SnapshotDiffReport report = fs.getSnapshotDiffReport(sourceDir,
|
||||
from, to);
|
||||
|
||||
this.diffMap = new EnumMap<>(SnapshotDiffReport.DiffType.class);
|
||||
for (SnapshotDiffReport.DiffType type :
|
||||
SnapshotDiffReport.DiffType.values()) {
|
||||
diffMap.put(type, new ArrayList<DiffInfo>());
|
||||
}
|
||||
|
||||
for (SnapshotDiffReport.DiffReportEntry entry : report.getDiffList()) {
|
||||
// If the entry is the snapshot root, usually a item like "M\t."
|
||||
// in the diff report. We don't need to handle it and cannot handle it,
|
||||
// since its sourcepath is empty.
|
||||
if (entry.getSourcePath().length <= 0) {
|
||||
continue;
|
||||
}
|
||||
List<DiffInfo> list = diffMap.get(entry.getType());
|
||||
|
||||
if (entry.getType() == SnapshotDiffReport.DiffType.MODIFY ||
|
||||
entry.getType() == SnapshotDiffReport.DiffType.CREATE ||
|
||||
entry.getType() == SnapshotDiffReport.DiffType.DELETE) {
|
||||
final Path source =
|
||||
new Path(DFSUtil.bytes2String(entry.getSourcePath()));
|
||||
list.add(new DiffInfo(source, null, entry.getType()));
|
||||
} else if (entry.getType() == SnapshotDiffReport.DiffType.RENAME) {
|
||||
final Path source =
|
||||
new Path(DFSUtil.bytes2String(entry.getSourcePath()));
|
||||
final Path target =
|
||||
new Path(DFSUtil.bytes2String(entry.getTargetPath()));
|
||||
list.add(new DiffInfo(source, target, entry.getType()));
|
||||
}
|
||||
}
|
||||
return true;
|
||||
} catch (IOException e) {
|
||||
DistCp.LOG.warn("Failed to compute snapshot diff on " + sourceDir, e);
|
||||
}
|
||||
this.diffMap = null;
|
||||
return false;
|
||||
}
|
||||
|
||||
private String getSnapshotName(String name) {
|
||||
return Path.CUR_DIR.equals(name) ? "" : name;
|
||||
}
|
||||
|
||||
private static Path getSourceSnapshotPath(Path sourceDir, String snapshotName) {
|
||||
private Path getSourceSnapshotPath(Path sourceDir, String snapshotName) {
|
||||
if (Path.CUR_DIR.equals(snapshotName)) {
|
||||
return sourceDir;
|
||||
} else {
|
||||
@ -108,8 +196,8 @@ private static Path getSourceSnapshotPath(Path sourceDir, String snapshotName) {
|
||||
}
|
||||
}
|
||||
|
||||
private static Path createTargetTmpDir(DistributedFileSystem targetFs,
|
||||
Path targetDir) throws IOException {
|
||||
private Path createTargetTmpDir(DistributedFileSystem targetFs,
|
||||
Path targetDir) throws IOException {
|
||||
final Path tmp = new Path(targetDir,
|
||||
DistCpConstants.HDFS_DISTCP_DIFF_DIRECTORY_NAME + DistCp.rand.nextInt());
|
||||
if (!targetFs.mkdirs(tmp)) {
|
||||
@ -118,8 +206,8 @@ private static Path createTargetTmpDir(DistributedFileSystem targetFs,
|
||||
return tmp;
|
||||
}
|
||||
|
||||
private static void deleteTargetTmpDir(DistributedFileSystem targetFs,
|
||||
Path tmpDir) {
|
||||
private void deleteTargetTmpDir(DistributedFileSystem targetFs,
|
||||
Path tmpDir) {
|
||||
try {
|
||||
if (tmpDir != null) {
|
||||
targetFs.delete(tmpDir, true);
|
||||
@ -133,8 +221,7 @@ private static void deleteTargetTmpDir(DistributedFileSystem targetFs,
|
||||
* Compute the snapshot diff on the given file system. Return true if the diff
|
||||
* is empty, i.e., no changes have happened in the FS.
|
||||
*/
|
||||
private static boolean checkNoChange(DistCpOptions inputOptions,
|
||||
DistributedFileSystem fs, Path path) {
|
||||
private boolean checkNoChange(DistributedFileSystem fs, Path path) {
|
||||
try {
|
||||
SnapshotDiffReport targetDiff =
|
||||
fs.getSnapshotDiffReport(path, inputOptions.getFromSnapshot(), "");
|
||||
@ -151,22 +238,7 @@ private static boolean checkNoChange(DistCpOptions inputOptions,
|
||||
return false;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static DiffInfo[] getDiffs(DistCpOptions inputOptions,
|
||||
DistributedFileSystem fs, Path sourceDir, Path targetDir) {
|
||||
try {
|
||||
final String from = getSnapshotName(inputOptions.getFromSnapshot());
|
||||
final String to = getSnapshotName(inputOptions.getToSnapshot());
|
||||
SnapshotDiffReport sourceDiff = fs.getSnapshotDiffReport(sourceDir,
|
||||
from, to);
|
||||
return DiffInfo.getDiffs(sourceDiff, targetDir);
|
||||
} catch (IOException e) {
|
||||
DistCp.LOG.warn("Failed to compute snapshot diff on " + sourceDir, e);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private static void syncDiff(DiffInfo[] diffs,
|
||||
private void syncDiff(DiffInfo[] diffs,
|
||||
DistributedFileSystem targetFs, Path tmpDir) throws IOException {
|
||||
moveToTmpDir(diffs, targetFs, tmpDir);
|
||||
moveToTarget(diffs, targetFs);
|
||||
@ -176,7 +248,7 @@ private static void syncDiff(DiffInfo[] diffs,
|
||||
* Move all the source files that should be renamed or deleted to the tmp
|
||||
* directory.
|
||||
*/
|
||||
private static void moveToTmpDir(DiffInfo[] diffs,
|
||||
private void moveToTmpDir(DiffInfo[] diffs,
|
||||
DistributedFileSystem targetFs, Path tmpDir) throws IOException {
|
||||
// sort the diffs based on their source paths to make sure the files and
|
||||
// subdirs are moved before moving their parents/ancestors.
|
||||
@ -196,7 +268,7 @@ private static void moveToTmpDir(DiffInfo[] diffs,
|
||||
* Finish the rename operations: move all the intermediate files/directories
|
||||
* from the tmp dir to the final targets.
|
||||
*/
|
||||
private static void moveToTarget(DiffInfo[] diffs,
|
||||
private void moveToTarget(DiffInfo[] diffs,
|
||||
DistributedFileSystem targetFs) throws IOException {
|
||||
// sort the diffs based on their target paths to make sure the parent
|
||||
// directories are created first.
|
||||
@ -210,4 +282,166 @@ private static void moveToTarget(DiffInfo[] diffs,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get rename and delete diffs and add the targetDir as the prefix of their
|
||||
* source and target paths.
|
||||
*/
|
||||
private DiffInfo[] getRenameAndDeleteDiffs(Path targetDir) {
|
||||
List<DiffInfo> renameAndDeleteDiff = new ArrayList<>();
|
||||
for (DiffInfo diff : diffMap.get(SnapshotDiffReport.DiffType.DELETE)) {
|
||||
Path source = new Path(targetDir, diff.source);
|
||||
renameAndDeleteDiff.add(new DiffInfo(source, diff.target,
|
||||
diff.getType()));
|
||||
}
|
||||
|
||||
for (DiffInfo diff : diffMap.get(SnapshotDiffReport.DiffType.RENAME)) {
|
||||
Path source = new Path(targetDir, diff.source);
|
||||
Path target = new Path(targetDir, diff.target);
|
||||
renameAndDeleteDiff.add(new DiffInfo(source, target, diff.getType()));
|
||||
}
|
||||
|
||||
return renameAndDeleteDiff.toArray(
|
||||
new DiffInfo[renameAndDeleteDiff.size()]);
|
||||
}
|
||||
|
||||
private DiffInfo[] getCreateAndModifyDiffs() {
|
||||
List<DiffInfo> createDiff =
|
||||
diffMap.get(SnapshotDiffReport.DiffType.CREATE);
|
||||
List<DiffInfo> modifyDiff =
|
||||
diffMap.get(SnapshotDiffReport.DiffType.MODIFY);
|
||||
List<DiffInfo> diffs =
|
||||
new ArrayList<>(createDiff.size() + modifyDiff.size());
|
||||
diffs.addAll(createDiff);
|
||||
diffs.addAll(modifyDiff);
|
||||
return diffs.toArray(new DiffInfo[diffs.size()]);
|
||||
}
|
||||
|
||||
/**
|
||||
* Probe for a path being a parent of another.
|
||||
* @return true if the parent's path matches the start of the child's
|
||||
*/
|
||||
private boolean isParentOf(Path parent, Path child) {
|
||||
String parentPath = parent.toString();
|
||||
String childPath = child.toString();
|
||||
if (!parentPath.endsWith(Path.SEPARATOR)) {
|
||||
parentPath += Path.SEPARATOR;
|
||||
}
|
||||
|
||||
return childPath.length() > parentPath.length() &&
|
||||
childPath.startsWith(parentPath);
|
||||
}
|
||||
|
||||
/**
|
||||
* Find the possible rename item which equals to the parent or self of
|
||||
* a created/modified file/directory.
|
||||
* @param diff a modify/create diff item
|
||||
* @param renameDiffArray all rename diffs
|
||||
* @return possible rename item
|
||||
*/
|
||||
private DiffInfo getRenameItem(DiffInfo diff, DiffInfo[] renameDiffArray) {
|
||||
for (DiffInfo renameItem : renameDiffArray) {
|
||||
if (diff.source.equals(renameItem.source)) {
|
||||
// The same path string may appear in:
|
||||
// 1. both renamed and modified snapshot diff entries.
|
||||
// 2. both renamed and created snapshot diff entries.
|
||||
// Case 1 is the about same file/directory, whereas case 2
|
||||
// is about two different files/directories.
|
||||
// We are finding case 1 here, thus we check against DiffType.MODIFY.
|
||||
if (diff.getType() == SnapshotDiffReport.DiffType.MODIFY) {
|
||||
return renameItem;
|
||||
}
|
||||
} else if (isParentOf(renameItem.source, diff.source)) {
|
||||
// If rename entry is the parent of diff entry, then both MODIFY and
|
||||
// CREATE diff entries should be handled.
|
||||
return renameItem;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* For a given source path, get its target path based on the rename item.
|
||||
* @return target path
|
||||
*/
|
||||
private Path getTargetPath(Path sourcePath, DiffInfo renameItem) {
|
||||
if (sourcePath.equals(renameItem.source)) {
|
||||
return renameItem.target;
|
||||
}
|
||||
StringBuffer sb = new StringBuffer(sourcePath.toString());
|
||||
String remain = sb.substring(renameItem.source.toString().length() + 1);
|
||||
return new Path(renameItem.target, remain);
|
||||
}
|
||||
|
||||
/**
|
||||
* Prepare the diff list.
|
||||
* This diff list only includes created or modified files/directories, since
|
||||
* delete and rename items are synchronized already.
|
||||
*
|
||||
* If the parent or self of a source path is renamed, we need to change its
|
||||
* target path according the correspondent rename item.
|
||||
* @return a diff list
|
||||
*/
|
||||
public ArrayList<DiffInfo> prepareDiffList() {
|
||||
DiffInfo[] modifyAndCreateDiffs = getCreateAndModifyDiffs();
|
||||
|
||||
List<DiffInfo> renameDiffsList =
|
||||
diffMap.get(SnapshotDiffReport.DiffType.RENAME);
|
||||
DiffInfo[] renameDiffArray =
|
||||
renameDiffsList.toArray(new DiffInfo[renameDiffsList.size()]);
|
||||
Arrays.sort(renameDiffArray, DiffInfo.sourceComparator);
|
||||
|
||||
ArrayList<DiffInfo> finalListWithTarget = new ArrayList<>();
|
||||
for (DiffInfo diff : modifyAndCreateDiffs) {
|
||||
DiffInfo renameItem = getRenameItem(diff, renameDiffArray);
|
||||
if (renameItem == null) {
|
||||
diff.target = diff.source;
|
||||
} else {
|
||||
diff.target = getTargetPath(diff.source, renameItem);
|
||||
}
|
||||
finalListWithTarget.add(diff);
|
||||
}
|
||||
return finalListWithTarget;
|
||||
}
|
||||
|
||||
/**
|
||||
* This method returns a list of items to be excluded when recursively
|
||||
* traversing newDir to build the copy list.
|
||||
*
|
||||
* Specifically, given a newly created directory newDir (a CREATE entry in
|
||||
* the snapshot diff), if a previously copied file/directory itemX is moved
|
||||
* (a RENAME entry in the snapshot diff) into newDir, itemX should be
|
||||
* excluded when recursively traversing newDir in caller method so that it
|
||||
* will not to be copied again.
|
||||
* If the same itemX also has a MODIFY entry in the snapshot diff report,
|
||||
* meaning it was modified after it was previously copied, it will still
|
||||
* be added to the copy list in caller method.
|
||||
* @return the exclude list
|
||||
*/
|
||||
public HashSet<String> getTraverseExcludeList(Path newDir, Path prefix) {
|
||||
if (renameDiffs == null) {
|
||||
List<DiffInfo> renameList =
|
||||
diffMap.get(SnapshotDiffReport.DiffType.RENAME);
|
||||
renameDiffs = renameList.toArray(new DiffInfo[renameList.size()]);
|
||||
Arrays.sort(renameDiffs, DiffInfo.targetComparator);
|
||||
}
|
||||
|
||||
if (renameDiffs.length <= 0) {
|
||||
return null;
|
||||
}
|
||||
|
||||
boolean foundChild = false;
|
||||
HashSet<String> excludeList = new HashSet<>();
|
||||
for (DiffInfo diff : renameDiffs) {
|
||||
if (isParentOf(newDir, diff.target)) {
|
||||
foundChild = true;
|
||||
excludeList.add(new Path(prefix, diff.target).toUri().getPath());
|
||||
} else if (foundChild) {
|
||||
// The renameDiffs was sorted, the matching section should be
|
||||
// contiguous.
|
||||
break;
|
||||
}
|
||||
}
|
||||
return excludeList;
|
||||
}
|
||||
}
|
||||
|
@ -24,6 +24,7 @@
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
|
||||
import org.apache.hadoop.io.SequenceFile;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.io.Text;
|
||||
@ -40,6 +41,7 @@
|
||||
|
||||
import java.io.*;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
|
||||
import static org.apache.hadoop.tools.DistCpConstants
|
||||
.HDFS_RESERVED_RAW_DIRECTORY_NAME;
|
||||
@ -59,6 +61,7 @@ public class SimpleCopyListing extends CopyListing {
|
||||
private int numListstatusThreads = 1;
|
||||
private final int maxRetries = 3;
|
||||
private CopyFilter copyFilter;
|
||||
private DistCpSync distCpSync;
|
||||
|
||||
/**
|
||||
* Protected constructor, to initialize configuration.
|
||||
@ -77,12 +80,20 @@ protected SimpleCopyListing(Configuration configuration, Credentials credentials
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
protected SimpleCopyListing(Configuration configuration, Credentials credentials,
|
||||
protected SimpleCopyListing(Configuration configuration,
|
||||
Credentials credentials,
|
||||
int numListstatusThreads) {
|
||||
super(configuration, credentials);
|
||||
this.numListstatusThreads = numListstatusThreads;
|
||||
}
|
||||
|
||||
protected SimpleCopyListing(Configuration configuration,
|
||||
Credentials credentials,
|
||||
DistCpSync distCpSync) {
|
||||
this(configuration, credentials);
|
||||
this.distCpSync = distCpSync;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void validatePaths(DistCpOptions options)
|
||||
throws IOException, InvalidInputException {
|
||||
@ -157,8 +168,106 @@ protected void validatePaths(DistCpOptions options)
|
||||
/** {@inheritDoc} */
|
||||
@Override
|
||||
public void doBuildListing(Path pathToListingFile, DistCpOptions options) throws IOException {
|
||||
doBuildListing(getWriter(pathToListingFile), options);
|
||||
if(options.shouldUseDiff()) {
|
||||
doBuildListingWithSnapshotDiff(getWriter(pathToListingFile), options);
|
||||
}else {
|
||||
doBuildListing(getWriter(pathToListingFile), options);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a path with its scheme and authority.
|
||||
*/
|
||||
private Path getPathWithSchemeAndAuthority(Path path) throws IOException {
|
||||
FileSystem fs= path.getFileSystem(getConf());
|
||||
String scheme = path.toUri().getScheme();
|
||||
if (scheme == null) {
|
||||
scheme = fs.getUri().getScheme();
|
||||
}
|
||||
|
||||
String authority = path.toUri().getAuthority();
|
||||
if (authority == null) {
|
||||
authority = fs.getUri().getAuthority();
|
||||
}
|
||||
|
||||
return new Path(scheme, authority, path.toUri().getPath());
|
||||
}
|
||||
|
||||
/**
|
||||
* Write a single file/directory to the sequence file.
|
||||
* @throws IOException
|
||||
*/
|
||||
private void addToFileListing(SequenceFile.Writer fileListWriter,
|
||||
Path sourceRoot, Path path, DistCpOptions options) throws IOException {
|
||||
sourceRoot = getPathWithSchemeAndAuthority(sourceRoot);
|
||||
path = getPathWithSchemeAndAuthority(path);
|
||||
path = makeQualified(path);
|
||||
|
||||
FileSystem sourceFS = sourceRoot.getFileSystem(getConf());
|
||||
FileStatus fileStatus = sourceFS.getFileStatus(path);
|
||||
final boolean preserveAcls = options.shouldPreserve(FileAttribute.ACL);
|
||||
final boolean preserveXAttrs = options.shouldPreserve(FileAttribute.XATTR);
|
||||
final boolean preserveRawXAttrs = options.shouldPreserveRawXattrs();
|
||||
CopyListingFileStatus fileCopyListingStatus =
|
||||
DistCpUtils.toCopyListingFileStatus(sourceFS, fileStatus,
|
||||
preserveAcls, preserveXAttrs, preserveRawXAttrs);
|
||||
|
||||
writeToFileListingRoot(fileListWriter, fileCopyListingStatus,
|
||||
sourceRoot, options);
|
||||
}
|
||||
|
||||
/**
|
||||
* Build a copy list based on the snapshot diff report.
|
||||
*
|
||||
* Any file/directory changed or created will be in the list. Deleted
|
||||
* files/directories will not be in the list, since they are handled by
|
||||
* {@link org.apache.hadoop.tools.DistCpSync#sync}. An item can be
|
||||
* created/modified and renamed, in which case, the target path is put
|
||||
* into the list.
|
||||
* @throws IOException
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public void doBuildListingWithSnapshotDiff(SequenceFile.Writer fileListWriter,
|
||||
DistCpOptions options) throws IOException {
|
||||
ArrayList<DiffInfo> diffList = distCpSync.prepareDiffList();
|
||||
Path sourceRoot = options.getSourcePaths().get(0);
|
||||
FileSystem sourceFS = sourceRoot.getFileSystem(getConf());
|
||||
|
||||
try {
|
||||
for (DiffInfo diff : diffList) {
|
||||
// add snapshot paths prefix
|
||||
diff.target = new Path(options.getSourcePaths().get(0), diff.target);
|
||||
if (diff.getType() == SnapshotDiffReport.DiffType.MODIFY) {
|
||||
addToFileListing(fileListWriter, sourceRoot, diff.target, options);
|
||||
} else if (diff.getType() == SnapshotDiffReport.DiffType.CREATE) {
|
||||
addToFileListing(fileListWriter, sourceRoot, diff.target, options);
|
||||
|
||||
FileStatus sourceStatus = sourceFS.getFileStatus(diff.target);
|
||||
if (sourceStatus.isDirectory()) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Adding source dir for traverse: " +
|
||||
sourceStatus.getPath());
|
||||
}
|
||||
|
||||
HashSet<String> excludeList =
|
||||
distCpSync.getTraverseExcludeList(diff.source,
|
||||
options.getSourcePaths().get(0));
|
||||
|
||||
ArrayList<FileStatus> sourceDirs = new ArrayList<>();
|
||||
sourceDirs.add(sourceStatus);
|
||||
|
||||
traverseDirectory(fileListWriter, sourceFS, sourceDirs,
|
||||
sourceRoot, options, excludeList);
|
||||
}
|
||||
}
|
||||
}
|
||||
fileListWriter.close();
|
||||
fileListWriter = null;
|
||||
} finally {
|
||||
IOUtils.cleanup(LOG, fileListWriter);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Collect the list of
|
||||
* {@literal <sourceRelativePath, sourceFileStatus>}
|
||||
@ -226,7 +335,7 @@ public void doBuildListing(SequenceFile.Writer fileListWriter,
|
||||
}
|
||||
}
|
||||
traverseDirectory(fileListWriter, sourceFS, sourceDirs,
|
||||
sourcePathRoot, options);
|
||||
sourcePathRoot, options, null);
|
||||
}
|
||||
}
|
||||
fileListWriter.close();
|
||||
@ -312,9 +421,33 @@ private SequenceFile.Writer getWriter(Path pathToListFile) throws IOException {
|
||||
private static class FileStatusProcessor
|
||||
implements WorkRequestProcessor<FileStatus, FileStatus[]> {
|
||||
private FileSystem fileSystem;
|
||||
private HashSet<String> excludeList;
|
||||
|
||||
public FileStatusProcessor(FileSystem fileSystem) {
|
||||
public FileStatusProcessor(FileSystem fileSystem,
|
||||
HashSet<String> excludeList) {
|
||||
this.fileSystem = fileSystem;
|
||||
this.excludeList = excludeList;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get FileStatuses for a given path.
|
||||
* Exclude the some renamed FileStatuses since they are already handled by
|
||||
* {@link org.apache.hadoop.tools.DistCpSync#sync}.
|
||||
* @return an array of file status
|
||||
*/
|
||||
private FileStatus[] getFileStatus(Path path) throws IOException {
|
||||
FileStatus[] fileStatuses = fileSystem.listStatus(path);
|
||||
if (excludeList != null && excludeList.size() > 0) {
|
||||
ArrayList<FileStatus> fileStatusList = new ArrayList<>();
|
||||
for(FileStatus status : fileStatuses) {
|
||||
if (!excludeList.contains(status.getPath().toUri().getPath())) {
|
||||
fileStatusList.add(status);
|
||||
}
|
||||
}
|
||||
fileStatuses = fileStatusList.toArray(
|
||||
new FileStatus[fileStatusList.size()]);
|
||||
}
|
||||
return fileStatuses;
|
||||
}
|
||||
|
||||
/*
|
||||
@ -344,8 +477,8 @@ public WorkReport<FileStatus[]> processItem(
|
||||
LOG.debug("Interrupted while sleeping in exponential backoff.");
|
||||
}
|
||||
}
|
||||
result = new WorkReport<FileStatus[]>(
|
||||
fileSystem.listStatus(parent.getPath()), retry, true);
|
||||
result = new WorkReport<FileStatus[]>(getFileStatus(parent.getPath()),
|
||||
retry, true);
|
||||
} catch (FileNotFoundException fnf) {
|
||||
LOG.error("FileNotFoundException exception in listStatus: " +
|
||||
fnf.getMessage());
|
||||
@ -376,7 +509,8 @@ private void traverseDirectory(SequenceFile.Writer fileListWriter,
|
||||
FileSystem sourceFS,
|
||||
ArrayList<FileStatus> sourceDirs,
|
||||
Path sourcePathRoot,
|
||||
DistCpOptions options)
|
||||
DistCpOptions options,
|
||||
HashSet<String> excludeList)
|
||||
throws IOException {
|
||||
final boolean preserveAcls = options.shouldPreserve(FileAttribute.ACL);
|
||||
final boolean preserveXAttrs = options.shouldPreserve(FileAttribute.XATTR);
|
||||
@ -389,7 +523,8 @@ private void traverseDirectory(SequenceFile.Writer fileListWriter,
|
||||
new ProducerConsumer<FileStatus, FileStatus[]>(numListstatusThreads);
|
||||
for (int i = 0; i < numListstatusThreads; i++) {
|
||||
workers.addWorker(
|
||||
new FileStatusProcessor(sourcePathRoot.getFileSystem(getConf())));
|
||||
new FileStatusProcessor(sourcePathRoot.getFileSystem(getConf()),
|
||||
excludeList));
|
||||
}
|
||||
|
||||
for (FileStatus status : sourceDirs) {
|
||||
|
@ -62,7 +62,6 @@ public void setUp() throws Exception {
|
||||
|
||||
options = new DistCpOptions(Arrays.asList(source), target);
|
||||
options.setSyncFolder(true);
|
||||
options.setDeleteMissing(true);
|
||||
options.setUseDiff(true, "s1", "s2");
|
||||
options.appendToConf(conf);
|
||||
|
||||
@ -87,7 +86,7 @@ public void tearDown() throws Exception {
|
||||
@Test
|
||||
public void testFallback() throws Exception {
|
||||
// the source/target dir are not snapshottable dir
|
||||
Assert.assertFalse(DistCpSync.sync(options, conf));
|
||||
Assert.assertFalse(sync());
|
||||
// make sure the source path has been updated to the snapshot path
|
||||
final Path spath = new Path(source,
|
||||
HdfsConstants.DOT_SNAPSHOT_DIR + Path.SEPARATOR + "s2");
|
||||
@ -98,7 +97,7 @@ public void testFallback() throws Exception {
|
||||
// the source/target does not have the given snapshots
|
||||
dfs.allowSnapshot(source);
|
||||
dfs.allowSnapshot(target);
|
||||
Assert.assertFalse(DistCpSync.sync(options, conf));
|
||||
Assert.assertFalse(sync());
|
||||
Assert.assertEquals(spath, options.getSourcePaths().get(0));
|
||||
|
||||
// reset source path in options
|
||||
@ -106,21 +105,38 @@ public void testFallback() throws Exception {
|
||||
dfs.createSnapshot(source, "s1");
|
||||
dfs.createSnapshot(source, "s2");
|
||||
dfs.createSnapshot(target, "s1");
|
||||
Assert.assertTrue(DistCpSync.sync(options, conf));
|
||||
Assert.assertTrue(sync());
|
||||
|
||||
// reset source paths in options
|
||||
options.setSourcePaths(Arrays.asList(source));
|
||||
// changes have been made in target
|
||||
final Path subTarget = new Path(target, "sub");
|
||||
dfs.mkdirs(subTarget);
|
||||
Assert.assertFalse(DistCpSync.sync(options, conf));
|
||||
Assert.assertFalse(sync());
|
||||
// make sure the source path has been updated to the snapshot path
|
||||
Assert.assertEquals(spath, options.getSourcePaths().get(0));
|
||||
|
||||
// reset source paths in options
|
||||
options.setSourcePaths(Arrays.asList(source));
|
||||
dfs.delete(subTarget, true);
|
||||
Assert.assertTrue(DistCpSync.sync(options, conf));
|
||||
Assert.assertTrue(sync());
|
||||
}
|
||||
|
||||
private void enableAndCreateFirstSnapshot() throws Exception {
|
||||
dfs.allowSnapshot(source);
|
||||
dfs.allowSnapshot(target);
|
||||
dfs.createSnapshot(source, "s1");
|
||||
dfs.createSnapshot(target, "s1");
|
||||
}
|
||||
|
||||
private void syncAndVerify() throws Exception {
|
||||
Assert.assertTrue(sync());
|
||||
verifyCopy(dfs.getFileStatus(source), dfs.getFileStatus(target), false);
|
||||
}
|
||||
|
||||
private boolean sync() throws Exception {
|
||||
DistCpSync distCpSync = new DistCpSync(options, conf);
|
||||
return distCpSync.sync();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -164,23 +180,30 @@ private void initData(Path dir) throws Exception {
|
||||
* foo/ f4
|
||||
* f1(new)
|
||||
*/
|
||||
private void changeData(Path dir) throws Exception {
|
||||
private int changeData(Path dir) throws Exception {
|
||||
final Path foo = new Path(dir, "foo");
|
||||
final Path bar = new Path(dir, "bar");
|
||||
final Path d1 = new Path(foo, "d1");
|
||||
final Path f2 = new Path(bar, "f2");
|
||||
|
||||
final Path bar_d1 = new Path(bar, "d1");
|
||||
int numCreatedModified = 0;
|
||||
dfs.rename(d1, bar_d1);
|
||||
numCreatedModified += 1; // modify ./foo
|
||||
numCreatedModified += 1; // modify ./bar
|
||||
final Path f3 = new Path(bar_d1, "f3");
|
||||
dfs.delete(f3, true);
|
||||
final Path newfoo = new Path(bar_d1, "foo");
|
||||
dfs.rename(foo, newfoo);
|
||||
numCreatedModified += 1; // modify ./foo/d1
|
||||
final Path f1 = new Path(newfoo, "f1");
|
||||
dfs.delete(f1, true);
|
||||
DFSTestUtil.createFile(dfs, f1, 2 * BLOCK_SIZE, DATA_NUM, 0);
|
||||
numCreatedModified += 1; // create ./foo/f1
|
||||
DFSTestUtil.appendFile(dfs, f2, (int) BLOCK_SIZE);
|
||||
numCreatedModified += 1; // modify ./bar/f2
|
||||
dfs.rename(bar, new Path(dir, "foo"));
|
||||
return numCreatedModified;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -190,13 +213,10 @@ private void changeData(Path dir) throws Exception {
|
||||
public void testSync() throws Exception {
|
||||
initData(source);
|
||||
initData(target);
|
||||
dfs.allowSnapshot(source);
|
||||
dfs.allowSnapshot(target);
|
||||
dfs.createSnapshot(source, "s1");
|
||||
dfs.createSnapshot(target, "s1");
|
||||
enableAndCreateFirstSnapshot();
|
||||
|
||||
// make changes under source
|
||||
changeData(source);
|
||||
int numCreatedModified = changeData(source);
|
||||
dfs.createSnapshot(source, "s2");
|
||||
|
||||
// before sync, make some further changes on source. this should not affect
|
||||
@ -206,17 +226,22 @@ public void testSync() throws Exception {
|
||||
final Path newdir = new Path(source, "foo/d1/foo/newdir");
|
||||
dfs.mkdirs(newdir);
|
||||
|
||||
SnapshotDiffReport report = dfs.getSnapshotDiffReport(source, "s1", "s2");
|
||||
System.out.println(report);
|
||||
|
||||
DistCpSync distCpSync = new DistCpSync(options, conf);
|
||||
|
||||
// do the sync
|
||||
Assert.assertTrue(DistCpSync.sync(options, conf));
|
||||
Assert.assertTrue(distCpSync.sync());
|
||||
|
||||
// make sure the source path has been updated to the snapshot path
|
||||
final Path spath = new Path(source,
|
||||
HdfsConstants.DOT_SNAPSHOT_DIR + Path.SEPARATOR + "s2");
|
||||
HdfsConstants.DOT_SNAPSHOT_DIR + Path.SEPARATOR + "s2");
|
||||
Assert.assertEquals(spath, options.getSourcePaths().get(0));
|
||||
|
||||
// build copy listing
|
||||
final Path listingPath = new Path("/tmp/META/fileList.seq");
|
||||
CopyListing listing = new GlobbedCopyListing(conf, new Credentials());
|
||||
CopyListing listing = new SimpleCopyListing(conf, new Credentials(), distCpSync);
|
||||
listing.buildListing(listingPath, options);
|
||||
|
||||
Map<Text, CopyListingFileStatus> copyListing = getListing(listingPath);
|
||||
@ -232,6 +257,9 @@ public void testSync() throws Exception {
|
||||
copyMapper.map(entry.getKey(), entry.getValue(), context);
|
||||
}
|
||||
|
||||
// verify that we only list modified and created files/directories
|
||||
Assert.assertEquals(numCreatedModified, copyListing.size());
|
||||
|
||||
// verify that we only copied new appended data of f2 and the new file f1
|
||||
Assert.assertEquals(BLOCK_SIZE * 3, stubContext.getReporter()
|
||||
.getCounter(CopyMapper.Counter.BYTESCOPIED).getValue());
|
||||
@ -285,16 +313,13 @@ public void testSyncWithCurrent() throws Exception {
|
||||
options.setUseDiff(true, "s1", ".");
|
||||
initData(source);
|
||||
initData(target);
|
||||
dfs.allowSnapshot(source);
|
||||
dfs.allowSnapshot(target);
|
||||
dfs.createSnapshot(source, "s1");
|
||||
dfs.createSnapshot(target, "s1");
|
||||
enableAndCreateFirstSnapshot();
|
||||
|
||||
// make changes under source
|
||||
changeData(source);
|
||||
|
||||
// do the sync
|
||||
Assert.assertTrue(DistCpSync.sync(options, conf));
|
||||
sync();
|
||||
// make sure the source path is still unchanged
|
||||
Assert.assertEquals(source, options.getSourcePaths().get(0));
|
||||
}
|
||||
@ -328,10 +353,7 @@ private void changeData2(Path dir) throws Exception {
|
||||
public void testSync2() throws Exception {
|
||||
initData2(source);
|
||||
initData2(target);
|
||||
dfs.allowSnapshot(source);
|
||||
dfs.allowSnapshot(target);
|
||||
dfs.createSnapshot(source, "s1");
|
||||
dfs.createSnapshot(target, "s1");
|
||||
enableAndCreateFirstSnapshot();
|
||||
|
||||
// make changes under source
|
||||
changeData2(source);
|
||||
@ -340,9 +362,7 @@ public void testSync2() throws Exception {
|
||||
SnapshotDiffReport report = dfs.getSnapshotDiffReport(source, "s1", "s2");
|
||||
System.out.println(report);
|
||||
|
||||
// do the sync
|
||||
Assert.assertTrue(DistCpSync.sync(options, conf));
|
||||
verifyCopy(dfs.getFileStatus(source), dfs.getFileStatus(target), false);
|
||||
syncAndVerify();
|
||||
}
|
||||
|
||||
private void initData3(Path dir) throws Exception {
|
||||
@ -375,16 +395,13 @@ private void changeData3(Path dir) throws Exception {
|
||||
}
|
||||
|
||||
/**
|
||||
* Test a case where there are multiple source files with the same name
|
||||
* Test a case where there are multiple source files with the same name.
|
||||
*/
|
||||
@Test
|
||||
public void testSync3() throws Exception {
|
||||
initData3(source);
|
||||
initData3(target);
|
||||
dfs.allowSnapshot(source);
|
||||
dfs.allowSnapshot(target);
|
||||
dfs.createSnapshot(source, "s1");
|
||||
dfs.createSnapshot(target, "s1");
|
||||
enableAndCreateFirstSnapshot();
|
||||
|
||||
// make changes under source
|
||||
changeData3(source);
|
||||
@ -393,8 +410,268 @@ public void testSync3() throws Exception {
|
||||
SnapshotDiffReport report = dfs.getSnapshotDiffReport(source, "s1", "s2");
|
||||
System.out.println(report);
|
||||
|
||||
syncAndVerify();
|
||||
}
|
||||
|
||||
private void initData4(Path dir) throws Exception {
|
||||
final Path d1 = new Path(dir, "d1");
|
||||
final Path d2 = new Path(d1, "d2");
|
||||
final Path f1 = new Path(d2, "f1");
|
||||
|
||||
DFSTestUtil.createFile(dfs, f1, BLOCK_SIZE, DATA_NUM, 0L);
|
||||
}
|
||||
|
||||
private void changeData4(Path dir) throws Exception {
|
||||
final Path d1 = new Path(dir, "d1");
|
||||
final Path d11 = new Path(dir, "d11");
|
||||
final Path d2 = new Path(d1, "d2");
|
||||
final Path d21 = new Path(d1, "d21");
|
||||
final Path f1 = new Path(d2, "f1");
|
||||
|
||||
dfs.delete(f1, false);
|
||||
dfs.rename(d2, d21);
|
||||
dfs.rename(d1, d11);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test a case where multiple level dirs are renamed.
|
||||
*/
|
||||
@Test
|
||||
public void testSync4() throws Exception {
|
||||
initData4(source);
|
||||
initData4(target);
|
||||
enableAndCreateFirstSnapshot();
|
||||
|
||||
// make changes under source
|
||||
changeData4(source);
|
||||
dfs.createSnapshot(source, "s2");
|
||||
|
||||
SnapshotDiffReport report = dfs.getSnapshotDiffReport(source, "s1", "s2");
|
||||
System.out.println(report);
|
||||
|
||||
syncAndVerify();
|
||||
}
|
||||
|
||||
private void initData5(Path dir) throws Exception {
|
||||
final Path d1 = new Path(dir, "d1");
|
||||
final Path d2 = new Path(dir, "d2");
|
||||
final Path f1 = new Path(d1, "f1");
|
||||
final Path f2 = new Path(d2, "f2");
|
||||
|
||||
DFSTestUtil.createFile(dfs, f1, BLOCK_SIZE, DATA_NUM, 0L);
|
||||
DFSTestUtil.createFile(dfs, f2, BLOCK_SIZE, DATA_NUM, 0L);
|
||||
}
|
||||
|
||||
private void changeData5(Path dir) throws Exception {
|
||||
final Path d1 = new Path(dir, "d1");
|
||||
final Path d2 = new Path(dir, "d2");
|
||||
final Path f1 = new Path(d1, "f1");
|
||||
final Path tmp = new Path(dir, "tmp");
|
||||
|
||||
dfs.delete(f1, false);
|
||||
dfs.rename(d1, tmp);
|
||||
dfs.rename(d2, d1);
|
||||
final Path f2 = new Path(d1, "f2");
|
||||
dfs.delete(f2, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test a case with different delete and rename sequences.
|
||||
*/
|
||||
@Test
|
||||
public void testSync5() throws Exception {
|
||||
initData5(source);
|
||||
initData5(target);
|
||||
enableAndCreateFirstSnapshot();
|
||||
|
||||
// make changes under source
|
||||
changeData5(source);
|
||||
dfs.createSnapshot(source, "s2");
|
||||
|
||||
SnapshotDiffReport report = dfs.getSnapshotDiffReport(source, "s1", "s2");
|
||||
System.out.println(report);
|
||||
|
||||
syncAndVerify();
|
||||
}
|
||||
|
||||
private void testAndVerify(int numCreatedModified)
|
||||
throws Exception{
|
||||
SnapshotDiffReport report = dfs.getSnapshotDiffReport(source, "s1", "s2");
|
||||
System.out.println(report);
|
||||
|
||||
DistCpSync distCpSync = new DistCpSync(options, conf);
|
||||
// do the sync
|
||||
Assert.assertTrue(DistCpSync.sync(options, conf));
|
||||
verifyCopy(dfs.getFileStatus(source), dfs.getFileStatus(target), false);
|
||||
Assert.assertTrue(distCpSync.sync());
|
||||
|
||||
// make sure the source path has been updated to the snapshot path
|
||||
final Path spath = new Path(source,
|
||||
HdfsConstants.DOT_SNAPSHOT_DIR + Path.SEPARATOR + "s2");
|
||||
Assert.assertEquals(spath, options.getSourcePaths().get(0));
|
||||
|
||||
// build copy listing
|
||||
final Path listingPath = new Path("/tmp/META/fileList.seq");
|
||||
CopyListing listing = new SimpleCopyListing(conf, new Credentials(), distCpSync);
|
||||
listing.buildListing(listingPath, options);
|
||||
|
||||
Map<Text, CopyListingFileStatus> copyListing = getListing(listingPath);
|
||||
CopyMapper copyMapper = new CopyMapper();
|
||||
StubContext stubContext = new StubContext(conf, null, 0);
|
||||
Mapper<Text, CopyListingFileStatus, Text, Text>.Context context =
|
||||
stubContext.getContext();
|
||||
// Enable append
|
||||
context.getConfiguration().setBoolean(
|
||||
DistCpOptionSwitch.APPEND.getConfigLabel(), true);
|
||||
copyMapper.setup(context);
|
||||
for (Map.Entry<Text, CopyListingFileStatus> entry :
|
||||
copyListing.entrySet()) {
|
||||
copyMapper.map(entry.getKey(), entry.getValue(), context);
|
||||
}
|
||||
|
||||
// verify that we only list modified and created files/directories
|
||||
Assert.assertEquals(numCreatedModified, copyListing.size());
|
||||
|
||||
// verify the source and target now has the same structure
|
||||
verifyCopy(dfs.getFileStatus(spath), dfs.getFileStatus(target), false);
|
||||
}
|
||||
|
||||
private void initData6(Path dir) throws Exception {
|
||||
final Path foo = new Path(dir, "foo");
|
||||
final Path bar = new Path(dir, "bar");
|
||||
final Path foo_f1 = new Path(foo, "f1");
|
||||
final Path bar_f1 = new Path(bar, "f1");
|
||||
|
||||
DFSTestUtil.createFile(dfs, foo_f1, BLOCK_SIZE, DATA_NUM, 0L);
|
||||
DFSTestUtil.createFile(dfs, bar_f1, BLOCK_SIZE, DATA_NUM, 0L);
|
||||
}
|
||||
|
||||
private int changeData6(Path dir) throws Exception {
|
||||
final Path foo = new Path(dir, "foo");
|
||||
final Path bar = new Path(dir, "bar");
|
||||
final Path foo2 = new Path(dir, "foo2");
|
||||
final Path foo_f1 = new Path(foo, "f1");
|
||||
|
||||
int numCreatedModified = 0;
|
||||
dfs.rename(foo, foo2);
|
||||
dfs.rename(bar, foo);
|
||||
dfs.rename(foo2, bar);
|
||||
DFSTestUtil.appendFile(dfs, foo_f1, (int) BLOCK_SIZE);
|
||||
numCreatedModified += 1; // modify ./bar/f1
|
||||
return numCreatedModified;
|
||||
}
|
||||
|
||||
/**
|
||||
* Test a case where there is a cycle in renaming dirs.
|
||||
*/
|
||||
@Test
|
||||
public void testSync6() throws Exception {
|
||||
initData6(source);
|
||||
initData6(target);
|
||||
enableAndCreateFirstSnapshot();
|
||||
int numCreatedModified = changeData6(source);
|
||||
dfs.createSnapshot(source, "s2");
|
||||
|
||||
testAndVerify(numCreatedModified);
|
||||
}
|
||||
|
||||
private void initData7(Path dir) throws Exception {
|
||||
final Path foo = new Path(dir, "foo");
|
||||
final Path bar = new Path(dir, "bar");
|
||||
final Path foo_f1 = new Path(foo, "f1");
|
||||
final Path bar_f1 = new Path(bar, "f1");
|
||||
|
||||
DFSTestUtil.createFile(dfs, foo_f1, BLOCK_SIZE, DATA_NUM, 0L);
|
||||
DFSTestUtil.createFile(dfs, bar_f1, BLOCK_SIZE, DATA_NUM, 0L);
|
||||
}
|
||||
|
||||
private int changeData7(Path dir) throws Exception {
|
||||
final Path foo = new Path(dir, "foo");
|
||||
final Path foo2 = new Path(dir, "foo2");
|
||||
final Path foo_f1 = new Path(foo, "f1");
|
||||
final Path foo2_f2 = new Path(foo2, "f2");
|
||||
final Path foo_d1 = new Path(foo, "d1");
|
||||
final Path foo_d1_f3 = new Path(foo_d1, "f3");
|
||||
|
||||
int numCreatedModified = 0;
|
||||
dfs.rename(foo, foo2);
|
||||
DFSTestUtil.createFile(dfs, foo_f1, BLOCK_SIZE, DATA_NUM, 0L);
|
||||
numCreatedModified += 2; // create ./foo and ./foo/f1
|
||||
DFSTestUtil.appendFile(dfs, foo_f1, (int) BLOCK_SIZE);
|
||||
dfs.rename(foo_f1, foo2_f2);
|
||||
numCreatedModified -= 1; // mv ./foo/f1
|
||||
numCreatedModified += 2; // "M ./foo" and "+ ./foo/f2"
|
||||
DFSTestUtil.createFile(dfs, foo_d1_f3, BLOCK_SIZE, DATA_NUM, 0L);
|
||||
numCreatedModified += 2; // create ./foo/d1 and ./foo/d1/f3
|
||||
return numCreatedModified;
|
||||
}
|
||||
|
||||
/**
|
||||
* Test a case where rename a dir, then create a new dir with the same name
|
||||
* and sub dir.
|
||||
*/
|
||||
@Test
|
||||
public void testSync7() throws Exception {
|
||||
initData7(source);
|
||||
initData7(target);
|
||||
enableAndCreateFirstSnapshot();
|
||||
int numCreatedModified = changeData7(source);
|
||||
dfs.createSnapshot(source, "s2");
|
||||
|
||||
testAndVerify(numCreatedModified);
|
||||
}
|
||||
|
||||
private void initData8(Path dir) throws Exception {
|
||||
final Path foo = new Path(dir, "foo");
|
||||
final Path bar = new Path(dir, "bar");
|
||||
final Path d1 = new Path(dir, "d1");
|
||||
final Path foo_f1 = new Path(foo, "f1");
|
||||
final Path bar_f1 = new Path(bar, "f1");
|
||||
final Path d1_f1 = new Path(d1, "f1");
|
||||
|
||||
DFSTestUtil.createFile(dfs, foo_f1, BLOCK_SIZE, DATA_NUM, 0L);
|
||||
DFSTestUtil.createFile(dfs, bar_f1, BLOCK_SIZE, DATA_NUM, 0L);
|
||||
DFSTestUtil.createFile(dfs, d1_f1, BLOCK_SIZE, DATA_NUM, 0L);
|
||||
}
|
||||
|
||||
private int changeData8(Path dir) throws Exception {
|
||||
final Path foo = new Path(dir, "foo");
|
||||
final Path createdDir = new Path(dir, "c");
|
||||
final Path d1 = new Path(dir, "d1");
|
||||
final Path d1_f1 = new Path(d1, "f1");
|
||||
final Path createdDir_f1 = new Path(createdDir, "f1");
|
||||
final Path foo_f3 = new Path(foo, "f3");
|
||||
final Path new_foo = new Path(createdDir, "foo");
|
||||
final Path foo_f4 = new Path(foo, "f4");
|
||||
final Path foo_d1 = new Path(foo, "d1");
|
||||
final Path bar = new Path(dir, "bar");
|
||||
final Path bar1 = new Path(dir, "bar1");
|
||||
|
||||
int numCreatedModified = 0;
|
||||
DFSTestUtil.createFile(dfs, foo_f3, BLOCK_SIZE, DATA_NUM, 0L);
|
||||
numCreatedModified += 1; // create ./c/foo/f3
|
||||
DFSTestUtil.createFile(dfs, createdDir_f1, BLOCK_SIZE, DATA_NUM, 0L);
|
||||
numCreatedModified += 1; // create ./c
|
||||
dfs.rename(createdDir_f1, foo_f4);
|
||||
numCreatedModified += 1; // create ./c/foo/f4
|
||||
dfs.rename(d1_f1, createdDir_f1); // rename ./d1/f1 -> ./c/f1
|
||||
numCreatedModified += 1; // modify ./c/foo/d1
|
||||
dfs.rename(d1, foo_d1);
|
||||
numCreatedModified += 1; // modify ./c/foo
|
||||
dfs.rename(foo, new_foo);
|
||||
dfs.rename(bar, bar1);
|
||||
return numCreatedModified;
|
||||
}
|
||||
|
||||
/**
|
||||
* Test a case where create a dir, then mv a existed dir into it.
|
||||
*/
|
||||
@Test
|
||||
public void testSync8() throws Exception {
|
||||
initData8(source);
|
||||
initData8(target);
|
||||
enableAndCreateFirstSnapshot();
|
||||
int numCreatedModified = changeData8(source);
|
||||
dfs.createSnapshot(source, "s2");
|
||||
|
||||
testAndVerify(numCreatedModified);
|
||||
}
|
||||
}
|
||||
|
@ -657,7 +657,7 @@ public void testDiffOption() {
|
||||
false));
|
||||
|
||||
DistCpOptions options = OptionsParser.parse(new String[] { "-update",
|
||||
"-delete", "-diff", "s1", "s2",
|
||||
"-diff", "s1", "s2",
|
||||
"hdfs://localhost:8020/source/first",
|
||||
"hdfs://localhost:8020/target/" });
|
||||
options.appendToConf(conf);
|
||||
@ -667,7 +667,7 @@ public void testDiffOption() {
|
||||
Assert.assertEquals("s2", options.getToSnapshot());
|
||||
|
||||
options = OptionsParser.parse(new String[] {
|
||||
"-delete", "-diff", "s1", ".", "-update",
|
||||
"-diff", "s1", ".", "-update",
|
||||
"hdfs://localhost:8020/source/first",
|
||||
"hdfs://localhost:8020/target/" });
|
||||
options.appendToConf(conf);
|
||||
@ -679,7 +679,7 @@ public void testDiffOption() {
|
||||
|
||||
// -diff requires two option values
|
||||
try {
|
||||
OptionsParser.parse(new String[] {"-diff", "s1", "-delete", "-update",
|
||||
OptionsParser.parse(new String[] {"-diff", "s1", "-update",
|
||||
"hdfs://localhost:8020/source/first",
|
||||
"hdfs://localhost:8020/target/" });
|
||||
fail("-diff should fail with only one snapshot name");
|
||||
@ -688,25 +688,25 @@ public void testDiffOption() {
|
||||
"Must provide both the starting and ending snapshot names", e);
|
||||
}
|
||||
|
||||
// make sure -diff is only valid when -update and -delete is specified
|
||||
// make sure -diff is only valid when -update is specified
|
||||
try {
|
||||
OptionsParser.parse(new String[] { "-diff", "s1", "s2",
|
||||
"hdfs://localhost:8020/source/first",
|
||||
"hdfs://localhost:8020/target/" });
|
||||
fail("-diff should fail if -update or -delete option is not specified");
|
||||
fail("-diff should fail if -update option is not specified");
|
||||
} catch (IllegalArgumentException e) {
|
||||
GenericTestUtils.assertExceptionContains(
|
||||
"Diff is valid only with update and delete options", e);
|
||||
"Diff is valid only with update options", e);
|
||||
}
|
||||
|
||||
try {
|
||||
OptionsParser.parse(new String[] { "-diff", "s1", "s2", "-update",
|
||||
OptionsParser.parse(new String[] { "-diff", "s1", "s2", "-update", "-delete",
|
||||
"hdfs://localhost:8020/source/first",
|
||||
"hdfs://localhost:8020/target/" });
|
||||
fail("-diff should fail if -update or -delete option is not specified");
|
||||
fail("-diff should fail if -delete option is specified");
|
||||
} catch (IllegalArgumentException e) {
|
||||
GenericTestUtils.assertExceptionContains(
|
||||
"Diff is valid only with update and delete options", e);
|
||||
"Diff is valid only with update options", e);
|
||||
}
|
||||
|
||||
try {
|
||||
@ -714,10 +714,10 @@ public void testDiffOption() {
|
||||
"-delete", "-overwrite",
|
||||
"hdfs://localhost:8020/source/first",
|
||||
"hdfs://localhost:8020/target/" });
|
||||
fail("-diff should fail if -update or -delete option is not specified");
|
||||
fail("-diff should fail if -update option is not specified");
|
||||
} catch (IllegalArgumentException e) {
|
||||
GenericTestUtils.assertExceptionContains(
|
||||
"Diff is valid only with update and delete options", e);
|
||||
"Diff is valid only with update options", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user