HADOOP-14395. Provide Builder pattern for DistributedFileSystem.append. Contributed by Lei (Eddy) Xu.

This commit is contained in:
Lei Xu 2017-06-16 17:24:00 -07:00
parent 82bbcbf37f
commit 6460df21a0
3 changed files with 82 additions and 12 deletions

View File

@ -4179,4 +4179,14 @@ protected FSDataOutputStreamBuilder createFile(Path path) {
return new FileSystemDataOutputStreamBuilder(this, path) return new FileSystemDataOutputStreamBuilder(this, path)
.create().overwrite(true); .create().overwrite(true);
} }
/**
* Create a Builder to append a file.
* @param path file path.
* @return a {@link FSDataOutputStreamBuilder} to build file append request.
*/
@InterfaceAudience.Private
protected FSDataOutputStreamBuilder appendFile(Path path) {
return new FileSystemDataOutputStreamBuilder(this, path).append();
}
} }

View File

@ -30,6 +30,7 @@
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -2734,7 +2735,7 @@ DFSOpsCountStatistics getDFSOpsCountStatistics() {
*/ */
public static final class HdfsDataOutputStreamBuilder public static final class HdfsDataOutputStreamBuilder
extends FSDataOutputStreamBuilder< extends FSDataOutputStreamBuilder<
HdfsDataOutputStream, HdfsDataOutputStreamBuilder> { FSDataOutputStream, HdfsDataOutputStreamBuilder> {
private final DistributedFileSystem dfs; private final DistributedFileSystem dfs;
private InetSocketAddress[] favoredNodes = null; private InetSocketAddress[] favoredNodes = null;
private String ecPolicyName = null; private String ecPolicyName = null;
@ -2857,17 +2858,24 @@ protected EnumSet<CreateFlag> getFlags() {
* @throws IOException on I/O errors. * @throws IOException on I/O errors.
*/ */
@Override @Override
public HdfsDataOutputStream build() throws IOException { public FSDataOutputStream build() throws IOException {
if (isRecursive()) { if (getFlags().contains(CreateFlag.CREATE)) {
return dfs.create(getPath(), getPermission(), getFlags(), if (isRecursive()) {
getBufferSize(), getReplication(), getBlockSize(), return dfs.create(getPath(), getPermission(), getFlags(),
getProgress(), getChecksumOpt(), getFavoredNodes(), getBufferSize(), getReplication(), getBlockSize(),
getEcPolicyName()); getProgress(), getChecksumOpt(), getFavoredNodes(),
} else { getEcPolicyName());
return dfs.createNonRecursive(getPath(), getPermission(), getFlags(), } else {
getBufferSize(), getReplication(), getBlockSize(), getProgress(), return dfs.createNonRecursive(getPath(), getPermission(), getFlags(),
getChecksumOpt(), getFavoredNodes(), getEcPolicyName()); getBufferSize(), getReplication(), getBlockSize(), getProgress(),
getChecksumOpt(), getFavoredNodes(), getEcPolicyName());
}
} else if (getFlags().contains(CreateFlag.APPEND)) {
return dfs.append(getPath(), getFlags(), getBufferSize(), getProgress(),
getFavoredNodes());
} }
throw new HadoopIllegalArgumentException(
"Must specify either create or append");
} }
} }
@ -2896,4 +2904,15 @@ public HdfsDataOutputStreamBuilder createFile(Path path) {
public RemoteIterator<OpenFileEntry> listOpenFiles() throws IOException { public RemoteIterator<OpenFileEntry> listOpenFiles() throws IOException {
return dfs.listOpenFiles(); return dfs.listOpenFiles();
} }
/**
* Create a {@link HdfsDataOutputStreamBuilder} to append a file on DFS.
*
* @param path file path.
* @return A {@link HdfsDataOutputStreamBuilder} for appending a file.
*/
@Override
public HdfsDataOutputStreamBuilder appendFile(Path path) {
return new HdfsDataOutputStreamBuilder(this, path).append();
}
} }

View File

@ -50,6 +50,7 @@
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeys;
@ -1445,7 +1446,7 @@ public void testHdfsDataOutputStreamBuilderSetParameters()
} }
@Test @Test
public void testDFSDataOutputStreamBuilder() throws Exception { public void testDFSDataOutputStreamBuilderForCreation() throws Exception {
Configuration conf = getTestConfiguration(); Configuration conf = getTestConfiguration();
String testFile = "/testDFSDataOutputStreamBuilder"; String testFile = "/testDFSDataOutputStreamBuilder";
Path testFilePath = new Path(testFile); Path testFilePath = new Path(testFile);
@ -1453,6 +1454,11 @@ public void testDFSDataOutputStreamBuilder() throws Exception {
.numDataNodes(1).build()) { .numDataNodes(1).build()) {
DistributedFileSystem fs = cluster.getFileSystem(); DistributedFileSystem fs = cluster.getFileSystem();
// Before calling build(), no change was made in the file system
HdfsDataOutputStreamBuilder builder = fs.createFile(testFilePath)
.blockSize(4096).replication((short)1);
assertFalse(fs.exists(testFilePath));
// Test create an empty file // Test create an empty file
try (FSDataOutputStream out = try (FSDataOutputStream out =
fs.createFile(testFilePath).build()) { fs.createFile(testFilePath).build()) {
@ -1497,4 +1503,39 @@ public void testDFSDataOutputStreamBuilder() throws Exception {
fs.exists(new Path("/parent"))); fs.exists(new Path("/parent")));
} }
} }
@Test
public void testDFSDataOutputStreamBuilderForAppend() throws IOException {
Configuration conf = getTestConfiguration();
String testFile = "/testDFSDataOutputStreamBuilderForAppend";
Path path = new Path(testFile);
Random random = new Random();
try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(1).build()) {
DistributedFileSystem fs = cluster.getFileSystem();
byte[] buf = new byte[16];
random.nextBytes(buf);
try (FSDataOutputStream out = fs.appendFile(path).build()) {
out.write(buf);
fail("should fail on appending to non-existent file");
} catch (IOException e) {
GenericTestUtils.assertExceptionContains("non-existent", e);
}
random.nextBytes(buf);
try (FSDataOutputStream out = fs.createFile(path).build()) {
out.write(buf);
}
random.nextBytes(buf);
try (FSDataOutputStream out = fs.appendFile(path).build()) {
out.write(buf);
}
FileStatus status = fs.getFileStatus(path);
assertEquals(16 * 2, status.getLen());
}
}
} }