HADOOP-17531. DistCp: Reduce memory usage on copying huge directories. (#2732). Contributed by Ayush Saxena.

Signed-off-by: Steve Loughran <stevel@apache.org>
This commit is contained in:
Ayush Saxena 2021-03-24 02:36:26 +05:30 committed by GitHub
parent 569e407f64
commit 03cfc85279
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 773 additions and 165 deletions

View File

@ -0,0 +1,153 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.util.functional;
import javax.annotation.Nullable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.util.DurationInfo;
import static org.apache.hadoop.fs.impl.FutureIOSupport.raiseInnerCause;
/**
* A bridge from Callable to Supplier; catching exceptions
* raised by the callable and wrapping them as appropriate.
* @param <T> return type.
*/
public final class CommonCallableSupplier<T> implements Supplier {
private static final Logger LOG =
LoggerFactory.getLogger(CommonCallableSupplier.class);
private final Callable<T> call;
/**
* Create.
* @param call call to invoke.
*/
public CommonCallableSupplier(final Callable<T> call) {
this.call = call;
}
@Override
public Object get() {
try {
return call.call();
} catch (RuntimeException e) {
throw e;
} catch (IOException e) {
throw new UncheckedIOException(e);
} catch (Exception e) {
throw new UncheckedIOException(new IOException(e));
}
}
/**
* Submit a callable into a completable future.
* RTEs are rethrown.
* Non RTEs are caught and wrapped; IOExceptions to
* {@code RuntimeIOException} instances.
* @param executor executor.
* @param call call to invoke
* @param <T> type
* @return the future to wait for
*/
@SuppressWarnings("unchecked")
public static <T> CompletableFuture<T> submit(final Executor executor,
final Callable<T> call) {
return CompletableFuture
.supplyAsync(new CommonCallableSupplier<T>(call), executor);
}
/**
* Wait for a list of futures to complete. If the list is empty,
* return immediately.
* @param futures list of futures.
* @throws IOException if one of the called futures raised an IOE.
* @throws RuntimeException if one of the futures raised one.
*/
public static <T> void waitForCompletion(
final List<CompletableFuture<T>> futures) throws IOException {
if (futures.isEmpty()) {
return;
}
// await completion
waitForCompletion(
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])));
}
/**
* Wait for a single of future to complete, extracting IOEs afterwards.
* @param future future to wait for.
* @throws IOException if one of the called futures raised an IOE.
* @throws RuntimeException if one of the futures raised one.
*/
public static <T> void waitForCompletion(final CompletableFuture<T> future)
throws IOException {
try (DurationInfo ignore = new DurationInfo(LOG, false,
"Waiting for task completion")) {
future.join();
} catch (CancellationException e) {
throw new IOException(e);
} catch (CompletionException e) {
raiseInnerCause(e);
}
}
/**
* Wait for a single of future to complete, ignoring exceptions raised.
* @param future future to wait for.
*/
public static <T> void waitForCompletionIgnoringExceptions(
@Nullable final CompletableFuture<T> future) {
if (future != null) {
try (DurationInfo ignore = new DurationInfo(LOG, false,
"Waiting for task completion")) {
future.join();
} catch (Exception e) {
LOG.debug("Ignoring exception raised in task completion: ");
}
}
}
/**
* Block awaiting completion for any non-null future passed in;
* No-op if a null arg was supplied.
* @param future future
* @throws IOException if one of the called futures raised an IOE.
* @throws RuntimeException if one of the futures raised one.
*/
public static void maybeAwaitCompletion(
@Nullable final CompletableFuture<Void> future) throws IOException {
if (future != null) {
waitForCompletion(future);
}
}
}

View File

