HDFS-9630. DistCp minor refactoring and clean up. Contributed by Kai Zheng.

Change-Id: I363c4ffcac32116ddcdc0a22fac3db92f14a0db0
This commit is contained in:
Zhe Zhang 2016-01-11 09:46:56 -08:00
parent 8c1adeaa26
commit 95f32015ad
12 changed files with 65 additions and 51 deletions

View File

@ -211,6 +211,8 @@ Trunk (Unreleased)
HDFS-9582. TestLeaseRecoveryStriped file missing Apache License header HDFS-9582. TestLeaseRecoveryStriped file missing Apache License header
and not well formatted. (umamahesh) and not well formatted. (umamahesh)
HDFS-9630. DistCp minor refactoring and clean up. (Kai Zheng via zhz)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -20,10 +20,16 @@
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import java.io.*; import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;

View File

@ -39,7 +39,8 @@
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import java.io.*; import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashSet; import java.util.HashSet;
@ -165,9 +166,9 @@ protected void validatePaths(DistCpOptions options)
} }
} }
/** {@inheritDoc} */
@Override @Override
public void doBuildListing(Path pathToListingFile, DistCpOptions options) throws IOException { protected void doBuildListing(Path pathToListingFile,
DistCpOptions options) throws IOException {
if(options.shouldUseDiff()) { if(options.shouldUseDiff()) {
doBuildListingWithSnapshotDiff(getWriter(pathToListingFile), options); doBuildListingWithSnapshotDiff(getWriter(pathToListingFile), options);
}else { }else {
@ -227,8 +228,9 @@ private void addToFileListing(SequenceFile.Writer fileListWriter,
* @throws IOException * @throws IOException
*/ */
@VisibleForTesting @VisibleForTesting
public void doBuildListingWithSnapshotDiff(SequenceFile.Writer fileListWriter, protected void doBuildListingWithSnapshotDiff(
DistCpOptions options) throws IOException { SequenceFile.Writer fileListWriter, DistCpOptions options)
throws IOException {
ArrayList<DiffInfo> diffList = distCpSync.prepareDiffList(); ArrayList<DiffInfo> diffList = distCpSync.prepareDiffList();
Path sourceRoot = options.getSourcePaths().get(0); Path sourceRoot = options.getSourcePaths().get(0);
FileSystem sourceFS = sourceRoot.getFileSystem(getConf()); FileSystem sourceFS = sourceRoot.getFileSystem(getConf());
@ -287,7 +289,7 @@ public void doBuildListingWithSnapshotDiff(SequenceFile.Writer fileListWriter,
* @throws IOException * @throws IOException
*/ */
@VisibleForTesting @VisibleForTesting
public void doBuildListing(SequenceFile.Writer fileListWriter, protected void doBuildListing(SequenceFile.Writer fileListWriter,
DistCpOptions options) throws IOException { DistCpOptions options) throws IOException {
if (options.getNumListstatusThreads() > 0) { if (options.getNumListstatusThreads() > 0) {
numListstatusThreads = options.getNumListstatusThreads(); numListstatusThreads = options.getNumListstatusThreads();

View File

@ -27,10 +27,16 @@
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.tools.*; import org.apache.hadoop.tools.CopyListing;
import org.apache.hadoop.tools.CopyListingFileStatus;
import org.apache.hadoop.tools.DistCpConstants;
import org.apache.hadoop.tools.DistCpOptions;
import org.apache.hadoop.tools.DistCpOptions.FileAttribute; import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
import org.apache.hadoop.tools.GlobbedCopyListing;
import org.apache.hadoop.tools.util.DistCpUtils; import org.apache.hadoop.tools.util.DistCpUtils;
import java.io.IOException; import java.io.IOException;

View File

@ -20,7 +20,10 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.security.TokenCache; import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.tools.DistCpConstants; import org.apache.hadoop.tools.DistCpConstants;

View File

@ -201,11 +201,13 @@ private void compareCheckSums(FileSystem sourceFS, Path source,
targetFS, target)) { targetFS, target)) {
StringBuilder errorMessage = new StringBuilder("Check-sum mismatch between ") StringBuilder errorMessage = new StringBuilder("Check-sum mismatch between ")
.append(source).append(" and ").append(target).append("."); .append(source).append(" and ").append(target).append(".");
if (sourceFS.getFileStatus(source).getBlockSize() != targetFS.getFileStatus(target).getBlockSize()) { if (sourceFS.getFileStatus(source).getBlockSize() !=
targetFS.getFileStatus(target).getBlockSize()) {
errorMessage.append(" Source and target differ in block-size.") errorMessage.append(" Source and target differ in block-size.")
.append(" Use -pb to preserve block-sizes during copy.") .append(" Use -pb to preserve block-sizes during copy.")
.append(" Alternatively, skip checksum-checks altogether, using -skipCrc.") .append(" Alternatively, skip checksum-checks altogether, using -skipCrc.")
.append(" (NOTE: By skipping checksums, one runs the risk of masking data-corruption during file-transfer.)"); .append(" (NOTE: By skipping checksums, one runs the risk of " +
"masking data-corruption during file-transfer.)");
} }
throw new IOException(errorMessage.toString()); throw new IOException(errorMessage.toString());
} }

View File

@ -23,12 +23,16 @@
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.tools.CopyListingFileStatus; import org.apache.hadoop.tools.CopyListingFileStatus;
import org.apache.hadoop.tools.DistCpConstants; import org.apache.hadoop.tools.DistCpConstants;
import org.apache.hadoop.tools.util.DistCpUtils; import org.apache.hadoop.tools.util.DistCpUtils;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader; import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;

View File

@ -18,17 +18,7 @@
package org.apache.hadoop.tools.util; package org.apache.hadoop.tools.util;
import java.io.IOException; import com.google.common.collect.Maps;
import java.net.InetAddress;
import java.net.URI;
import java.net.UnknownHostException;
import java.text.DecimalFormat;
import java.util.EnumSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -49,10 +39,15 @@
import org.apache.hadoop.tools.DistCpOptions; import org.apache.hadoop.tools.DistCpOptions;
import org.apache.hadoop.tools.DistCpOptions.FileAttribute; import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
import org.apache.hadoop.tools.mapred.UniformSizeInputFormat; import org.apache.hadoop.tools.mapred.UniformSizeInputFormat;
import com.google.common.collect.Maps;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import java.io.IOException;
import java.text.DecimalFormat;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
/** /**
* Utility functions used in DistCp. * Utility functions used in DistCp.
*/ */

View File

@ -20,15 +20,11 @@
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.tools.util.WorkReport;
import org.apache.hadoop.tools.util.WorkRequest;
import org.apache.hadoop.tools.util.WorkRequestProcessor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.ArrayList;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
/** /**
* ProducerConsumer class encapsulates input and output queues and a * ProducerConsumer class encapsulates input and output queues and a
@ -51,8 +47,8 @@ public class ProducerConsumer<T, R> {
* @param numThreads Size of thread-pool to execute Workers. * @param numThreads Size of thread-pool to execute Workers.
*/ */
public ProducerConsumer(int numThreads) { public ProducerConsumer(int numThreads) {
this.inputQueue = new LinkedBlockingQueue<WorkRequest<T>>(); this.inputQueue = new LinkedBlockingQueue<>();
this.outputQueue = new LinkedBlockingQueue<WorkReport<R>>(); this.outputQueue = new LinkedBlockingQueue<>();
executor = Executors.newFixedThreadPool(numThreads); executor = Executors.newFixedThreadPool(numThreads);
workCnt = new AtomicInteger(0); workCnt = new AtomicInteger(0);
} }

View File

@ -18,13 +18,10 @@
package org.apache.hadoop.tools.util; package org.apache.hadoop.tools.util;
import java.io.IOException;
import java.io.InputStream;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.PositionedReadable; import org.apache.hadoop.fs.PositionedReadable;
import com.google.common.base.Preconditions; import java.io.IOException;
import java.io.InputStream;
/** /**
* The ThrottleInputStream provides bandwidth throttling on a specified * The ThrottleInputStream provides bandwidth throttling on a specified

View File

@ -18,9 +18,6 @@
package org.apache.hadoop.tools.util; package org.apache.hadoop.tools.util;
import org.apache.hadoop.tools.util.WorkReport;
import org.apache.hadoop.tools.util.WorkRequest;
/** /**
* Interface for ProducerConsumer worker loop. * Interface for ProducerConsumer worker loop.
* *

View File

@ -378,7 +378,8 @@ public void testMakeDirFailure() {
workPath); workPath);
copyMapper.setup(context); copyMapper.setup(context);
copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), pathList.get(0))), copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH),
pathList.get(0))),
new CopyListingFileStatus(fs.getFileStatus(pathList.get(0))), context); new CopyListingFileStatus(fs.getFileStatus(pathList.get(0))), context);
Assert.assertTrue("There should have been an exception.", false); Assert.assertTrue("There should have been an exception.", false);
@ -525,7 +526,8 @@ public Mapper<Text, CopyListingFileStatus, Text, Text>.Context run() {
mkdirs(TARGET_PATH); mkdirs(TARGET_PATH);
cluster.getFileSystem().setPermission(new Path(SOURCE_PATH + "/src/file"), cluster.getFileSystem().setPermission(new Path(SOURCE_PATH + "/src/file"),
new FsPermission(FsAction.READ, FsAction.READ, FsAction.READ)); new FsPermission(FsAction.READ, FsAction.READ, FsAction.READ));
cluster.getFileSystem().setPermission(new Path(TARGET_PATH), new FsPermission((short)511)); cluster.getFileSystem().setPermission(new Path(TARGET_PATH),
new FsPermission((short)511));
final FileSystem tmpFS = tmpUser.doAs(new PrivilegedAction<FileSystem>() { final FileSystem tmpFS = tmpUser.doAs(new PrivilegedAction<FileSystem>() {
@Override @Override
@ -785,7 +787,8 @@ private void doTestIgnoreFailures(boolean ignoreFailures) {
} }
if (ignoreFailures) { if (ignoreFailures) {
for (Text value : stubContext.getWriter().values()) { for (Text value : stubContext.getWriter().values()) {
Assert.assertTrue(value.toString() + " is not skipped", value.toString().startsWith("FAIL:")); Assert.assertTrue(value.toString() + " is not skipped",
value.toString().startsWith("FAIL:"));
} }
} }
Assert.assertTrue("There should have been an exception.", ignoreFailures); Assert.assertTrue("There should have been an exception.", ignoreFailures);
@ -813,7 +816,6 @@ public void testPreserveBlockSizeAndReplication() {
@Test(timeout=40000) @Test(timeout=40000)
public void testCopyFailOnBlockSizeDifference() { public void testCopyFailOnBlockSizeDifference() {
try { try {
deleteState(); deleteState();
createSourceDataWithDifferentBlockSize(); createSourceDataWithDifferentBlockSize();
@ -833,16 +835,18 @@ public void testCopyFailOnBlockSizeDifference() {
for (Path path : pathList) { for (Path path : pathList) {
final FileStatus fileStatus = fs.getFileStatus(path); final FileStatus fileStatus = fs.getFileStatus(path);
copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)), copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH),
new CopyListingFileStatus(fileStatus), context); path)), new CopyListingFileStatus(fileStatus), context);
} }
Assert.fail("Copy should have failed because of block-size difference."); Assert.fail("Copy should have failed because of block-size difference.");
} }
catch (Exception exception) { catch (Exception exception) {
// Check that the exception suggests the use of -pb/-skipCrc. // Check that the exception suggests the use of -pb/-skipCrc.
Assert.assertTrue("Failure exception should have suggested the use of -pb.", exception.getCause().getCause().getMessage().contains("pb")); Assert.assertTrue("Failure exception should have suggested the use of -pb.",
Assert.assertTrue("Failure exception should have suggested the use of -skipCrc.", exception.getCause().getCause().getMessage().contains("skipCrc")); exception.getCause().getCause().getMessage().contains("pb"));
Assert.assertTrue("Failure exception should have suggested the use of -skipCrc.",
exception.getCause().getCause().getMessage().contains("skipCrc"));
} }
} }