HDFS-17120. Support snapshot diff based copylisting for flat paths. (#5885)
This commit is contained in:
parent
b1fc00d4b2
commit
b971222372
@ -122,6 +122,10 @@ private DistCpConstants() {
|
|||||||
/* DistCp CopyListing class override param */
|
/* DistCp CopyListing class override param */
|
||||||
public static final String CONF_LABEL_COPY_LISTING_CLASS = "distcp.copy.listing.class";
|
public static final String CONF_LABEL_COPY_LISTING_CLASS = "distcp.copy.listing.class";
|
||||||
|
|
||||||
|
/* Traverse directory from diff recursively and add paths to the copyList if true */
|
||||||
|
public static final String CONF_LABEL_DIFF_COPY_LISTING_TRAVERSE_DIRECTORY =
|
||||||
|
"distcp.diff.copy.listing.traverse.directory";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* DistCp Filter class override param.
|
* DistCp Filter class override param.
|
||||||
*/
|
*/
|
||||||
|
@ -61,7 +61,7 @@ protected void validatePaths(DistCpContext context)
|
|||||||
* @param pathToListingFile The location at which the copy-listing file
|
* @param pathToListingFile The location at which the copy-listing file
|
||||||
* is to be created.
|
* is to be created.
|
||||||
* @param context The distcp context with associated input options.
|
* @param context The distcp context with associated input options.
|
||||||
* @throws IOException
|
* @throws IOException if unable to construct the fileList
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void doBuildListing(Path pathToListingFile, DistCpContext context)
|
public void doBuildListing(Path pathToListingFile, DistCpContext context)
|
||||||
|
@ -50,6 +50,7 @@ public class RegexCopyFilter extends CopyFilter {
|
|||||||
/**
|
/**
|
||||||
* Constructor, sets up a File object to read filter patterns from and
|
* Constructor, sets up a File object to read filter patterns from and
|
||||||
* the List to store the patterns.
|
* the List to store the patterns.
|
||||||
|
* @param filtersFilename name of the filtersFile
|
||||||
*/
|
*/
|
||||||
protected RegexCopyFilter(String filtersFilename) {
|
protected RegexCopyFilter(String filtersFilename) {
|
||||||
filtersFile = new File(filtersFilename);
|
filtersFile = new File(filtersFilename);
|
||||||
|
@ -234,7 +234,11 @@ private Path getPathWithSchemeAndAuthority(Path path) throws IOException {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Write a single file/directory to the sequence file.
|
* Write a single file/directory to the sequence file.
|
||||||
* @throws IOException
|
* @param fileListWriter the list for holding processed results
|
||||||
|
* @param sourceRoot the source dir path for copyListing
|
||||||
|
* @param path add the given path to the file list.
|
||||||
|
* @param context The DistCp context with associated input options
|
||||||
|
* @throws IOException if it fails to add it to the fileList
|
||||||
*/
|
*/
|
||||||
private void addToFileListing(SequenceFile.Writer fileListWriter,
|
private void addToFileListing(SequenceFile.Writer fileListWriter,
|
||||||
Path sourceRoot, Path path, DistCpContext context) throws IOException {
|
Path sourceRoot, Path path, DistCpContext context) throws IOException {
|
||||||
@ -265,7 +269,7 @@ private void addToFileListing(SequenceFile.Writer fileListWriter,
|
|||||||
* into the list.
|
* into the list.
|
||||||
* @param fileListWriter the list for holding processed results
|
* @param fileListWriter the list for holding processed results
|
||||||
* @param context The DistCp context with associated input options
|
* @param context The DistCp context with associated input options
|
||||||
* @throws IOException
|
* @throws IOException if unable to construct the fileList
|
||||||
*/
|
*/
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
protected void doBuildListingWithSnapshotDiff(
|
protected void doBuildListingWithSnapshotDiff(
|
||||||
@ -274,6 +278,8 @@ protected void doBuildListingWithSnapshotDiff(
|
|||||||
ArrayList<DiffInfo> diffList = distCpSync.prepareDiffListForCopyListing();
|
ArrayList<DiffInfo> diffList = distCpSync.prepareDiffListForCopyListing();
|
||||||
Path sourceRoot = context.getSourcePaths().get(0);
|
Path sourceRoot = context.getSourcePaths().get(0);
|
||||||
FileSystem sourceFS = sourceRoot.getFileSystem(getConf());
|
FileSystem sourceFS = sourceRoot.getFileSystem(getConf());
|
||||||
|
boolean traverseDirectory = getConf().getBoolean(
|
||||||
|
DistCpConstants.CONF_LABEL_DIFF_COPY_LISTING_TRAVERSE_DIRECTORY, true);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
List<FileStatusInfo> fileStatuses = Lists.newArrayList();
|
List<FileStatusInfo> fileStatuses = Lists.newArrayList();
|
||||||
@ -285,25 +291,8 @@ protected void doBuildListingWithSnapshotDiff(
|
|||||||
addToFileListing(fileListWriter,
|
addToFileListing(fileListWriter,
|
||||||
sourceRoot, diff.getTarget(), context);
|
sourceRoot, diff.getTarget(), context);
|
||||||
} else if (diff.getType() == SnapshotDiffReport.DiffType.CREATE) {
|
} else if (diff.getType() == SnapshotDiffReport.DiffType.CREATE) {
|
||||||
addToFileListing(fileListWriter,
|
addCreateDiffsToFileListing(fileListWriter, context, sourceRoot,
|
||||||
sourceRoot, diff.getTarget(), context);
|
sourceFS, fileStatuses, diff, traverseDirectory);
|
||||||
|
|
||||||
FileStatus sourceStatus = sourceFS.getFileStatus(diff.getTarget());
|
|
||||||
if (sourceStatus.isDirectory()) {
|
|
||||||
LOG.debug("Adding source dir for traverse: {}",
|
|
||||||
sourceStatus.getPath());
|
|
||||||
|
|
||||||
HashSet<String> excludeList =
|
|
||||||
distCpSync.getTraverseExcludeList(diff.getSource(),
|
|
||||||
context.getSourcePaths().get(0));
|
|
||||||
|
|
||||||
ArrayList<FileStatus> sourceDirs = new ArrayList<>();
|
|
||||||
sourceDirs.add(sourceStatus);
|
|
||||||
|
|
||||||
new TraverseDirectory(fileListWriter, sourceFS, sourceDirs,
|
|
||||||
sourceRoot, context, excludeList, fileStatuses)
|
|
||||||
.traverseDirectory();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (randomizeFileListing) {
|
if (randomizeFileListing) {
|
||||||
@ -316,6 +305,43 @@ protected void doBuildListingWithSnapshotDiff(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Handle create Diffs and add to the copyList.
|
||||||
|
* If the path is a directory, iterate it recursively and add the paths
|
||||||
|
* to the result copyList.
|
||||||
|
*
|
||||||
|
* @param fileListWriter the list for holding processed results
|
||||||
|
* @param context The DistCp context with associated input options
|
||||||
|
* @param sourceRoot The rootDir of the source snapshot
|
||||||
|
* @param sourceFS the source Filesystem
|
||||||
|
* @param fileStatuses store the result fileStatuses to add to the copyList
|
||||||
|
* @param diff the SnapshotDiff report
|
||||||
|
* @param traverseDirectory traverse directory recursively and add paths to the copyList
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
private void addCreateDiffsToFileListing(SequenceFile.Writer fileListWriter,
|
||||||
|
DistCpContext context, Path sourceRoot, FileSystem sourceFS,
|
||||||
|
List<FileStatusInfo> fileStatuses, DiffInfo diff, boolean traverseDirectory) throws IOException {
|
||||||
|
addToFileListing(fileListWriter, sourceRoot, diff.getTarget(), context);
|
||||||
|
|
||||||
|
if (traverseDirectory) {
|
||||||
|
FileStatus sourceStatus = sourceFS.getFileStatus(diff.getTarget());
|
||||||
|
if (sourceStatus.isDirectory()) {
|
||||||
|
LOG.debug("Adding source dir for traverse: {}", sourceStatus.getPath());
|
||||||
|
|
||||||
|
HashSet<String> excludeList =
|
||||||
|
distCpSync.getTraverseExcludeList(diff.getSource(),
|
||||||
|
context.getSourcePaths().get(0));
|
||||||
|
|
||||||
|
ArrayList<FileStatus> sourceDirs = new ArrayList<>();
|
||||||
|
sourceDirs.add(sourceStatus);
|
||||||
|
|
||||||
|
new TraverseDirectory(fileListWriter, sourceFS, sourceDirs, sourceRoot,
|
||||||
|
context, excludeList, fileStatuses).traverseDirectory();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Collect the list of
|
* Collect the list of
|
||||||
* {@literal <sourceRelativePath, sourceFileStatus>}
|
* {@literal <sourceRelativePath, sourceFileStatus>}
|
||||||
@ -332,7 +358,7 @@ protected void doBuildListingWithSnapshotDiff(
|
|||||||
* computed.
|
* computed.
|
||||||
* @param fileListWriter
|
* @param fileListWriter
|
||||||
* @param context The distcp context with associated input options
|
* @param context The distcp context with associated input options
|
||||||
* @throws IOException
|
* @throws IOException if unable to construct the fileList
|
||||||
*/
|
*/
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
protected void doBuildListing(SequenceFile.Writer fileListWriter,
|
protected void doBuildListing(SequenceFile.Writer fileListWriter,
|
||||||
|
@ -20,6 +20,8 @@
|
|||||||
|
|
||||||
import static org.mockito.Mockito.*;
|
import static org.mockito.Mockito.*;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
|
||||||
|
import org.mockito.Mockito;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
@ -167,6 +169,72 @@ public void testDuplicates() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(expected = DuplicateFileException.class, timeout = 10000)
|
||||||
|
public void testDiffBasedSimpleCopyListing() throws IOException {
|
||||||
|
FileSystem fs = null;
|
||||||
|
Configuration configuration = getConf();
|
||||||
|
DistCpSync distCpSync = Mockito.mock(DistCpSync.class);
|
||||||
|
Path listingFile = new Path("/tmp/list");
|
||||||
|
// Throws DuplicateFileException as it recursively traverses src3 directory
|
||||||
|
// and also adds 3.txt,4.txt twice
|
||||||
|
configuration.setBoolean(
|
||||||
|
DistCpConstants.CONF_LABEL_DIFF_COPY_LISTING_TRAVERSE_DIRECTORY, true);
|
||||||
|
try {
|
||||||
|
fs = FileSystem.get(getConf());
|
||||||
|
buildListingUsingSnapshotDiff(fs, configuration, distCpSync, listingFile);
|
||||||
|
} finally {
|
||||||
|
TestDistCpUtils.delete(fs, "/tmp");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout=10000)
|
||||||
|
public void testDiffBasedSimpleCopyListingWithoutTraverseDirectory()
|
||||||
|
throws IOException {
|
||||||
|
FileSystem fs = null;
|
||||||
|
Configuration configuration = getConf();
|
||||||
|
DistCpSync distCpSync = Mockito.mock(DistCpSync.class);
|
||||||
|
Path listingFile = new Path("/tmp/list");
|
||||||
|
// no exception expected in this case
|
||||||
|
configuration.setBoolean(
|
||||||
|
DistCpConstants.CONF_LABEL_DIFF_COPY_LISTING_TRAVERSE_DIRECTORY, false);
|
||||||
|
try {
|
||||||
|
fs = FileSystem.get(getConf());
|
||||||
|
buildListingUsingSnapshotDiff(fs, configuration, distCpSync, listingFile);
|
||||||
|
} finally {
|
||||||
|
TestDistCpUtils.delete(fs, "/tmp");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void buildListingUsingSnapshotDiff(FileSystem fs,
|
||||||
|
Configuration configuration, DistCpSync distCpSync, Path listingFile)
|
||||||
|
throws IOException {
|
||||||
|
List<Path> srcPaths = new ArrayList<>();
|
||||||
|
srcPaths.add(new Path("/tmp/in"));
|
||||||
|
TestDistCpUtils.createFile(fs, "/tmp/in/src1/1.txt");
|
||||||
|
TestDistCpUtils.createFile(fs, "/tmp/in/src2/1.txt");
|
||||||
|
TestDistCpUtils.createFile(fs, "/tmp/in/src3/3.txt");
|
||||||
|
TestDistCpUtils.createFile(fs, "/tmp/in/src3/4.txt");
|
||||||
|
Path target = new Path("/tmp/out");
|
||||||
|
// adding below flags useDiff & sync only to enable context.shouldUseSnapshotDiff()
|
||||||
|
final DistCpOptions options =
|
||||||
|
new DistCpOptions.Builder(srcPaths, target).withUseDiff("snap1",
|
||||||
|
"snap2").withSyncFolder(true).build();
|
||||||
|
// Create a dummy DiffInfo List that contains a directory + paths inside
|
||||||
|
// that directory as part of the diff.
|
||||||
|
ArrayList<DiffInfo> diffs = new ArrayList<>();
|
||||||
|
diffs.add(new DiffInfo(new Path("/tmp/in/src3/"), new Path("/tmp/in/src3/"),
|
||||||
|
SnapshotDiffReport.DiffType.CREATE));
|
||||||
|
diffs.add(new DiffInfo(new Path("/tmp/in/src3/3.txt"),
|
||||||
|
new Path("/tmp/in/src3/3.txt"), SnapshotDiffReport.DiffType.CREATE));
|
||||||
|
diffs.add(new DiffInfo(new Path("/tmp/in/src3/4.txt"),
|
||||||
|
new Path("/tmp/in/src3/4.txt"), SnapshotDiffReport.DiffType.CREATE));
|
||||||
|
Mockito.when(distCpSync.prepareDiffListForCopyListing()).thenReturn(diffs);
|
||||||
|
final DistCpContext context = new DistCpContext(options);
|
||||||
|
CopyListing listing =
|
||||||
|
new SimpleCopyListing(configuration, CREDENTIALS, distCpSync);
|
||||||
|
listing.buildListing(listingFile, context);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testDuplicateSourcePaths() throws Exception {
|
public void testDuplicateSourcePaths() throws Exception {
|
||||||
FileSystem fs = FileSystem.get(getConf());
|
FileSystem fs = FileSystem.get(getConf());
|
||||||
|
Loading…
Reference in New Issue
Block a user