@ -30,13 +30,17 @@
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.Enumeration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
@ -46,8 +50,11 @@
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
import org.apache.hadoop.util.DurationInfo;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.apache.log4j.Appender;
@ -61,15 +68,28 @@
import org.junit.Assume;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.thirdparty.com.google.common.base.Charsets;
import org.apache.hadoop.thirdparty.com.google.common.base.Joiner;
import org.apache.hadoop.thirdparty.com.google.common.collect.Sets;
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
import static org.apache.hadoop.util.functional.CommonCallableSupplier.submit;
import static org.apache.hadoop.util.functional.CommonCallableSupplier.waitForCompletion;
/**
* Test provides some very generic helpers which might be used across the tests
*/
public abstract class GenericTestUtils {
public static final int EXECUTOR_THREAD_COUNT = 64;
private static final org.slf4j.Logger LOG =
LoggerFactory.getLogger(GenericTestUtils.class);
public static final String PREFIX = "file-";
private static final AtomicInteger sequence = new AtomicInteger();
/**
@ -896,5 +916,132 @@ public static int getTestsThreadCount() {
}
return threadCount;
}
/**
* Write the text to a file asynchronously. Logs the operation duration.
* @param fs filesystem
* @param path path
* @return future to the patch created.
*/
private static CompletableFuture<Path> put(FileSystem fs,
Path path, String text) {
return submit(EXECUTOR, () -> {
try (DurationInfo ignore =
new DurationInfo(LOG, false, "Creating %s", path)) {
createFile(fs, path, true, text.getBytes(Charsets.UTF_8));
return path;
}
});
}
/**
* Build a set of files in a directory tree.
* @param fs filesystem
* @param destDir destination
* @param depth file depth
* @param fileCount number of files to create.
* @param dirCount number of dirs to create at each level
* @return the list of files created.
*/
public static List<Path> createFiles(final FileSystem fs,
final Path destDir,
final int depth,
final int fileCount,
final int dirCount) throws IOException {
return createDirsAndFiles(fs, destDir, depth, fileCount, dirCount,
new ArrayList<Path>(fileCount),
new ArrayList<Path>(dirCount));
}
/**
* Build a set of files in a directory tree.
* @param fs filesystem
* @param destDir destination
* @param depth file depth
* @param fileCount number of files to create.
* @param dirCount number of dirs to create at each level
* @param paths [out] list of file paths created
* @param dirs [out] list of directory paths created.
* @return the list of files created.
*/
public static List<Path> createDirsAndFiles(final FileSystem fs,
final Path destDir,
final int depth,
final int fileCount,
final int dirCount,
final List<Path> paths,
final List<Path> dirs) throws IOException {
buildPaths(paths, dirs, destDir, depth, fileCount, dirCount);
List<CompletableFuture<Path>> futures = new ArrayList<>(paths.size()
+ dirs.size());
// create directories. With dir marker retention, that adds more entries
// to cause deletion issues
try (DurationInfo ignore =
new DurationInfo(LOG, "Creating %d directories", dirs.size())) {
for (Path path : dirs) {
futures.add(submit(EXECUTOR, () ->{
fs.mkdirs(path);
return path;
}));
}
waitForCompletion(futures);
}
try (DurationInfo ignore =
new DurationInfo(LOG, "Creating %d files", paths.size())) {
for (Path path : paths) {
futures.add(put(fs, path, path.getName()));
}
waitForCompletion(futures);
return paths;
}
}
/**
* Recursive method to build up lists of files and directories.
* @param filePaths list of file paths to add entries to.
* @param dirPaths list of directory paths to add entries to.
* @param destDir destination directory.
* @param depth depth of directories
* @param fileCount number of files.
* @param dirCount number of directories.
*/
public static void buildPaths(final List<Path> filePaths,
final List<Path> dirPaths, final Path destDir, final int depth,
final int fileCount, final int dirCount) {
if (depth <= 0) {
return;
}
// create the file paths
for (int i = 0; i < fileCount; i++) {
String name = filenameOfIndex(i);
Path p = new Path(destDir, name);
filePaths.add(p);
}
for (int i = 0; i < dirCount; i++) {
String name = String.format("dir-%03d", i);
Path p = new Path(destDir, name);
dirPaths.add(p);
buildPaths(filePaths, dirPaths, p, depth - 1, fileCount, dirCount);
}
}
/**
* Given an index, return a string to use as the filename.
* @param i index
* @return name
*/
public static String filenameOfIndex(final int i) {
return String.format("%s%03d", PREFIX, i);
}
/**
* For submitting work.
*/
private static final BlockingThreadPoolExecutorService EXECUTOR =
BlockingThreadPoolExecutorService.newInstance(
EXECUTOR_THREAD_COUNT,
EXECUTOR_THREAD_COUNT * 2,
30, TimeUnit.SECONDS,
"test-operations");
}

View File

