HADOOP-12384. Add '-direct' flag option for fs copy so that user can choose not to create '._COPYING_' file (Contributed by J.Andreina)
This commit is contained in:
parent
435f935ba7
commit
090d26652c
@ -768,6 +768,9 @@ Release 2.8.0 - UNRELEASED
|
|||||||
HADOOP-12358. Add -safely flag to rm to prompt when deleting many files.
|
HADOOP-12358. Add -safely flag to rm to prompt when deleting many files.
|
||||||
(xyao via wang)
|
(xyao via wang)
|
||||||
|
|
||||||
|
HADOOP-12384. Add "-direct" flag option for fs copy so that user can choose
|
||||||
|
not to create "._COPYING_" file (J.Andreina via vinayakumarb)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HADOOP-11785. Reduce the number of listStatus operation in distcp
|
HADOOP-11785. Reduce the number of listStatus operation in distcp
|
||||||
|
@ -61,6 +61,7 @@ abstract class CommandWithDestination extends FsCommand {
|
|||||||
private boolean verifyChecksum = true;
|
private boolean verifyChecksum = true;
|
||||||
private boolean writeChecksum = true;
|
private boolean writeChecksum = true;
|
||||||
private boolean lazyPersist = false;
|
private boolean lazyPersist = false;
|
||||||
|
private boolean direct = false;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The name of the raw xattr namespace. It would be nice to use
|
* The name of the raw xattr namespace. It would be nice to use
|
||||||
@ -95,6 +96,10 @@ protected void setWriteChecksum(boolean flag) {
|
|||||||
writeChecksum = flag;
|
writeChecksum = flag;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected void setDirectWrite(boolean flag) {
|
||||||
|
direct = flag;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* If true, the last modified time, last access time,
|
* If true, the last modified time, last access time,
|
||||||
* owner, group and permission information of the source
|
* owner, group and permission information of the source
|
||||||
@ -372,9 +377,11 @@ private boolean checkPathsForReservedRaw(Path src, Path target)
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Copies the stream contents to a temporary file. If the copy is
|
* If direct write is disabled ,copies the stream contents to a temporary
|
||||||
|
* file "<target>._COPYING_". If the copy is
|
||||||
* successful, the temporary file will be renamed to the real path,
|
* successful, the temporary file will be renamed to the real path,
|
||||||
* else the temporary file will be deleted.
|
* else the temporary file will be deleted.
|
||||||
|
* if direct write is enabled , then creation temporary file is skipped.
|
||||||
* @param in the input stream for the copy
|
* @param in the input stream for the copy
|
||||||
* @param target where to store the contents of the stream
|
* @param target where to store the contents of the stream
|
||||||
* @throws IOException if copy fails
|
* @throws IOException if copy fails
|
||||||
@ -386,10 +393,12 @@ protected void copyStreamToTarget(InputStream in, PathData target)
|
|||||||
}
|
}
|
||||||
TargetFileSystem targetFs = new TargetFileSystem(target.fs);
|
TargetFileSystem targetFs = new TargetFileSystem(target.fs);
|
||||||
try {
|
try {
|
||||||
PathData tempTarget = target.suffix("._COPYING_");
|
PathData tempTarget = direct ? target : target.suffix("._COPYING_");
|
||||||
targetFs.setWriteChecksum(writeChecksum);
|
targetFs.setWriteChecksum(writeChecksum);
|
||||||
targetFs.writeStreamToFile(in, tempTarget, lazyPersist);
|
targetFs.writeStreamToFile(in, tempTarget, lazyPersist, direct);
|
||||||
|
if (!direct) {
|
||||||
targetFs.rename(tempTarget, target);
|
targetFs.rename(tempTarget, target);
|
||||||
|
}
|
||||||
} finally {
|
} finally {
|
||||||
targetFs.close(); // last ditch effort to ensure temp file is removed
|
targetFs.close(); // last ditch effort to ensure temp file is removed
|
||||||
}
|
}
|
||||||
@ -459,10 +468,11 @@ private static class TargetFileSystem extends FilterFileSystem {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void writeStreamToFile(InputStream in, PathData target,
|
void writeStreamToFile(InputStream in, PathData target,
|
||||||
boolean lazyPersist) throws IOException {
|
boolean lazyPersist, boolean direct)
|
||||||
|
throws IOException {
|
||||||
FSDataOutputStream out = null;
|
FSDataOutputStream out = null;
|
||||||
try {
|
try {
|
||||||
out = create(target, lazyPersist);
|
out = create(target, lazyPersist, direct);
|
||||||
IOUtils.copyBytes(in, out, getConf(), true);
|
IOUtils.copyBytes(in, out, getConf(), true);
|
||||||
} finally {
|
} finally {
|
||||||
IOUtils.closeStream(out); // just in case copyBytes didn't
|
IOUtils.closeStream(out); // just in case copyBytes didn't
|
||||||
@ -470,7 +480,8 @@ void writeStreamToFile(InputStream in, PathData target,
|
|||||||
}
|
}
|
||||||
|
|
||||||
// tag created files as temp files
|
// tag created files as temp files
|
||||||
FSDataOutputStream create(PathData item, boolean lazyPersist)
|
FSDataOutputStream create(PathData item, boolean lazyPersist,
|
||||||
|
boolean direct)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
try {
|
try {
|
||||||
if (lazyPersist) {
|
if (lazyPersist) {
|
||||||
@ -488,9 +499,11 @@ FSDataOutputStream create(PathData item, boolean lazyPersist)
|
|||||||
return create(item.path, true);
|
return create(item.path, true);
|
||||||
}
|
}
|
||||||
} finally { // might have been created but stream was interrupted
|
} finally { // might have been created but stream was interrupted
|
||||||
|
if (!direct) {
|
||||||
deleteOnExit(item.path);
|
deleteOnExit(item.path);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void rename(PathData src, PathData target) throws IOException {
|
void rename(PathData src, PathData target) throws IOException {
|
||||||
// the rename method with an option to delete the target is deprecated
|
// the rename method with an option to delete the target is deprecated
|
||||||
|
@ -133,7 +133,8 @@ protected void processPath(PathData src) throws IOException {
|
|||||||
|
|
||||||
static class Cp extends CommandWithDestination {
|
static class Cp extends CommandWithDestination {
|
||||||
public static final String NAME = "cp";
|
public static final String NAME = "cp";
|
||||||
public static final String USAGE = "[-f] [-p | -p[topax]] <src> ... <dst>";
|
public static final String USAGE =
|
||||||
|
"[-f] [-p | -p[topax]] [-d] <src> ... <dst>";
|
||||||
public static final String DESCRIPTION =
|
public static final String DESCRIPTION =
|
||||||
"Copy files that match the file pattern <src> to a " +
|
"Copy files that match the file pattern <src> to a " +
|
||||||
"destination. When copying multiple files, the destination " +
|
"destination. When copying multiple files, the destination " +
|
||||||
@ -147,13 +148,15 @@ static class Cp extends CommandWithDestination {
|
|||||||
"if (1) they are supported (HDFS only) and, (2) all of the source and " +
|
"if (1) they are supported (HDFS only) and, (2) all of the source and " +
|
||||||
"target pathnames are in the /.reserved/raw hierarchy. raw namespace " +
|
"target pathnames are in the /.reserved/raw hierarchy. raw namespace " +
|
||||||
"xattr preservation is determined solely by the presence (or absence) " +
|
"xattr preservation is determined solely by the presence (or absence) " +
|
||||||
"of the /.reserved/raw prefix and not by the -p option.\n";
|
"of the /.reserved/raw prefix and not by the -p option. Passing -d "+
|
||||||
|
"will skip creation of temporary file(<dst>._COPYING_).\n";
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void processOptions(LinkedList<String> args) throws IOException {
|
protected void processOptions(LinkedList<String> args) throws IOException {
|
||||||
popPreserveOption(args);
|
popPreserveOption(args);
|
||||||
CommandFormat cf = new CommandFormat(2, Integer.MAX_VALUE, "f");
|
CommandFormat cf = new CommandFormat(2, Integer.MAX_VALUE, "f", "d");
|
||||||
cf.parse(args);
|
cf.parse(args);
|
||||||
|
setDirectWrite(cf.getOpt("d"));
|
||||||
setOverwrite(cf.getOpt("f"));
|
setOverwrite(cf.getOpt("f"));
|
||||||
// should have a -r option
|
// should have a -r option
|
||||||
setRecursive(true);
|
setRecursive(true);
|
||||||
@ -215,7 +218,8 @@ protected void processOptions(LinkedList<String> args)
|
|||||||
*/
|
*/
|
||||||
public static class Put extends CommandWithDestination {
|
public static class Put extends CommandWithDestination {
|
||||||
public static final String NAME = "put";
|
public static final String NAME = "put";
|
||||||
public static final String USAGE = "[-f] [-p] [-l] <localsrc> ... <dst>";
|
public static final String USAGE =
|
||||||
|
"[-f] [-p] [-l] [-d] <localsrc> ... <dst>";
|
||||||
public static final String DESCRIPTION =
|
public static final String DESCRIPTION =
|
||||||
"Copy files from the local file system " +
|
"Copy files from the local file system " +
|
||||||
"into fs. Copying fails if the file already " +
|
"into fs. Copying fails if the file already " +
|
||||||
@ -225,15 +229,18 @@ public static class Put extends CommandWithDestination {
|
|||||||
" -f : Overwrites the destination if it already exists.\n" +
|
" -f : Overwrites the destination if it already exists.\n" +
|
||||||
" -l : Allow DataNode to lazily persist the file to disk. Forces\n" +
|
" -l : Allow DataNode to lazily persist the file to disk. Forces\n" +
|
||||||
" replication factor of 1. This flag will result in reduced\n" +
|
" replication factor of 1. This flag will result in reduced\n" +
|
||||||
" durability. Use with care.\n";
|
" durability. Use with care.\n" +
|
||||||
|
" -d : Skip creation of temporary file(<dst>._COPYING_).\n";
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void processOptions(LinkedList<String> args) throws IOException {
|
protected void processOptions(LinkedList<String> args) throws IOException {
|
||||||
CommandFormat cf = new CommandFormat(1, Integer.MAX_VALUE, "f", "p", "l");
|
CommandFormat cf =
|
||||||
|
new CommandFormat(1, Integer.MAX_VALUE, "f", "p", "l", "d");
|
||||||
cf.parse(args);
|
cf.parse(args);
|
||||||
setOverwrite(cf.getOpt("f"));
|
setOverwrite(cf.getOpt("f"));
|
||||||
setPreserve(cf.getOpt("p"));
|
setPreserve(cf.getOpt("p"));
|
||||||
setLazyPersist(cf.getOpt("l"));
|
setLazyPersist(cf.getOpt("l"));
|
||||||
|
setDirectWrite(cf.getOpt("d"));
|
||||||
getRemoteDestination(args);
|
getRemoteDestination(args);
|
||||||
// should have a -r option
|
// should have a -r option
|
||||||
setRecursive(true);
|
setRecursive(true);
|
||||||
|
@ -485,6 +485,52 @@ public void testGetWindowsLocalPath() throws Exception {
|
|||||||
checkPath(dstPath, false);
|
checkPath(dstPath, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDirectCopy() throws Exception {
|
||||||
|
Path testRoot = new Path(testRootDir, "testPutFile");
|
||||||
|
lfs.delete(testRoot, true);
|
||||||
|
lfs.mkdirs(testRoot);
|
||||||
|
|
||||||
|
Path target_COPYING_File = new Path(testRoot, "target._COPYING_");
|
||||||
|
Path target_File = new Path(testRoot, "target");
|
||||||
|
Path srcFile = new Path(testRoot, new Path("srcFile"));
|
||||||
|
lfs.createNewFile(srcFile);
|
||||||
|
|
||||||
|
// If direct write is false , then creation of "file1" ,will delete file
|
||||||
|
// (file1._COPYING_) if already exist.
|
||||||
|
checkDirectCopy(srcFile, target_File, target_COPYING_File, false);
|
||||||
|
shell.run(new String[] { "-rm", target_File.toString() });
|
||||||
|
|
||||||
|
// If direct write is true , then creation of "file1", will not create a
|
||||||
|
// temporary file and will not delete (file1._COPYING_) if already exist.
|
||||||
|
checkDirectCopy(srcFile, target_File, target_COPYING_File, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkDirectCopy(Path srcFile, Path target_File,
|
||||||
|
Path target_COPYING_File,boolean direct) throws Exception {
|
||||||
|
int directWriteExitCode = direct ? 0 : 1;
|
||||||
|
shell
|
||||||
|
.run(new String[] { "-copyFromLocal", srcFile.toString(),
|
||||||
|
target_COPYING_File.toString() });
|
||||||
|
int srcFileexist = shell
|
||||||
|
.run(new String[] { "-cat", target_COPYING_File.toString() });
|
||||||
|
assertEquals(0, srcFileexist);
|
||||||
|
|
||||||
|
if (!direct) {
|
||||||
|
shell.run(new String[] { "-copyFromLocal", srcFile.toString(),
|
||||||
|
target_File.toString() });
|
||||||
|
} else {
|
||||||
|
shell.run(new String[] { "-copyFromLocal", "-d", srcFile.toString(),
|
||||||
|
target_File.toString() });
|
||||||
|
}
|
||||||
|
// cat of "target._COPYING_" will return exitcode :
|
||||||
|
// as 1(file does not exist), if direct write is false.
|
||||||
|
// as 0, if direct write is true.
|
||||||
|
srcFileexist = shell.run(new String[] { "-cat",
|
||||||
|
target_COPYING_File.toString() });
|
||||||
|
assertEquals(directWriteExitCode, srcFileexist);
|
||||||
|
}
|
||||||
|
|
||||||
private void createFile(Path ... paths) throws IOException {
|
private void createFile(Path ... paths) throws IOException {
|
||||||
for (Path path : paths) {
|
for (Path path : paths) {
|
||||||
FSDataOutputStream out = lfs.create(path);
|
FSDataOutputStream out = lfs.create(path);
|
||||||
|
@ -336,7 +336,7 @@
|
|||||||
<comparators>
|
<comparators>
|
||||||
<comparator>
|
<comparator>
|
||||||
<type>RegexpComparator</type>
|
<type>RegexpComparator</type>
|
||||||
<expected-output>^-cp \[-f\] \[-p \| -p\[topax\]\] <src> \.\.\. <dst> :\s*</expected-output>
|
<expected-output>^-cp \[-f\] \[-p \| -p\[topax\]\] \[-d\] <src> \.\.\. <dst> :\s*</expected-output>
|
||||||
</comparator>
|
</comparator>
|
||||||
<comparator>
|
<comparator>
|
||||||
<type>RegexpComparator</type>
|
<type>RegexpComparator</type>
|
||||||
@ -376,7 +376,11 @@
|
|||||||
</comparator>
|
</comparator>
|
||||||
<comparator>
|
<comparator>
|
||||||
<type>RegexpComparator</type>
|
<type>RegexpComparator</type>
|
||||||
<expected-output>^\s*\(or absence\) of the \/\.reserved\/raw prefix and not by the -p option.( )*</expected-output>
|
<expected-output>^\s*\(or absence\) of the \/\.reserved\/raw prefix and not by the -p option\. Passing -d( )*</expected-output>
|
||||||
|
</comparator>
|
||||||
|
<comparator>
|
||||||
|
<type>RegexpComparator</type>
|
||||||
|
<expected-output>^\s*will skip creation of temporary file\(<dst>\._COPYING_\)\.( )*</expected-output>
|
||||||
</comparator>
|
</comparator>
|
||||||
</comparators>
|
</comparators>
|
||||||
</test>
|
</test>
|
||||||
@ -472,7 +476,7 @@
|
|||||||
<comparators>
|
<comparators>
|
||||||
<comparator>
|
<comparator>
|
||||||
<type>RegexpComparator</type>
|
<type>RegexpComparator</type>
|
||||||
<expected-output>^-put \[-f\] \[-p\] \[-l\] <localsrc> \.\.\. <dst> :( )*</expected-output>
|
<expected-output>^-put \[-f\] \[-p\] \[-l\] \[-d\] <localsrc> \.\.\. <dst> :( )*</expected-output>
|
||||||
</comparator>
|
</comparator>
|
||||||
<comparator>
|
<comparator>
|
||||||
<type>RegexpComparator</type>
|
<type>RegexpComparator</type>
|
||||||
@ -506,6 +510,10 @@
|
|||||||
<type>RegexpComparator</type>
|
<type>RegexpComparator</type>
|
||||||
<expected-output>^\s*durability. Use with care.( )*</expected-output>
|
<expected-output>^\s*durability. Use with care.( )*</expected-output>
|
||||||
</comparator>
|
</comparator>
|
||||||
|
<comparator>
|
||||||
|
<type>RegexpComparator</type>
|
||||||
|
<expected-output>^\s*-d Skip creation of temporary file\(<dst>\._COPYING_\).( )*</expected-output>
|
||||||
|
</comparator>
|
||||||
</comparators>
|
</comparators>
|
||||||
</test>
|
</test>
|
||||||
|
|
||||||
@ -519,7 +527,7 @@
|
|||||||
<comparators>
|
<comparators>
|
||||||
<comparator>
|
<comparator>
|
||||||
<type>RegexpComparator</type>
|
<type>RegexpComparator</type>
|
||||||
<expected-output>^-copyFromLocal \[-f\] \[-p\] \[-l\] <localsrc> \.\.\. <dst> :\s*</expected-output>
|
<expected-output>^-copyFromLocal \[-f\] \[-p\] \[-l\] \[-d\] <localsrc> \.\.\. <dst> :\s*</expected-output>
|
||||||
</comparator>
|
</comparator>
|
||||||
<comparator>
|
<comparator>
|
||||||
<type>RegexpComparator</type>
|
<type>RegexpComparator</type>
|
||||||
|
Loading…
Reference in New Issue
Block a user