HADOOP-15384. distcp numListstatusThreads option doesn't get to -delete scan.
Contributed by Steve Loughran.
This commit is contained in:
parent
9bd5bef297
commit
ca8b80bf59
@ -387,7 +387,10 @@ public void appendToConf(Configuration conf) {
|
||||
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.TRACK_MISSING,
|
||||
String.valueOf(trackPath));
|
||||
}
|
||||
|
||||
if (numListstatusThreads > 0) {
|
||||
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.NUM_LISTSTATUS_THREADS,
|
||||
Integer.toString(numListstatusThreads));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -392,6 +392,9 @@ private void deleteMissing(Configuration conf) throws IOException {
|
||||
Path sourceListing = new Path(conf.get(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH));
|
||||
FileSystem clusterFS = sourceListing.getFileSystem(conf);
|
||||
Path sortedSourceListing = DistCpUtils.sortListing(conf, sourceListing);
|
||||
long sourceListingCompleted = System.currentTimeMillis();
|
||||
LOG.info("Source listing completed in {}",
|
||||
formatDuration(sourceListingCompleted - listingStart));
|
||||
|
||||
// Similarly, create the listing of target-files. Sort alphabetically.
|
||||
Path targetListing = new Path(sourceListing.getParent(), "targetListing.seq");
|
||||
@ -409,8 +412,8 @@ private void deleteMissing(Configuration conf) throws IOException {
|
||||
// Walk both source and target file listings.
|
||||
// Delete all from target that doesn't also exist on source.
|
||||
long deletionStart = System.currentTimeMillis();
|
||||
LOG.info("Listing completed in {}",
|
||||
formatDuration(deletionStart - listingStart));
|
||||
LOG.info("Destination listing completed in {}",
|
||||
formatDuration(deletionStart - sourceListingCompleted));
|
||||
|
||||
long deletedEntries = 0;
|
||||
long filesDeleted = 0;
|
||||
@ -545,9 +548,15 @@ private Path listTargetFiles(final Configuration conf,
|
||||
// Set up options to be the same from the CopyListing.buildListing's
|
||||
// perspective, so to collect similar listings as when doing the copy
|
||||
//
|
||||
// thread count is picked up from the job
|
||||
int threads = conf.getInt(DistCpConstants.CONF_LABEL_LISTSTATUS_THREADS,
|
||||
DistCpConstants.DEFAULT_LISTSTATUS_THREADS);
|
||||
LOG.info("Scanning destination directory {} with thread count: {}",
|
||||
targetFinalPath, threads);
|
||||
DistCpOptions options = new DistCpOptions.Builder(targets, resultNonePath)
|
||||
.withOverwrite(overwrite)
|
||||
.withSyncFolder(syncFolder)
|
||||
.withNumListstatusThreads(threads)
|
||||
.build();
|
||||
DistCpContext distCpContext = new DistCpContext(options);
|
||||
distCpContext.setTargetPathExists(targetPathExists);
|
||||
|
@ -572,7 +572,7 @@ private Job runDistCp(final DistCpOptions options) throws Exception {
|
||||
private DistCpOptions buildWithStandardOptions(
|
||||
DistCpOptions.Builder builder) {
|
||||
return builder
|
||||
.withNumListstatusThreads(8)
|
||||
.withNumListstatusThreads(DistCpOptions.MAX_NUM_LISTSTATUS_THREADS)
|
||||
.build();
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user