HDFS-16716. Improve appendToFile command: support appending on file with new block (#4697)
Reviewed-by: xuzq <15040255127@163.com> Signed-off-by: Tao Li <tomscut@apache.org>
This commit is contained in:
parent
53143409a8
commit
cbac2c4875
@ -1545,6 +1545,39 @@ public abstract class FileSystem extends Configured
|
|||||||
public abstract FSDataOutputStream append(Path f, int bufferSize,
|
public abstract FSDataOutputStream append(Path f, int bufferSize,
|
||||||
Progressable progress) throws IOException;
|
Progressable progress) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Append to an existing file (optional operation).
|
||||||
|
* @param f the existing file to be appended.
|
||||||
|
* @param appendToNewBlock whether to append data to a new block
|
||||||
|
* instead of the end of the last partial block
|
||||||
|
* @throws IOException IO failure
|
||||||
|
* @throws UnsupportedOperationException if the operation is unsupported
|
||||||
|
* (default).
|
||||||
|
* @return output stream.
|
||||||
|
*/
|
||||||
|
public FSDataOutputStream append(Path f, boolean appendToNewBlock) throws IOException {
|
||||||
|
return append(f, getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
|
||||||
|
IO_FILE_BUFFER_SIZE_DEFAULT), null, appendToNewBlock);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Append to an existing file (optional operation).
|
||||||
|
* This function is used for being overridden by some FileSystem like DistributedFileSystem
|
||||||
|
* @param f the existing file to be appended.
|
||||||
|
* @param bufferSize the size of the buffer to be used.
|
||||||
|
* @param progress for reporting progress if it is not null.
|
||||||
|
* @param appendToNewBlock whether to append data to a new block
|
||||||
|
* instead of the end of the last partial block
|
||||||
|
* @throws IOException IO failure
|
||||||
|
* @throws UnsupportedOperationException if the operation is unsupported
|
||||||
|
* (default).
|
||||||
|
* @return output stream.
|
||||||
|
*/
|
||||||
|
public FSDataOutputStream append(Path f, int bufferSize,
|
||||||
|
Progressable progress, boolean appendToNewBlock) throws IOException {
|
||||||
|
return append(f, bufferSize, progress);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Concat existing files together.
|
* Concat existing files together.
|
||||||
* @param trg the path to the target destination.
|
* @param trg the path to the target destination.
|
||||||
|
@ -333,15 +333,24 @@ class CopyCommands {
|
|||||||
*/
|
*/
|
||||||
public static class AppendToFile extends CommandWithDestination {
|
public static class AppendToFile extends CommandWithDestination {
|
||||||
public static final String NAME = "appendToFile";
|
public static final String NAME = "appendToFile";
|
||||||
public static final String USAGE = "<localsrc> ... <dst>";
|
public static final String USAGE = "[-n] <localsrc> ... <dst>";
|
||||||
public static final String DESCRIPTION =
|
public static final String DESCRIPTION =
|
||||||
"Appends the contents of all the given local files to the " +
|
"Appends the contents of all the given local files to the " +
|
||||||
"given dst file. The dst file will be created if it does " +
|
"given dst file. The dst file will be created if it does " +
|
||||||
"not exist. If <localSrc> is -, then the input is read " +
|
"not exist. If <localSrc> is -, then the input is read " +
|
||||||
"from stdin.";
|
"from stdin. Option -n represents that use NEW_BLOCK create flag to append file.";
|
||||||
|
|
||||||
private static final int DEFAULT_IO_LENGTH = 1024 * 1024;
|
private static final int DEFAULT_IO_LENGTH = 1024 * 1024;
|
||||||
boolean readStdin = false;
|
boolean readStdin = false;
|
||||||
|
private boolean appendToNewBlock = false;
|
||||||
|
|
||||||
|
public boolean isAppendToNewBlock() {
|
||||||
|
return appendToNewBlock;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setAppendToNewBlock(boolean appendToNewBlock) {
|
||||||
|
this.appendToNewBlock = appendToNewBlock;
|
||||||
|
}
|
||||||
|
|
||||||
// commands operating on local paths have no need for glob expansion
|
// commands operating on local paths have no need for glob expansion
|
||||||
@Override
|
@Override
|
||||||
@ -372,6 +381,9 @@ class CopyCommands {
|
|||||||
throw new IOException("missing destination argument");
|
throw new IOException("missing destination argument");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
CommandFormat cf = new CommandFormat(2, Integer.MAX_VALUE, "n");
|
||||||
|
cf.parse(args);
|
||||||
|
appendToNewBlock = cf.getOpt("n");
|
||||||
getRemoteDestination(args);
|
getRemoteDestination(args);
|
||||||
super.processOptions(args);
|
super.processOptions(args);
|
||||||
}
|
}
|
||||||
@ -385,7 +397,8 @@ class CopyCommands {
|
|||||||
}
|
}
|
||||||
|
|
||||||
InputStream is = null;
|
InputStream is = null;
|
||||||
try (FSDataOutputStream fos = dst.fs.append(dst.path)) {
|
try (FSDataOutputStream fos = appendToNewBlock ?
|
||||||
|
dst.fs.append(dst.path, true) : dst.fs.append(dst.path)) {
|
||||||
if (readStdin) {
|
if (readStdin) {
|
||||||
if (args.size() == 0) {
|
if (args.size() == 0) {
|
||||||
IOUtils.copyBytes(System.in, fos, DEFAULT_IO_LENGTH);
|
IOUtils.copyBytes(System.in, fos, DEFAULT_IO_LENGTH);
|
||||||
|
@ -143,6 +143,11 @@ public class TestFilterFileSystem {
|
|||||||
of the filter such as checksums.
|
of the filter such as checksums.
|
||||||
*/
|
*/
|
||||||
MultipartUploaderBuilder createMultipartUploader(Path basePath);
|
MultipartUploaderBuilder createMultipartUploader(Path basePath);
|
||||||
|
|
||||||
|
FSDataOutputStream append(Path f, boolean appendToNewBlock) throws IOException;
|
||||||
|
|
||||||
|
FSDataOutputStream append(Path f, int bufferSize,
|
||||||
|
Progressable progress, boolean appendToNewBlock) throws IOException;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -250,6 +250,11 @@ public class TestHarFileSystem {
|
|||||||
|
|
||||||
MultipartUploaderBuilder createMultipartUploader(Path basePath)
|
MultipartUploaderBuilder createMultipartUploader(Path basePath)
|
||||||
throws IOException;
|
throws IOException;
|
||||||
|
|
||||||
|
FSDataOutputStream append(Path f, boolean appendToNewBlock) throws IOException;
|
||||||
|
|
||||||
|
FSDataOutputStream append(Path f, int bufferSize,
|
||||||
|
Progressable progress, boolean appendToNewBlock) throws IOException;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -423,6 +423,16 @@ public class DistributedFileSystem extends FileSystem
|
|||||||
return append(f, EnumSet.of(CreateFlag.APPEND), bufferSize, progress);
|
return append(f, EnumSet.of(CreateFlag.APPEND), bufferSize, progress);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public FSDataOutputStream append(Path f, final int bufferSize,
|
||||||
|
final Progressable progress, boolean appendToNewBlock) throws IOException {
|
||||||
|
EnumSet<CreateFlag> flag = EnumSet.of(CreateFlag.APPEND);
|
||||||
|
if (appendToNewBlock) {
|
||||||
|
flag.add(CreateFlag.NEW_BLOCK);
|
||||||
|
}
|
||||||
|
return append(f, flag, bufferSize, progress);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Append to an existing file (optional operation).
|
* Append to an existing file (optional operation).
|
||||||
*
|
*
|
||||||
|
@ -38,6 +38,7 @@ import java.util.function.Supplier;
|
|||||||
import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
|
import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
|
||||||
|
|
||||||
import org.apache.commons.lang3.RandomStringUtils;
|
import org.apache.commons.lang3.RandomStringUtils;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
||||||
import org.apache.hadoop.hdfs.protocol.XAttrNotFoundException;
|
import org.apache.hadoop.hdfs.protocol.XAttrNotFoundException;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
@ -3040,6 +3041,96 @@ public class TestDFSShell {
|
|||||||
assertThat(res, not(0));
|
assertThat(res, not(0));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test (timeout = 300000)
|
||||||
|
public void testAppendToFileWithOptionN() throws Exception {
|
||||||
|
final int inputFileLength = 1024 * 1024;
|
||||||
|
File testRoot = new File(TEST_ROOT_DIR, "testAppendToFileWithOptionN");
|
||||||
|
testRoot.mkdirs();
|
||||||
|
|
||||||
|
File file1 = new File(testRoot, "file1");
|
||||||
|
createLocalFileWithRandomData(inputFileLength, file1);
|
||||||
|
|
||||||
|
Configuration conf = new HdfsConfiguration();
|
||||||
|
try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(6).build()) {
|
||||||
|
cluster.waitActive();
|
||||||
|
FileSystem hdfs = cluster.getFileSystem();
|
||||||
|
assertTrue("Not a HDFS: " + hdfs.getUri(),
|
||||||
|
hdfs instanceof DistributedFileSystem);
|
||||||
|
|
||||||
|
// Run appendToFile with option n by replica policy once, make sure that the target file is
|
||||||
|
// created and is of the right size and block number is correct.
|
||||||
|
String dir = "/replica";
|
||||||
|
boolean mkdirs = hdfs.mkdirs(new Path(dir));
|
||||||
|
assertTrue("Mkdir fail", mkdirs);
|
||||||
|
Path remoteFile = new Path(dir + "/remoteFile");
|
||||||
|
FsShell shell = new FsShell();
|
||||||
|
shell.setConf(conf);
|
||||||
|
String[] argv = new String[] {
|
||||||
|
"-appendToFile", "-n", file1.toString(), remoteFile.toString() };
|
||||||
|
int res = ToolRunner.run(shell, argv);
|
||||||
|
assertEquals("Run appendToFile command fail", 0, res);
|
||||||
|
FileStatus fileStatus = hdfs.getFileStatus(remoteFile);
|
||||||
|
assertEquals("File size should be " + inputFileLength,
|
||||||
|
inputFileLength, fileStatus.getLen());
|
||||||
|
BlockLocation[] fileBlockLocations =
|
||||||
|
hdfs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
|
||||||
|
assertEquals("Block Num should be 1", 1, fileBlockLocations.length);
|
||||||
|
|
||||||
|
// Run appendToFile with option n by replica policy again and
|
||||||
|
// make sure that the target file size has been doubled and block number has been doubled.
|
||||||
|
res = ToolRunner.run(shell, argv);
|
||||||
|
assertEquals("Run appendToFile command fail", 0, res);
|
||||||
|
fileStatus = hdfs.getFileStatus(remoteFile);
|
||||||
|
assertEquals("File size should be " + inputFileLength * 2,
|
||||||
|
inputFileLength * 2, fileStatus.getLen());
|
||||||
|
fileBlockLocations = hdfs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
|
||||||
|
assertEquals("Block Num should be 2", 2, fileBlockLocations.length);
|
||||||
|
|
||||||
|
// Before run appendToFile with option n by ec policy, set ec policy for the dir.
|
||||||
|
dir = "/ecPolicy";
|
||||||
|
final String ecPolicyName = "RS-6-3-1024k";
|
||||||
|
mkdirs = hdfs.mkdirs(new Path(dir));
|
||||||
|
assertTrue("Mkdir fail", mkdirs);
|
||||||
|
((DistributedFileSystem) hdfs).setErasureCodingPolicy(new Path(dir), ecPolicyName);
|
||||||
|
ErasureCodingPolicy erasureCodingPolicy =
|
||||||
|
((DistributedFileSystem) hdfs).getErasureCodingPolicy(new Path(dir));
|
||||||
|
assertEquals("Set ec policy fail", ecPolicyName, erasureCodingPolicy.getName());
|
||||||
|
|
||||||
|
// Run appendToFile with option n by ec policy once, make sure that the target file is
|
||||||
|
// created and is of the right size and block group number is correct.
|
||||||
|
remoteFile = new Path(dir + "/remoteFile");
|
||||||
|
argv = new String[] {
|
||||||
|
"-appendToFile", "-n", file1.toString(), remoteFile.toString() };
|
||||||
|
res = ToolRunner.run(shell, argv);
|
||||||
|
assertEquals("Run appendToFile command fail", 0, res);
|
||||||
|
fileStatus = hdfs.getFileStatus(remoteFile);
|
||||||
|
assertEquals("File size should be " + inputFileLength,
|
||||||
|
inputFileLength, fileStatus.getLen());
|
||||||
|
fileBlockLocations = hdfs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
|
||||||
|
assertEquals("Block Group Num should be 1", 1, fileBlockLocations.length);
|
||||||
|
|
||||||
|
// Run appendToFile without option n by ec policy again and make sure that
|
||||||
|
// append on EC file without new block must fail.
|
||||||
|
argv = new String[] {
|
||||||
|
"-appendToFile", file1.toString(), remoteFile.toString() };
|
||||||
|
res = ToolRunner.run(shell, argv);
|
||||||
|
assertTrue("Run appendToFile command must fail", res != 0);
|
||||||
|
|
||||||
|
// Run appendToFile with option n by ec policy again and
|
||||||
|
// make sure that the target file size has been doubled
|
||||||
|
// and block group number has been doubled.
|
||||||
|
argv = new String[] {
|
||||||
|
"-appendToFile", "-n", file1.toString(), remoteFile.toString() };
|
||||||
|
res = ToolRunner.run(shell, argv);
|
||||||
|
assertEquals("Run appendToFile command fail", 0, res);
|
||||||
|
fileStatus = hdfs.getFileStatus(remoteFile);
|
||||||
|
assertEquals("File size should be " + inputFileLength * 2,
|
||||||
|
inputFileLength * 2, fileStatus.getLen());
|
||||||
|
fileBlockLocations = hdfs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
|
||||||
|
assertEquals("Block Group Num should be 2", 2, fileBlockLocations.length);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test (timeout = 30000)
|
@Test (timeout = 30000)
|
||||||
public void testSetXAttrPermission() throws Exception {
|
public void testSetXAttrPermission() throws Exception {
|
||||||
UserGroupInformation user = UserGroupInformation.
|
UserGroupInformation user = UserGroupInformation.
|
||||||
|
Loading…
x
Reference in New Issue
Block a user