HADOOP-14698. Make copyFromLocals -t option available for put as well. Contributed by Andras Bokor.

This commit is contained in:
S O'Donnell 2020-05-29 10:32:37 +01:00
parent d9838f2d42
commit d9e8046a1a
5 changed files with 94 additions and 135 deletions

View File

@ -239,26 +239,35 @@ class CopyCommands {
* Copy local files to a remote filesystem * Copy local files to a remote filesystem
*/ */
public static class Put extends CommandWithDestination { public static class Put extends CommandWithDestination {
private ThreadPoolExecutor executor = null;
private int numThreads = 1;
private static final int MAX_THREADS =
Runtime.getRuntime().availableProcessors() * 2;
public static final String NAME = "put"; public static final String NAME = "put";
public static final String USAGE = public static final String USAGE =
"[-f] [-p] [-l] [-d] <localsrc> ... <dst>"; "[-f] [-p] [-l] [-d] [-t <thread count>] <localsrc> ... <dst>";
public static final String DESCRIPTION = public static final String DESCRIPTION =
"Copy files from the local file system " + "Copy files from the local file system " +
"into fs. Copying fails if the file already " + "into fs. Copying fails if the file already " +
"exists, unless the -f flag is given.\n" + "exists, unless the -f flag is given.\n" +
"Flags:\n" + "Flags:\n" +
" -p : Preserves access and modification times, ownership and the mode.\n" + " -p : Preserves timestamps, ownership and the mode.\n" +
" -f : Overwrites the destination if it already exists.\n" + " -f : Overwrites the destination if it already exists.\n" +
" -l : Allow DataNode to lazily persist the file to disk. Forces\n" + " -t <thread count> : Number of threads to be used, default is 1.\n" +
" replication factor of 1. This flag will result in reduced\n" + " -l : Allow DataNode to lazily persist the file to disk. Forces" +
" durability. Use with care.\n" + " replication factor of 1. This flag will result in reduced" +
" durability. Use with care.\n" +
" -d : Skip creation of temporary file(<dst>._COPYING_).\n"; " -d : Skip creation of temporary file(<dst>._COPYING_).\n";
@Override @Override
protected void processOptions(LinkedList<String> args) throws IOException { protected void processOptions(LinkedList<String> args) throws IOException {
CommandFormat cf = CommandFormat cf =
new CommandFormat(1, Integer.MAX_VALUE, "f", "p", "l", "d"); new CommandFormat(1, Integer.MAX_VALUE, "f", "p", "l", "d");
cf.addOptionWithValue("t");
cf.parse(args); cf.parse(args);
setNumberThreads(cf.getOptValue("t"));
setOverwrite(cf.getOpt("f")); setOverwrite(cf.getOpt("f"));
setPreserve(cf.getOpt("p")); setPreserve(cf.getOpt("p"));
setLazyPersist(cf.getOpt("l")); setLazyPersist(cf.getOpt("l"));
@ -288,32 +297,22 @@ class CopyCommands {
copyStreamToTarget(System.in, getTargetPath(args.get(0))); copyStreamToTarget(System.in, getTargetPath(args.get(0)));
return; return;
} }
executor = new ThreadPoolExecutor(numThreads, numThreads, 1,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024),
new ThreadPoolExecutor.CallerRunsPolicy());
super.processArguments(args); super.processArguments(args);
// issue the command and then wait for it to finish
executor.shutdown();
try {
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MINUTES);
} catch (InterruptedException e) {
executor.shutdownNow();
displayError(e);
Thread.currentThread().interrupt();
}
} }
}
public static class CopyFromLocal extends Put {
private ThreadPoolExecutor executor = null;
private int numThreads = 1;
private static final int MAX_THREADS =
Runtime.getRuntime().availableProcessors() * 2;
public static final String NAME = "copyFromLocal";
public static final String USAGE =
"[-f] [-p] [-l] [-d] [-t <thread count>] <localsrc> ... <dst>";
public static final String DESCRIPTION =
"Copy files from the local file system " +
"into fs. Copying fails if the file already " +
"exists, unless the -f flag is given.\n" +
"Flags:\n" +
" -p : Preserves access and modification times, ownership and the" +
" mode.\n" +
" -f : Overwrites the destination if it already exists.\n" +
" -t <thread count> : Number of threads to be used, default is 1.\n" +
" -l : Allow DataNode to lazily persist the file to disk. Forces" +
" replication factor of 1. This flag will result in reduced" +
" durability. Use with care.\n" +
" -d : Skip creation of temporary file(<dst>._COPYING_).\n";
private void setNumberThreads(String numberThreadsString) { private void setNumberThreads(String numberThreadsString) {
if (numberThreadsString == null) { if (numberThreadsString == null) {
@ -330,22 +329,6 @@ class CopyCommands {
} }
} }
@Override
protected void processOptions(LinkedList<String> args) throws IOException {
CommandFormat cf =
new CommandFormat(1, Integer.MAX_VALUE, "f", "p", "l", "d");
cf.addOptionWithValue("t");
cf.parse(args);
setNumberThreads(cf.getOptValue("t"));
setOverwrite(cf.getOpt("f"));
setPreserve(cf.getOpt("p"));
setLazyPersist(cf.getOpt("l"));
setDirectWrite(cf.getOpt("d"));
getRemoteDestination(args);
// should have a -r option
setRecursive(true);
}
private void copyFile(PathData src, PathData target) throws IOException { private void copyFile(PathData src, PathData target) throws IOException {
if (isPathRecursable(src)) { if (isPathRecursable(src)) {
throw new PathIsDirectoryException(src.toString()); throw new PathIsDirectoryException(src.toString());
@ -372,25 +355,6 @@ class CopyCommands {
executor.submit(task); executor.submit(task);
} }
@Override
protected void processArguments(LinkedList<PathData> args)
throws IOException {
executor = new ThreadPoolExecutor(numThreads, numThreads, 1,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024),
new ThreadPoolExecutor.CallerRunsPolicy());
super.processArguments(args);
// issue the command and then wait for it to finish
executor.shutdown();
try {
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MINUTES);
} catch (InterruptedException e) {
executor.shutdownNow();
displayError(e);
Thread.currentThread().interrupt();
}
}
@VisibleForTesting @VisibleForTesting
public int getNumThreads() { public int getNumThreads() {
return numThreads; return numThreads;
@ -401,6 +365,12 @@ class CopyCommands {
return executor; return executor;
} }
} }
public static class CopyFromLocal extends Put {
public static final String NAME = "copyFromLocal";
public static final String USAGE = Put.USAGE;
public static final String DESCRIPTION = "Identical to the -put command.";
}
public static class CopyToLocal extends Get { public static class CopyToLocal extends Get {
public static final String NAME = "copyToLocal"; public static final String NAME = "copyToLocal";

View File

@ -25,7 +25,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.PathIOException; import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.PathExistsException; import org.apache.hadoop.fs.PathExistsException;
import org.apache.hadoop.fs.shell.CopyCommands.Put; import org.apache.hadoop.fs.shell.CopyCommands.CopyFromLocal;
/** Various commands for moving files */ /** Various commands for moving files */
@InterfaceAudience.Private @InterfaceAudience.Private
@ -41,12 +41,22 @@ class MoveCommands {
/** /**
* Move local files to a remote filesystem * Move local files to a remote filesystem
*/ */
public static class MoveFromLocal extends Put { public static class MoveFromLocal extends CopyFromLocal {
public static final String NAME = "moveFromLocal"; public static final String NAME = "moveFromLocal";
public static final String USAGE = "<localsrc> ... <dst>"; public static final String USAGE =
"[-f] [-p] [-l] [-d] <localsrc> ... <dst>";
public static final String DESCRIPTION = public static final String DESCRIPTION =
"Same as -put, except that the source is " + "Same as -put, except that the source is " +
"deleted after it's copied."; "deleted after it's copied\n" +
"and -t option has not yet implemented.";
@Override
protected void processOptions(LinkedList<String> args) throws IOException {
if(args.contains("-t")) {
throw new CommandFormat.UnknownOptionException("-t");
}
super.processOptions(args);
}
@Override @Override
protected void processPath(PathData src, PathData target) throws IOException { protected void processPath(PathData src, PathData target) throws IOException {

View File

@ -538,7 +538,7 @@ Returns 0 on success and -1 on error.
put put
--- ---
Usage: `hadoop fs -put [-f] [-p] [-l] [-d] [ - | <localsrc1> .. ]. <dst>` Usage: `hadoop fs -put [-f] [-p] [-l] [-d] [-t <thread count>] [ - | <localsrc1> .. ]. <dst>`
Copy single src, or multiple srcs from local file system to the destination file system. Copy single src, or multiple srcs from local file system to the destination file system.
Also reads input from stdin and writes to destination file system if the source is set to "-" Also reads input from stdin and writes to destination file system if the source is set to "-"
@ -550,6 +550,8 @@ Options:
* `-p` : Preserves access and modification times, ownership and the permissions. * `-p` : Preserves access and modification times, ownership and the permissions.
(assuming the permissions can be propagated across filesystems) (assuming the permissions can be propagated across filesystems)
* `-f` : Overwrites the destination if it already exists. * `-f` : Overwrites the destination if it already exists.
* `-t <thread count>` : Number of threads to be used, default is 1. Useful
when uploading a directory containing more than 1 file.
* `-l` : Allow DataNode to lazily persist the file to disk, Forces a replication * `-l` : Allow DataNode to lazily persist the file to disk, Forces a replication
factor of 1. This flag will result in reduced durability. Use with care. factor of 1. This flag will result in reduced durability. Use with care.
* `-d` : Skip creation of temporary file with the suffix `._COPYING_`. * `-d` : Skip creation of temporary file with the suffix `._COPYING_`.

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FilterFileSystem; import org.apache.hadoop.fs.FilterFileSystem;
import org.apache.hadoop.fs.PathExistsException; import org.apache.hadoop.fs.PathExistsException;
import org.apache.hadoop.fs.shell.CommandFormat.UnknownOptionException;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
@ -93,6 +94,12 @@ public class TestMove {
assertTrue("Rename should have failed with path exists exception", assertTrue("Rename should have failed with path exists exception",
cmd.error instanceof PathExistsException); cmd.error instanceof PathExistsException);
} }
@Test(expected = UnknownOptionException.class)
public void testMoveFromLocalDoesNotAllowTOption() {
new MoveCommands.MoveFromLocal().run("-t", "2",
null, null);
}
static class MockFileSystem extends FilterFileSystem { static class MockFileSystem extends FilterFileSystem {
Configuration conf; Configuration conf;

View File

@ -496,7 +496,10 @@
<comparators> <comparators>
<comparator> <comparator>
<type>RegexpComparator</type> <type>RegexpComparator</type>
<expected-output>^-put \[-f\] \[-p\] \[-l\] \[-d\] &lt;localsrc&gt; \.\.\. &lt;dst&gt; :( )*</expected-output> <comparator>
<type>RegexpComparator</type>
<expected-output>^-put \[-f\] \[-p\] \[-l\] \[-d\] \[-t &lt;thread count&gt;\] &lt;localsrc&gt; \.\.\. &lt;dst&gt; :\s*</expected-output>
</comparator>
</comparator> </comparator>
<comparator> <comparator>
<type>RegexpComparator</type> <type>RegexpComparator</type>
@ -512,66 +515,11 @@
</comparator> </comparator>
<comparator> <comparator>
<type>RegexpComparator</type> <type>RegexpComparator</type>
<expected-output>^\s*-p Preserves access and modification times, ownership and the mode.( )*</expected-output> <expected-output>^\s*-p Preserves timestamps, ownership and the mode.( )*</expected-output>
</comparator> </comparator>
<comparator> <comparator>
<type>RegexpComparator</type> <type>RegexpComparator</type>
<expected-output>^\s*-f Overwrites the destination if it already exists.( )*</expected-output> <expected-output>^\s*-f Overwrites the destination if it already exists.( )*</expected-output>
</comparator>
<comparator>
<type>RegexpComparator</type>
<expected-output>^\s*-l Allow DataNode to lazily persist the file to disk. Forces( )*</expected-output>
</comparator>
<comparator>
<type>RegexpComparator</type>
<expected-output>^\s*replication factor of 1. This flag will result in reduced( )*</expected-output>
</comparator>
<comparator>
<type>RegexpComparator</type>
<expected-output>^\s*durability. Use with care.( )*</expected-output>
</comparator>
<comparator>
<type>RegexpComparator</type>
<expected-output>^\s*-d Skip creation of temporary file\(&lt;dst&gt;\._COPYING_\).( )*</expected-output>
</comparator>
</comparators>
</test>
<test> <!-- TESTED -->
<description>help: help for copyFromLocal</description>
<test-commands>
<command>-help copyFromLocal</command>
</test-commands>
<cleanup-commands>
</cleanup-commands>
<comparators>
<comparator>
<type>RegexpComparator</type>
<expected-output>^-copyFromLocal \[-f\] \[-p\] \[-l\] \[-d\] \[-t &lt;thread count&gt;\] &lt;localsrc&gt; \.\.\. &lt;dst&gt; :\s*</expected-output>
</comparator>
<comparator>
<type>RegexpComparator</type>
<expected-output>^\s*Copy files from the local file system into fs.( )*Copying fails if the file already( )*</expected-output>
</comparator>
<comparator>
<type>RegexpComparator</type>
<expected-output>^\s*exists, unless the -f flag is given.( )*</expected-output>
</comparator>
<comparator>
<type>RegexpComparator</type>
<expected-output>^\s*Flags:( )*</expected-output>
</comparator>
<comparator>
<type>RegexpComparator</type>
<expected-output>^\s*-p Preserves access and modification times, ownership and the( )*</expected-output>
</comparator>
<comparator>
<type>RegexpComparator</type>
<expected-output>^\s*mode.( )*</expected-output>
</comparator>
<comparator>
<type>RegexpComparator</type>
<expected-output>^\s*-f Overwrites the destination if it already exists.( )*</expected-output>
</comparator> </comparator>
<comparator> <comparator>
<type>RegexpComparator</type> <type>RegexpComparator</type>
@ -596,6 +544,25 @@
</comparators> </comparators>
</test> </test>
<test> <!-- TESTED -->
<description>help: help for copyFromLocal</description>
<test-commands>
<command>-help copyFromLocal</command>
</test-commands>
<cleanup-commands>
</cleanup-commands>
<comparators>
<comparator>
<type>RegexpComparator</type>
<expected-output>^-copyFromLocal \[-f\] \[-p\] \[-l\] \[-d\] \[-t &lt;thread count&gt;\] &lt;localsrc&gt; \.\.\. &lt;dst&gt; :\s*</expected-output>
</comparator>
<comparator>
<type>RegexpComparator</type>
<expected-output>^\s*Identical to the -put command\.\s*</expected-output>
</comparator>
</comparators>
</test>
<test> <!-- TESTED --> <test> <!-- TESTED -->
<description>help: help for moveFromLocal</description> <description>help: help for moveFromLocal</description>
<test-commands> <test-commands>
@ -606,11 +573,14 @@
<comparators> <comparators>
<comparator> <comparator>
<type>RegexpComparator</type> <type>RegexpComparator</type>
<expected-output>^-moveFromLocal &lt;localsrc&gt; \.\.\. &lt;dst&gt; :\s*</expected-output> <expected-output>^-moveFromLocal \[-f\] \[-p\] \[-l\] \[-d\] &lt;localsrc&gt; \.\.\. &lt;dst&gt; :\s*</expected-output>
</comparator> </comparator>
<comparator> <comparator>
<type>RegexpComparator</type> <type>RegexpComparator</type>
<expected-output>^( |\t)*Same as -put, except that the source is deleted after it's copied.</expected-output> <expected-output>^( |\t)*Same as -put, except that the source is deleted after it's copied</expected-output>
</comparator><comparator>
<type>RegexpComparator</type>
<expected-output>^\s* and -t option has not yet implemented.</expected-output>
</comparator> </comparator>
</comparators> </comparators>
</test> </test>