From de804e53b9d20a2df75a4c7252bf83ed52011488 Mon Sep 17 00:00:00 2001 From: Andrew Olson Date: Thu, 7 Feb 2019 10:05:58 +0000 Subject: [PATCH] HADOOP-15281. Distcp to add no-rename copy option. Contributed by Andrew Olson. --- .../contract/s3a/ITestS3AContractDistCp.java | 33 +++++++++ .../apache/hadoop/tools/DistCpConstants.java | 3 +- .../apache/hadoop/tools/DistCpContext.java | 4 ++ .../hadoop/tools/DistCpOptionSwitch.java | 14 +++- .../apache/hadoop/tools/DistCpOptions.java | 19 ++++++ .../apache/hadoop/tools/OptionsParser.java | 4 +- .../hadoop/tools/mapred/CopyMapper.java | 6 +- .../mapred/RetriableFileCopyCommand.java | 52 ++++++++++---- .../src/site/markdown/DistCp.md.vm | 6 +- .../hadoop/tools/TestDistCpOptions.java | 5 +- .../contract/AbstractContractDistCpTest.java | 68 ++++++++++++++++++- 11 files changed, 191 insertions(+), 23 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java index b3d511ed6a..740f256b62 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java @@ -18,6 +18,7 @@ package org.apache.hadoop.fs.contract.s3a; +import java.io.FileNotFoundException; import java.io.IOException; import static org.apache.hadoop.fs.s3a.Constants.*; @@ -26,6 +27,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.StorageStatistics; import org.apache.hadoop.fs.s3a.FailureInjectionPolicy; import org.apache.hadoop.tools.contract.AbstractContractDistCpTest; @@ -74,4 +76,35 @@ protected Path path(final String filepath) throws IOException { Path path = super.path(filepath); return new Path(path, FailureInjectionPolicy.DEFAULT_DELAY_KEY_SUBSTRING); } + + @Override + public void testDirectWrite() throws Exception { + resetStorageStatistics(); + super.testDirectWrite(); + assertEquals("Expected no renames for a direct write distcp", 0L, + getRenameOperationCount()); + } + + @Override + public void testNonDirectWrite() throws Exception { + resetStorageStatistics(); + try { + super.testNonDirectWrite(); + } catch (FileNotFoundException e) { + // We may get this exception when data is written to a DELAY_LISTING_ME + // directory causing verification of the distcp success to fail if + // S3Guard is not enabled + } + assertEquals("Expected 2 renames for a non-direct write distcp", 2L, + getRenameOperationCount()); + } + + private void resetStorageStatistics() { + getFileSystem().getStorageStatistics().reset(); + } + + private long getRenameOperationCount() { + return getFileSystem().getStorageStatistics() + .getLong(StorageStatistics.CommonStatisticNames.OP_RENAME); + } } diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java index 494609144b..e20f20626a 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java @@ -85,7 +85,8 @@ private DistCpConstants() { "distcp.dynamic.min.records_per_chunk"; public static final String CONF_LABEL_SPLIT_RATIO = "distcp.dynamic.split.ratio"; - + public static final String CONF_LABEL_DIRECT_WRITE = "distcp.direct.write"; + /* Total bytes to be copied. Updated by copylisting. Unfiltered count */ public static final String CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED = "mapred.total.bytes.expected"; diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpContext.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpContext.java index fc047cadad..1e63d802e8 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpContext.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpContext.java @@ -179,6 +179,10 @@ public int getCopyBufferSize() { return options.getCopyBufferSize(); } + public boolean shouldDirectWrite() { + return options.shouldDirectWrite(); + } + public void setTargetPathExists(boolean targetPathExists) { this.targetPathExists = targetPathExists; } diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java index e57e413de3..49ffc59344 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java @@ -223,7 +223,19 @@ public enum DistCpOptionSwitch { */ FILTERS(DistCpConstants.CONF_LABEL_FILTERS_FILE, new Option("filters", true, "The path to a file containing a list of" - + " strings for paths to be excluded from the copy.")); + + " strings for paths to be excluded from the copy.")), + + /** + * Write directly to the final location, avoiding the creation and rename + * of temporary files. + * This is typically useful in cases where the target filesystem + * implementation does not support atomic rename operations, such as with + * the S3AFileSystem which translates file renames to potentially very + * expensive copy-then-delete operations. + */ + DIRECT_WRITE(DistCpConstants.CONF_LABEL_DIRECT_WRITE, + new Option("direct", false, "Write files directly to the" + + " target location, avoiding temporary file rename.")); public static final String PRESERVE_STATUS_DEFAULT = "-prbugpct"; diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java index aca5d0e414..4a6552fed6 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java @@ -155,6 +155,9 @@ public final class DistCpOptions { private final int copyBufferSize; + /** Whether data should be written directly to the target paths. */ + private final boolean directWrite; + /** * File attributes for preserve. * @@ -216,6 +219,8 @@ private DistCpOptions(Builder builder) { this.copyBufferSize = builder.copyBufferSize; this.verboseLog = builder.verboseLog; this.trackPath = builder.trackPath; + + this.directWrite = builder.directWrite; } public Path getSourceFileListing() { @@ -343,6 +348,10 @@ public Path getTrackPath() { return trackPath; } + public boolean shouldDirectWrite() { + return directWrite; + } + /** * Add options to configuration. These will be used in the Mapper/committer * @@ -391,6 +400,8 @@ public void appendToConf(Configuration conf) { DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.NUM_LISTSTATUS_THREADS, Integer.toString(numListstatusThreads)); } + DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.DIRECT_WRITE, + String.valueOf(directWrite)); } /** @@ -427,6 +438,7 @@ public String toString() { ", blocksPerChunk=" + blocksPerChunk + ", copyBufferSize=" + copyBufferSize + ", verboseLog=" + verboseLog + + ", directWrite=" + directWrite + '}'; } @@ -476,6 +488,8 @@ public static class Builder { private int copyBufferSize = DistCpConstants.COPY_BUFFER_SIZE_DEFAULT; + private boolean directWrite = false; + public Builder(List sourcePaths, Path targetPath) { Preconditions.checkArgument(sourcePaths != null && !sourcePaths.isEmpty(), "Source paths should not be null or empty!"); @@ -728,6 +742,11 @@ public Builder withVerboseLog(boolean newVerboseLog) { this.verboseLog = newVerboseLog; return this; } + + public Builder withDirectWrite(boolean newDirectWrite) { + this.directWrite = newDirectWrite; + return this; + } } } diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java index 83c6ff3e40..3b9d13b3b0 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java @@ -113,7 +113,9 @@ public static DistCpOptions parse(String[] args) .withBlocking( !command.hasOption(DistCpOptionSwitch.BLOCKING.getSwitch())) .withVerboseLog( - command.hasOption(DistCpOptionSwitch.VERBOSE_LOG.getSwitch())); + command.hasOption(DistCpOptionSwitch.VERBOSE_LOG.getSwitch())) + .withDirectWrite( + command.hasOption(DistCpOptionSwitch.DIRECT_WRITE.getSwitch())); if (command.hasOption(DistCpOptionSwitch.DIFF.getSwitch())) { String[] snapshots = getVals(command, diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java index 63a61b861f..336779eef2 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java @@ -84,6 +84,7 @@ static enum FileAction { private boolean overWrite = false; private boolean append = false; private boolean verboseLog = false; + private boolean directWrite = false; private EnumSet preserve = EnumSet.noneOf(FileAttribute.class); private FileSystem targetFS = null; @@ -111,6 +112,8 @@ public void setup(Context context) throws IOException, InterruptedException { DistCpOptionSwitch.VERBOSE_LOG.getConfigLabel(), false); preserve = DistCpUtils.unpackAttributes(conf.get(DistCpOptionSwitch. PRESERVE_STATUS.getConfigLabel())); + directWrite = conf.getBoolean( + DistCpOptionSwitch.DIRECT_WRITE.getConfigLabel(), false); targetWorkPath = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH)); Path targetFinalPath = new Path(conf.get( @@ -253,7 +256,8 @@ private void copyFileWithRetry(String description, long bytesCopied; try { bytesCopied = (Long) new RetriableFileCopyCommand(skipCrc, description, - action).execute(sourceFileStatus, target, context, fileAttributes); + action, directWrite).execute(sourceFileStatus, target, context, + fileAttributes); } catch (Exception e) { context.setStatus("Copy Failure: " + sourceFileStatus.getPath()); throw new IOException("File copy failed: " + sourceFileStatus.getPath() + diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java index 51579bc437..db21f64d72 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java @@ -55,6 +55,7 @@ public class RetriableFileCopyCommand extends RetriableCommand { private static Logger LOG = LoggerFactory.getLogger(RetriableFileCopyCommand.class); private boolean skipCrc = false; + private boolean directWrite = false; private FileAction action; /** @@ -79,6 +80,21 @@ public RetriableFileCopyCommand(boolean skipCrc, String description, this.skipCrc = skipCrc; } + /** + * Create a RetriableFileCopyCommand. + * + * @param skipCrc Whether to skip the crc check. + * @param description A verbose description of the copy operation. + * @param action We should overwrite the target file or append new data to it. + * @param directWrite Whether to write directly to the target path, avoiding a + * temporary file rename. + */ + public RetriableFileCopyCommand(boolean skipCrc, String description, + FileAction action, boolean directWrite) { + this(skipCrc, description, action); + this.directWrite = directWrite; + } + /** * Implementation of RetriableCommand::doExecute(). * This is the actual copy-implementation. @@ -102,16 +118,19 @@ protected Object doExecute(Object... arguments) throws Exception { private long doCopy(CopyListingFileStatus source, Path target, Mapper.Context context, EnumSet fileAttributes) throws IOException { + LOG.info("Copying {} to {}", source.getPath(), target); + final boolean toAppend = action == FileAction.APPEND; - Path targetPath = toAppend ? target : getTmpFile(target, context); + final boolean useTempTarget = !toAppend && !directWrite; + Path targetPath = useTempTarget ? getTempFile(target, context) : target; + + LOG.info("Writing to {} target file path {}", useTempTarget ? "temporary" + : "direct", targetPath); + final Configuration configuration = context.getConfiguration(); FileSystem targetFS = target.getFileSystem(configuration); try { - if (LOG.isDebugEnabled()) { - LOG.debug("Copying " + source.getPath() + " to " + target); - LOG.debug("Target file path: " + targetPath); - } final Path sourcePath = source.getPath(); final FileSystem sourceFS = sourcePath.getFileSystem(configuration); final FileChecksum sourceChecksum = fileAttributes @@ -134,17 +153,20 @@ private long doCopy(CopyListingFileStatus source, Path target, targetFS, targetPath); } } - // it's not append case, thus we first write to a temporary file, rename - // it to the target path. - if (!toAppend) { + // it's not append or direct write (preferred for s3a) case, thus we first + // write to a temporary file, then rename it to the target path. + if (useTempTarget) { + LOG.info("Renaming temporary target file path {} to {}", targetPath, + target); promoteTmpToTarget(targetPath, target, targetFS); } + LOG.info("Completed writing {} ({} bytes)", target, bytesRead); return bytesRead; } finally { // note that for append case, it is possible that we append partial data // and then fail. In that case, for the next retry, we either reuse the // partial appended data if it is good or we overwrite the whole file - if (!toAppend) { + if (useTempTarget) { targetFS.delete(targetPath, false); } } @@ -252,14 +274,16 @@ private void promoteTmpToTarget(Path tmpTarget, Path target, FileSystem fs) } } - private Path getTmpFile(Path target, Mapper.Context context) { + private Path getTempFile(Path target, Mapper.Context context) { Path targetWorkPath = new Path(context.getConfiguration(). get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH)); - Path root = target.equals(targetWorkPath)? targetWorkPath.getParent() : targetWorkPath; - LOG.info("Creating temp file: " + - new Path(root, ".distcp.tmp." + context.getTaskAttemptID().toString())); - return new Path(root, ".distcp.tmp." + context.getTaskAttemptID().toString()); + Path root = target.equals(targetWorkPath) ? targetWorkPath.getParent() + : targetWorkPath; + Path tempFile = new Path(root, ".distcp.tmp." + + context.getTaskAttemptID().toString()); + LOG.info("Creating temp file: {}", tempFile); + return tempFile; } @VisibleForTesting diff --git a/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm b/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm index b855422d96..25ea7e28fe 100644 --- a/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm +++ b/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm @@ -241,6 +241,7 @@ Flag | Description | Notes `-blocksperchunk ` | Number of blocks per chunk. When specified, split files into chunks to copy in parallel | If set to a positive value, files with more blocks than this value will be split into chunks of `` blocks to be transferred in parallel, and reassembled on the destination. By default, `` is 0 and the files will be transmitted in their entirety without splitting. This switch is only applicable when the source file system implements getBlockLocations method and the target file system implements concat method. | `-copybuffersize ` | Size of the copy buffer to use. By default, `` is set to 8192B | `-xtrack ` | Save information about missing source files to the specified path. | This option is only valid with `-update` option. This is an experimental property and it cannot be used with `-atomic` option. +`-direct` | Write directly to destination paths | Useful for avoiding potentially very expensive temporary file rename operations when the destination is an object store Architecture of DistCp ---------------------- @@ -455,7 +456,7 @@ configuration, or be otherwise available in all cluster hosts. DistCp can be used to upload data ```bash -hadoop distcp hdfs://nn1:8020/datasets/set1 s3a://bucket/datasets/set1 +hadoop distcp -direct hdfs://nn1:8020/datasets/set1 s3a://bucket/datasets/set1 ``` To download data @@ -535,6 +536,9 @@ rely on disk buffering. Copies each byte down to the Hadoop worker nodes and back to the bucket. As well as being slow, it means that charges may be incurred. +* The `-direct` option can be used to write to object store target paths directly, +avoiding the potentially very expensive temporary file rename operations that would +otherwise occur. Frequently Asked Questions -------------------------- diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpOptions.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpOptions.java index 62a2e6d751..7382795dd9 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpOptions.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpOptions.java @@ -287,8 +287,9 @@ public void testToString() { "skipCRC=false, blocking=true, numListstatusThreads=0, maxMaps=20, " + "mapBandwidth=0.0, copyStrategy='uniformsize', preserveStatus=[], " + "atomicWorkPath=null, logPath=null, sourceFileListing=abc, " + - "sourcePaths=null, targetPath=xyz, filtersFile='null'," + - " blocksPerChunk=0, copyBufferSize=8192, verboseLog=false}"; + "sourcePaths=null, targetPath=xyz, filtersFile='null', " + + "blocksPerChunk=0, copyBufferSize=8192, verboseLog=false, " + + "directWrite=false}"; String optionString = option.toString(); Assert.assertEquals(val, optionString); Assert.assertNotSame(DistCpOptionSwitch.ATOMIC_COMMIT.toString(), diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java index 0757a66223..eeaf30a929 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java @@ -552,7 +552,7 @@ private void runDistCp(Path src, Path dst) throws Exception { /** * Run the distcp job. - * @param optons distcp options + * @param options distcp options * @return the job. It will have already completed. * @throws Exception failure */ @@ -586,4 +586,68 @@ private DistCpOptions buildWithStandardOptions( private static void mkdirs(FileSystem fs, Path dir) throws Exception { assertTrue("Failed to mkdir " + dir, fs.mkdirs(dir)); } -} + + @Test + public void testDirectWrite() throws Exception { + describe("copy file from local to remote using direct write option"); + directWrite(localFS, localDir, remoteFS, remoteDir, true); + } + + @Test + public void testNonDirectWrite() throws Exception { + describe("copy file from local to remote without using direct write " + + "option"); + directWrite(localFS, localDir, remoteFS, remoteDir, false); + } + + /** + * Executes a test with support for using direct write option. + * + * @param srcFS source FileSystem + * @param srcDir source directory + * @param dstFS destination FileSystem + * @param dstDir destination directory + * @param directWrite whether to use -directwrite option + * @throws Exception if there is a failure + */ + private void directWrite(FileSystem srcFS, Path srcDir, FileSystem dstFS, + Path dstDir, boolean directWrite) throws Exception { + initPathFields(srcDir, dstDir); + + // Create 2 test files + mkdirs(srcFS, inputSubDir1); + byte[] data1 = dataset(64, 33, 43); + createFile(srcFS, inputFile1, true, data1); + byte[] data2 = dataset(200, 43, 53); + createFile(srcFS, inputFile2, true, data2); + Path target = new Path(dstDir, "outputDir"); + if (directWrite) { + runDistCpDirectWrite(inputDir, target); + } else { + runDistCp(inputDir, target); + } + ContractTestUtils.assertIsDirectory(dstFS, target); + lsR("Destination tree after distcp", dstFS, target); + + // Verify copied file contents + verifyFileContents(dstFS, new Path(target, "inputDir/file1"), data1); + verifyFileContents(dstFS, new Path(target, "inputDir/subDir1/file2"), + data2); + } + + /** + * Run distcp -direct srcDir destDir. + * @param srcDir local source directory + * @param destDir remote destination directory + * @return the completed job + * @throws Exception any failure. + */ + private Job runDistCpDirectWrite(final Path srcDir, final Path destDir) + throws Exception { + describe("\nDistcp -direct from " + srcDir + " to " + destDir); + return runDistCp(buildWithStandardOptions( + new DistCpOptions.Builder( + Collections.singletonList(srcDir), destDir) + .withDirectWrite(true))); + } +} \ No newline at end of file