diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index 9b782f32f2..13291952a9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -862,27 +862,7 @@ public static void appendFile(FileSystem fs, Path p, int length) out.write(toAppend); } } - - /** - * Append specified length of bytes to a given file, starting with new block. - * @param fs The file system - * @param p Path of the file to append - * @param length Length of bytes to append to the file - * @throws IOException - */ - public static void appendFileNewBlock(DistributedFileSystem fs, - Path p, int length) throws IOException { - assert fs.exists(p); - assert length >= 0; - byte[] toAppend = new byte[length]; - Random random = new Random(); - random.nextBytes(toAppend); - try (FSDataOutputStream out = fs.append(p, - EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null)) { - out.write(toAppend); - } - } - + /** * @return url content as string (UTF-8 encoding assumed) */ diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java index 9ebf9d29b3..481aa61b0f 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java @@ -145,22 +145,12 @@ private void validateFinalListing(Path pathToListFile, DistCpOptions options) Configuration config = getConf(); FileSystem fs = pathToListFile.getFileSystem(config); - final boolean splitLargeFile = options.splitLargeFile(); - - // When splitLargeFile is enabled, we don't randomize the copylist - // earlier, so we don't do the sorting here. For a file that has - // multiple entries due to split, we check here that their - // is continuous. - // - Path checkPath = splitLargeFile? - pathToListFile : DistCpUtils.sortListing(fs, config, pathToListFile); + Path sortedList = DistCpUtils.sortListing(fs, config, pathToListFile); SequenceFile.Reader reader = new SequenceFile.Reader( - config, SequenceFile.Reader.file(checkPath)); + config, SequenceFile.Reader.file(sortedList)); try { Text lastKey = new Text("*"); //source relative path can never hold * - long lastChunkOffset = -1; - long lastChunkLength = -1; CopyListingFileStatus lastFileStatus = new CopyListingFileStatus(); Text currentKey = new Text(); @@ -171,21 +161,8 @@ private void validateFinalListing(Path pathToListFile, DistCpOptions options) if (currentKey.equals(lastKey)) { CopyListingFileStatus currentFileStatus = new CopyListingFileStatus(); reader.getCurrentValue(currentFileStatus); - if (!splitLargeFile) { - throw new DuplicateFileException("File " + lastFileStatus.getPath() - + " and " + currentFileStatus.getPath() - + " would cause duplicates. Aborting"); - } else { - if (lastChunkOffset + lastChunkLength != - currentFileStatus.getChunkOffset()) { - throw new InvalidInputException("File " + lastFileStatus.getPath() - + " " + lastChunkOffset + "," + lastChunkLength - + " and " + currentFileStatus.getPath() - + " " + currentFileStatus.getChunkOffset() + "," - + currentFileStatus.getChunkLength() - + " are not continuous. Aborting"); - } - } + throw new DuplicateFileException("File " + lastFileStatus.getPath() + " and " + + currentFileStatus.getPath() + " would cause duplicates. Aborting"); } reader.getCurrentValue(lastFileStatus); if (options.shouldPreserve(DistCpOptions.FileAttribute.ACL)) { @@ -204,12 +181,8 @@ private void validateFinalListing(Path pathToListFile, DistCpOptions options) xAttrSupportCheckFsSet.add(lastFsUri); } } - lastKey.set(currentKey); - if (splitLargeFile) { - lastChunkOffset = lastFileStatus.getChunkOffset(); - lastChunkLength = lastFileStatus.getChunkLength(); - } + if (options.shouldUseDiff() && LOG.isDebugEnabled()) { LOG.debug("Copy list entry " + idx + ": " + lastFileStatus.getPath().toUri().getPath()); diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListingFileStatus.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListingFileStatus.java index 29c59ac103..00d4b32505 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListingFileStatus.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListingFileStatus.java @@ -74,14 +74,6 @@ public final class CopyListingFileStatus implements Writable { private List aclEntries; private Map xAttrs; - // represents the offset and length of a file - // chunk in number of bytes. - // used when splitting a large file to chunks to copy in parallel. - // If a file is not large enough to split, chunkOffset would be 0 and - // chunkLength would be the length of the file. - private long chunkOffset = 0; - private long chunkLength = Long.MAX_VALUE; - /** * Default constructor. */ @@ -104,32 +96,11 @@ public CopyListingFileStatus(FileStatus fileStatus) { fileStatus.getPath()); } - public CopyListingFileStatus(FileStatus fileStatus, - long chunkOffset, long chunkLength) { - this(fileStatus.getLen(), fileStatus.isDirectory(), - fileStatus.getReplication(), fileStatus.getBlockSize(), - fileStatus.getModificationTime(), fileStatus.getAccessTime(), - fileStatus.getPermission(), fileStatus.getOwner(), - fileStatus.getGroup(), - fileStatus.getPath()); - this.chunkOffset = chunkOffset; - this.chunkLength = chunkLength; - } - @SuppressWarnings("checkstyle:parameternumber") public CopyListingFileStatus(long length, boolean isdir, int blockReplication, long blocksize, long modificationTime, long accessTime, FsPermission permission, String owner, String group, Path path) { - this(length, isdir, blockReplication, blocksize, modificationTime, - accessTime, permission, owner, group, path, 0, Long.MAX_VALUE); - } - - @SuppressWarnings("checkstyle:parameternumber") - public CopyListingFileStatus(long length, boolean isdir, - int blockReplication, long blocksize, long modificationTime, - long accessTime, FsPermission permission, String owner, String group, - Path path, long chunkOffset, long chunkLength) { this.length = length; this.isdir = isdir; this.blockReplication = (short)blockReplication; @@ -146,23 +117,6 @@ public CopyListingFileStatus(long length, boolean isdir, this.owner = (owner == null) ? "" : owner; this.group = (group == null) ? "" : group; this.path = path; - this.chunkOffset = chunkOffset; - this.chunkLength = chunkLength; - } - - public CopyListingFileStatus(CopyListingFileStatus other) { - this.length = other.length; - this.isdir = other.isdir; - this.blockReplication = other.blockReplication; - this.blocksize = other.blocksize; - this.modificationTime = other.modificationTime; - this.accessTime = other.accessTime; - this.permission = other.permission; - this.owner = other.owner; - this.group = other.group; - this.path = new Path(other.path.toUri()); - this.chunkOffset = other.chunkOffset; - this.chunkLength = other.chunkLength; } public Path getPath() { @@ -246,31 +200,6 @@ public void setXAttrs(Map xAttrs) { this.xAttrs = xAttrs; } - public long getChunkOffset() { - return chunkOffset; - } - - public void setChunkOffset(long offset) { - this.chunkOffset = offset; - } - - public long getChunkLength() { - return chunkLength; - } - - public void setChunkLength(long chunkLength) { - this.chunkLength = chunkLength; - } - - public boolean isSplit() { - return getChunkLength() != Long.MAX_VALUE && - getChunkLength() != getLen(); - } - - public long getSizeToCopy() { - return isSplit()? getChunkLength() : getLen(); - } - @Override public void write(DataOutput out) throws IOException { Text.writeString(out, getPath().toString(), Text.DEFAULT_MAX_LEN); @@ -315,9 +244,6 @@ public void write(DataOutput out) throws IOException { } else { out.writeInt(NO_XATTRS); } - - out.writeLong(chunkOffset); - out.writeLong(chunkLength); } @Override @@ -366,9 +292,6 @@ public void readFields(DataInput in) throws IOException { } else { xAttrs = null; } - - chunkOffset = in.readLong(); - chunkLength = in.readLong(); } @Override @@ -394,14 +317,8 @@ public int hashCode() { public String toString() { StringBuilder sb = new StringBuilder(super.toString()); sb.append('{'); - sb.append(this.getPath().toString()); - sb.append(" length = ").append(this.getLen()); - sb.append(" aclEntries = ").append(aclEntries); - sb.append(", xAttrs = ").append(xAttrs); - if (isSplit()) { - sb.append(", chunkOffset = ").append(this.getChunkOffset()); - sb.append(", chunkLength = ").append(this.getChunkLength()); - } + sb.append("aclEntries = " + aclEntries); + sb.append(", xAttrs = " + xAttrs); sb.append('}'); return sb.toString(); } diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java index 8c2fa24a15..ab58e9c66d 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java @@ -35,7 +35,6 @@ import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobSubmissionFiles; -import org.apache.hadoop.tools.DistCpOptions.FileAttribute; import org.apache.hadoop.tools.CopyListing.*; import org.apache.hadoop.tools.mapred.CopyMapper; import org.apache.hadoop.tools.mapred.CopyOutputFormat; @@ -135,7 +134,6 @@ public int run(String[] argv) { try { inputOptions = (OptionsParser.parse(argv)); - setOptionsForSplitLargeFile(); setTargetPathExists(); LOG.info("Input Options: " + inputOptions); } catch (Throwable e) { @@ -237,56 +235,6 @@ private void setTargetPathExists() throws IOException { getConf().setBoolean(DistCpConstants.CONF_LABEL_TARGET_PATH_EXISTS, targetExists); } - - /** - * Check if concat is supported by fs. - * Throws UnsupportedOperationException if not. - */ - private void checkConcatSupport(FileSystem fs) { - try { - Path[] src = null; - Path tgt = null; - fs.concat(tgt, src); - } catch (UnsupportedOperationException use) { - throw new UnsupportedOperationException( - DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch() + - " is not supported since the target file system doesn't" + - " support concat.", use); - } catch (Exception e) { - // Ignore other exception - } - } - - /** - * Set up needed options for splitting large files. - */ - private void setOptionsForSplitLargeFile() throws IOException { - if (!inputOptions.splitLargeFile()) { - return; - } - Path target = inputOptions.getTargetPath(); - FileSystem targetFS = target.getFileSystem(getConf()); - checkConcatSupport(targetFS); - - LOG.info("Enabling preserving blocksize since " - + DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch() + " is passed."); - inputOptions.preserve(FileAttribute.BLOCKSIZE); - - LOG.info("Set " + - DistCpOptionSwitch.APPEND.getSwitch() - + " to false since " + DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch() - + " is passed."); - inputOptions.setAppend(false); - - LOG.info("Set " + - DistCpConstants.CONF_LABEL_SIMPLE_LISTING_RANDOMIZE_FILES - + " to false since " + DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch() - + " is passed."); - getConf().setBoolean( - DistCpConstants.CONF_LABEL_SIMPLE_LISTING_RANDOMIZE_FILES, false); - } - - /** * Create Job object for submitting it, with all the configuration * diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java index ced9b540c8..fb47d76923 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java @@ -169,16 +169,6 @@ public enum DistCpOptionSwitch { new Option("sizelimit", true, "(Deprecated!) Limit number of files " + "copied to <= n bytes")), - BLOCKS_PER_CHUNK("", - new Option("blocksperchunk", true, "If set to a positive value, files" - + "with more blocks than this value will be split into chunks of " - + " blocks to be transferred in parallel, and " - + "reassembled on the destination. By default, is " - + "0 and the files will be transmitted in their entirety without " - + "splitting. This switch is only applicable when the source file " - + "system implements getBlockLocations method and the target file " - + "system implements concat method")), - /** * Specify bandwidth per map in MB, accepts bandwidth as a fraction */ diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java index 9822d83dbd..8c37ff30ae 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java @@ -97,11 +97,7 @@ public class DistCpOptions { // targetPathExist is a derived field, it's initialized in the // beginning of distcp. private boolean targetPathExists = true; - - // Size of chunk in number of blocks when splitting large file into chunks - // to copy in parallel. Default is 0 and file are not splitted. - private int blocksPerChunk = 0; - + public static enum FileAttribute{ REPLICATION, BLOCKSIZE, USER, GROUP, PERMISSION, CHECKSUMTYPE, ACL, XATTR, TIMES; @@ -170,7 +166,6 @@ public DistCpOptions(DistCpOptions that) { this.targetPath = that.getTargetPath(); this.targetPathExists = that.getTargetPathExists(); this.filtersFile = that.getFiltersFile(); - this.blocksPerChunk = that.blocksPerChunk; } } @@ -583,18 +578,6 @@ public final void setFiltersFile(String filtersFilename) { this.filtersFile = filtersFilename; } - public final void setBlocksPerChunk(int csize) { - this.blocksPerChunk = csize; - } - - public final int getBlocksPerChunk() { - return blocksPerChunk; - } - - public final boolean splitLargeFile() { - return blocksPerChunk > 0; - } - void validate() { if ((useDiff || useRdiff) && deleteMissing) { // -delete and -diff/-rdiff are mutually exclusive. For backward @@ -686,8 +669,6 @@ public void appendToConf(Configuration conf) { DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.FILTERS, filtersFile); } - DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.BLOCKS_PER_CHUNK, - String.valueOf(blocksPerChunk)); } /** @@ -723,7 +704,6 @@ public String toString() { ", targetPath=" + targetPath + ", targetPathExists=" + targetPathExists + ", filtersFile='" + filtersFile + '\'' + - ", blocksPerChunk=" + blocksPerChunk + '}'; } diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java index 8881264d28..d0f82ca76b 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java @@ -172,44 +172,11 @@ public static DistCpOptions parse(String[] args) DistCpOptionSwitch.FILTERS.getSwitch())); } - parseBlocksPerChunk(command, option); - option.validate(); return option; } - - /** - * A helper method to parse chunk size in number of blocks. - * Used when breaking large file into chunks to copy in parallel. - * - * @param command command line arguments - */ - private static void parseBlocksPerChunk(CommandLine command, - DistCpOptions option) { - boolean hasOption = - command.hasOption(DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch()); - LOG.info("parseChunkSize: " + - DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch() + " " + hasOption); - if (hasOption) { - String chunkSizeString = getVal(command, - DistCpOptionSwitch.BLOCKS_PER_CHUNK.getSwitch().trim()); - try { - int csize = Integer.parseInt(chunkSizeString); - if (csize < 0) { - csize = 0; - } - LOG.info("Set distcp blocksPerChunk to " + csize); - option.setBlocksPerChunk(csize); - } - catch (NumberFormatException e) { - throw new IllegalArgumentException("blocksPerChunk is invalid: " - + chunkSizeString, e); - } - } - } - /** * parseSizeLimit is a helper method for parsing the deprecated * argument SIZE_LIMIT. @@ -244,7 +211,8 @@ private static void parseFileLimit(CommandLine command) { DistCpOptionSwitch.FILE_LIMIT.getSwitch().trim()); try { Integer.parseInt(fileLimitString); - } catch (NumberFormatException e) { + } + catch (NumberFormatException e) { throw new IllegalArgumentException("File-limit is invalid: " + fileLimitString, e); } diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java index af91347455..105e4f2fe1 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java @@ -19,7 +19,6 @@ package org.apache.hadoop.tools; import com.google.common.collect.Lists; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; @@ -48,7 +47,6 @@ import java.util.HashSet; import java.util.List; import java.util.Random; -import java.util.LinkedList; import static org.apache.hadoop.tools.DistCpConstants .HDFS_RESERVED_RAW_DIRECTORY_NAME; @@ -242,10 +240,10 @@ private void addToFileListing(SequenceFile.Writer fileListWriter, final boolean preserveAcls = options.shouldPreserve(FileAttribute.ACL); final boolean preserveXAttrs = options.shouldPreserve(FileAttribute.XATTR); final boolean preserveRawXAttrs = options.shouldPreserveRawXattrs(); - LinkedList fileCopyListingStatus = + CopyListingFileStatus fileCopyListingStatus = DistCpUtils.toCopyListingFileStatus(sourceFS, fileStatus, - preserveAcls, preserveXAttrs, preserveRawXAttrs, - options.getBlocksPerChunk()); + preserveAcls, preserveXAttrs, preserveRawXAttrs); + writeToFileListingRoot(fileListWriter, fileCopyListingStatus, sourceRoot, options); } @@ -350,10 +348,9 @@ protected void doBuildListing(SequenceFile.Writer fileListWriter, FileStatus[] sourceFiles = sourceFS.listStatus(path); boolean explore = (sourceFiles != null && sourceFiles.length > 0); if (!explore || rootStatus.isDirectory()) { - LinkedList rootCopyListingStatus = - DistCpUtils.toCopyListingFileStatus(sourceFS, rootStatus, - preserveAcls, preserveXAttrs, preserveRawXAttrs, - options.getBlocksPerChunk()); + CopyListingFileStatus rootCopyListingStatus = + DistCpUtils.toCopyListingFileStatus(sourceFS, rootStatus, + preserveAcls, preserveXAttrs, preserveRawXAttrs); writeToFileListingRoot(fileListWriter, rootCopyListingStatus, sourcePathRoot, options); } @@ -363,20 +360,20 @@ protected void doBuildListing(SequenceFile.Writer fileListWriter, if (LOG.isDebugEnabled()) { LOG.debug("Recording source-path: " + sourceStatus.getPath() + " for copy."); } - LinkedList sourceCopyListingStatus = - DistCpUtils.toCopyListingFileStatus(sourceFS, sourceStatus, - preserveAcls && sourceStatus.isDirectory(), - preserveXAttrs && sourceStatus.isDirectory(), - preserveRawXAttrs && sourceStatus.isDirectory(), - options.getBlocksPerChunk()); - for (CopyListingFileStatus fs : sourceCopyListingStatus) { - if (randomizeFileListing) { - addToFileListing(statusList, - new FileStatusInfo(fs, sourcePathRoot), fileListWriter); - } else { - writeToFileListing(fileListWriter, fs, sourcePathRoot); - } + CopyListingFileStatus sourceCopyListingStatus = + DistCpUtils.toCopyListingFileStatus(sourceFS, sourceStatus, + preserveAcls && sourceStatus.isDirectory(), + preserveXAttrs && sourceStatus.isDirectory(), + preserveRawXAttrs && sourceStatus.isDirectory()); + if (randomizeFileListing) { + addToFileListing(statusList, + new FileStatusInfo(sourceCopyListingStatus, sourcePathRoot), + fileListWriter); + } else { + writeToFileListing(fileListWriter, sourceCopyListingStatus, + sourcePathRoot); } + if (sourceStatus.isDirectory()) { if (LOG.isDebugEnabled()) { LOG.debug("Adding source dir for traverse: " + sourceStatus.getPath()); @@ -644,20 +641,18 @@ private void traverseDirectory(SequenceFile.Writer fileListWriter, LOG.debug("Recording source-path: " + child.getPath() + " for copy."); } if (workResult.getSuccess()) { - LinkedList childCopyListingStatus = + CopyListingFileStatus childCopyListingStatus = DistCpUtils.toCopyListingFileStatus(sourceFS, child, preserveAcls && child.isDirectory(), preserveXAttrs && child.isDirectory(), - preserveRawXattrs && child.isDirectory(), - options.getBlocksPerChunk()); - - for (CopyListingFileStatus fs : childCopyListingStatus) { - if (randomizeFileListing) { - addToFileListing(fileStatuses, - new FileStatusInfo(fs, sourcePathRoot), fileListWriter); - } else { - writeToFileListing(fileListWriter, fs, sourcePathRoot); - } + preserveRawXattrs && child.isDirectory()); + if (randomizeFileListing) { + addToFileListing(fileStatuses, + new FileStatusInfo(childCopyListingStatus, sourcePathRoot), + fileListWriter); + } else { + writeToFileListing(fileListWriter, childCopyListingStatus, + sourcePathRoot); } } if (retry < maxRetries) { @@ -680,21 +675,19 @@ private void traverseDirectory(SequenceFile.Writer fileListWriter, } private void writeToFileListingRoot(SequenceFile.Writer fileListWriter, - LinkedList fileStatus, Path sourcePathRoot, + CopyListingFileStatus fileStatus, Path sourcePathRoot, DistCpOptions options) throws IOException { boolean syncOrOverwrite = options.shouldSyncFolder() || options.shouldOverwrite(); - for (CopyListingFileStatus fs : fileStatus) { - if (fs.getPath().equals(sourcePathRoot) && - fs.isDirectory() && syncOrOverwrite) { - // Skip the root-paths when syncOrOverwrite - if (LOG.isDebugEnabled()) { - LOG.debug("Skip " + fs.getPath()); - } - return; - } - writeToFileListing(fileListWriter, fs, sourcePathRoot); + if (fileStatus.getPath().equals(sourcePathRoot) && + fileStatus.isDirectory() && syncOrOverwrite) { + // Skip the root-paths when syncOrOverwrite + if (LOG.isDebugEnabled()) { + LOG.debug("Skip " + fileStatus.getPath()); + } + return; } + writeToFileListing(fileListWriter, fileStatus, sourcePathRoot); } private void writeToFileListing(SequenceFile.Writer fileListWriter, @@ -714,7 +707,7 @@ private void writeToFileListing(SequenceFile.Writer fileListWriter, fileListWriter.sync(); if (!fileStatus.isDirectory()) { - totalBytesToCopy += fileStatus.getSizeToCopy(); + totalBytesToCopy += fileStatus.getLen(); } else { totalDirs++; } diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java index 6ddaab99c3..75cefb488a 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java @@ -27,7 +27,6 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; -import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobStatus; import org.apache.hadoop.mapreduce.TaskAttemptContext; @@ -35,17 +34,14 @@ import org.apache.hadoop.tools.CopyListing; import org.apache.hadoop.tools.CopyListingFileStatus; import org.apache.hadoop.tools.DistCpConstants; -import org.apache.hadoop.tools.DistCpOptionSwitch; import org.apache.hadoop.tools.DistCpOptions; import org.apache.hadoop.tools.DistCpOptions.FileAttribute; import org.apache.hadoop.tools.GlobbedCopyListing; import org.apache.hadoop.tools.util.DistCpUtils; -import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.EnumSet; -import java.util.LinkedList; import java.util.List; /** @@ -67,8 +63,7 @@ public class CopyCommitter extends FileOutputCommitter { private boolean syncFolder = false; private boolean overwrite = false; private boolean targetPathExists = true; - private boolean ignoreFailures = false; - + /** * Create a output committer * @@ -87,13 +82,8 @@ public void commitJob(JobContext jobContext) throws IOException { Configuration conf = jobContext.getConfiguration(); syncFolder = conf.getBoolean(DistCpConstants.CONF_LABEL_SYNC_FOLDERS, false); overwrite = conf.getBoolean(DistCpConstants.CONF_LABEL_OVERWRITE, false); - targetPathExists = conf.getBoolean( - DistCpConstants.CONF_LABEL_TARGET_PATH_EXISTS, true); - ignoreFailures = conf.getBoolean( - DistCpOptionSwitch.IGNORE_FAILURES.getConfigLabel(), false); - - concatFileChunks(conf); - + targetPathExists = conf.getBoolean(DistCpConstants.CONF_LABEL_TARGET_PATH_EXISTS, true); + super.commitJob(jobContext); cleanupTempFiles(jobContext); @@ -179,112 +169,9 @@ private void cleanup(Configuration conf) { } } - private boolean isFileNotFoundException(IOException e) { - if (e instanceof FileNotFoundException) { - return true; - } - - if (e instanceof RemoteException) { - return ((RemoteException)e).unwrapRemoteException() - instanceof FileNotFoundException; - } - - return false; - } - - /** - * Concat chunk files for the same file into one. - * Iterate through copy listing, identify chunk files for the same file, - * concat them into one. - */ - private void concatFileChunks(Configuration conf) throws IOException { - - LOG.info("concat file chunks ..."); - - String spath = conf.get(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH); - if (spath == null || spath.isEmpty()) { - return; - } - Path sourceListing = new Path(spath); - SequenceFile.Reader sourceReader = new SequenceFile.Reader(conf, - SequenceFile.Reader.file(sourceListing)); - Path targetRoot = - new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH)); - - try { - CopyListingFileStatus srcFileStatus = new CopyListingFileStatus(); - Text srcRelPath = new Text(); - CopyListingFileStatus lastFileStatus = null; - LinkedList allChunkPaths = new LinkedList(); - - // Iterate over every source path that was copied. - while (sourceReader.next(srcRelPath, srcFileStatus)) { - if (srcFileStatus.isDirectory()) { - continue; - } - Path targetFile = new Path(targetRoot.toString() + "/" + srcRelPath); - Path targetFileChunkPath = - DistCpUtils.getSplitChunkPath(targetFile, srcFileStatus); - if (LOG.isDebugEnabled()) { - LOG.debug(" add " + targetFileChunkPath + " to concat."); - } - allChunkPaths.add(targetFileChunkPath); - if (srcFileStatus.getChunkOffset() + srcFileStatus.getChunkLength() - == srcFileStatus.getLen()) { - // This is the last chunk of the splits, consolidate allChunkPaths - try { - concatFileChunks(conf, targetFile, allChunkPaths); - } catch (IOException e) { - // If the concat failed because a chunk file doesn't exist, - // then we assume that the CopyMapper has skipped copying this - // file, and we ignore the exception here. - // If a chunk file should have been created but it was not, then - // the CopyMapper would have failed. - if (!isFileNotFoundException(e)) { - String emsg = "Failed to concat chunk files for " + targetFile; - if (!ignoreFailures) { - throw new IOException(emsg, e); - } else { - LOG.warn(emsg, e); - } - } - } - allChunkPaths.clear(); - lastFileStatus = null; - } else { - if (lastFileStatus == null) { - lastFileStatus = new CopyListingFileStatus(srcFileStatus); - } else { - // Two neighboring chunks have to be consecutive ones for the same - // file, for them to be merged - if (!srcFileStatus.getPath().equals(lastFileStatus.getPath()) || - srcFileStatus.getChunkOffset() != - (lastFileStatus.getChunkOffset() + - lastFileStatus.getChunkLength())) { - String emsg = "Inconsistent sequence file: current " + - "chunk file " + srcFileStatus + " doesnt match prior " + - "entry " + lastFileStatus; - if (!ignoreFailures) { - throw new IOException(emsg); - } else { - LOG.warn(emsg + ", skipping concat this set."); - } - } else { - lastFileStatus.setChunkOffset(srcFileStatus.getChunkOffset()); - lastFileStatus.setChunkLength(srcFileStatus.getChunkLength()); - } - } - } - } - } finally { - IOUtils.closeStream(sourceReader); - } - } - // This method changes the target-directories' file-attributes (owner, // user/group permissions, etc.) based on the corresponding source directories. - private void preserveFileAttributesForDirectories(Configuration conf) - throws IOException { + private void preserveFileAttributesForDirectories(Configuration conf) throws IOException { String attrSymbols = conf.get(DistCpConstants.CONF_LABEL_PRESERVE_STATUS); final boolean syncOrOverwrite = syncFolder || overwrite; @@ -438,57 +325,4 @@ private void commitData(Configuration conf) throws IOException { ", Unable to move to " + finalDir); } } - - /** - * Concat the passed chunk files into one and rename it the targetFile. - */ - private void concatFileChunks(Configuration conf, Path targetFile, - LinkedList allChunkPaths) throws IOException { - if (allChunkPaths.size() == 1) { - return; - } - if (LOG.isDebugEnabled()) { - LOG.debug("concat " + targetFile + " allChunkSize+ " - + allChunkPaths.size()); - } - FileSystem dstfs = targetFile.getFileSystem(conf); - - Path firstChunkFile = allChunkPaths.removeFirst(); - Path[] restChunkFiles = new Path[allChunkPaths.size()]; - allChunkPaths.toArray(restChunkFiles); - if (LOG.isDebugEnabled()) { - LOG.debug("concat: firstchunk: " + dstfs.getFileStatus(firstChunkFile)); - int i = 0; - for (Path f : restChunkFiles) { - LOG.debug("concat: other chunk: " + i + ": " + dstfs.getFileStatus(f)); - ++i; - } - } - dstfs.concat(firstChunkFile, restChunkFiles); - if (LOG.isDebugEnabled()) { - LOG.debug("concat: result: " + dstfs.getFileStatus(firstChunkFile)); - } - rename(dstfs, firstChunkFile, targetFile); - } - - /** - * Rename tmp to dst on destFileSys. - * @param destFileSys the file ssystem - * @param tmp the source path - * @param dst the destination path - * @throws IOException if renaming failed - */ - private static void rename(FileSystem destFileSys, Path tmp, Path dst) - throws IOException { - try { - if (destFileSys.exists(dst)) { - destFileSys.delete(dst, true); - } - destFileSys.rename(tmp, dst); - } catch (IOException ioe) { - throw new IOException("Fail to rename tmp file (=" + tmp - + ") to destination file (=" + dst + ")", ioe); - } - } - } diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java index d6b3ba817b..e1873f17e4 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java @@ -156,12 +156,10 @@ public void map(Text relPath, CopyListingFileStatus sourceFileStatus, sourceFS = sourcePath.getFileSystem(conf); final boolean preserveXAttrs = fileAttributes.contains(FileAttribute.XATTR); - sourceCurrStatus = DistCpUtils.toCopyListingFileStatusHelper(sourceFS, - sourceFS.getFileStatus(sourcePath), - fileAttributes.contains(FileAttribute.ACL), - preserveXAttrs, preserveRawXattrs, - sourceFileStatus.getChunkOffset(), - sourceFileStatus.getChunkLength()); + sourceCurrStatus = DistCpUtils.toCopyListingFileStatus(sourceFS, + sourceFS.getFileStatus(sourcePath), + fileAttributes.contains(FileAttribute.ACL), + preserveXAttrs, preserveRawXattrs); } catch (FileNotFoundException e) { throw new IOException(new RetriableFileCopyCommand.CopyReadException(e)); } @@ -175,8 +173,7 @@ public void map(Text relPath, CopyListingFileStatus sourceFileStatus, LOG.debug("Path could not be found: " + target, ignore); } - if (targetStatus != null && - (targetStatus.isDirectory() != sourceCurrStatus.isDirectory())) { + if (targetStatus != null && (targetStatus.isDirectory() != sourceCurrStatus.isDirectory())) { throw new IOException("Can't replace " + target + ". Target is " + getFileType(targetStatus) + ", Source is " + getFileType(sourceCurrStatus)); } @@ -186,28 +183,19 @@ public void map(Text relPath, CopyListingFileStatus sourceFileStatus, return; } - FileAction action = checkUpdate(sourceFS, sourceCurrStatus, target, - targetStatus); - - Path tmpTarget = target; + FileAction action = checkUpdate(sourceFS, sourceCurrStatus, target, targetStatus); if (action == FileAction.SKIP) { LOG.info("Skipping copy of " + sourceCurrStatus.getPath() + " to " + target); updateSkipCounters(context, sourceCurrStatus); context.write(null, new Text("SKIP: " + sourceCurrStatus.getPath())); - } else { - if (sourceCurrStatus.isSplit()) { - tmpTarget = DistCpUtils.getSplitChunkPath(target, sourceCurrStatus); - } - if (LOG.isDebugEnabled()) { - LOG.debug("copying " + sourceCurrStatus + " " + tmpTarget); - } - copyFileWithRetry(description, sourceCurrStatus, tmpTarget, context, + copyFileWithRetry(description, sourceCurrStatus, target, context, action, fileAttributes); } - DistCpUtils.preserve(target.getFileSystem(conf), tmpTarget, - sourceCurrStatus, fileAttributes, preserveRawXattrs); + + DistCpUtils.preserve(target.getFileSystem(conf), target, sourceCurrStatus, + fileAttributes, preserveRawXattrs); } catch (IOException exception) { handleFailures(exception, sourceFileStatus, target, context); } @@ -273,12 +261,8 @@ private static void updateSkipCounters(Context context, private void handleFailures(IOException exception, CopyListingFileStatus sourceFileStatus, Path target, Context context) throws IOException, InterruptedException { - LOG.error("Failure in copying " + sourceFileStatus.getPath() + - (sourceFileStatus.isSplit()? "," - + " offset=" + sourceFileStatus.getChunkOffset() - + " chunkLength=" + sourceFileStatus.getChunkLength() - : "") + - " to " + target, exception); + LOG.error("Failure in copying " + sourceFileStatus.getPath() + " to " + + target, exception); if (ignoreFailures && ExceptionUtils.indexOfType(exception, CopyReadException.class) != -1) { diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java index 2c17fef1a3..06acd78a8a 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java @@ -118,21 +118,17 @@ private long doCopy(CopyListingFileStatus source, Path target, .contains(FileAttribute.CHECKSUMTYPE) ? sourceFS .getFileChecksum(sourcePath) : null; - long offset = (action == FileAction.APPEND) ? - targetFS.getFileStatus(target).getLen() : source.getChunkOffset(); + final long offset = action == FileAction.APPEND ? targetFS.getFileStatus( + target).getLen() : 0; long bytesRead = copyToFile(targetPath, targetFS, source, offset, context, fileAttributes, sourceChecksum); - if (!source.isSplit()) { - compareFileLengths(source, targetPath, configuration, bytesRead - + offset); - } + compareFileLengths(source, targetPath, configuration, bytesRead + + offset); //At this point, src&dest lengths are same. if length==0, we skip checksum if ((bytesRead != 0) && (!skipCrc)) { - if (!source.isSplit()) { - compareCheckSums(sourceFS, source.getPath(), sourceChecksum, - targetFS, targetPath); - } + compareCheckSums(sourceFS, source.getPath(), sourceChecksum, + targetFS, targetPath); } // it's not append case, thus we first write to a temporary file, rename // it to the target path. @@ -253,26 +249,16 @@ long copyBytes(CopyListingFileStatus source2, long sourceOffset, ThrottledInputStream inStream = null; long totalBytesRead = 0; - long chunkLength = source2.getChunkLength(); - boolean finished = false; try { inStream = getInputStream(source, context.getConfiguration()); int bytesRead = readBytes(inStream, buf, sourceOffset); while (bytesRead >= 0) { - if (chunkLength > 0 && - (totalBytesRead + bytesRead) >= chunkLength) { - bytesRead = (int)(chunkLength - totalBytesRead); - finished = true; - } totalBytesRead += bytesRead; if (action == FileAction.APPEND) { sourceOffset += bytesRead; } outStream.write(buf, 0, bytesRead); updateContextStatus(totalBytesRead, context, source2); - if (finished) { - break; - } bytesRead = readBytes(inStream, buf, sourceOffset); } outStream.close(); diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/UniformSizeInputFormat.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/UniformSizeInputFormat.java index d1c18ea8d1..3e86d0931b 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/UniformSizeInputFormat.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/UniformSizeInputFormat.java @@ -99,8 +99,7 @@ private List getSplits(Configuration configuration, int numSplits, while (reader.next(srcRelPath, srcFileStatus)) { // If adding the current file would cause the bytes per map to exceed // limit. Add the current file to new split - if (currentSplitSize + srcFileStatus.getChunkLength() > nBytesPerSplit - && lastPosition != 0) { + if (currentSplitSize + srcFileStatus.getLen() > nBytesPerSplit && lastPosition != 0) { FileSplit split = new FileSplit(listingFilePath, lastSplitStart, lastPosition - lastSplitStart, null); if (LOG.isDebugEnabled()) { @@ -110,7 +109,7 @@ private List getSplits(Configuration configuration, int numSplits, lastSplitStart = lastPosition; currentSplitSize = 0; } - currentSplitSize += srcFileStatus.getChunkLength(); + currentSplitSize += srcFileStatus.getLen(); lastPosition = reader.getPosition(); } if (lastPosition > lastSplitStart) { diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java index e315b848de..76bc4c5626 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java @@ -19,11 +19,9 @@ package org.apache.hadoop.tools.util; import com.google.common.collect.Maps; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileChecksum; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -32,7 +30,6 @@ import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclUtil; import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputFormat; @@ -47,7 +44,6 @@ import java.io.IOException; import java.text.DecimalFormat; import java.util.EnumSet; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -120,7 +116,7 @@ public static long getLong(Configuration configuration, String label) { * @return Class implementing the strategy specified in options. */ public static Class getStrategy(Configuration conf, - DistCpOptions options) { + DistCpOptions options) { String confLabel = "distcp." + StringUtils.toLowerCase(options.getCopyStrategy()) + ".strategy" + ".impl"; @@ -301,86 +297,6 @@ public static Map getXAttrs(FileSystem fileSystem, return fileSystem.getXAttrs(path); } - /** - * Converts FileStatus to a list of CopyListingFileStatus. - * The resulted list contains either one CopyListingFileStatus per chunk of - * file-blocks (if file-size exceeds blockSize * blocksPerChunk, and there - * are more blocks in the file than blocksperChunk), or a single - * CopyListingFileStatus for the entire file (if file-size is too small to - * split). - * If preserving ACLs, populates the CopyListingFileStatus with the ACLs. - * If preserving XAttrs, populates the CopyListingFileStatus with the XAttrs. - * - * @param fileSystem FileSystem containing the file - * @param fileStatus FileStatus of file - * @param preserveAcls boolean true if preserving ACLs - * @param preserveXAttrs boolean true if preserving XAttrs - * @param preserveRawXAttrs boolean true if preserving raw.* XAttrs - * @param blocksPerChunk size of chunks when copying chunks in parallel - * @return list of CopyListingFileStatus - * @throws IOException if there is an I/O error - */ - public static LinkedList toCopyListingFileStatus( - FileSystem fileSystem, FileStatus fileStatus, boolean preserveAcls, - boolean preserveXAttrs, boolean preserveRawXAttrs, int blocksPerChunk) - throws IOException { - LinkedList copyListingFileStatus = - new LinkedList(); - - final CopyListingFileStatus clfs = toCopyListingFileStatusHelper( - fileSystem, fileStatus, preserveAcls, - preserveXAttrs, preserveRawXAttrs, - 0, fileStatus.getLen()); - final long blockSize = fileStatus.getBlockSize(); - if (LOG.isDebugEnabled()) { - LOG.debug("toCopyListing: " + fileStatus + " chunkSize: " - + blocksPerChunk + " isDFS: " + - (fileSystem instanceof DistributedFileSystem)); - } - if ((blocksPerChunk > 0) && - !fileStatus.isDirectory() && - (fileStatus.getLen() > blockSize * blocksPerChunk)) { - // split only when the file size is larger than the intended chunk size - final BlockLocation[] blockLocations; - blockLocations = fileSystem.getFileBlockLocations(fileStatus, 0, - fileStatus.getLen()); - - int numBlocks = blockLocations.length; - long curPos = 0; - if (numBlocks <= blocksPerChunk) { - if (LOG.isDebugEnabled()) { - LOG.debug(" add file " + clfs); - } - copyListingFileStatus.add(clfs); - } else { - int i = 0; - while (i < numBlocks) { - long curLength = 0; - for (int j = 0; j < blocksPerChunk && i < numBlocks; ++j, ++i) { - curLength += blockLocations[i].getLength(); - } - if (curLength > 0) { - CopyListingFileStatus clfs1 = new CopyListingFileStatus(clfs); - clfs1.setChunkOffset(curPos); - clfs1.setChunkLength(curLength); - if (LOG.isDebugEnabled()) { - LOG.debug(" add file chunk " + clfs1); - } - copyListingFileStatus.add(clfs1); - curPos += curLength; - } - } - } - } else { - if (LOG.isDebugEnabled()) { - LOG.debug(" add file/dir " + clfs); - } - copyListingFileStatus.add(clfs); - } - - return copyListingFileStatus; - } - /** * Converts a FileStatus to a CopyListingFileStatus. If preserving ACLs, * populates the CopyListingFileStatus with the ACLs. If preserving XAttrs, @@ -391,17 +307,13 @@ public static LinkedList toCopyListingFileStatus( * @param preserveAcls boolean true if preserving ACLs * @param preserveXAttrs boolean true if preserving XAttrs * @param preserveRawXAttrs boolean true if preserving raw.* XAttrs - * @param chunkOffset chunk offset in bytes - * @param chunkLength chunk length in bytes - * @return CopyListingFileStatus * @throws IOException if there is an I/O error */ - public static CopyListingFileStatus toCopyListingFileStatusHelper( + public static CopyListingFileStatus toCopyListingFileStatus( FileSystem fileSystem, FileStatus fileStatus, boolean preserveAcls, - boolean preserveXAttrs, boolean preserveRawXAttrs, - long chunkOffset, long chunkLength) throws IOException { + boolean preserveXAttrs, boolean preserveRawXAttrs) throws IOException { CopyListingFileStatus copyListingFileStatus = - new CopyListingFileStatus(fileStatus, chunkOffset, chunkLength); + new CopyListingFileStatus(fileStatus); if (preserveAcls) { FsPermission perm = fileStatus.getPermission(); if (perm.getAclBit()) { @@ -558,19 +470,4 @@ public static boolean checksumsAreEqual(FileSystem sourceFS, Path source, return (sourceChecksum == null || targetChecksum == null || sourceChecksum.equals(targetChecksum)); } - - /* - * Return the Path for a given chunk. - * Used when splitting large file into chunks to copy in parallel. - * @param targetFile path to target file - * @param srcFileStatus source file status in copy listing - * @return path to the chunk specified by the parameters to store - * in target cluster temporarily - */ - public static Path getSplitChunkPath(Path targetFile, - CopyListingFileStatus srcFileStatus) { - return new Path(targetFile.toString() - + ".____distcpSplit____" + srcFileStatus.getChunkOffset() - + "." + srcFileStatus.getChunkLength()); - } } diff --git a/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm b/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm index a77deb2ffe..41a6e94aee 100644 --- a/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm +++ b/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm @@ -237,7 +237,6 @@ Flag | Description | Notes `-rdiff ` | Use snapshot diff report between given two snapshots to identify what has been changed on the target since the snapshot `` was created on the target, and apply the diff reversely to the target, and copy modified files from the source's ``, to make the target the same as ``. | This option is valid only with `-update` option and the following conditions should be satisfied.
  1. Both the source and the target FileSystem must be DistributedFileSystem. The source and the target can be two different clusters/paths, or they can be exactly the same cluster/path. In the latter case, modified files are copied from target's `` to target's current state).
  2. Two snapshots `` and `` have been created on the target FS, and `` is older than ``. No change has been made on target since `` was created on the target.
  3. The source has the same snapshot ``, which has the same content as the `` on the target. All the files/directories in the target's `` are the same with source's ``.
