HADOOP-13169. Randomize file list in SimpleCopyListing. Contributed by Rajesh Balamohan.
This commit is contained in:
parent
7558dbbb48
commit
98bdb51397
@ -58,6 +58,10 @@ public class DistCpConstants {
|
||||
public static final String CONF_LABEL_APPEND = "distcp.copy.append";
|
||||
public static final String CONF_LABEL_DIFF = "distcp.copy.diff";
|
||||
public static final String CONF_LABEL_BANDWIDTH_MB = "distcp.map.bandwidth.mb";
|
||||
public static final String CONF_LABEL_SIMPLE_LISTING_FILESTATUS_SIZE =
|
||||
"distcp.simplelisting.file.status.size";
|
||||
public static final String CONF_LABEL_SIMPLE_LISTING_RANDOMIZE_FILES =
|
||||
"distcp.simplelisting.randomize.files";
|
||||
public static final String CONF_LABEL_FILTERS_FILE =
|
||||
"distcp.filters.file";
|
||||
public static final String CONF_LABEL_MAX_CHUNKS_TOLERABLE =
|
||||
|
@ -18,6 +18,7 @@
|
||||
|
||||
package org.apache.hadoop.tools;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
@ -42,7 +43,10 @@
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
|
||||
import static org.apache.hadoop.tools.DistCpConstants
|
||||
.HDFS_RESERVED_RAW_DIRECTORY_NAME;
|
||||
@ -56,13 +60,19 @@
|
||||
public class SimpleCopyListing extends CopyListing {
|
||||
private static final Log LOG = LogFactory.getLog(SimpleCopyListing.class);
|
||||
|
||||
public static final int DEFAULT_FILE_STATUS_SIZE = 1000;
|
||||
public static final boolean DEFAULT_RANDOMIZE_FILE_LISTING = true;
|
||||
|
||||
private long totalPaths = 0;
|
||||
private long totalDirs = 0;
|
||||
private long totalBytesToCopy = 0;
|
||||
private int numListstatusThreads = 1;
|
||||
private final int fileStatusLimit;
|
||||
private final boolean randomizeFileListing;
|
||||
private final int maxRetries = 3;
|
||||
private CopyFilter copyFilter;
|
||||
private DistCpSync distCpSync;
|
||||
private final Random rnd = new Random();
|
||||
|
||||
/**
|
||||
* Protected constructor, to initialize configuration.
|
||||
@ -76,6 +86,17 @@ protected SimpleCopyListing(Configuration configuration, Credentials credentials
|
||||
numListstatusThreads = getConf().getInt(
|
||||
DistCpConstants.CONF_LABEL_LISTSTATUS_THREADS,
|
||||
DistCpConstants.DEFAULT_LISTSTATUS_THREADS);
|
||||
fileStatusLimit = Math.max(1, getConf()
|
||||
.getInt(DistCpConstants.CONF_LABEL_SIMPLE_LISTING_FILESTATUS_SIZE,
|
||||
DEFAULT_FILE_STATUS_SIZE));
|
||||
randomizeFileListing = getConf().getBoolean(
|
||||
DistCpConstants.CONF_LABEL_SIMPLE_LISTING_RANDOMIZE_FILES,
|
||||
DEFAULT_RANDOMIZE_FILE_LISTING);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("numListstatusThreads=" + numListstatusThreads
|
||||
+ ", fileStatusLimit=" + fileStatusLimit
|
||||
+ ", randomizeFileListing=" + randomizeFileListing);
|
||||
}
|
||||
copyFilter = CopyFilter.getCopyFilter(getConf());
|
||||
copyFilter.initialize();
|
||||
}
|
||||
@ -83,9 +104,13 @@ protected SimpleCopyListing(Configuration configuration, Credentials credentials
|
||||
@VisibleForTesting
|
||||
protected SimpleCopyListing(Configuration configuration,
|
||||
Credentials credentials,
|
||||
int numListstatusThreads) {
|
||||
int numListstatusThreads,
|
||||
int fileStatusLimit,
|
||||
boolean randomizeFileListing) {
|
||||
super(configuration, credentials);
|
||||
this.numListstatusThreads = numListstatusThreads;
|
||||
this.fileStatusLimit = Math.max(1, fileStatusLimit);
|
||||
this.randomizeFileListing = randomizeFileListing;
|
||||
}
|
||||
|
||||
protected SimpleCopyListing(Configuration configuration,
|
||||
@ -236,6 +261,7 @@ protected void doBuildListingWithSnapshotDiff(
|
||||
FileSystem sourceFS = sourceRoot.getFileSystem(getConf());
|
||||
|
||||
try {
|
||||
List<FileStatusInfo> fileStatuses = Lists.newArrayList();
|
||||
for (DiffInfo diff : diffList) {
|
||||
// add snapshot paths prefix
|
||||
diff.target = new Path(options.getSourcePaths().get(0), diff.target);
|
||||
@ -259,10 +285,13 @@ protected void doBuildListingWithSnapshotDiff(
|
||||
sourceDirs.add(sourceStatus);
|
||||
|
||||
traverseDirectory(fileListWriter, sourceFS, sourceDirs,
|
||||
sourceRoot, options, excludeList);
|
||||
sourceRoot, options, excludeList, fileStatuses);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (randomizeFileListing) {
|
||||
writeToFileListing(fileStatuses, fileListWriter);
|
||||
}
|
||||
fileListWriter.close();
|
||||
fileListWriter = null;
|
||||
} finally {
|
||||
@ -296,6 +325,7 @@ protected void doBuildListing(SequenceFile.Writer fileListWriter,
|
||||
}
|
||||
|
||||
try {
|
||||
List<FileStatusInfo> statusList = Lists.newArrayList();
|
||||
for (Path path: options.getSourcePaths()) {
|
||||
FileSystem sourceFS = path.getFileSystem(getConf());
|
||||
final boolean preserveAcls = options.shouldPreserve(FileAttribute.ACL);
|
||||
@ -326,8 +356,14 @@ protected void doBuildListing(SequenceFile.Writer fileListWriter,
|
||||
preserveAcls && sourceStatus.isDirectory(),
|
||||
preserveXAttrs && sourceStatus.isDirectory(),
|
||||
preserveRawXAttrs && sourceStatus.isDirectory());
|
||||
writeToFileListing(fileListWriter, sourceCopyListingStatus,
|
||||
sourcePathRoot);
|
||||
if (randomizeFileListing) {
|
||||
addToFileListing(statusList,
|
||||
new FileStatusInfo(sourceCopyListingStatus, sourcePathRoot),
|
||||
fileListWriter);
|
||||
} else {
|
||||
writeToFileListing(fileListWriter, sourceCopyListingStatus,
|
||||
sourcePathRoot);
|
||||
}
|
||||
|
||||
if (sourceStatus.isDirectory()) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
@ -337,9 +373,12 @@ protected void doBuildListing(SequenceFile.Writer fileListWriter,
|
||||
}
|
||||
}
|
||||
traverseDirectory(fileListWriter, sourceFS, sourceDirs,
|
||||
sourcePathRoot, options, null);
|
||||
sourcePathRoot, options, null, statusList);
|
||||
}
|
||||
}
|
||||
if (randomizeFileListing) {
|
||||
writeToFileListing(statusList, fileListWriter);
|
||||
}
|
||||
fileListWriter.close();
|
||||
printStats();
|
||||
LOG.info("Build file listing completed.");
|
||||
@ -349,6 +388,52 @@ protected void doBuildListing(SequenceFile.Writer fileListWriter,
|
||||
}
|
||||
}
|
||||
|
||||
private void addToFileListing(List<FileStatusInfo> fileStatusInfoList,
|
||||
FileStatusInfo statusInfo, SequenceFile.Writer fileListWriter)
|
||||
throws IOException {
|
||||
fileStatusInfoList.add(statusInfo);
|
||||
if (fileStatusInfoList.size() > fileStatusLimit) {
|
||||
writeToFileListing(fileStatusInfoList, fileListWriter);
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
void setSeedForRandomListing(long seed) {
|
||||
this.rnd.setSeed(seed);
|
||||
}
|
||||
|
||||
private void writeToFileListing(List<FileStatusInfo> fileStatusInfoList,
|
||||
SequenceFile.Writer fileListWriter) throws IOException {
|
||||
/**
|
||||
* In cloud storage systems, it is possible to get region hotspot.
|
||||
* Shuffling paths can avoid such cases and also ensure that
|
||||
* some mappers do not get lots of similar paths.
|
||||
*/
|
||||
Collections.shuffle(fileStatusInfoList, rnd);
|
||||
for (FileStatusInfo fileStatusInfo : fileStatusInfoList) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Adding " + fileStatusInfo.fileStatus.getPath());
|
||||
}
|
||||
writeToFileListing(fileListWriter, fileStatusInfo.fileStatus,
|
||||
fileStatusInfo.sourceRootPath);
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Number of paths written to fileListing="
|
||||
+ fileStatusInfoList.size());
|
||||
}
|
||||
fileStatusInfoList.clear();
|
||||
}
|
||||
|
||||
private static class FileStatusInfo {
|
||||
private CopyListingFileStatus fileStatus;
|
||||
private Path sourceRootPath;
|
||||
|
||||
FileStatusInfo(CopyListingFileStatus fileStatus, Path sourceRootPath) {
|
||||
this.fileStatus = fileStatus;
|
||||
this.sourceRootPath = sourceRootPath;
|
||||
}
|
||||
}
|
||||
|
||||
private Path computeSourceRootPath(FileStatus sourceStatus,
|
||||
DistCpOptions options) throws IOException {
|
||||
|
||||
@ -516,15 +601,18 @@ private void traverseDirectory(SequenceFile.Writer fileListWriter,
|
||||
ArrayList<FileStatus> sourceDirs,
|
||||
Path sourcePathRoot,
|
||||
DistCpOptions options,
|
||||
HashSet<String> excludeList)
|
||||
HashSet<String> excludeList,
|
||||
List<FileStatusInfo> fileStatuses)
|
||||
throws IOException {
|
||||
final boolean preserveAcls = options.shouldPreserve(FileAttribute.ACL);
|
||||
final boolean preserveXAttrs = options.shouldPreserve(FileAttribute.XATTR);
|
||||
final boolean preserveRawXattrs = options.shouldPreserveRawXattrs();
|
||||
|
||||
assert numListstatusThreads > 0;
|
||||
LOG.debug("Starting thread pool of " + numListstatusThreads +
|
||||
" listStatus workers.");
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Starting thread pool of " + numListstatusThreads +
|
||||
" listStatus workers.");
|
||||
}
|
||||
ProducerConsumer<FileStatus, FileStatus[]> workers =
|
||||
new ProducerConsumer<FileStatus, FileStatus[]>(numListstatusThreads);
|
||||
for (int i = 0; i < numListstatusThreads; i++) {
|
||||
@ -551,8 +639,14 @@ private void traverseDirectory(SequenceFile.Writer fileListWriter,
|
||||
preserveAcls && child.isDirectory(),
|
||||
preserveXAttrs && child.isDirectory(),
|
||||
preserveRawXattrs && child.isDirectory());
|
||||
writeToFileListing(fileListWriter, childCopyListingStatus,
|
||||
sourcePathRoot);
|
||||
if (randomizeFileListing) {
|
||||
addToFileListing(fileStatuses,
|
||||
new FileStatusInfo(childCopyListingStatus, sourcePathRoot),
|
||||
fileListWriter);
|
||||
} else {
|
||||
writeToFileListing(fileListWriter, childCopyListingStatus,
|
||||
sourcePathRoot);
|
||||
}
|
||||
}
|
||||
if (retry < maxRetries) {
|
||||
if (child.isDirectory()) {
|
||||
|
@ -25,7 +25,6 @@
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
|
||||
import org.apache.hadoop.tools.util.TestDistCpUtils;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
@ -46,7 +45,9 @@
|
||||
import java.util.Arrays;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
|
||||
@RunWith(value = Parameterized.class)
|
||||
public class TestCopyListing extends SimpleCopyListing {
|
||||
@ -77,7 +78,7 @@ public static Collection<Object[]> data() {
|
||||
}
|
||||
|
||||
public TestCopyListing(int numListstatusThreads) {
|
||||
super(config, CREDENTIALS, numListstatusThreads);
|
||||
super(config, CREDENTIALS, numListstatusThreads, 0, false);
|
||||
}
|
||||
|
||||
protected TestCopyListing(Configuration configuration) {
|
||||
@ -221,6 +222,84 @@ public void testBuildListing() {
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=60000)
|
||||
public void testWithRandomFileListing() throws IOException {
|
||||
FileSystem fs = null;
|
||||
try {
|
||||
fs = FileSystem.get(getConf());
|
||||
List<Path> srcPaths = new ArrayList<>();
|
||||
List<Path> srcFiles = new ArrayList<>();
|
||||
Path target = new Path("/tmp/out/1");
|
||||
final int pathCount = 25;
|
||||
for (int i = 0; i < pathCount; i++) {
|
||||
Path p = new Path("/tmp", String.valueOf(i));
|
||||
srcPaths.add(p);
|
||||
fs.mkdirs(p);
|
||||
|
||||
Path fileName = new Path(p, i + ".txt");
|
||||
srcFiles.add(fileName);
|
||||
try (OutputStream out = fs.create(fileName)) {
|
||||
out.write(i);
|
||||
}
|
||||
}
|
||||
|
||||
Path listingFile = new Path("/tmp/file");
|
||||
DistCpOptions options = new DistCpOptions(srcPaths, target);
|
||||
options.setSyncFolder(true);
|
||||
|
||||
// Check without randomizing files
|
||||
getConf().setBoolean(
|
||||
DistCpConstants.CONF_LABEL_SIMPLE_LISTING_RANDOMIZE_FILES, false);
|
||||
SimpleCopyListing listing = new SimpleCopyListing(getConf(), CREDENTIALS);
|
||||
listing.buildListing(listingFile, options);
|
||||
|
||||
Assert.assertEquals(listing.getNumberOfPaths(), pathCount);
|
||||
validateFinalListing(listingFile, srcFiles);
|
||||
fs.delete(listingFile, true);
|
||||
|
||||
// Check with randomized file listing
|
||||
getConf().setBoolean(
|
||||
DistCpConstants.CONF_LABEL_SIMPLE_LISTING_RANDOMIZE_FILES, true);
|
||||
listing = new SimpleCopyListing(getConf(), CREDENTIALS);
|
||||
|
||||
// Set the seed for randomness, so that it can be verified later
|
||||
long seed = System.nanoTime();
|
||||
listing.setSeedForRandomListing(seed);
|
||||
listing.buildListing(listingFile, options);
|
||||
Assert.assertEquals(listing.getNumberOfPaths(), pathCount);
|
||||
|
||||
// validate randomness
|
||||
Collections.shuffle(srcFiles, new Random(seed));
|
||||
validateFinalListing(listingFile, srcFiles);
|
||||
} finally {
|
||||
TestDistCpUtils.delete(fs, "/tmp");
|
||||
}
|
||||
}
|
||||
|
||||
private void validateFinalListing(Path pathToListFile, List<Path> srcFiles)
|
||||
throws IOException {
|
||||
FileSystem fs = pathToListFile.getFileSystem(config);
|
||||
|
||||
try (SequenceFile.Reader reader = new SequenceFile.Reader(
|
||||
config, SequenceFile.Reader.file(pathToListFile))) {
|
||||
CopyListingFileStatus currentVal = new CopyListingFileStatus();
|
||||
|
||||
Text currentKey = new Text();
|
||||
int idx = 0;
|
||||
while (reader.next(currentKey)) {
|
||||
reader.getCurrentValue(currentVal);
|
||||
Assert.assertEquals("srcFiles.size=" + srcFiles.size()
|
||||
+ ", idx=" + idx, fs.makeQualified(srcFiles.get(idx)),
|
||||
currentVal.getPath());
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("val=" + fs.makeQualified(srcFiles.get(idx)));
|
||||
}
|
||||
idx++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Test(timeout=10000)
|
||||
public void testBuildListingForSingleFile() {
|
||||
FileSystem fs = null;
|
||||
|
Loading…
Reference in New Issue
Block a user