@ -79,6 +79,7 @@
import static org.apache.hadoop.fs.s3a.test.ExtraAssertions.extractCause;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsSourceToString;
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
import static org.apache.hadoop.test.GenericTestUtils.buildPaths;
import static org.apache.hadoop.test.LambdaTestUtils.eval;
/**
@ -162,8 +163,6 @@ public class ITestPartialRenamesDeletes extends AbstractS3ATestBase {
public static final int DEPTH = 2;
public static final int DEPTH_SCALED = 2;
public static final String PREFIX = "file-";
/**
* A role FS; if non-null it is closed in teardown.
*/
@ -910,49 +909,6 @@ public static List<Path> createDirsAndFiles(final FileSystem fs,
}
}
/**
* Recursive method to build up lists of files and directories.
* @param filePaths list of file paths to add entries to.
* @param dirPaths list of directory paths to add entries to.
* @param destDir destination directory.
* @param depth depth of directories
* @param fileCount number of files.
* @param dirCount number of directories.
*/
private static void buildPaths(
final List<Path> filePaths,
final List<Path> dirPaths,
final Path destDir,
final int depth,
final int fileCount,
final int dirCount) {
if (depth<=0) {
return;
}
// create the file paths
for (int i = 0; i < fileCount; i++) {
String name = filenameOfIndex(i);
Path p = new Path(destDir, name);
filePaths.add(p);
}
for (int i = 0; i < dirCount; i++) {
String name = String.format("dir-%03d", i);
Path p = new Path(destDir, name);
dirPaths.add(p);
buildPaths(filePaths, dirPaths, p, depth - 1, fileCount, dirCount);
}
}
/**
* Given an index, return a string to use as the filename.
* @param i index
* @return name
*/
public static String filenameOfIndex(final int i) {
return String.format("%s%03d", PREFIX, i);
}
/**
* Verifies that s3:DeleteObjectVersion is not required for rename.
* <p></p>

View File

@ -39,7 +39,7 @@
import static org.apache.hadoop.fs.s3a.Constants.USER_AGENT_PREFIX;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.lsR;
import static org.apache.hadoop.fs.s3a.impl.ITestPartialRenamesDeletes.createFiles;
import static org.apache.hadoop.fs.s3a.impl.ITestPartialRenamesDeletes.filenameOfIndex;
import static org.apache.hadoop.test.GenericTestUtils.filenameOfIndex;
/**
* Test some scalable operations related to file renaming and deletion.

View File

@ -139,6 +139,7 @@ private DistCpConstants() {
public static final String CONF_LABEL_BLOCKS_PER_CHUNK =
"distcp.blocks.per.chunk";
public static final String CONF_LABEL_USE_ITERATOR = "distcp.use.iterator";
/**
* Constants for DistCp return code to shell / consumer of ToolRunner's run
*/

View File

@ -171,6 +171,10 @@ public int getBlocksPerChunk() {
return options.getBlocksPerChunk();
}
public boolean shouldUseIterator() {
return options.shouldUseIterator();
}
public final boolean splitLargeFile() {
return options.getBlocksPerChunk() > 0;
}

View File

@ -239,7 +239,12 @@ public enum DistCpOptionSwitch {
*/
DIRECT_WRITE(DistCpConstants.CONF_LABEL_DIRECT_WRITE,
new Option("direct", false, "Write files directly to the"
+ " target location, avoiding temporary file rename."));
+ " target location, avoiding temporary file rename.")),
USE_ITERATOR(DistCpConstants.CONF_LABEL_USE_ITERATOR,
new Option("useiterator", false,
"Use single threaded list status iterator to build "
+ "the listing to save the memory utilisation at the client"));
public static final String PRESERVE_STATUS_DEFAULT = "-prbugpct";

View File

