HADOOP-14495. Add set options interface to FSDataOutputStreamBuilder. (Lei (Eddy) Xu)
This commit is contained in:
parent
a11c230236
commit
02cd71ba9d
@ -17,17 +17,21 @@
|
||||
*/
|
||||
package org.apache.hadoop.fs;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Options.ChecksumOpt;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
|
||||
@ -43,6 +47,29 @@
|
||||
* Progressable)}.
|
||||
*
|
||||
* To create missing parent directory, use {@link #recursive()}.
|
||||
*
|
||||
* To be more generic, {@link #opt(String, int)} and {@link #must(String, int)}
|
||||
* variants provide implementation-agnostic way to customize the builder.
|
||||
* Each FS-specific builder implementation can interpret the FS-specific
|
||||
* options accordingly, for example:
|
||||
*
|
||||
* <code>
|
||||
* FSDataOutputStreamBuilder builder = fs.createFile(path);
|
||||
* builder.permission(perm)
|
||||
* .bufferSize(bufSize)
|
||||
* .opt("dfs.outputstream.builder.lazy-persist", true)
|
||||
* .opt("dfs.outputstream.builder.ec.policy-name", "rs-3-2-64k")
|
||||
* .opt("fs.local.o-direct", true)
|
||||
* .must("fs.s3a.fast-upload", true)
|
||||
* .must("fs.azure.buffer-size", 256 * 1024 * 1024);
|
||||
* FSDataOutputStream out = builder.build();
|
||||
* ...
|
||||
* </code>
|
||||
*
|
||||
* If the option is not related to the file system, the option will be ignored.
|
||||
* If the option is must, but not supported by the file system, a
|
||||
* {@link IllegalArgumentException} will be thrown.
|
||||
*
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
@ -60,6 +87,16 @@ public abstract class FSDataOutputStreamBuilder
|
||||
private Progressable progress = null;
|
||||
private ChecksumOpt checksumOpt = null;
|
||||
|
||||
/**
|
||||
* Contains optional and mandatory parameters.
|
||||
*
|
||||
* It does not load default configurations from default files.
|
||||
*/
|
||||
private final Configuration options = new Configuration(false);
|
||||
|
||||
/** Keep track of the keys for mandatory options. */
|
||||
private final Set<String> mandatoryKeys = new HashSet<>();
|
||||
|
||||
/**
|
||||
* Return the concrete implementation of the builder instance.
|
||||
*/
|
||||
@ -215,11 +252,125 @@ public B checksumOpt(@Nonnull final ChecksumOpt chksumOpt) {
|
||||
return getThisBuilder();
|
||||
}
|
||||
|
||||
/**
|
||||
* Set optional Builder parameter.
|
||||
*/
|
||||
public B opt(@Nonnull final String key, @Nonnull final String value) {
|
||||
mandatoryKeys.remove(key);
|
||||
options.set(key, value);
|
||||
return getThisBuilder();
|
||||
}
|
||||
|
||||
/**
|
||||
* Set optional boolean parameter for the Builder.
|
||||
*/
|
||||
public B opt(@Nonnull final String key, boolean value) {
|
||||
mandatoryKeys.remove(key);
|
||||
options.setBoolean(key, value);
|
||||
return getThisBuilder();
|
||||
}
|
||||
|
||||
/**
|
||||
* Set optional int parameter for the Builder.
|
||||
*/
|
||||
public B opt(@Nonnull final String key, int value) {
|
||||
mandatoryKeys.remove(key);
|
||||
options.setInt(key, value);
|
||||
return getThisBuilder();
|
||||
}
|
||||
|
||||
/**
|
||||
* Set optional float parameter for the Builder.
|
||||
*/
|
||||
public B opt(@Nonnull final String key, float value) {
|
||||
mandatoryKeys.remove(key);
|
||||
options.setFloat(key, value);
|
||||
return getThisBuilder();
|
||||
}
|
||||
|
||||
/**
|
||||
* Set optional double parameter for the Builder.
|
||||
*/
|
||||
public B opt(@Nonnull final String key, double value) {
|
||||
mandatoryKeys.remove(key);
|
||||
options.setDouble(key, value);
|
||||
return getThisBuilder();
|
||||
}
|
||||
|
||||
/**
|
||||
* Set an array of string values as optional parameter for the Builder.
|
||||
*/
|
||||
public B opt(@Nonnull final String key, @Nonnull final String... values) {
|
||||
mandatoryKeys.remove(key);
|
||||
options.setStrings(key, values);
|
||||
return getThisBuilder();
|
||||
}
|
||||
|
||||
/**
|
||||
* Set mandatory option to the Builder.
|
||||
*
|
||||
* If the option is not supported or unavailable on the {@link FileSystem},
|
||||
* the client should expect {@link #build()} throws
|
||||
* {@link IllegalArgumentException}.
|
||||
*/
|
||||
public B must(@Nonnull final String key, @Nonnull final String value) {
|
||||
mandatoryKeys.add(key);
|
||||
options.set(key, value);
|
||||
return getThisBuilder();
|
||||
}
|
||||
|
||||
/** Set mandatory boolean option. */
|
||||
public B must(@Nonnull final String key, boolean value) {
|
||||
mandatoryKeys.add(key);
|
||||
options.setBoolean(key, value);
|
||||
return getThisBuilder();
|
||||
}
|
||||
|
||||
/** Set mandatory int option. */
|
||||
public B must(@Nonnull final String key, int value) {
|
||||
mandatoryKeys.add(key);
|
||||
options.setInt(key, value);
|
||||
return getThisBuilder();
|
||||
}
|
||||
|
||||
/** Set mandatory float option. */
|
||||
public B must(@Nonnull final String key, float value) {
|
||||
mandatoryKeys.add(key);
|
||||
options.setFloat(key, value);
|
||||
return getThisBuilder();
|
||||
}
|
||||
|
||||
/** Set mandatory double option. */
|
||||
public B must(@Nonnull final String key, double value) {
|
||||
mandatoryKeys.add(key);
|
||||
options.setDouble(key, value);
|
||||
return getThisBuilder();
|
||||
}
|
||||
|
||||
/** Set a string array as mandatory option. */
|
||||
public B must(@Nonnull final String key, @Nonnull final String... values) {
|
||||
mandatoryKeys.add(key);
|
||||
options.setStrings(key, values);
|
||||
return getThisBuilder();
|
||||
}
|
||||
|
||||
protected Configuration getOptions() {
|
||||
return options;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all the keys that are set as mandatory keys.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
protected Set<String> getMandatoryKeys() {
|
||||
return Collections.unmodifiableSet(mandatoryKeys);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create the FSDataOutputStream to write on the file system.
|
||||
*
|
||||
* @throws HadoopIllegalArgumentException if the parameters are not valid.
|
||||
* @throws IllegalArgumentException if the parameters are not valid.
|
||||
* @throws IOException on errors when file system creates or appends the file.
|
||||
*/
|
||||
public abstract S build() throws IOException;
|
||||
public abstract S build() throws IllegalArgumentException, IOException;
|
||||
}
|
||||
|
@ -17,11 +17,13 @@
|
||||
*/
|
||||
package org.apache.hadoop.fs;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem.Statistics;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.test.LambdaTestUtils;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
|
||||
@ -31,7 +33,11 @@
|
||||
import java.io.*;
|
||||
import java.net.URI;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.apache.hadoop.test.PlatformAssumptions.assumeNotWindows;
|
||||
import static org.apache.hadoop.test.PlatformAssumptions.assumeWindows;
|
||||
@ -46,6 +52,8 @@
|
||||
import org.junit.rules.Timeout;
|
||||
import org.mockito.internal.util.reflection.Whitebox;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
|
||||
|
||||
/**
|
||||
* This class tests the local file system via the FileSystem abstraction.
|
||||
@ -703,4 +711,66 @@ public void testFSOutputStreamBuilder() throws Exception {
|
||||
Assert.assertEquals("Buffer size should be 0",
|
||||
builder.getBufferSize(), 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* A builder to verify configuration keys are supported.
|
||||
*/
|
||||
private static class BuilderWithSupportedKeys
|
||||
extends FSDataOutputStreamBuilder<FSDataOutputStream,
|
||||
BuilderWithSupportedKeys> {
|
||||
|
||||
private final Set<String> supportedKeys = new HashSet<>();
|
||||
|
||||
BuilderWithSupportedKeys(@Nonnull final Collection<String> supportedKeys,
|
||||
@Nonnull FileSystem fileSystem, @Nonnull Path p) {
|
||||
super(fileSystem, p);
|
||||
this.supportedKeys.addAll(supportedKeys);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected BuilderWithSupportedKeys getThisBuilder() {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public FSDataOutputStream build()
|
||||
throws IllegalArgumentException, IOException {
|
||||
Set<String> unsupported = new HashSet<>(getMandatoryKeys());
|
||||
unsupported.removeAll(supportedKeys);
|
||||
Preconditions.checkArgument(unsupported.isEmpty(),
|
||||
"unsupported key found: " + supportedKeys);
|
||||
return getFS().create(
|
||||
getPath(), getPermission(), getFlags(), getBufferSize(),
|
||||
getReplication(), getBlockSize(), getProgress(), getChecksumOpt());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFSOutputStreamBuilderOptions() throws Exception {
|
||||
Path path = new Path(TEST_ROOT_DIR, "testBuilderOpt");
|
||||
final List<String> supportedKeys = Arrays.asList("strM");
|
||||
|
||||
FSDataOutputStreamBuilder<?, ?> builder =
|
||||
new BuilderWithSupportedKeys(supportedKeys, fileSys, path);
|
||||
builder.opt("strKey", "value");
|
||||
builder.opt("intKey", 123);
|
||||
builder.opt("strM", "ignored");
|
||||
// Over-write an optional value with a mandatory value.
|
||||
builder.must("strM", "value");
|
||||
builder.must("unsupported", 12.34);
|
||||
|
||||
assertEquals("Optional value should be overwrite by a mandatory value",
|
||||
"value", builder.getOptions().get("strM"));
|
||||
|
||||
Set<String> mandatoryKeys = builder.getMandatoryKeys();
|
||||
Set<String> expectedKeys = new HashSet<>();
|
||||
expectedKeys.add("strM");
|
||||
expectedKeys.add("unsupported");
|
||||
assertEquals(expectedKeys, mandatoryKeys);
|
||||
assertEquals(2, mandatoryKeys.size());
|
||||
|
||||
LambdaTestUtils.intercept(IllegalArgumentException.class,
|
||||
"unsupported key found", builder::build
|
||||
);
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user