From cbb3ba135cfa3a9bc9f444e8b4a7875758721a3c Mon Sep 17 00:00:00 2001 From: smarthan <1139557635@qq.com> Date: Mon, 22 Nov 2021 19:37:05 +0800 Subject: [PATCH] HADOOP-17998. Allow get command to run with multi threads. (#3645) (cherry picked from commit 63018dc73f4d29632e93be08d035ab9a7e73531c) Conflicts: hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java --- .../fs/shell/CommandWithDestination.java | 8 +- .../fs/shell/CopyCommandWithMultiThread.java | 155 +++++++++++++ .../apache/hadoop/fs/shell/CopyCommands.java | 147 +++--------- .../src/site/markdown/FileSystemShell.md | 36 +-- .../hadoop/fs/shell/TestCopyFromLocal.java | 101 ++++----- .../hadoop/fs/shell/TestCopyPreserveFlag.java | 43 +++- .../hadoop/fs/shell/TestCopyToLocal.java | 210 ++++++++++++++++++ .../src/test/resources/testConf.xml | 90 ++++---- 8 files changed, 548 insertions(+), 242 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommandWithMultiThread.java create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCopyToLocal.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CommandWithDestination.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CommandWithDestination.java index 90a709dffc..f6f4247489 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CommandWithDestination.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CommandWithDestination.java @@ -397,11 +397,11 @@ private boolean checkPathsForReservedRaw(Path src, Path target) /** * If direct write is disabled ,copies the stream contents to a temporary - * file "._COPYING_". If the copy is - * successful, the temporary file will be renamed to the real path, - * else the temporary file will be deleted. + * file "target._COPYING_". If the copy is successful, the temporary file + * will be renamed to the real path, else the temporary file will be deleted. * if direct write is enabled , then creation temporary file is skipped. - * @param in the input stream for the copy + * + * @param in the input stream for the copy * @param target where to store the contents of the stream * @throws IOException if copy fails */ diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommandWithMultiThread.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommandWithMultiThread.java new file mode 100644 index 0000000000..aed4030540 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommandWithMultiThread.java @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.shell; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.classification.VisibleForTesting; + +/** + * Abstract command to enable sub copy commands run with multi-thread. + */ +public abstract class CopyCommandWithMultiThread + extends CommandWithDestination { + + private int threadCount = 1; + private ThreadPoolExecutor executor = null; + private int threadPoolQueueSize = DEFAULT_QUEUE_SIZE; + + public static final int DEFAULT_QUEUE_SIZE = 1024; + + /** + * set thread count by option value, if the value less than 1, + * use 1 instead. + * + * @param optValue option value + */ + protected void setThreadCount(String optValue) { + if (optValue != null) { + threadCount = Math.max(Integer.parseInt(optValue), 1); + } + } + + /** + * set thread pool queue size by option value, if the value less than 1, + * use DEFAULT_QUEUE_SIZE instead. + * + * @param optValue option value + */ + protected void setThreadPoolQueueSize(String optValue) { + if (optValue != null) { + int size = Integer.parseInt(optValue); + threadPoolQueueSize = size < 1 ? DEFAULT_QUEUE_SIZE : size; + } + } + + @VisibleForTesting + protected int getThreadCount() { + return this.threadCount; + } + + @VisibleForTesting + protected int getThreadPoolQueueSize() { + return this.threadPoolQueueSize; + } + + @VisibleForTesting + protected ThreadPoolExecutor getExecutor() { + return this.executor; + } + + @Override + protected void processArguments(LinkedList args) + throws IOException { + + if (isMultiThreadNecessary(args)) { + initThreadPoolExecutor(); + } + + super.processArguments(args); + + if (executor != null) { + waitForCompletion(); + } + } + + // if thread count is 1 or the source is only one single file, + // don't init executor to avoid threading overhead. + @VisibleForTesting + protected boolean isMultiThreadNecessary(LinkedList args) + throws IOException { + return this.threadCount > 1 && hasMoreThanOneSourcePaths(args); + } + + // check if source is only one single file. + private boolean hasMoreThanOneSourcePaths(LinkedList args) + throws IOException { + if (args.size() > 1) { + return true; + } + if (args.size() == 1) { + PathData src = args.get(0); + if (src.stat == null) { + src.refreshStatus(); + } + return isPathRecursable(src); + } + return false; + } + + private void initThreadPoolExecutor() { + executor = + new ThreadPoolExecutor(threadCount, threadCount, 1, TimeUnit.SECONDS, + new ArrayBlockingQueue<>(threadPoolQueueSize), + new ThreadPoolExecutor.CallerRunsPolicy()); + } + + private void waitForCompletion() { + if (executor != null) { + executor.shutdown(); + try { + executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MINUTES); + } catch (InterruptedException e) { + executor.shutdownNow(); + displayError(e); + Thread.currentThread().interrupt(); + } + } + } + + @Override + protected void copyFileToTarget(PathData src, PathData target) + throws IOException { + if (executor == null) { + super.copyFileToTarget(src, target); + } else { + executor.submit(() -> { + try { + super.copyFileToTarget(src, target); + } catch (IOException e) { + displayError(e); + } + }); + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java index 06809ec68d..5a0652b247 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java @@ -26,11 +26,7 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.TimeUnit; -import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.FSDataInputStream; @@ -38,8 +34,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathIsDirectoryException; import org.apache.hadoop.io.IOUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** Various commands for copy files */ @InterfaceAudience.Private @@ -210,28 +204,37 @@ private void popPreserveOption(List args) { /** * Copy local files to a remote filesystem */ - public static class Get extends CommandWithDestination { + public static class Get extends CopyCommandWithMultiThread { public static final String NAME = "get"; public static final String USAGE = - "[-f] [-p] [-ignoreCrc] [-crc] ... "; + "[-f] [-p] [-crc] [-ignoreCrc] [-t ]" + + " [-q ] ... "; public static final String DESCRIPTION = - "Copy files that match the file pattern " + - "to the local name. is kept. When copying multiple " + - "files, the destination must be a directory. Passing " + - "-f overwrites the destination if it already exists and " + - "-p preserves access and modification times, " + - "ownership and the mode.\n"; + "Copy files that match the file pattern to the local name. " + + " is kept.\nWhen copying multiple files, the destination" + + " must be a directory.\nFlags:\n" + + " -p : Preserves timestamps, ownership and the mode.\n" + + " -f : Overwrites the destination if it already exists.\n" + + " -crc : write CRC checksums for the files downloaded.\n" + + " -ignoreCrc : Skip CRC checks on the file(s) downloaded.\n" + + " -t : Number of threads to be used," + + " default is 1.\n" + + " -q : Thread pool queue size to be" + + " used, default is 1024.\n"; @Override - protected void processOptions(LinkedList args) - throws IOException { - CommandFormat cf = new CommandFormat( - 1, Integer.MAX_VALUE, "crc", "ignoreCrc", "p", "f"); + protected void processOptions(LinkedList args) throws IOException { + CommandFormat cf = + new CommandFormat(1, Integer.MAX_VALUE, "crc", "ignoreCrc", "p", "f"); + cf.addOptionWithValue("t"); + cf.addOptionWithValue("q"); cf.parse(args); setWriteChecksum(cf.getOpt("crc")); setVerifyChecksum(!cf.getOpt("ignoreCrc")); setPreserve(cf.getOpt("p")); setOverwrite(cf.getOpt("f")); + setThreadCount(cf.getOptValue("t")); + setThreadPoolQueueSize(cf.getOptValue("q")); setRecursive(true); getLocalDestination(args); } @@ -240,21 +243,12 @@ protected void processOptions(LinkedList args) /** * Copy local files to a remote filesystem */ - public static class Put extends CommandWithDestination { - - public static final Logger LOG = LoggerFactory.getLogger(Put.class); - - private ThreadPoolExecutor executor = null; - private int threadPoolQueueSize = 1024; - private int numThreads = 1; - - private static final int MAX_THREADS = - Runtime.getRuntime().availableProcessors() * 2; + public static class Put extends CopyCommandWithMultiThread { public static final String NAME = "put"; public static final String USAGE = - "[-f] [-p] [-l] [-d] [-t ] [-q ] " + - " ... "; + "[-f] [-p] [-l] [-d] [-t ] [-q ]" + + " ... "; public static final String DESCRIPTION = "Copy files from the local file system " + "into fs. Copying fails if the file already " + @@ -263,11 +257,11 @@ public static class Put extends CommandWithDestination { " -p : Preserves timestamps, ownership and the mode.\n" + " -f : Overwrites the destination if it already exists.\n" + " -t : Number of threads to be used, default is 1.\n" + - " -q : ThreadPool queue size to be used, " + + " -q : Thread pool queue size to be used, " + "default is 1024.\n" + - " -l : Allow DataNode to lazily persist the file to disk. Forces" + - " replication factor of 1. This flag will result in reduced" + - " durability. Use with care.\n" + + " -l : Allow DataNode to lazily persist the file to disk. Forces " + + "replication factor of 1. This flag will result in reduced " + + "durability. Use with care.\n" + " -d : Skip creation of temporary file(._COPYING_).\n"; @Override @@ -277,7 +271,7 @@ protected void processOptions(LinkedList args) throws IOException { cf.addOptionWithValue("t"); cf.addOptionWithValue("q"); cf.parse(args); - setNumberThreads(cf.getOptValue("t")); + setThreadCount(cf.getOptValue("t")); setThreadPoolQueueSize(cf.getOptValue("q")); setOverwrite(cf.getOpt("f")); setPreserve(cf.getOpt("p")); @@ -308,92 +302,9 @@ protected void processArguments(LinkedList args) copyStreamToTarget(System.in, getTargetPath(args.get(0))); return; } - - executor = new ThreadPoolExecutor(numThreads, numThreads, 1, - TimeUnit.SECONDS, new ArrayBlockingQueue<>(threadPoolQueueSize), - new ThreadPoolExecutor.CallerRunsPolicy()); super.processArguments(args); - - // issue the command and then wait for it to finish - executor.shutdown(); - try { - executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MINUTES); - } catch (InterruptedException e) { - executor.shutdownNow(); - displayError(e); - Thread.currentThread().interrupt(); - } } - private void setNumberThreads(String numberThreadsString) { - if (numberThreadsString == null) { - numThreads = 1; - } else { - int parsedValue = Integer.parseInt(numberThreadsString); - if (parsedValue <= 1) { - numThreads = 1; - } else if (parsedValue > MAX_THREADS) { - numThreads = MAX_THREADS; - } else { - numThreads = parsedValue; - } - } - } - - private void setThreadPoolQueueSize(String numThreadPoolQueueSize) { - if (numThreadPoolQueueSize != null) { - int parsedValue = Integer.parseInt(numThreadPoolQueueSize); - if (parsedValue < 1) { - LOG.warn("The value of the thread pool queue size cannot be " + - "less than 1, and the default value is used here. " + - "The default size is 1024."); - threadPoolQueueSize = 1024; - } else { - threadPoolQueueSize = parsedValue; - } - } - } - - @VisibleForTesting - protected int getThreadPoolQueueSize() { - return threadPoolQueueSize; - } - - private void copyFile(PathData src, PathData target) throws IOException { - if (isPathRecursable(src)) { - throw new PathIsDirectoryException(src.toString()); - } - super.copyFileToTarget(src, target); - } - - @Override - protected void copyFileToTarget(PathData src, PathData target) - throws IOException { - // if number of thread is 1, mimic put and avoid threading overhead - if (numThreads == 1) { - copyFile(src, target); - return; - } - - Runnable task = () -> { - try { - copyFile(src, target); - } catch (IOException e) { - displayError(e); - } - }; - executor.submit(task); - } - - @VisibleForTesting - public int getNumThreads() { - return numThreads; - } - - @VisibleForTesting - public ThreadPoolExecutor getExecutor() { - return executor; - } } public static class CopyFromLocal extends Put { diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/FileSystemShell.md b/hadoop-common-project/hadoop-common/src/site/markdown/FileSystemShell.md index feed0a4905..46ae4058d2 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/FileSystemShell.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/FileSystemShell.md @@ -323,27 +323,33 @@ Returns 0 on success and -1 on error. get --- -Usage: `hadoop fs -get [-ignorecrc] [-crc] [-p] [-f] ` +Usage: `hadoop fs -get [-ignorecrc] [-crc] [-p] [-f] [-t ] [-q ] ... ` Copy files to the local file system. Files that fail the CRC check may be copied with the -ignorecrc option. Files and CRCs may be copied using the -crc option. +Options: + +* `-p` : Preserves access and modification times, ownership and the permissions. + (assuming the permissions can be propagated across filesystems) +* `-f` : Overwrites the destination if it already exists. +* `-ignorecrc` : Skip CRC checks on the file(s) downloaded. +* `-crc`: write CRC checksums for the files downloaded. +* `-t ` : Number of threads to be used, default is 1. + Useful when downloading directories containing more than 1 file. +* `-q ` : Thread pool queue size to be used, default is 1024. + It takes effect only when thread count greater than 1. + Example: * `hadoop fs -get /user/hadoop/file localfile` * `hadoop fs -get hdfs://nn.example.com/user/hadoop/file localfile` +* `hadoop fs -get -t 10 hdfs://nn.example.com/user/hadoop/dir1 localdir` +* `hadoop fs -get -t 10 -q 2048 hdfs://nn.example.com/user/hadoop/dir* localdir` Exit Code: Returns 0 on success and -1 on error. -Options: - -* `-p` : Preserves access and modification times, ownership and the permissions. -(assuming the permissions can be propagated across filesystems) -* `-f` : Overwrites the destination if it already exists. -* `-ignorecrc` : Skip CRC checks on the file(s) downloaded. -* `-crc`: write CRC checksums for the files downloaded. - getfacl ------- @@ -525,7 +531,7 @@ Returns 0 on success and -1 on error. put --- -Usage: `hadoop fs -put [-f] [-p] [-l] [-d] [-t ] [-q ] [ - | .. ]. ` +Usage: `hadoop fs -put [-f] [-p] [-l] [-d] [-t ] [-q ] [ - | ...] ` Copy single src, or multiple srcs from local file system to the destination file system. Also reads input from stdin and writes to destination file system if the source is set to "-" @@ -537,12 +543,13 @@ Options: * `-p` : Preserves access and modification times, ownership and the permissions. (assuming the permissions can be propagated across filesystems) * `-f` : Overwrites the destination if it already exists. -* `-t ` : Number of threads to be used, default is 1. Useful - when uploading a directory containing more than 1 file. * `-l` : Allow DataNode to lazily persist the file to disk, Forces a replication factor of 1. This flag will result in reduced durability. Use with care. * `-d` : Skip creation of temporary file with the suffix `._COPYING_`. -* `-q ` : ThreadPool queue size to be used, default is 1024. +* `-t ` : Number of threads to be used, default is 1. + Useful when uploading directories containing more than 1 file. +* `-q ` : Thread pool queue size to be used, default is 1024. + It takes effect only when thread count greater than 1. Examples: @@ -551,7 +558,8 @@ Examples: * `hadoop fs -put -f localfile1 localfile2 /user/hadoop/hadoopdir` * `hadoop fs -put -d localfile hdfs://nn.example.com/hadoop/hadoopfile` * `hadoop fs -put - hdfs://nn.example.com/hadoop/hadoopfile` Reads the input from stdin. -* `hadoop fs -put -q 500 localfile3 hdfs://nn.example.com/hadoop/hadoopfile3` +* `hadoop fs -put -t 5 localdir hdfs://nn.example.com/hadoop/hadoopdir` +* `hadoop fs -put -t 10 -q 2048 localdir1 localdir2 hdfs://nn.example.com/hadoop/hadoopdir` Exit Code: diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCopyFromLocal.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCopyFromLocal.java index e7f36fc850..757c588104 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCopyFromLocal.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCopyFromLocal.java @@ -17,24 +17,26 @@ */ package org.apache.hadoop.fs.shell; -import org.apache.commons.lang3.RandomStringUtils; -import org.apache.commons.lang3.RandomUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocalFileSystem; -import org.apache.hadoop.fs.FileSystemTestHelper; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.shell.CopyCommands.CopyFromLocal; -import org.junit.BeforeClass; -import org.junit.AfterClass; -import org.junit.Test; -import org.junit.Assert; - import java.io.IOException; import java.util.LinkedList; import java.util.concurrent.ThreadPoolExecutor; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.commons.lang3.RandomUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileSystemTestHelper; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.shell.CopyCommands.CopyFromLocal; + import static org.junit.Assert.assertEquals; /** @@ -48,6 +50,9 @@ public class TestCopyFromLocal { private static Path testDir; private static Configuration conf; + private Path dir = null; + private int numFiles = 0; + public static int initialize(Path dir) throws Exception { fs.mkdirs(dir); Path fromDirPath = new Path(dir, FROM_DIR_NAME); @@ -66,7 +71,7 @@ public static int initialize(Path dir) throws Exception { Path subFile = new Path(subDirPath, "file" + fileCount); fs.createNewFile(subFile); FSDataOutputStream output = fs.create(subFile, true); - for(int i = 0; i < 100; ++i) { + for (int i = 0; i < 100; ++i) { output.writeInt(i); output.writeChar('\n'); } @@ -96,48 +101,36 @@ public static void cleanup() throws Exception { fs.close(); } + @Before + public void initDirectory() throws Exception { + dir = new Path("dir" + RandomStringUtils.randomNumeric(4)); + numFiles = initialize(dir); + } + + private void run(CommandWithDestination cmd, String... args) { cmd.setConf(conf); assertEquals(0, cmd.run(args)); } @Test(timeout = 10000) - public void testCopyFromLocal() throws Exception { - Path dir = new Path("dir" + RandomStringUtils.randomNumeric(4)); - TestCopyFromLocal.initialize(dir); + public void testCopyFromLocal() { run(new TestMultiThreadedCopy(1, 0), new Path(dir, FROM_DIR_NAME).toString(), new Path(dir, TO_DIR_NAME).toString()); } @Test(timeout = 10000) - public void testCopyFromLocalWithThreads() throws Exception { - Path dir = new Path("dir" + RandomStringUtils.randomNumeric(4)); - int numFiles = TestCopyFromLocal.initialize(dir); - int maxThreads = Runtime.getRuntime().availableProcessors() * 2; - int randThreads = RandomUtils.nextInt(0, maxThreads - 1) + 1; - String numThreads = Integer.toString(randThreads); - run(new TestMultiThreadedCopy(randThreads, - randThreads == 1 ? 0 : numFiles), "-t", numThreads, + public void testCopyFromLocalWithThreads(){ + int threads = Runtime.getRuntime().availableProcessors() * 2 + 1; + run(new TestMultiThreadedCopy(threads, numFiles), + "-t", Integer.toString(threads), new Path(dir, FROM_DIR_NAME).toString(), new Path(dir, TO_DIR_NAME).toString()); } @Test(timeout = 10000) - public void testCopyFromLocalWithThreadWrong() throws Exception { - Path dir = new Path("dir" + RandomStringUtils.randomNumeric(4)); - int numFiles = TestCopyFromLocal.initialize(dir); - int maxThreads = Runtime.getRuntime().availableProcessors() * 2; - String numThreads = Integer.toString(maxThreads * 2); - run(new TestMultiThreadedCopy(maxThreads, numFiles), "-t", numThreads, - new Path(dir, FROM_DIR_NAME).toString(), - new Path(dir, TO_DIR_NAME).toString()); - } - - @Test(timeout = 10000) - public void testCopyFromLocalWithZeroThreads() throws Exception { - Path dir = new Path("dir" + RandomStringUtils.randomNumeric(4)); - TestCopyFromLocal.initialize(dir); + public void testCopyFromLocalWithThreadWrong(){ run(new TestMultiThreadedCopy(1, 0), "-t", "0", new Path(dir, FROM_DIR_NAME).toString(), new Path(dir, TO_DIR_NAME).toString()); @@ -148,8 +141,7 @@ private class TestMultiThreadedCopy extends CopyFromLocal { private int expectedThreads; private int expectedCompletedTaskCount; - TestMultiThreadedCopy(int expectedThreads, - int expectedCompletedTaskCount) { + TestMultiThreadedCopy(int expectedThreads, int expectedCompletedTaskCount) { this.expectedThreads = expectedThreads; this.expectedCompletedTaskCount = expectedCompletedTaskCount; } @@ -158,17 +150,22 @@ private class TestMultiThreadedCopy extends CopyFromLocal { protected void processArguments(LinkedList args) throws IOException { // Check if the correct number of threads are spawned - Assert.assertEquals(expectedThreads, getNumThreads()); + Assert.assertEquals(expectedThreads, getThreadCount()); super.processArguments(args); - // Once the copy is complete, check following - // 1) number of completed tasks are same as expected - // 2) There are no active tasks in the executor - // 3) Executor has shutdown correctly - ThreadPoolExecutor executor = getExecutor(); - Assert.assertEquals(expectedCompletedTaskCount, - executor.getCompletedTaskCount()); - Assert.assertEquals(0, executor.getActiveCount()); - Assert.assertTrue(executor.isTerminated()); + + if (isMultiThreadNecessary(args)) { + // Once the copy is complete, check following + // 1) number of completed tasks are same as expected + // 2) There are no active tasks in the executor + // 3) Executor has shutdown correctly + ThreadPoolExecutor executor = getExecutor(); + Assert.assertEquals(expectedCompletedTaskCount, + executor.getCompletedTaskCount()); + Assert.assertEquals(0, executor.getActiveCount()); + Assert.assertTrue(executor.isTerminated()); + } else { + assert getExecutor() == null; + } } } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCopyPreserveFlag.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCopyPreserveFlag.java index 0f0ddcc4ee..b68be243c9 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCopyPreserveFlag.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCopyPreserveFlag.java @@ -17,11 +17,12 @@ */ package org.apache.hadoop.fs.shell; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; - import java.io.IOException; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; @@ -31,13 +32,13 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.fs.shell.CopyCommands.CopyFromLocal; import org.apache.hadoop.fs.shell.CopyCommands.Cp; import org.apache.hadoop.fs.shell.CopyCommands.Get; import org.apache.hadoop.fs.shell.CopyCommands.Put; -import org.apache.hadoop.fs.shell.CopyCommands.CopyFromLocal; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; public class TestCopyPreserveFlag { private static final int MODIFICATION_TIME = 12345000; @@ -176,6 +177,34 @@ public void testGetWithoutP() throws Exception { assertAttributesChanged(TO); } + @Test(timeout = 10000) + public void testGetWithPQ() throws Exception { + Get get = new Get(); + run(get, "-p", "-q", "100", FROM.toString(), TO.toString()); + assertEquals(get.getThreadPoolQueueSize(), 100); + assertAttributesPreserved(TO); + } + + @Test(timeout = 10000) + public void testGetWithQ() throws Exception { + Get get = new Get(); + run(get, "-q", "100", FROM.toString(), TO.toString()); + assertEquals(get.getThreadPoolQueueSize(), 100); + assertAttributesChanged(TO); + } + + @Test(timeout = 10000) + public void testGetWithThreads() throws Exception { + run(new Get(), "-t", "10", FROM.toString(), TO.toString()); + assertAttributesChanged(TO); + } + + @Test(timeout = 10000) + public void testGetWithThreadsPreserve() throws Exception { + run(new Get(), "-p", "-t", "10", FROM.toString(), TO.toString()); + assertAttributesPreserved(TO); + } + @Test(timeout = 10000) public void testCpWithP() throws Exception { run(new Cp(), "-p", FROM.toString(), TO.toString()); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCopyToLocal.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCopyToLocal.java new file mode 100644 index 0000000000..202b81912c --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCopyToLocal.java @@ -0,0 +1,210 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.shell; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.concurrent.ThreadPoolExecutor; + +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.commons.lang3.RandomUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileSystemTestHelper; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.shell.CopyCommands.CopyToLocal; + +import static org.apache.hadoop.fs.shell.CopyCommandWithMultiThread.DEFAULT_QUEUE_SIZE; +import static org.junit.Assert.assertEquals; + +public class TestCopyToLocal { + + private static final String FROM_DIR_NAME = "fromDir"; + private static final String TO_DIR_NAME = "toDir"; + + private static FileSystem fs; + private static Path testDir; + private static Configuration conf; + + private Path dir = null; + private int numFiles = 0; + + private static int initialize(Path dir) throws Exception { + fs.mkdirs(dir); + Path fromDirPath = new Path(dir, FROM_DIR_NAME); + fs.mkdirs(fromDirPath); + Path toDirPath = new Path(dir, TO_DIR_NAME); + fs.mkdirs(toDirPath); + + int numTotalFiles = 0; + int numDirs = RandomUtils.nextInt(0, 5); + for (int dirCount = 0; dirCount < numDirs; ++dirCount) { + Path subDirPath = new Path(fromDirPath, "subdir" + dirCount); + fs.mkdirs(subDirPath); + int numFiles = RandomUtils.nextInt(0, 10); + for (int fileCount = 0; fileCount < numFiles; ++fileCount) { + numTotalFiles++; + Path subFile = new Path(subDirPath, "file" + fileCount); + fs.createNewFile(subFile); + FSDataOutputStream output = fs.create(subFile, true); + for (int i = 0; i < 100; ++i) { + output.writeInt(i); + output.writeChar('\n'); + } + output.close(); + } + } + + return numTotalFiles; + } + + @BeforeClass + public static void init() throws Exception { + conf = new Configuration(false); + conf.set("fs.file.impl", LocalFileSystem.class.getName()); + fs = FileSystem.getLocal(conf); + testDir = new FileSystemTestHelper().getTestRootPath(fs); + // don't want scheme on the path, just an absolute path + testDir = new Path(fs.makeQualified(testDir).toUri().getPath()); + + FileSystem.setDefaultUri(conf, fs.getUri()); + fs.setWorkingDirectory(testDir); + } + + @AfterClass + public static void cleanup() throws Exception { + fs.delete(testDir, true); + fs.close(); + } + + private void run(CopyCommandWithMultiThread cmd, String... args) { + cmd.setConf(conf); + assertEquals(0, cmd.run(args)); + } + + @Before + public void initDirectory() throws Exception { + dir = new Path("dir" + RandomStringUtils.randomNumeric(4)); + numFiles = initialize(dir); + } + + @Test(timeout = 10000) + public void testCopy() throws Exception { + MultiThreadedCopy copy = new MultiThreadedCopy(1, DEFAULT_QUEUE_SIZE, 0); + run(copy, new Path(dir, FROM_DIR_NAME).toString(), + new Path(dir, TO_DIR_NAME).toString()); + assert copy.getExecutor() == null; + } + + @Test(timeout = 10000) + public void testCopyWithThreads() { + run(new MultiThreadedCopy(5, DEFAULT_QUEUE_SIZE, numFiles), "-t", "5", + new Path(dir, FROM_DIR_NAME).toString(), + new Path(dir, TO_DIR_NAME).toString()); + } + + @Test(timeout = 10000) + public void testCopyWithThreadWrong() { + run(new MultiThreadedCopy(1, DEFAULT_QUEUE_SIZE, 0), "-t", "0", + new Path(dir, FROM_DIR_NAME).toString(), + new Path(dir, TO_DIR_NAME).toString()); + } + + @Test(timeout = 10000) + public void testCopyWithThreadsAndQueueSize() { + int queueSize = 256; + run(new MultiThreadedCopy(5, queueSize, numFiles), "-t", "5", "-q", + Integer.toString(queueSize), + new Path(dir, FROM_DIR_NAME).toString(), + new Path(dir, TO_DIR_NAME).toString()); + } + + @Test(timeout = 10000) + public void testCopyWithThreadsAndQueueSizeWrong() { + int queueSize = 0; + run(new MultiThreadedCopy(5, DEFAULT_QUEUE_SIZE, numFiles), "-t", "5", "-q", + Integer.toString(queueSize), + new Path(dir, FROM_DIR_NAME).toString(), + new Path(dir, TO_DIR_NAME).toString()); + } + + @Test(timeout = 10000) + public void testCopySingleFile() throws Exception { + Path fromDirPath = new Path(dir, FROM_DIR_NAME); + Path subFile = new Path(fromDirPath, "file0"); + fs.createNewFile(subFile); + FSDataOutputStream output = fs.create(subFile, true); + for (int i = 0; i < 100; ++i) { + output.writeInt(i); + output.writeChar('\n'); + } + output.close(); + + MultiThreadedCopy copy = new MultiThreadedCopy(5, DEFAULT_QUEUE_SIZE, 0); + run(copy, "-t", "5", subFile.toString(), + new Path(dir, TO_DIR_NAME).toString()); + assert copy.getExecutor() == null; + } + + private static class MultiThreadedCopy extends CopyToLocal { + public static final String NAME = "multiThreadCopy"; + private final int expectedThreads; + private final int expectedQueuePoolSize; + private final int expectedCompletedTaskCount; + + MultiThreadedCopy(int expectedThreads, int expectedQueuePoolSize, + int expectedCompletedTaskCount) { + this.expectedThreads = expectedThreads; + this.expectedQueuePoolSize = expectedQueuePoolSize; + this.expectedCompletedTaskCount = expectedCompletedTaskCount; + } + + @Override + protected void processArguments(LinkedList args) + throws IOException { + // Check if the number of threads are same as expected + Assert.assertEquals(expectedThreads, getThreadCount()); + // Check if the queue pool size of executor is same as expected + Assert.assertEquals(expectedQueuePoolSize, getThreadPoolQueueSize()); + + super.processArguments(args); + + if (isMultiThreadNecessary(args)) { + // Once the copy is complete, check following + // 1) number of completed tasks are same as expected + // 2) There are no active tasks in the executor + // 3) Executor has shutdown correctly + ThreadPoolExecutor executor = getExecutor(); + Assert.assertEquals(expectedCompletedTaskCount, + executor.getCompletedTaskCount()); + Assert.assertEquals(0, executor.getActiveCount()); + Assert.assertTrue(executor.isTerminated()); + } else { + assert getExecutor() == null; + } + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/resources/testConf.xml b/hadoop-common-project/hadoop-common/src/test/resources/testConf.xml index fd6ee110c7..6dd1a8b81b 100644 --- a/hadoop-common-project/hadoop-common/src/test/resources/testConf.xml +++ b/hadoop-common-project/hadoop-common/src/test/resources/testConf.xml @@ -162,38 +162,6 @@ - - help: help for get - - -help get - - - - - - - RegexpComparator - ^-get( )*\[-f\]( )*\[-p\]( )*\[-ignoreCrc\]( )*\[-crc\]( )*<src> \.\.\. <localdst> :\s* - - - RegexpComparator - \s*Copy files that match the file pattern <src> to the local name. <src> is kept.\s* - - - RegexpComparator - ^( |\t)*When copying multiple files, the destination must be a directory. Passing -f( )* - - - RegexpComparator - ^( |\t)*overwrites the destination if it already exists and -p preserves access and( )* - - - RegexpComparator - ^( |\t)*modification times, ownership and the mode.* - - - - help: help for du @@ -498,7 +466,7 @@ RegexpComparator RegexpComparator - ^-put \[-f\] \[-p\] \[-l\] \[-d\] \[-t <thread count>\] \[-q <threadPool queue size>\] <localsrc> \.\.\. <dst> :\s* + ^-put \[-f\] \[-p\] \[-l\] \[-d\] \[-t <thread count>\] \[-q <thread pool queue size>\] <localsrc> \.\.\. <dst> :\s* @@ -515,35 +483,39 @@ RegexpComparator - ^\s*-p Preserves timestamps, ownership and the mode.( )* + ^\s*-p\s+Preserves timestamps, ownership and the mode.( )* RegexpComparator - ^\s*-f Overwrites the destination if it already exists.( )* + ^\s*-f\s+Overwrites the destination if it already exists.( )* RegexpComparator - ^\s*-t <thread count> Number of threads to be used, default is 1.( )* + ^\s*-t <thread count>\s+Number of threads to be used, default is 1.( )* RegexpComparator - ^\s*-q <threadPool size> ThreadPool queue size to be used, default is 1024.( )* + ^\s*-q <thread pool queue size>\s+Thread pool queue size to be used, default is( )* RegexpComparator - ^\s*-l Allow DataNode to lazily persist the file to disk. Forces( )* + ^( |\t)*1024.\s* RegexpComparator - ^\s*replication factor of 1. This flag will result in reduced( )* + ^\s*-l\s+Allow DataNode to lazily persist the file to disk.( )* RegexpComparator - ^\s*durability. Use with care.( )* + ^\s*Forces replication factor of 1. This flag will( )* RegexpComparator - ^\s*-d Skip creation of temporary file\(<dst>\._COPYING_\).( )* + ^\s*result in reduced durability. Use with care.( )* + + + RegexpComparator + ^\s*-d\s+Skip creation of temporary file\(<dst>\._COPYING_\).( )* @@ -558,7 +530,7 @@ RegexpComparator - ^-copyFromLocal \[-f\] \[-p\] \[-l\] \[-d\] \[-t <thread count>\] \[-q <threadPool queue size>\] <localsrc> \.\.\. <dst> :\s* + ^-copyFromLocal \[-f\] \[-p\] \[-l\] \[-d\] \[-t <thread count>\] \[-q <thread pool queue size>\] <localsrc> \.\.\. <dst> :\s* RegexpComparator @@ -600,7 +572,7 @@ RegexpComparator - ^-get( )*\[-f\]( )*\[-p\]( )*\[-ignoreCrc\]( )*\[-crc\]( )*<src> \.\.\. <localdst> :\s* + ^-get \[-f\] \[-p\] \[-crc\] \[-ignoreCrc\] \[-t <thread count>\] \[-q <thread pool queue size>\] <src> \.\.\. <localdst> :\s* RegexpComparator @@ -608,15 +580,39 @@ RegexpComparator - ^( |\t)*When copying multiple files, the destination must be a directory. Passing -f( )* + ^( |\t)*When copying multiple files, the destination must be a directory.( )* RegexpComparator - ^( |\t)*overwrites the destination if it already exists and -p preserves access and( )* + ^( |\t)*Flags:\s* RegexpComparator - ^( |\t)*modification times, ownership and the mode.* + ^( |\t)*-p\s+Preserves timestamps, ownership and the mode.\s* + + + RegexpComparator + ^( |\t)*-f\s+Overwrites the destination if it already exists.\s* + + + RegexpComparator + ^( |\t)*-crc\s+ write CRC checksums for the files downloaded.\s* + + + RegexpComparator + ^( |\t)*-ignoreCrc\s+ Skip CRC checks on the file\(s\) downloaded.\s* + + + RegexpComparator + ^( |\t)*-t <thread count>\s+Number of threads to be used, default is 1.\s* + + + RegexpComparator + ^( |\t)*-q <thread pool queue size>\s+Thread pool queue size to be used, default is\s* + + + RegexpComparator + ^( |\t)*1024.\s* @@ -723,7 +719,7 @@ RegexpComparator - ^-copyToLocal \[-f\] \[-p\] \[-ignoreCrc\] \[-crc\] <src> \.\.\. <localdst> :\s* + ^-copyToLocal \[-f\] \[-p\] \[-crc\] \[-ignoreCrc\] \[-t <thread count>\] \[-q <thread pool queue size>\] <src> \.\.\. <localdst> :\s* RegexpComparator