From 6460df21a09a7fcc29eceb8dc3859d6298da6882 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Fri, 16 Jun 2017 17:24:00 -0700 Subject: [PATCH] HADOOP-14395. Provide Builder pattern for DistributedFileSystem.append. Contributed by Lei (Eddy) Xu. --- .../java/org/apache/hadoop/fs/FileSystem.java | 10 +++++ .../hadoop/hdfs/DistributedFileSystem.java | 41 +++++++++++++----- .../hdfs/TestDistributedFileSystem.java | 43 ++++++++++++++++++- 3 files changed, 82 insertions(+), 12 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java index cc92f31b5a..d7cd7dd883 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java @@ -4179,4 +4179,14 @@ protected FSDataOutputStreamBuilder createFile(Path path) { return new FileSystemDataOutputStreamBuilder(this, path) .create().overwrite(true); } + + /** + * Create a Builder to append a file. + * @param path file path. + * @return a {@link FSDataOutputStreamBuilder} to build file append request. + */ + @InterfaceAudience.Private + protected FSDataOutputStreamBuilder appendFile(Path path) { + return new FileSystemDataOutputStreamBuilder(this, path).append(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index 1fd8f79439..1a9ae48763 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -30,6 +30,7 @@ import java.util.List; import java.util.Map; +import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; @@ -2734,7 +2735,7 @@ DFSOpsCountStatistics getDFSOpsCountStatistics() { */ public static final class HdfsDataOutputStreamBuilder extends FSDataOutputStreamBuilder< - HdfsDataOutputStream, HdfsDataOutputStreamBuilder> { + FSDataOutputStream, HdfsDataOutputStreamBuilder> { private final DistributedFileSystem dfs; private InetSocketAddress[] favoredNodes = null; private String ecPolicyName = null; @@ -2857,17 +2858,24 @@ protected EnumSet getFlags() { * @throws IOException on I/O errors. */ @Override - public HdfsDataOutputStream build() throws IOException { - if (isRecursive()) { - return dfs.create(getPath(), getPermission(), getFlags(), - getBufferSize(), getReplication(), getBlockSize(), - getProgress(), getChecksumOpt(), getFavoredNodes(), - getEcPolicyName()); - } else { - return dfs.createNonRecursive(getPath(), getPermission(), getFlags(), - getBufferSize(), getReplication(), getBlockSize(), getProgress(), - getChecksumOpt(), getFavoredNodes(), getEcPolicyName()); + public FSDataOutputStream build() throws IOException { + if (getFlags().contains(CreateFlag.CREATE)) { + if (isRecursive()) { + return dfs.create(getPath(), getPermission(), getFlags(), + getBufferSize(), getReplication(), getBlockSize(), + getProgress(), getChecksumOpt(), getFavoredNodes(), + getEcPolicyName()); + } else { + return dfs.createNonRecursive(getPath(), getPermission(), getFlags(), + getBufferSize(), getReplication(), getBlockSize(), getProgress(), + getChecksumOpt(), getFavoredNodes(), getEcPolicyName()); + } + } else if (getFlags().contains(CreateFlag.APPEND)) { + return dfs.append(getPath(), getFlags(), getBufferSize(), getProgress(), + getFavoredNodes()); } + throw new HadoopIllegalArgumentException( + "Must specify either create or append"); } } @@ -2896,4 +2904,15 @@ public HdfsDataOutputStreamBuilder createFile(Path path) { public RemoteIterator listOpenFiles() throws IOException { return dfs.listOpenFiles(); } + + /** + * Create a {@link HdfsDataOutputStreamBuilder} to append a file on DFS. + * + * @param path file path. + * @return A {@link HdfsDataOutputStreamBuilder} for appending a file. + */ + @Override + public HdfsDataOutputStreamBuilder appendFile(Path path) { + return new HdfsDataOutputStreamBuilder(this, path).append(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java index 9857735d5c..447941990c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java @@ -50,6 +50,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicReference; +import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.CommonConfigurationKeys; @@ -1445,7 +1446,7 @@ public void testHdfsDataOutputStreamBuilderSetParameters() } @Test - public void testDFSDataOutputStreamBuilder() throws Exception { + public void testDFSDataOutputStreamBuilderForCreation() throws Exception { Configuration conf = getTestConfiguration(); String testFile = "/testDFSDataOutputStreamBuilder"; Path testFilePath = new Path(testFile); @@ -1453,6 +1454,11 @@ public void testDFSDataOutputStreamBuilder() throws Exception { .numDataNodes(1).build()) { DistributedFileSystem fs = cluster.getFileSystem(); + // Before calling build(), no change was made in the file system + HdfsDataOutputStreamBuilder builder = fs.createFile(testFilePath) + .blockSize(4096).replication((short)1); + assertFalse(fs.exists(testFilePath)); + // Test create an empty file try (FSDataOutputStream out = fs.createFile(testFilePath).build()) { @@ -1497,4 +1503,39 @@ public void testDFSDataOutputStreamBuilder() throws Exception { fs.exists(new Path("/parent"))); } } + + @Test + public void testDFSDataOutputStreamBuilderForAppend() throws IOException { + Configuration conf = getTestConfiguration(); + String testFile = "/testDFSDataOutputStreamBuilderForAppend"; + Path path = new Path(testFile); + Random random = new Random(); + try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(1).build()) { + DistributedFileSystem fs = cluster.getFileSystem(); + + byte[] buf = new byte[16]; + random.nextBytes(buf); + + try (FSDataOutputStream out = fs.appendFile(path).build()) { + out.write(buf); + fail("should fail on appending to non-existent file"); + } catch (IOException e) { + GenericTestUtils.assertExceptionContains("non-existent", e); + } + + random.nextBytes(buf); + try (FSDataOutputStream out = fs.createFile(path).build()) { + out.write(buf); + } + + random.nextBytes(buf); + try (FSDataOutputStream out = fs.appendFile(path).build()) { + out.write(buf); + } + + FileStatus status = fs.getFileStatus(path); + assertEquals(16 * 2, status.getLen()); + } + } }