| `-numListstatusThreads` | Number of threads to use for building file listing | At most 40 threads. `-skipcrccheck` | Whether to skip CRC checks between source and target paths. | -`-blocksperchunk ` | Number of blocks per chunk. When specified, split files into chunks to copy in parallel | If set to a positive value, files with more blocks than this value will be split into chunks of `` blocks to be transferred in parallel, and reassembled on the destination. By default, `` is 0 and the files will be transmitted in their entirety without splitting. This switch is only applicable when the source file system implements getBlockLocations method and the target file system implements concat method. | Architecture of DistCp ---------------------- diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSystem.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSystem.java index b2266b3344..e3018a0773 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSystem.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSystem.java @@ -23,27 +23,17 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.io.PrintStream; +import java.io.OutputStream; import java.net.URI; import java.util.ArrayList; import java.util.List; -import java.util.Random; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FsShell; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.DFSTestUtil; -import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.util.ToolRunner; import org.junit.AfterClass; import org.junit.Assert; @@ -57,15 +47,11 @@ */ public class TestDistCpSystem { - private static final Log LOG = - LogFactory.getLog(TestDistCpSystem.class); - @Rule public Timeout globalTimeout = new Timeout(30000); private static final String SRCDAT = "srcdat"; private static final String DSTDAT = "dstdat"; - private static final long BLOCK_SIZE = 1024; private static MiniDFSCluster cluster; private static Configuration conf; @@ -77,76 +63,27 @@ public FileEntry(String path, boolean isDir) { this.path = path; this.isDir = isDir; } - - String getPath() { - return path; - } - - boolean isDirectory() { - return isDir; - } - } - - @BeforeClass - public static void beforeClass() throws IOException { - conf = new Configuration(); - conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, BLOCK_SIZE); - conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); - cluster.waitActive(); - } - - @AfterClass - public static void afterClass() throws IOException { - if (cluster != null) { - cluster.shutdown(); - } - } - - static String execCmd(FsShell shell, String... args) throws Exception { - ByteArrayOutputStream baout = new ByteArrayOutputStream(); - PrintStream out = new PrintStream(baout, true); - PrintStream old = System.out; - System.setOut(out); - shell.run(args); - out.close(); - System.setOut(old); - return baout.toString(); + String getPath() { return path; } + boolean isDirectory() { return isDir; } } - private void createFiles(DistributedFileSystem fs, String topdir, - FileEntry[] entries, long chunkSize) throws IOException { - long seed = System.currentTimeMillis(); - Random rand = new Random(seed); - short replicationFactor = 2; + private void createFiles(FileSystem fs, String topdir, + FileEntry[] entries) throws IOException { for (FileEntry entry : entries) { - Path newPath = new Path(topdir + "/" + entry.getPath()); + Path newpath = new Path(topdir + "/" + entry.getPath()); if (entry.isDirectory()) { - fs.mkdirs(newPath); + fs.mkdirs(newpath); } else { - long fileSize = BLOCK_SIZE *100; - int bufSize = 128; - if (chunkSize == -1) { - DFSTestUtil.createFile(fs, newPath, bufSize, - fileSize, BLOCK_SIZE, replicationFactor, seed); - } else { - // Create a variable length block file, by creating - // one block of half block size at the chunk boundary - long seg1 = chunkSize * BLOCK_SIZE - BLOCK_SIZE / 2; - long seg2 = fileSize - seg1; - DFSTestUtil.createFile(fs, newPath, bufSize, - seg1, BLOCK_SIZE, replicationFactor, seed); - DFSTestUtil.appendFileNewBlock(fs, newPath, (int)seg2); + OutputStream out = fs.create(newpath); + try { + out.write((topdir + "/" + entry).getBytes()); + out.write("\n".getBytes()); + } finally { + out.close(); } } - seed = System.currentTimeMillis() + rand.nextLong(); } } - - private void createFiles(DistributedFileSystem fs, String topdir, - FileEntry[] entries) throws IOException { - createFiles(fs, topdir, entries, -1); - } private static FileStatus[] getFileStatus(FileSystem fs, String topdir, FileEntry[] files) throws IOException { @@ -167,19 +104,18 @@ private static void deldir(FileSystem fs, String topdir) throws IOException { } private void testPreserveUserHelper(String testRoot, - FileEntry[] srcEntries, - FileEntry[] dstEntries, - boolean createSrcDir, - boolean createTgtDir, - boolean update) throws Exception { + FileEntry[] srcEntries, + FileEntry[] dstEntries, + boolean createSrcDir, + boolean createTgtDir, + boolean update) throws Exception { final String testSrcRel = SRCDAT; final String testSrc = testRoot + "/" + testSrcRel; final String testDstRel = DSTDAT; final String testDst = testRoot + "/" + testDstRel; String nnUri = FileSystem.getDefaultUri(conf).toString(); - DistributedFileSystem fs = (DistributedFileSystem) - FileSystem.get(URI.create(nnUri), conf); + FileSystem fs = FileSystem.get(URI.create(nnUri), conf); fs.mkdirs(new Path(testRoot)); if (createSrcDir) { fs.mkdirs(new Path(testSrc)); @@ -193,8 +129,8 @@ private void testPreserveUserHelper(String testRoot, for(int i = 0; i < srcEntries.length; i++) { fs.setOwner(srcstats[i].getPath(), "u" + i, null); } - String[] args = update? new String[]{"-pub", "-update", nnUri+testSrc, - nnUri+testDst} : new String[]{"-pub", nnUri+testSrc, nnUri+testDst}; + String[] args = update? new String[]{"-pu", "-update", nnUri+testSrc, + nnUri+testDst} : new String[]{"-pu", nnUri+testSrc, nnUri+testDst}; ToolRunner.run(conf, new DistCp(), args); @@ -209,263 +145,20 @@ private void testPreserveUserHelper(String testRoot, deldir(fs, testRoot); } - private void compareFiles(FileSystem fs, FileStatus srcStat, - FileStatus dstStat) throws Exception { - LOG.info("Comparing " + srcStat + " and " + dstStat); - assertEquals(srcStat.isDirectory(), dstStat.isDirectory()); - assertEquals(srcStat.getReplication(), dstStat.getReplication()); - assertEquals("File POSIX permission should match", - srcStat.getPermission(), dstStat.getPermission()); - assertEquals("File user ownership should match", - srcStat.getOwner(), dstStat.getOwner()); - assertEquals("File group ownership should match", - srcStat.getGroup(), dstStat.getGroup()); - // TODO; check ACL attributes - - if (srcStat.isDirectory()) { - return; - } - - assertEquals("File length should match (" + srcStat.getPath() + ")", - srcStat.getLen(), dstStat.getLen()); - - FSDataInputStream srcIn = fs.open(srcStat.getPath()); - FSDataInputStream dstIn = fs.open(dstStat.getPath()); - try { - byte[] readSrc = new byte[(int) - HdfsClientConfigKeys.DFS_BLOCK_SIZE_DEFAULT]; - byte[] readDst = new byte[(int) - HdfsClientConfigKeys.DFS_BLOCK_SIZE_DEFAULT]; - - int srcBytesRead = 0, tgtBytesRead = 0; - int srcIdx = 0, tgtIdx = 0; - long totalComparedBytes = 0; - while (true) { - if (srcBytesRead == 0) { - srcBytesRead = srcIn.read(readSrc); - srcIdx = 0; - } - if (tgtBytesRead == 0) { - tgtBytesRead = dstIn.read(readDst); - tgtIdx = 0; - } - if (srcBytesRead == 0 || tgtBytesRead == 0) { - LOG.info("______ compared src and dst files for " - + totalComparedBytes + " bytes, content match."); - if (srcBytesRead != tgtBytesRead) { - Assert.fail("Read mismatching size, compared " - + totalComparedBytes + " bytes between src and dst file " - + srcStat + " and " + dstStat); - } - if (totalComparedBytes != srcStat.getLen()) { - Assert.fail("Only read/compared " + totalComparedBytes + - " bytes between src and dst file " + srcStat + - " and " + dstStat); - } else { - // success - break; - } - } - for (; srcIdx < srcBytesRead && tgtIdx < tgtBytesRead; - ++srcIdx, ++tgtIdx) { - if (readSrc[srcIdx] != readDst[tgtIdx]) { - Assert.fail("src and dst file does not match at " - + totalComparedBytes + " between " - + srcStat + " and " + dstStat); - } - ++totalComparedBytes; - } - LOG.info("______ compared src and dst files for " - + totalComparedBytes + " bytes, content match. FileLength: " - + srcStat.getLen()); - if (totalComparedBytes == srcStat.getLen()) { - LOG.info("______ Final:" + srcIdx + " " - + srcBytesRead + " " + tgtIdx + " " + tgtBytesRead); - break; - } - if (srcIdx == srcBytesRead) { - srcBytesRead = 0; - } - if (tgtIdx == tgtBytesRead) { - tgtBytesRead = 0; - } - } - } finally { - if (srcIn != null) { - srcIn.close(); - } - if (dstIn != null) { - dstIn.close(); - } - } + @BeforeClass + public static void beforeClass() throws IOException { + conf = new Configuration(); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); + cluster.waitActive(); } - // WC: needed because the current distcp does not create target dirs - private void createDestDir(FileSystem fs, String testDst, - FileStatus[] srcStats, FileEntry[] srcFiles) throws IOException { - fs.mkdirs(new Path(testDst)); - - for (int i=0; i=0; --i) { - if (!srcFiles[i].isDirectory()) { - LOG.info("Modifying " + srcStats[i].getPath()); - DFSTestUtil.appendFileNewBlock(fs, srcStats[i].getPath(), - (int)BLOCK_SIZE * 3); - break; - } - } - // get file status after modifying file - srcStats = getFileStatus(fs, testRoot, srcFiles); - - args = new String[] {"-pugp", "-update", "-blocksperchunk", - String.valueOf(chunkSize), - nnUri + testSrc, nnUri + testDst + "/" + testSrcRel}; - - copyAndVerify(fs, srcFiles, srcStats, testDst, args); - - deldir(fs, testRoot); - } - - @Test - public void testRecursiveChunkCopy() throws Exception { - FileEntry[] srcFiles = { - new FileEntry(SRCDAT, true), - new FileEntry(SRCDAT + "/file0", false), - new FileEntry(SRCDAT + "/dir1", true), - new FileEntry(SRCDAT + "/dir2", true), - new FileEntry(SRCDAT + "/dir1/file1", false) - }; - chunkCopy(srcFiles); - } - - @Test - public void testChunkCopyOneFile() throws Exception { - FileEntry[] srcFiles = { - new FileEntry(SRCDAT, true), - new FileEntry(SRCDAT + "/file0", false) - }; - chunkCopy(srcFiles); - } - - @Test - public void testDistcpLargeFile() throws Exception { - FileEntry[] srcfiles = { - new FileEntry(SRCDAT, true), - new FileEntry(SRCDAT + "/file", false) - }; - - final String testRoot = "/testdir"; - final String testSrcRel = SRCDAT; - final String testSrc = testRoot + "/" + testSrcRel; - final String testDstRel = DSTDAT; - final String testDst = testRoot + "/" + testDstRel; - - String nnUri = FileSystem.getDefaultUri(conf).toString(); - DistributedFileSystem fs = - (DistributedFileSystem) FileSystem.get(URI.create(nnUri), conf); - fs.mkdirs(new Path(testRoot)); - fs.mkdirs(new Path(testSrc)); - fs.mkdirs(new Path(testDst)); - long chunkSize = 6; - createFiles(fs, testRoot, srcfiles, chunkSize); - - String srcFileName = testRoot + Path.SEPARATOR + srcfiles[1].getPath(); - Path srcfile = new Path(srcFileName); - - if(!cluster.getFileSystem().exists(srcfile)){ - throw new Exception("src not exist"); - } - - final long srcLen = fs.getFileStatus(srcfile).getLen(); - - FileStatus[] srcstats = getFileStatus(fs, testRoot, srcfiles); - for (int i = 0; i < srcfiles.length; i++) { - fs.setOwner(srcstats[i].getPath(), "u" + i, null); - } - String[] args = new String[] { - "-blocksperchunk", - String.valueOf(chunkSize), - nnUri + testSrc, - nnUri + testDst - }; - - LOG.info("_____ running distcp: " + args[0] + " " + args[1]); - ToolRunner.run(conf, new DistCp(), args); - - String realTgtPath = testDst; - FileStatus[] dststat = getFileStatus(fs, realTgtPath, srcfiles); - assertEquals("File length should match", srcLen, - dststat[dststat.length - 1].getLen()); - - this.compareFiles(fs, srcstats[srcstats.length-1], - dststat[dststat.length-1]); - deldir(fs, testRoot); - } - @Test public void testPreserveUseNonEmptyDir() throws Exception { String testRoot = "/testdir." + getMethodName(); @@ -487,6 +180,7 @@ public void testPreserveUseNonEmptyDir() throws Exception { testPreserveUserHelper(testRoot, srcfiles, dstfiles, false, false, false); } + @Test public void testPreserveUserEmptyDir() throws Exception { String testRoot = "/testdir." + getMethodName(); diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java index f94ba97ec3..efe46272e3 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java @@ -394,7 +394,7 @@ public void testToString() { + "copyStrategy='uniformsize', preserveStatus=[], " + "preserveRawXattrs=false, atomicWorkPath=null, logPath=null, " + "sourceFileListing=abc, sourcePaths=null, targetPath=xyz, " - + "targetPathExists=true, filtersFile='null', blocksPerChunk=0}"; + + "targetPathExists=true, filtersFile='null'}"; String optionString = option.toString(); Assert.assertEquals(val, optionString); Assert.assertNotSame(DistCpOptionSwitch.ATOMIC_COMMIT.toString(), diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java index 2452d6fee3..2e9a350b1c 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java @@ -81,10 +81,6 @@ public static void destroy() { @Before public void createMetaFolder() { config.set(DistCpConstants.CONF_LABEL_META_FOLDER, "/meta"); - // Unset listing file path since the config is shared by - // multiple tests, and some test doesn't set it, such as - // testNoCommitAction, but the distcp code will check it. - config.set(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH, ""); Path meta = new Path("/meta"); try { cluster.getFileSystem().mkdirs(meta); @@ -330,6 +326,7 @@ public void testAtomicCommitMissingFinal() { committer.commitJob(jobContext); Assert.assertFalse(fs.exists(new Path(workPath))); Assert.assertTrue(fs.exists(new Path(finalPath))); + } catch (IOException e) { LOG.error("Exception encountered while testing for preserve status", e); Assert.fail("Atomic commit failure");