From 95f32015ad9273420299130a9f10acdbafe63556 Mon Sep 17 00:00:00 2001 From: Zhe Zhang Date: Mon, 11 Jan 2016 09:46:56 -0800 Subject: [PATCH] HDFS-9630. DistCp minor refactoring and clean up. Contributed by Kai Zheng. Change-Id: I363c4ffcac32116ddcdc0a22fac3db92f14a0db0 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 ++ .../apache/hadoop/tools/RegexCopyFilter.java | 10 +++++++-- .../hadoop/tools/SimpleCopyListing.java | 14 +++++++----- .../hadoop/tools/mapred/CopyCommitter.java | 10 +++++++-- .../hadoop/tools/mapred/CopyOutputFormat.java | 5 ++++- .../mapred/RetriableFileCopyCommand.java | 6 +++-- .../tools/mapred/UniformSizeInputFormat.java | 6 ++++- .../apache/hadoop/tools/util/DistCpUtils.java | 21 +++++++----------- .../hadoop/tools/util/ProducerConsumer.java | 10 +++------ .../tools/util/ThrottledInputStream.java | 7 ++---- .../tools/util/WorkRequestProcessor.java | 3 --- .../hadoop/tools/mapred/TestCopyMapper.java | 22 +++++++++++-------- 12 files changed, 65 insertions(+), 51 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 4963b19889..b8f8c63ed7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -211,6 +211,8 @@ Trunk (Unreleased) HDFS-9582. TestLeaseRecoveryStriped file missing Apache License header and not well formatted. (umamahesh) + HDFS-9630. DistCp minor refactoring and clean up. (Kai Zheng via zhz) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/RegexCopyFilter.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/RegexCopyFilter.java index 1c2b324dd3..f6c496e76c 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/RegexCopyFilter.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/RegexCopyFilter.java @@ -20,10 +20,16 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; -import java.io.*; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java index 77743ebe1c..d2598a42d0 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java @@ -39,7 +39,8 @@ import com.google.common.annotations.VisibleForTesting; -import java.io.*; +import java.io.FileNotFoundException; +import java.io.IOException; import java.util.ArrayList; import java.util.HashSet; @@ -165,9 +166,9 @@ protected void validatePaths(DistCpOptions options) } } - /** {@inheritDoc} */ @Override - public void doBuildListing(Path pathToListingFile, DistCpOptions options) throws IOException { + protected void doBuildListing(Path pathToListingFile, + DistCpOptions options) throws IOException { if(options.shouldUseDiff()) { doBuildListingWithSnapshotDiff(getWriter(pathToListingFile), options); }else { @@ -227,8 +228,9 @@ private void addToFileListing(SequenceFile.Writer fileListWriter, * @throws IOException */ @VisibleForTesting - public void doBuildListingWithSnapshotDiff(SequenceFile.Writer fileListWriter, - DistCpOptions options) throws IOException { + protected void doBuildListingWithSnapshotDiff( + SequenceFile.Writer fileListWriter, DistCpOptions options) + throws IOException { ArrayList diffList = distCpSync.prepareDiffList(); Path sourceRoot = options.getSourcePaths().get(0); FileSystem sourceFS = sourceRoot.getFileSystem(getConf()); @@ -287,7 +289,7 @@ public void doBuildListingWithSnapshotDiff(SequenceFile.Writer fileListWriter, * @throws IOException */ @VisibleForTesting - public void doBuildListing(SequenceFile.Writer fileListWriter, + protected void doBuildListing(SequenceFile.Writer fileListWriter, DistCpOptions options) throws IOException { if (options.getNumListstatusThreads() > 0) { numListstatusThreads = options.getNumListstatusThreads(); diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java index 2b1e5104c2..6d2fef5f90 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java @@ -27,10 +27,16 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; -import org.apache.hadoop.tools.*; +import org.apache.hadoop.tools.CopyListing; +import org.apache.hadoop.tools.CopyListingFileStatus; +import org.apache.hadoop.tools.DistCpConstants; +import org.apache.hadoop.tools.DistCpOptions; import org.apache.hadoop.tools.DistCpOptions.FileAttribute; +import org.apache.hadoop.tools.GlobbedCopyListing; import org.apache.hadoop.tools.util.DistCpUtils; import java.io.IOException; diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyOutputFormat.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyOutputFormat.java index a5bd605a8e..7b41caa029 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyOutputFormat.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyOutputFormat.java @@ -20,7 +20,10 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.mapreduce.security.TokenCache; import org.apache.hadoop.tools.DistCpConstants; diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java index 6b5078c29b..071e500508 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java @@ -201,11 +201,13 @@ private void compareCheckSums(FileSystem sourceFS, Path source, targetFS, target)) { StringBuilder errorMessage = new StringBuilder("Check-sum mismatch between ") .append(source).append(" and ").append(target).append("."); - if (sourceFS.getFileStatus(source).getBlockSize() != targetFS.getFileStatus(target).getBlockSize()) { + if (sourceFS.getFileStatus(source).getBlockSize() != + targetFS.getFileStatus(target).getBlockSize()) { errorMessage.append(" Source and target differ in block-size.") .append(" Use -pb to preserve block-sizes during copy.") .append(" Alternatively, skip checksum-checks altogether, using -skipCrc.") - .append(" (NOTE: By skipping checksums, one runs the risk of masking data-corruption during file-transfer.)"); + .append(" (NOTE: By skipping checksums, one runs the risk of " + + "masking data-corruption during file-transfer.)"); } throw new IOException(errorMessage.toString()); } diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/UniformSizeInputFormat.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/UniformSizeInputFormat.java index 8f31234d05..3e86d0931b 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/UniformSizeInputFormat.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/UniformSizeInputFormat.java @@ -23,12 +23,16 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.tools.CopyListingFileStatus; import org.apache.hadoop.tools.DistCpConstants; import org.apache.hadoop.tools.util.DistCpUtils; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.conf.Configuration; diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java index 70d947e5ba..d3d7677ecf 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java @@ -18,17 +18,7 @@ package org.apache.hadoop.tools.util; -import java.io.IOException; -import java.net.InetAddress; -import java.net.URI; -import java.net.UnknownHostException; -import java.text.DecimalFormat; -import java.util.EnumSet; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Map.Entry; - +import com.google.common.collect.Maps; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -49,10 +39,15 @@ import org.apache.hadoop.tools.DistCpOptions; import org.apache.hadoop.tools.DistCpOptions.FileAttribute; import org.apache.hadoop.tools.mapred.UniformSizeInputFormat; - -import com.google.common.collect.Maps; import org.apache.hadoop.util.StringUtils; +import java.io.IOException; +import java.text.DecimalFormat; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + /** * Utility functions used in DistCp. */ diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ProducerConsumer.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ProducerConsumer.java index bf72bb88aa..16bf2541c2 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ProducerConsumer.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ProducerConsumer.java @@ -20,15 +20,11 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.tools.util.WorkReport; -import org.apache.hadoop.tools.util.WorkRequest; -import org.apache.hadoop.tools.util.WorkRequestProcessor; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.ArrayList; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; /** * ProducerConsumer class encapsulates input and output queues and a @@ -51,8 +47,8 @@ public class ProducerConsumer { * @param numThreads Size of thread-pool to execute Workers. */ public ProducerConsumer(int numThreads) { - this.inputQueue = new LinkedBlockingQueue>(); - this.outputQueue = new LinkedBlockingQueue>(); + this.inputQueue = new LinkedBlockingQueue<>(); + this.outputQueue = new LinkedBlockingQueue<>(); executor = Executors.newFixedThreadPool(numThreads); workCnt = new AtomicInteger(0); } diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java index 2be8ef043c..70355274ad 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java @@ -18,13 +18,10 @@ package org.apache.hadoop.tools.util; -import java.io.IOException; -import java.io.InputStream; - -import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.PositionedReadable; -import com.google.common.base.Preconditions; +import java.io.IOException; +import java.io.InputStream; /** * The ThrottleInputStream provides bandwidth throttling on a specified diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/WorkRequestProcessor.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/WorkRequestProcessor.java index b6d8a097c0..91f738e4ef 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/WorkRequestProcessor.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/WorkRequestProcessor.java @@ -18,9 +18,6 @@ package org.apache.hadoop.tools.util; -import org.apache.hadoop.tools.util.WorkReport; -import org.apache.hadoop.tools.util.WorkRequest; - /** * Interface for ProducerConsumer worker loop. * diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java index ec60fa800f..4d0752fef1 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java @@ -378,8 +378,9 @@ public void testMakeDirFailure() { workPath); copyMapper.setup(context); - copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), pathList.get(0))), - new CopyListingFileStatus(fs.getFileStatus(pathList.get(0))), context); + copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), + pathList.get(0))), + new CopyListingFileStatus(fs.getFileStatus(pathList.get(0))), context); Assert.assertTrue("There should have been an exception.", false); } @@ -525,7 +526,8 @@ public Mapper.Context run() { mkdirs(TARGET_PATH); cluster.getFileSystem().setPermission(new Path(SOURCE_PATH + "/src/file"), new FsPermission(FsAction.READ, FsAction.READ, FsAction.READ)); - cluster.getFileSystem().setPermission(new Path(TARGET_PATH), new FsPermission((short)511)); + cluster.getFileSystem().setPermission(new Path(TARGET_PATH), + new FsPermission((short)511)); final FileSystem tmpFS = tmpUser.doAs(new PrivilegedAction() { @Override @@ -785,7 +787,8 @@ private void doTestIgnoreFailures(boolean ignoreFailures) { } if (ignoreFailures) { for (Text value : stubContext.getWriter().values()) { - Assert.assertTrue(value.toString() + " is not skipped", value.toString().startsWith("FAIL:")); + Assert.assertTrue(value.toString() + " is not skipped", + value.toString().startsWith("FAIL:")); } } Assert.assertTrue("There should have been an exception.", ignoreFailures); @@ -813,7 +816,6 @@ public void testPreserveBlockSizeAndReplication() { @Test(timeout=40000) public void testCopyFailOnBlockSizeDifference() { try { - deleteState(); createSourceDataWithDifferentBlockSize(); @@ -833,16 +835,18 @@ public void testCopyFailOnBlockSizeDifference() { for (Path path : pathList) { final FileStatus fileStatus = fs.getFileStatus(path); - copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)), - new CopyListingFileStatus(fileStatus), context); + copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), + path)), new CopyListingFileStatus(fileStatus), context); } Assert.fail("Copy should have failed because of block-size difference."); } catch (Exception exception) { // Check that the exception suggests the use of -pb/-skipCrc. - Assert.assertTrue("Failure exception should have suggested the use of -pb.", exception.getCause().getCause().getMessage().contains("pb")); - Assert.assertTrue("Failure exception should have suggested the use of -skipCrc.", exception.getCause().getCause().getMessage().contains("skipCrc")); + Assert.assertTrue("Failure exception should have suggested the use of -pb.", + exception.getCause().getCause().getMessage().contains("pb")); + Assert.assertTrue("Failure exception should have suggested the use of -skipCrc.", + exception.getCause().getCause().getMessage().contains("skipCrc")); } }