@ -158,6 +158,8 @@ public final class DistCpOptions {
/** Whether data should be written directly to the target paths. */
private final boolean directWrite;
private final boolean useIterator;
/**
* File attributes for preserve.
*
@ -222,6 +224,8 @@ private DistCpOptions(Builder builder) {
this.trackPath = builder.trackPath;
this.directWrite = builder.directWrite;
this.useIterator = builder.useIterator;
}
public Path getSourceFileListing() {
@ -353,6 +357,10 @@ public boolean shouldDirectWrite() {
return directWrite;
}
public boolean shouldUseIterator() {
return useIterator;
}
/**
* Add options to configuration. These will be used in the Mapper/committer
*
@ -403,6 +411,9 @@ public void appendToConf(Configuration conf) {
}
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.DIRECT_WRITE,
String.valueOf(directWrite));
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.USE_ITERATOR,
String.valueOf(useIterator));
}
/**
@ -440,6 +451,7 @@ public String toString() {
", copyBufferSize=" + copyBufferSize +
", verboseLog=" + verboseLog +
", directWrite=" + directWrite +
", useiterator=" + useIterator +
'}';
}
@ -491,6 +503,8 @@ public static class Builder {
private boolean directWrite = false;
private boolean useIterator = false;
public Builder(List<Path> sourcePaths, Path targetPath) {
Preconditions.checkArgument(sourcePaths != null && !sourcePaths.isEmpty(),
"Source paths should not be null or empty!");
@ -748,6 +762,11 @@ public Builder withDirectWrite(boolean newDirectWrite) {
this.directWrite = newDirectWrite;
return this;
}
public Builder withUseIterator(boolean useItr) {
this.useIterator = useItr;
return this;
}
}
}

View File

@ -115,7 +115,9 @@ public static DistCpOptions parse(String[] args)
.withVerboseLog(
command.hasOption(DistCpOptionSwitch.VERBOSE_LOG.getSwitch()))
.withDirectWrite(
command.hasOption(DistCpOptionSwitch.DIRECT_WRITE.getSwitch()));
command.hasOption(DistCpOptionSwitch.DIRECT_WRITE.getSwitch()))
.withUseIterator(
command.hasOption(DistCpOptionSwitch.USE_ITERATOR.getSwitch()));
if (command.hasOption(DistCpOptionSwitch.DIFF.getSwitch())) {
String[] snapshots = getVals(command,

View File

@ -25,6 +25,8 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.statistics.IOStatisticsLogging;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.io.SequenceFile;
@ -36,6 +38,8 @@
import org.apache.hadoop.tools.util.WorkReport;
import org.apache.hadoop.tools.util.WorkRequest;
import org.apache.hadoop.tools.util.WorkRequestProcessor;
import org.apache.hadoop.util.DurationInfo;
import org.apache.hadoop.util.functional.RemoteIterators;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.security.Credentials;
@ -49,6 +53,7 @@
import java.util.List;
import java.util.Random;
import java.util.LinkedList;
import java.util.Stack;
import static org.apache.hadoop.tools.DistCpConstants
.HDFS_RESERVED_RAW_DIRECTORY_NAME;
@ -94,11 +99,9 @@ protected SimpleCopyListing(Configuration configuration, Credentials credentials
randomizeFileListing = getConf().getBoolean(
DistCpConstants.CONF_LABEL_SIMPLE_LISTING_RANDOMIZE_FILES,
DEFAULT_RANDOMIZE_FILE_LISTING);
if (LOG.isDebugEnabled()) {
LOG.debug("numListstatusThreads=" + numListstatusThreads
+ ", fileStatusLimit=" + fileStatusLimit
+ ", randomizeFileListing=" + randomizeFileListing);
}
LOG.debug(
"numListstatusThreads={}, fileStatusLimit={}, randomizeFileListing={}",
numListstatusThreads, fileStatusLimit, randomizeFileListing);
copyFilter = CopyFilter.getCopyFilter(getConf());
copyFilter.initialize();
}
@ -286,10 +289,8 @@ protected void doBuildListingWithSnapshotDiff(
FileStatus sourceStatus = sourceFS.getFileStatus(diff.getTarget());
if (sourceStatus.isDirectory()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Adding source dir for traverse: " +
LOG.debug("Adding source dir for traverse: {}",
sourceStatus.getPath());
}
HashSet<String> excludeList =
distCpSync.getTraverseExcludeList(diff.getSource(),
@ -298,8 +299,9 @@ protected void doBuildListingWithSnapshotDiff(
ArrayList<FileStatus> sourceDirs = new ArrayList<>();
sourceDirs.add(sourceStatus);
traverseDirectory(fileListWriter, sourceFS, sourceDirs,
sourceRoot, context, excludeList, fileStatuses);
new TraverseDirectory(fileListWriter, sourceFS, sourceDirs,
sourceRoot, context, excludeList, fileStatuses)
.traverseDirectory();
}
}
}
@ -366,9 +368,8 @@ protected void doBuildListing(SequenceFile.Writer fileListWriter,
if (explore) {
ArrayList<FileStatus> sourceDirs = new ArrayList<FileStatus>();
for (FileStatus sourceStatus: sourceFiles) {
if (LOG.isDebugEnabled()) {
LOG.debug("Recording source-path: " + sourceStatus.getPath() + " for copy.");
}
LOG.debug("Recording source-path: {} for copy.",
sourceStatus.getPath());
LinkedList<CopyListingFileStatus> sourceCopyListingStatus =
DistCpUtils.toCopyListingFileStatus(sourceFS, sourceStatus,
preserveAcls && sourceStatus.isDirectory(),
@ -384,14 +385,13 @@ protected void doBuildListing(SequenceFile.Writer fileListWriter,
}
}
if (sourceStatus.isDirectory()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Adding source dir for traverse: " + sourceStatus.getPath());
}
LOG.debug("Adding source dir for traverse: {}",
sourceStatus.getPath());
sourceDirs.add(sourceStatus);
}
}
traverseDirectory(fileListWriter, sourceFS, sourceDirs,
sourcePathRoot, context, null, statusList);
new TraverseDirectory(fileListWriter, sourceFS, sourceDirs,
sourcePathRoot, context, null, statusList).traverseDirectory();
}
}
if (randomizeFileListing) {
@ -429,16 +429,12 @@ private void writeToFileListing(List<FileStatusInfo> fileStatusInfoList,
*/
Collections.shuffle(fileStatusInfoList, rnd);
for (FileStatusInfo fileStatusInfo : fileStatusInfoList) {
if (LOG.isDebugEnabled()) {
LOG.debug("Adding " + fileStatusInfo.fileStatus.getPath());
}
LOG.debug("Adding {}", fileStatusInfo.fileStatus.getPath());
writeToFileListing(fileListWriter, fileStatusInfo.fileStatus,
fileStatusInfo.sourceRootPath);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Number of paths written to fileListing="
+ fileStatusInfoList.size());
}
LOG.debug("Number of paths written to fileListing={}",
fileStatusInfoList.size());
fileStatusInfoList.clear();
}
@ -590,7 +586,7 @@ public WorkReport<FileStatus[]> processItem(
result = new WorkReport<FileStatus[]>(getFileStatus(parent.getPath()),
retry, true);
} catch (FileNotFoundException fnf) {
LOG.error("FileNotFoundException exception in listStatus: " +
LOG.error("FileNotFoundException exception in listStatus: {}",
fnf.getMessage());
result = new WorkReport<FileStatus[]>(new FileStatus[0], retry, true,
fnf);
@ -605,8 +601,7 @@ public WorkReport<FileStatus[]> processItem(
}
private void printStats() {
LOG.info("Paths (files+dirs) cnt = " + totalPaths +
"; dirCnt = " + totalDirs);
LOG.info("Paths (files+dirs) cnt = {}; dirCnt = ", totalPaths, totalDirs);
}
private void maybePrintStats() {
@ -615,79 +610,6 @@ private void maybePrintStats() {
}
}
private void traverseDirectory(SequenceFile.Writer fileListWriter,
FileSystem sourceFS,
ArrayList<FileStatus> sourceDirs,
Path sourcePathRoot,
DistCpContext context,
HashSet<String> excludeList,
List<FileStatusInfo> fileStatuses)
throws IOException {
final boolean preserveAcls = context.shouldPreserve(FileAttribute.ACL);
final boolean preserveXAttrs = context.shouldPreserve(FileAttribute.XATTR);
final boolean preserveRawXattrs = context.shouldPreserveRawXattrs();
assert numListstatusThreads > 0;
if (LOG.isDebugEnabled()) {
LOG.debug("Starting thread pool of " + numListstatusThreads +
" listStatus workers.");
}
ProducerConsumer<FileStatus, FileStatus[]> workers =
new ProducerConsumer<FileStatus, FileStatus[]>(numListstatusThreads);
for (int i = 0; i < numListstatusThreads; i++) {
workers.addWorker(
new FileStatusProcessor(sourcePathRoot.getFileSystem(getConf()),
excludeList));
}
for (FileStatus status : sourceDirs) {
workers.put(new WorkRequest<FileStatus>(status, 0));
}
while (workers.hasWork()) {
try {
WorkReport<FileStatus[]> workResult = workers.take();
int retry = workResult.getRetry();
for (FileStatus child: workResult.getItem()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Recording source-path: " + child.getPath() + " for copy.");
}
if (workResult.getSuccess()) {
LinkedList<CopyListingFileStatus> childCopyListingStatus =
DistCpUtils.toCopyListingFileStatus(sourceFS, child,
preserveAcls && child.isDirectory(),
preserveXAttrs && child.isDirectory(),
preserveRawXattrs && child.isDirectory(),
context.getBlocksPerChunk());
for (CopyListingFileStatus fs : childCopyListingStatus) {
if (randomizeFileListing) {
addToFileListing(fileStatuses,
new FileStatusInfo(fs, sourcePathRoot), fileListWriter);
} else {
writeToFileListing(fileListWriter, fs, sourcePathRoot);
}
}
}
if (retry < maxRetries) {
if (child.isDirectory()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Traversing into source dir: " + child.getPath());
}
workers.put(new WorkRequest<FileStatus>(child, retry));
}
} else {
LOG.error("Giving up on " + child.getPath() +
" after " + retry + " retries.");
}
}
} catch (InterruptedException ie) {
LOG.error("Could not get item from childQueue. Retrying...");
}
}
workers.shutdown();
}
private void writeToFileListingRoot(SequenceFile.Writer fileListWriter,
LinkedList<CopyListingFileStatus> fileStatus, Path sourcePathRoot,
DistCpContext context) throws IOException {
@ -697,9 +619,7 @@ private void writeToFileListingRoot(SequenceFile.Writer fileListWriter,
if (fs.getPath().equals(sourcePathRoot) &&
fs.isDirectory() && syncOrOverwrite) {
// Skip the root-paths when syncOrOverwrite
if (LOG.isDebugEnabled()) {
LOG.debug("Skip " + fs.getPath());
}
LOG.debug("Skip {}", fs.getPath());
return;
}
writeToFileListing(fileListWriter, fs, sourcePathRoot);
@ -709,10 +629,9 @@ private void writeToFileListingRoot(SequenceFile.Writer fileListWriter,
private void writeToFileListing(SequenceFile.Writer fileListWriter,
CopyListingFileStatus fileStatus,
Path sourcePathRoot) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("REL PATH: " + DistCpUtils.getRelativePath(sourcePathRoot,
fileStatus.getPath()) + ", FULL PATH: " + fileStatus.getPath());
}
LOG.debug("REL PATH: {}, FULL PATH: {}",
DistCpUtils.getRelativePath(sourcePathRoot, fileStatus.getPath()),
fileStatus.getPath());
if (!shouldCopy(fileStatus.getPath())) {
return;
@ -730,4 +649,159 @@ private void writeToFileListing(SequenceFile.Writer fileListWriter,
totalPaths++;
maybePrintStats();
}
/**
* A utility class to traverse a directory.
*/
private final class TraverseDirectory {
private SequenceFile.Writer fileListWriter;
private FileSystem sourceFS;
private ArrayList<FileStatus> sourceDirs;
private Path sourcePathRoot;
private DistCpContext context;
private HashSet<String> excludeList;
private List<FileStatusInfo> fileStatuses;
private final boolean preserveAcls;
private final boolean preserveXAttrs;
private final boolean preserveRawXattrs;
private TraverseDirectory(SequenceFile.Writer fileListWriter,
FileSystem sourceFS, ArrayList<FileStatus> sourceDirs,
Path sourcePathRoot, DistCpContext context, HashSet<String> excludeList,
List<FileStatusInfo> fileStatuses) {
this.fileListWriter = fileListWriter;
this.sourceFS = sourceFS;
this.sourceDirs = sourceDirs;
this.sourcePathRoot = sourcePathRoot;
this.context = context;
this.excludeList = excludeList;
this.fileStatuses = fileStatuses;
this.preserveAcls = context.shouldPreserve(FileAttribute.ACL);
this.preserveXAttrs = context.shouldPreserve(FileAttribute.XATTR);
this.preserveRawXattrs = context.shouldPreserveRawXattrs();
}
public void traverseDirectory() throws IOException {
if (context.shouldUseIterator()) {
try (DurationInfo ignored = new DurationInfo(LOG,
"Building listing using iterator mode for %s", sourcePathRoot)) {
traverseDirectoryLegacy();
}
} else {
try (DurationInfo ignored = new DurationInfo(LOG,
"Building listing using multi threaded approach for %s",
sourcePathRoot)) {
traverseDirectoryMultiThreaded();
}
}
}
public void traverseDirectoryMultiThreaded() throws IOException {
assert numListstatusThreads > 0;
LOG.debug("Starting thread pool of {} listStatus workers.",
numListstatusThreads);
ProducerConsumer<FileStatus, FileStatus[]> workers =
new ProducerConsumer<FileStatus, FileStatus[]>(numListstatusThreads);
try {
for (int i = 0; i < numListstatusThreads; i++) {
workers.addWorker(
new FileStatusProcessor(sourcePathRoot.getFileSystem(getConf()),
excludeList));
}
for (FileStatus status : sourceDirs) {
workers.put(new WorkRequest<FileStatus>(status, 0));
}
while (workers.hasWork()) {
try {
WorkReport<FileStatus[]> workResult = workers.take();
int retry = workResult.getRetry();
for (FileStatus child : workResult.getItem()) {
LOG.debug("Recording source-path: {} for copy.", child.getPath());
boolean isChildDirectory = child.isDirectory();
if (workResult.getSuccess()) {
LinkedList<CopyListingFileStatus> childCopyListingStatus =
DistCpUtils.toCopyListingFileStatus(sourceFS, child,
preserveAcls && isChildDirectory,
preserveXAttrs && isChildDirectory,
preserveRawXattrs && isChildDirectory,
context.getBlocksPerChunk());
for (CopyListingFileStatus fs : childCopyListingStatus) {
if (randomizeFileListing) {
addToFileListing(fileStatuses,
new FileStatusInfo(fs, sourcePathRoot), fileListWriter);
} else {
writeToFileListing(fileListWriter, fs, sourcePathRoot);
}
}
}
if (retry < maxRetries) {
if (isChildDirectory) {
LOG.debug("Traversing into source dir: {}", child.getPath());
workers.put(new WorkRequest<FileStatus>(child, retry));
}
} else {
LOG.error("Giving up on {} after {} retries.", child.getPath(),
retry);
}
}
} catch (InterruptedException ie) {
LOG.error("Could not get item from childQueue. Retrying...");
}
}
} finally {
workers.shutdown();
}
}
private void traverseDirectoryLegacy() throws IOException {
Stack<FileStatus> pathStack = new Stack<FileStatus>();
for (FileStatus fs : sourceDirs) {
if (excludeList == null || !excludeList
.contains(fs.getPath().toUri().getPath())) {
pathStack.add(fs);
}
}
while (!pathStack.isEmpty()) {
prepareListing(pathStack.pop().getPath());
}
}
private void prepareListing(Path path) throws IOException {
LOG.debug("Recording source-path: {} for copy.", path);
RemoteIterator<FileStatus> listStatus = RemoteIterators
.filteringRemoteIterator(sourceFS.listStatusIterator(path),
i -> excludeList == null || !excludeList
.contains(i.getPath().toUri().getPath()));
while (listStatus.hasNext()) {
FileStatus child = listStatus.next();
LinkedList<CopyListingFileStatus> childCopyListingStatus = DistCpUtils
.toCopyListingFileStatus(sourceFS, child,
preserveAcls && child.isDirectory(),
preserveXAttrs && child.isDirectory(),
preserveRawXattrs && child.isDirectory(),
context.getBlocksPerChunk());
for (CopyListingFileStatus fs : childCopyListingStatus) {
if (randomizeFileListing) {
addToFileListing(fileStatuses,
new FileStatusInfo(fs, sourcePathRoot), fileListWriter);
} else {
writeToFileListing(fileListWriter, fs, sourcePathRoot);
}
}
if (child.isDirectory()) {
LOG.debug("Traversing into source dir: {}", child.getPath());
prepareListing(child.getPath());
}
}
IOStatisticsLogging
.logIOStatisticsAtDebug(LOG, "RemoteIterator Statistics: {}",
listStatus);
}
}
}

View File

@ -362,6 +362,7 @@ Command Line Options
| `-copybuffersize <copybuffersize>` | Size of the copy buffer to use. By default, `<copybuffersize>` is set to 8192B | |
| `-xtrack <path>` | Save information about missing source files to the specified path. | This option is only valid with `-update` option. This is an experimental property and it cannot be used with `-atomic` option. |
| `-direct` | Write directly to destination paths | Useful for avoiding potentially very expensive temporary file rename operations when the destination is an object store |
| `-useiterator` | Uses single threaded listStatusIterator to build listing | Useful for saving memory at the client side. Using this option will ignore the numListstatusThreads option |
Architecture of DistCp
----------------------

View File

@ -289,7 +289,7 @@ public void testToString() {
"atomicWorkPath=null, logPath=null, sourceFileListing=abc, " +
"sourcePaths=null, targetPath=xyz, filtersFile='null', " +
"blocksPerChunk=0, copyBufferSize=8192, verboseLog=false, " +
"directWrite=false}";
"directWrite=false, useiterator=false}";
String optionString = option.toString();
Assert.assertEquals(val, optionString);
Assert.assertNotSame(DistCpOptionSwitch.ATOMIC_COMMIT.toString(),

View File

@ -48,9 +48,7 @@
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
/**
* A JUnit test for copying files recursively.
@ -60,9 +58,6 @@ public class TestDistCpSystem {
private static final Logger LOG =
LoggerFactory.getLogger(TestDistCpSystem.class);
@Rule
public Timeout globalTimeout = new Timeout(30000);
private static final String SRCDAT = "srcdat";
private static final String DSTDAT = "dstdat";
private static final long BLOCK_SIZE = 1024;

View File

@ -29,9 +29,12 @@
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.tools.ECAdmin;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.tools.util.DistCpTestUtils;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.util.functional.RemoteIterators;
import org.assertj.core.api.Assertions;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@ -69,6 +72,7 @@ public class TestDistCpWithRawXAttrs {
public static void init() throws Exception {
conf = new Configuration();
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_XATTRS_ENABLED_KEY, true);
conf.setInt(DFSConfigKeys.DFS_LIST_LIMIT, 2);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true)
.build();
cluster.waitActive();
@ -217,4 +221,23 @@ public void testPreserveEC() throws Exception {
assertTrue("/dest/dir1/subdir1 is not erasure coded!",
destSubDir1Status.isErasureCoded());
}
@Test
public void testUseIterator() throws Exception {
Path source = new Path("/src");
Path dest = new Path("/dest");
fs.delete(source, true);
fs.delete(dest, true);
// Create a source dir
fs.mkdirs(source);
GenericTestUtils.createFiles(fs, source, 3, 10, 10);
DistCpTestUtils.assertRunDistCp(DistCpConstants.SUCCESS, source.toString(),
dest.toString(), "-useiterator", conf);
Assertions.assertThat(RemoteIterators.toList(fs.listFiles(dest, true)))
.describedAs("files").hasSize(1110);
}
}

View File

@ -44,7 +44,9 @@
import org.apache.hadoop.tools.DistCpOptions;
import org.apache.hadoop.tools.mapred.CopyMapper;
import org.apache.hadoop.tools.util.DistCpTestUtils;
import org.apache.hadoop.util.functional.RemoteIterators;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@ -59,6 +61,7 @@
* under test. The tests in the suite cover both copying from local to remote
* (e.g. a backup use case) and copying from remote to local (e.g. a restore use
* case).
* The HDFS contract test needs to be run explicitly.
*/
public abstract class AbstractContractDistCpTest
extends AbstractFSContractTestBase {
@ -613,6 +616,42 @@ public void testNonDirectWrite() throws Exception {
directWrite(localFS, localDir, remoteFS, remoteDir, false);
}
@Test
public void testDistCpWithIterator() throws Exception {
describe("Build listing in distCp using the iterator option.");
Path source = new Path(remoteDir, "src");
Path dest = new Path(localDir, "dest");
dest = localFS.makeQualified(dest);
mkdirs(remoteFS, source);
verifyPathExists(remoteFS, "", source);
GenericTestUtils
.createFiles(remoteFS, source, getDepth(), getWidth(), getWidth());
DistCpTestUtils.assertRunDistCp(DistCpConstants.SUCCESS, source.toString(),
dest.toString(), "-useiterator", conf);
Assertions
.assertThat(RemoteIterators.toList(localFS.listFiles(dest, true)))
.describedAs("files").hasSize(getTotalFiles());
}
public int getDepth() {
return 3;
}
public int getWidth() {
return 10;
}
private int getTotalFiles() {
int totalFiles = 0;
for (int i = 1; i <= getDepth(); i++) {
totalFiles += Math.pow(getWidth(), i);
}
return totalFiles;
}
/**
* Executes a test with support for using direct write option.
*

View File

@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.contract;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.contract.hdfs.HDFSContract;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import java.io.IOException;
/**
* Verifies that the HDFS passes all the tests in
* {@link AbstractContractDistCpTest}.
* As such, it acts as an in-module validation of this contract test itself.
*/
public class OptionalTestHDFSContractDistCp extends AbstractContractDistCpTest {
@BeforeClass
public static void createCluster() throws IOException {
HDFSContract.createCluster();
}
@AfterClass
public static void teardownCluster() throws IOException {
HDFSContract.destroyCluster();
}
@Override
protected AbstractFSContract createContract(Configuration conf) {
return new HDFSContract(conf);
}
}

View File

@ -0,0 +1,139 @@
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<configuration>
<!--
Here are most of the HDFS contract options.
-->
<property>
<name>fs.contract.test.root-tests-enabled</name>
<value>true</value>
</property>
<property>
<name>fs.file.contract.test.random-seek-count</name>
<value>500</value>
</property>
<property>
<name>fs.contract.is-case-sensitive</name>
<value>true</value>
</property>
<property>
<name>fs.contract.supports-append</name>
<value>true</value>
</property>
<property>
<name>fs.contract.supports-atomic-directory-delete</name>
<value>true</value>
</property>
<property>
<name>fs.contract.supports-atomic-rename</name>
<value>true</value>
</property>
<property>
<name>fs.contract.supports-block-locality</name>
<value>true</value>
</property>
<property>
<name>fs.contract.supports-concat</name>
<value>true</value>
</property>
<property>
<name>fs.contract.supports-seek</name>
<value>true</value>
</property>
<property>
<name>fs.contract.rejects-seek-past-eof</name>
<value>true</value>
</property>
<property>
<name>fs.contract.supports-strict-exceptions</name>
<value>true</value>
</property>
<property>
<name>fs.contract.supports-unix-permissions</name>
<value>true</value>
</property>
<property>
<name>fs.contract.rename-returns-false-if-dest-exists</name>
<value>true</value>
</property>
<property>
<name>fs.contract.rename-returns-false-if-source-missing</name>
<value>true</value>
</property>
<property>
<name>fs.contract.supports-settimes</name>
<value>true</value>
</property>
<property>
<name>fs.contract.supports-getfilestatus</name>
<value>true</value>
</property>
<property>
<name>fs.contract.supports-file-reference</name>
<value>true</value>
</property>
<property>
<name>fs.contract.supports-content-check</name>
<value>true</value>
</property>
<property>
<name>fs.contract.supports-unbuffer</name>
<value>true</value>
</property>
<property>
<name>fs.contract.supports-hflush</name>
<value>true</value>
</property>
<property>
<name>fs.contract.supports-hsync</name>
<value>true</value>
</property>
<property>
<name>fs.contract.metadata_updated_on_hsync</name>
<value>false</value>
</property>
<!-- Disable min block size since most tests use tiny blocks -->
<property>
<name>dfs.namenode.fs-limits.min-block-size</name>
<value>0</value>
</property>
</configuration>