HADOOP-11785. Reduce the number of listStatus operation in distcp buildListing (Zoran Dimitrijevic via Colin P. McCabe)

This commit is contained in:
Colin Patrick Mccabe 2015-04-03 14:08:25 -07:00
parent db80e42891
commit 932730df7d
2 changed files with 21 additions and 23 deletions

View File

@ -483,6 +483,9 @@ Release 2.8.0 - UNRELEASED
OPTIMIZATIONS OPTIMIZATIONS
HADOOP-11785. Reduce the number of listStatus operation in distcp
buildListing (Zoran Dimitrijevic via Colin P. McCabe)
BUG FIXES BUG FIXES
HADOOP-10027. *Compressor_deflateBytesDirect passes instance instead of HADOOP-10027. *Compressor_deflateBytesDirect passes instance instead of

View File

@ -193,12 +193,12 @@ public void doBuildListing(SequenceFile.Writer fileListWriter,
writeToFileListing(fileListWriter, sourceCopyListingStatus, writeToFileListing(fileListWriter, sourceCopyListingStatus,
sourcePathRoot, options); sourcePathRoot, options);
if (isDirectoryAndNotEmpty(sourceFS, sourceStatus)) { if (sourceStatus.isDirectory()) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Traversing non-empty source dir: " + sourceStatus.getPath()); LOG.debug("Traversing source dir: " + sourceStatus.getPath());
} }
traverseNonEmptyDirectory(fileListWriter, sourceStatus, sourcePathRoot, traverseDirectory(fileListWriter, sourceFS, sourceStatus,
options); sourcePathRoot, options);
} }
} }
} }
@ -275,22 +275,17 @@ private SequenceFile.Writer getWriter(Path pathToListFile) throws IOException {
SequenceFile.Writer.compression(SequenceFile.CompressionType.NONE)); SequenceFile.Writer.compression(SequenceFile.CompressionType.NONE));
} }
private static boolean isDirectoryAndNotEmpty(FileSystem fileSystem,
FileStatus fileStatus) throws IOException {
return fileStatus.isDirectory() && getChildren(fileSystem, fileStatus).length > 0;
}
private static FileStatus[] getChildren(FileSystem fileSystem, private static FileStatus[] getChildren(FileSystem fileSystem,
FileStatus parent) throws IOException { FileStatus parent) throws IOException {
return fileSystem.listStatus(parent.getPath()); return fileSystem.listStatus(parent.getPath());
} }
private void traverseNonEmptyDirectory(SequenceFile.Writer fileListWriter, private void traverseDirectory(SequenceFile.Writer fileListWriter,
FileStatus sourceStatus, FileSystem sourceFS,
Path sourcePathRoot, FileStatus sourceStatus,
DistCpOptions options) Path sourcePathRoot,
throws IOException { DistCpOptions options)
FileSystem sourceFS = sourcePathRoot.getFileSystem(getConf()); throws IOException {
final boolean preserveAcls = options.shouldPreserve(FileAttribute.ACL); final boolean preserveAcls = options.shouldPreserve(FileAttribute.ACL);
final boolean preserveXAttrs = options.shouldPreserve(FileAttribute.XATTR); final boolean preserveXAttrs = options.shouldPreserve(FileAttribute.XATTR);
final boolean preserveRawXattrs = options.shouldPreserveRawXattrs(); final boolean preserveRawXattrs = options.shouldPreserveRawXattrs();
@ -299,9 +294,9 @@ private void traverseNonEmptyDirectory(SequenceFile.Writer fileListWriter,
while (!pathStack.isEmpty()) { while (!pathStack.isEmpty()) {
for (FileStatus child: getChildren(sourceFS, pathStack.pop())) { for (FileStatus child: getChildren(sourceFS, pathStack.pop())) {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled()) {
LOG.debug("Recording source-path: " LOG.debug("Recording source-path: " + child.getPath() + " for copy.");
+ sourceStatus.getPath() + " for copy."); }
CopyListingFileStatus childCopyListingStatus = CopyListingFileStatus childCopyListingStatus =
DistCpUtils.toCopyListingFileStatus(sourceFS, child, DistCpUtils.toCopyListingFileStatus(sourceFS, child,
preserveAcls && child.isDirectory(), preserveAcls && child.isDirectory(),
@ -309,16 +304,16 @@ private void traverseNonEmptyDirectory(SequenceFile.Writer fileListWriter,
preserveRawXattrs && child.isDirectory()); preserveRawXattrs && child.isDirectory());
writeToFileListing(fileListWriter, childCopyListingStatus, writeToFileListing(fileListWriter, childCopyListingStatus,
sourcePathRoot, options); sourcePathRoot, options);
if (isDirectoryAndNotEmpty(sourceFS, child)) { if (child.isDirectory()) {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled()) {
LOG.debug("Traversing non-empty source dir: " LOG.debug("Traversing into source dir: " + child.getPath());
+ sourceStatus.getPath()); }
pathStack.push(child); pathStack.push(child);
} }
} }
} }
} }
private void writeToFileListingRoot(SequenceFile.Writer fileListWriter, private void writeToFileListingRoot(SequenceFile.Writer fileListWriter,
CopyListingFileStatus fileStatus, Path sourcePathRoot, CopyListingFileStatus fileStatus, Path sourcePathRoot,
DistCpOptions options) throws IOException { DistCpOptions options) throws IOException {