diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSBuilder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSBuilder.java
index 56ef51f128..81a14e97be 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSBuilder.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSBuilder.java
@@ -28,6 +28,34 @@
* The base interface which various FileSystem FileContext Builder
* interfaces can extend, and which underlying implementations
* will then implement.
+ *
+ * HADOOP-16202 expanded the opt() and must() arguments with
+ * operator overloading, but HADOOP-18724 identified mapping problems:
+ * passing a long value in to {@code opt()} could end up invoking
+ * {@code opt(string, double)}, which could then trigger parse failures.
+ *
+ * To fix this without forcing existing code to break/be recompiled.
+ *
+ * - A new method to explicitly set a long value is added:
+ * {@link #optLong(String, long)}
+ *
+ * - A new method to explicitly set a double value is added:
+ * {@link #optLong(String, long)}
+ *
+ * -
+ * All of {@link #opt(String, long)}, {@link #opt(String, float)} and
+ * {@link #opt(String, double)} invoke {@link #optLong(String, long)}.
+ *
+ * -
+ * The same changes have been applied to {@code must()} methods.
+ *
+ *
+ * The forwarding of existing double/float setters to the long setters ensure
+ * that existing code will link, but are guaranteed to always set a long value.
+ * If you need to write code which works correctly with all hadoop releases,
+ * covert the option to a string explicitly and then call {@link #opt(String, String)}
+ * or {@link #must(String, String)} as appropriate.
+ *
* @param Return type on the {@link #build()} call.
* @param type of builder itself.
*/
@@ -63,13 +91,17 @@ public interface FSBuilder> {
B opt(@Nonnull String key, int value);
/**
- * Set optional float parameter for the Builder.
+ * This parameter is converted to a long and passed
+ * to {@link #optLong(String, long)} -all
+ * decimal precision is lost.
*
* @param key key.
* @param value value.
* @return generic type B.
* @see #opt(String, String)
+ * @deprecated use {@link #optDouble(String, double)}
*/
+ @Deprecated
B opt(@Nonnull String key, float value);
/**
@@ -78,18 +110,22 @@ public interface FSBuilder> {
* @param key key.
* @param value value.
* @return generic type B.
- * @see #opt(String, String)
+ * @deprecated use {@link #optLong(String, long)} where possible.
*/
B opt(@Nonnull String key, long value);
/**
- * Set optional double parameter for the Builder.
- *
+ * Pass an optional double parameter for the Builder.
+ * This parameter is converted to a long and passed
+ * to {@link #optLong(String, long)} -all
+ * decimal precision is lost.
* @param key key.
* @param value value.
* @return generic type B.
* @see #opt(String, String)
+ * @deprecated use {@link #optDouble(String, double)}
*/
+ @Deprecated
B opt(@Nonnull String key, double value);
/**
@@ -102,6 +138,26 @@ public interface FSBuilder> {
*/
B opt(@Nonnull String key, @Nonnull String... values);
+ /**
+ * Set optional long parameter for the Builder.
+ *
+ * @param key key.
+ * @param value value.
+ * @return generic type B.
+ * @see #opt(String, String)
+ */
+ B optLong(@Nonnull String key, long value);
+
+ /**
+ * Set optional double parameter for the Builder.
+ *
+ * @param key key.
+ * @param value value.
+ * @return generic type B.
+ * @see #opt(String, String)
+ */
+ B optDouble(@Nonnull String key, double value);
+
/**
* Set mandatory option to the Builder.
*
@@ -135,13 +191,16 @@ public interface FSBuilder> {
B must(@Nonnull String key, int value);
/**
- * Set mandatory float option.
+ * This parameter is converted to a long and passed
+ * to {@link #mustLong(String, long)} -all
+ * decimal precision is lost.
*
* @param key key.
* @param value value.
* @return generic type B.
- * @see #must(String, String)
+ * @deprecated use {@link #mustDouble(String, double)} to set floating point.
*/
+ @Deprecated
B must(@Nonnull String key, float value);
/**
@@ -152,16 +211,19 @@ public interface FSBuilder> {
* @return generic type B.
* @see #must(String, String)
*/
+ @Deprecated
B must(@Nonnull String key, long value);
/**
- * Set mandatory double option.
+ * Set mandatory long option, despite passing in a floating
+ * point value.
*
* @param key key.
* @param value value.
* @return generic type B.
* @see #must(String, String)
*/
+ @Deprecated
B must(@Nonnull String key, double value);
/**
@@ -174,6 +236,26 @@ public interface FSBuilder> {
*/
B must(@Nonnull String key, @Nonnull String... values);
+ /**
+ * Set mandatory long parameter for the Builder.
+ *
+ * @param key key.
+ * @param value value.
+ * @return generic type B.
+ * @see #opt(String, String)
+ */
+ B mustLong(@Nonnull String key, long value);
+
+ /**
+ * Set mandatory double parameter for the Builder.
+ *
+ * @param key key.
+ * @param value value.
+ * @return generic type B.
+ * @see #opt(String, String)
+ */
+ B mustDouble(@Nonnull String key, double value);
+
/**
* Instantiate the object which was being built.
*
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java
index ed467a1890..5601f166ab 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java
@@ -2237,7 +2237,7 @@ public boolean copy(final Path src, final Path dst, boolean deleteSource,
InputStream in = awaitFuture(openFile(qSrc)
.opt(FS_OPTION_OPENFILE_READ_POLICY,
FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE)
- .opt(FS_OPTION_OPENFILE_LENGTH,
+ .optLong(FS_OPTION_OPENFILE_LENGTH,
fs.getLen()) // file length hint for object stores
.build());
try (OutputStream out = create(qDst, createFlag)) {
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java
index 6767e0dfbe..933f569277 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java
@@ -483,7 +483,7 @@ public static boolean copy(FileSystem srcFS, FileStatus srcStatus,
in = awaitFuture(srcFS.openFile(src)
.opt(FS_OPTION_OPENFILE_READ_POLICY,
FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE)
- .opt(FS_OPTION_OPENFILE_LENGTH,
+ .optLong(FS_OPTION_OPENFILE_LENGTH,
srcStatus.getLen()) // file length hint for object stores
.build());
out = dstFS.create(dst, overwrite);
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/AbstractFSBuilderImpl.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/AbstractFSBuilderImpl.java
index 774dfeb5fa..47cf98f474 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/AbstractFSBuilderImpl.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/AbstractFSBuilderImpl.java
@@ -44,11 +44,13 @@
* with option support.
*
*
- * .opt("foofs:option.a", true)
- * .opt("foofs:option.b", "value")
+ * .opt("fs.s3a.open.option.caching", true)
+ * .opt("fs.option.openfile.read.policy", "random, adaptive")
* .opt("fs.s3a.open.option.etag", "9fe4c37c25b")
- * .must("foofs:cache", true)
- * .must("barfs:cache-size", 256 * 1024 * 1024)
+ * .optLong("fs.option.openfile.length", 1_500_000_000_000)
+ * .must("fs.option.openfile.buffer.size", 256_000)
+ * .mustLong("fs.option.openfile.split.start", 256_000_000)
+ * .mustLong("fs.option.openfile.split.end", 512_000_000)
* .build();
*
*
@@ -64,6 +66,7 @@
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
+@SuppressWarnings({"deprecation", "unused"})
public abstract class
AbstractFSBuilderImpl>
implements FSBuilder {
@@ -178,10 +181,7 @@ public B opt(@Nonnull final String key, @Nonnull final String value) {
*/
@Override
public B opt(@Nonnull final String key, boolean value) {
- mandatoryKeys.remove(key);
- optionalKeys.add(key);
- options.setBoolean(key, value);
- return getThisBuilder();
+ return opt(key, Boolean.toString(value));
}
/**
@@ -190,19 +190,18 @@ public B opt(@Nonnull final String key, boolean value) {
* @see #opt(String, String)
*/
@Override
- public B opt(@Nonnull final String key, int value) {
- mandatoryKeys.remove(key);
- optionalKeys.add(key);
- options.setInt(key, value);
- return getThisBuilder();
+ public final B opt(@Nonnull final String key, int value) {
+ return optLong(key, value);
}
@Override
- public B opt(@Nonnull final String key, final long value) {
- mandatoryKeys.remove(key);
- optionalKeys.add(key);
- options.setLong(key, value);
- return getThisBuilder();
+ public final B opt(@Nonnull final String key, final long value) {
+ return optLong(key, value);
+ }
+
+ @Override
+ public B optLong(@Nonnull final String key, final long value) {
+ return opt(key, Long.toString(value));
}
/**
@@ -211,11 +210,8 @@ public B opt(@Nonnull final String key, final long value) {
* @see #opt(String, String)
*/
@Override
- public B opt(@Nonnull final String key, float value) {
- mandatoryKeys.remove(key);
- optionalKeys.add(key);
- options.setFloat(key, value);
- return getThisBuilder();
+ public final B opt(@Nonnull final String key, float value) {
+ return optLong(key, (long) value);
}
/**
@@ -224,11 +220,18 @@ public B opt(@Nonnull final String key, float value) {
* @see #opt(String, String)
*/
@Override
- public B opt(@Nonnull final String key, double value) {
- mandatoryKeys.remove(key);
- optionalKeys.add(key);
- options.setDouble(key, value);
- return getThisBuilder();
+ public final B opt(@Nonnull final String key, double value) {
+ return optLong(key, (long) value);
+ }
+
+ /**
+ * Set optional double parameter for the Builder.
+ *
+ * @see #opt(String, String)
+ */
+ @Override
+ public B optDouble(@Nonnull final String key, double value) {
+ return opt(key, Double.toString(value));
}
/**
@@ -264,10 +267,22 @@ public B must(@Nonnull final String key, @Nonnull final String value) {
*/
@Override
public B must(@Nonnull final String key, boolean value) {
- mandatoryKeys.add(key);
- optionalKeys.remove(key);
- options.setBoolean(key, value);
- return getThisBuilder();
+ return must(key, Boolean.toString(value));
+ }
+
+ @Override
+ public B mustLong(@Nonnull final String key, final long value) {
+ return must(key, Long.toString(value));
+ }
+
+ /**
+ * Set optional double parameter for the Builder.
+ *
+ * @see #opt(String, String)
+ */
+ @Override
+ public B mustDouble(@Nonnull final String key, double value) {
+ return must(key, Double.toString(value));
}
/**
@@ -276,45 +291,23 @@ public B must(@Nonnull final String key, boolean value) {
* @see #must(String, String)
*/
@Override
- public B must(@Nonnull final String key, int value) {
- mandatoryKeys.add(key);
- optionalKeys.remove(key);
- options.setInt(key, value);
- return getThisBuilder();
+ public final B must(@Nonnull final String key, int value) {
+ return mustLong(key, value);
}
@Override
- public B must(@Nonnull final String key, final long value) {
- mandatoryKeys.add(key);
- optionalKeys.remove(key);
- options.setLong(key, value);
- return getThisBuilder();
+ public final B must(@Nonnull final String key, final long value) {
+ return mustLong(key, value);
}
- /**
- * Set mandatory float option.
- *
- * @see #must(String, String)
- */
@Override
- public B must(@Nonnull final String key, float value) {
- mandatoryKeys.add(key);
- optionalKeys.remove(key);
- options.setFloat(key, value);
- return getThisBuilder();
+ public final B must(@Nonnull final String key, final float value) {
+ return mustLong(key, (long) value);
}
- /**
- * Set mandatory double option.
- *
- * @see #must(String, String)
- */
@Override
- public B must(@Nonnull final String key, double value) {
- mandatoryKeys.add(key);
- optionalKeys.remove(key);
- options.setDouble(key, value);
- return getThisBuilder();
+ public final B must(@Nonnull final String key, double value) {
+ return mustLong(key, (long) value);
}
/**
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FSBuilderSupport.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FSBuilderSupport.java
new file mode 100644
index 0000000000..dc4a18eb2b
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FSBuilderSupport.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.impl;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.store.LogExactlyOnce;
+
+/**
+ * Class to help with use of FSBuilder.
+ */
+public class FSBuilderSupport {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(FSBuilderSupport.class);
+
+ public static final LogExactlyOnce LOG_PARSE_ERROR = new LogExactlyOnce(LOG);
+
+ /**
+ * Options which are parsed.
+ */
+ private final Configuration options;
+
+ /**
+ * Constructor.
+ * @param options the configuration options from the builder.
+ */
+ public FSBuilderSupport(final Configuration options) {
+ this.options = options;
+ }
+
+ public Configuration getOptions() {
+ return options;
+ }
+
+ /**
+ * Get a long value with resilience to unparseable values.
+ * Negative values are replaced with the default.
+ * @param key key to log
+ * @param defVal default value
+ * @return long value
+ */
+ public long getPositiveLong(String key, long defVal) {
+ long l = getLong(key, defVal);
+ if (l < 0) {
+ LOG.debug("The option {} has a negative value {}, replacing with the default {}",
+ key, l, defVal);
+ l = defVal;
+ }
+ return l;
+ }
+
+ /**
+ * Get a long value with resilience to unparseable values.
+ * @param key key to log
+ * @param defVal default value
+ * @return long value
+ */
+ public long getLong(String key, long defVal) {
+ final String v = options.getTrimmed(key, "");
+ if (v.isEmpty()) {
+ return defVal;
+ }
+ try {
+ return options.getLong(key, defVal);
+ } catch (NumberFormatException e) {
+ final String msg = String.format(
+ "The option %s value \"%s\" is not a long integer; using the default value %s",
+ key, v, defVal);
+ // not a long,
+ LOG_PARSE_ERROR.warn(msg);
+ LOG.debug("{}", msg, e);
+ return defVal;
+ }
+ }
+
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/PathData.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/PathData.java
index da99ac2125..5e945ed835 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/PathData.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/PathData.java
@@ -633,7 +633,7 @@ protected FSDataInputStream openFile(final String policy) throws IOException {
return awaitFuture(fs.openFile(path)
.opt(FS_OPTION_OPENFILE_READ_POLICY,
policy)
- .opt(FS_OPTION_OPENFILE_LENGTH,
+ .optLong(FS_OPTION_OPENFILE_LENGTH,
stat.getLen()) // file length hint for object stores
.build());
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java
index 56dbcee92b..3807868e7b 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java
@@ -2017,7 +2017,7 @@ protected FSDataInputStream openFile(FileSystem fs, Path file,
FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)
.opt(FS_OPTION_OPENFILE_BUFFER_SIZE, bufferSize);
if (length >= 0) {
- builder.opt(FS_OPTION_OPENFILE_LENGTH, length);
+ builder.optLong(FS_OPTION_OPENFILE_LENGTH, length);
}
return awaitFuture(builder.build());
}
diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstreambuilder.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstreambuilder.md
index db630e05c2..22bec19e5b 100644
--- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstreambuilder.md
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstreambuilder.md
@@ -25,6 +25,55 @@ references to `FSDataInputStream` and its subclasses.
It is used to initate a (potentially asynchronous) operation to open an existing
file for reading.
+
+## History
+
+### Hadoop 3.3.0: API introduced
+
+[HADOOP-15229](https://issues.apache.org/jira/browse/HADOOP-15229)
+_Add FileSystem builder-based openFile() API to match createFile()_
+
+* No `opt(String key, long value)` method was available.
+* the `withFileStatus(status)` call required a non-null parameter.
+* Sole Filesystem to process options and file status was S3A;
+* Only the s3a specific options were the S3 select and `fs.s3a.experimental.input.fadvise`
+* S3A Filesystem raised `IllegalArgumentException` if a file status was passed in
+ and the path of the filestatus did not match the path of the `openFile(path)` call.
+
+This is the baseline implementation. To write code guaranteed to compile against this version,
+use the `opt(String, String)` and `must(String, String)` methods, converting numbers to
+string explicitly.
+
+```java
+fs.open("s3a://bucket/file")
+ .opt("fs.option.openfile.length", Long.toString(length))
+ .build().get()
+```
+
+### Hadoop 3.3.5: standardization and expansion
+
+[HADOOP-16202](https://issues.apache.org/jira/browse/HADOOP-16202)
+_Enhance openFile() for better read performance against object stores_
+
+* `withFileStatus(null)` required to be accepted (and ignored)
+* only the filename part of any supplied FileStatus path must match the
+ filename passed in on `openFile(path)`.
+* An `opt(String key, long value)` option was added. *This is now deprecated as it
+caused regression
+* Standard `fs.option.openfile` options defined.
+* S3A FS to use openfile length option, seek start/end options not _yet_ used.
+* Azure ABFS connector takes a supplied `VersionedFileStatus` and omits any
+ HEAD probe for the object.
+
+### Hadoop 3.3.6: API change to address operator overload bugs.
+
+new `optLong()`, `optDouble()`, `mustLong()` and `mustDouble()` builder methods.
+
+* See [HADOOP-18724](https://issues.apache.org/jira/browse/HADOOP-18724) _Open file fails with NumberFormatException for S3AFileSystem_,
+ which was somehow caused by the overloaded `opt(long)`.
+* Specification updated to declare that unparseable numbers MUST be treated as "unset" and the default
+ value used instead.
+
## Invariants
The `FutureDataInputStreamBuilder` interface does not require parameters or
@@ -36,7 +85,7 @@ Some aspects of the state of the filesystem, MAY be checked in the initial
change between `openFile()` and the `build().get()` sequence. For example,
path validation.
-## Implementation-agnostic parameters.
+## `Implementation-agnostic parameters.
### `FutureDataInputStreamBuilder bufferSize(int bufSize)`
@@ -89,10 +138,20 @@ operations. This is to support wrapper filesystems and serialization/deserializa
of the status.
-### Set optional or mandatory parameters
+### Set optional or mandatory parameters
- FutureDataInputStreamBuilder opt(String key, ...)
- FutureDataInputStreamBuilder must(String key, ...)
+```java
+FutureDataInputStreamBuilder opt(String key, String value)
+FutureDataInputStreamBuilder opt(String key, int value)
+FutureDataInputStreamBuilder opt(String key, boolean value)
+FutureDataInputStreamBuilder optLong(String key, long value)
+FutureDataInputStreamBuilder optDouble(String key, double value)
+FutureDataInputStreamBuilder must(String key, String value)
+FutureDataInputStreamBuilder must(String key, int value)
+FutureDataInputStreamBuilder must(String key, boolean value)
+FutureDataInputStreamBuilder mustLong(String key, long value)
+FutureDataInputStreamBuilder mustDouble(String key, double value)
+```
Set optional or mandatory parameters to the builder. Using `opt()` or `must()`,
client can specify FS-specific parameters without inspecting the concrete type
@@ -103,7 +162,7 @@ Example:
```java
out = fs.openFile(path)
.must("fs.option.openfile.read.policy", "random")
- .opt("fs.http.connection.timeout", 30_000L)
+ .optLong("fs.http.connection.timeout", 30_000L)
.withFileStatus(statusFromListing)
.build()
.get();
@@ -115,9 +174,9 @@ An http-specific option has been supplied which may be interpreted by any store;
If the filesystem opening the file does not recognize the option, it can safely be
ignored.
-### When to use `opt()` versus `must()`
+### When to use `opt` versus `must`
-The difference between `opt()` versus `must()` is how the FileSystem opening
+The difference between `opt` versus `must` is how the FileSystem opening
the file must react to an option which it does not recognize.
```python
@@ -144,7 +203,7 @@ irrespective of how the (key, value) pair was declared.
defined in this filesystem specification, validated through contract
tests.
-#### Implementation Notes
+## Implementation Notes
Checking for supported options must be performed in the `build()` operation.
@@ -155,6 +214,13 @@ Checking for supported options must be performed in the `build()` operation.
a feature which is recognized but not supported in the specific
`FileSystem`/`FileContext` instance `UnsupportedException` MUST be thrown.
+Parsing of numeric values SHOULD trim any string and if the value
+cannot be parsed as a number, downgrade to any default value supplied.
+This is to address [HADOOP-18724](https://issues.apache.org/jira/browse/HADOOP-18724)
+_Open file fails with NumberFormatException for S3AFileSystem_, which was cause by the overloaded `opt()`
+builder parameter binding to `opt(String, double)` rather than `opt(String, long)` when a long
+value was passed in.
+
The behavior of resolving the conflicts between the parameters set by
builder methods (i.e., `bufferSize()`) and `opt()`/`must()` is as follows:
@@ -181,7 +247,7 @@ Even if not values of the status are used, the presence of the argument
can be interpreted as the caller declaring that they believe the file
to be present and of the given size.
-## Builder interface
+## Builder interface
### `CompletableFuture build()`
@@ -339,7 +405,7 @@ _Futher reading_
* [Linux fadvise()](https://linux.die.net/man/2/fadvise).
* [Windows `CreateFile()`](https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilea#caching-behavior)
-#### Read Policy `adaptive`
+#### Read Policy `adaptive`
Try to adapt the seek policy to the read pattern of the application.
@@ -429,7 +495,7 @@ If this option is used by the FileSystem implementation
*Implementor's Notes*
-* A value of `fs.option.openfile.length` < 0 MUST be rejected.
+* A value of `fs.option.openfile.length` < 0 MUST be ignored.
* If a file status is supplied along with a value in `fs.opt.openfile.length`;
the file status values take precedence.
@@ -466,11 +532,11 @@ than that value.
The S3A Connector supports custom options for readahead and seek policy.
-| Name | Type | Meaning |
-|--------------------------------------|----------|-------------------------------------------------------------|
-| `fs.s3a.readahead.range` | `long` | readahead range in bytes |
-| `fs.s3a.input.async.drain.threshold` | `long` | threshold to switch to asynchronous draining of the stream |
-| `fs.s3a.experimental.input.fadvise` | `String` | seek policy. Superceded by `fs.option.openfile.read.policy` |
+| Name | Type | Meaning |
+|--------------------------------------|----------|---------------------------------------------------------------------------|
+| `fs.s3a.readahead.range` | `long` | readahead range in bytes |
+| `fs.s3a.experimental.input.fadvise` | `String` | seek policy. Superceded by `fs.option.openfile.read.policy` |
+| `fs.s3a.input.async.drain.threshold` | `long` | threshold to switch to asynchronous draining of the stream. (Since 3.3.5) |
If the option set contains a SQL statement in the `fs.s3a.select.sql` statement,
then the file is opened as an S3 Select query.
@@ -510,8 +576,8 @@ protected SeekableInputStream newStream(Path path, FileStatus stat,
.opt("fs.option.openfile.read.policy", "vector, random")
.withFileStatus(stat);
- builder.opt("fs.option.openfile.split.start", splitStart);
- builder.opt("fs.option.openfile.split.end", splitEnd);
+ builder.optLong("fs.option.openfile.split.start", splitStart);
+ builder.optLong("fs.option.openfile.split.end", splitEnd);
CompletableFuture streamF = builder.build();
return HadoopStreams.wrap(FutureIO.awaitFuture(streamF));
}
@@ -618,8 +684,8 @@ An example of a record reader passing in options to the file it opens.
file.getFileSystem(job).openFile(file);
// the start and end of the split may be used to build
// an input strategy.
- builder.opt("fs.option.openfile.split.start", start);
- builder.opt("fs.option.openfile.split.end", end);
+ builder.optLong("fs.option.openfile.split.start", start);
+ builder.optLong("fs.option.openfile.split.end", end);
FutureIO.propagateOptions(builder, job,
"mapreduce.job.input.file.option",
"mapreduce.job.input.file.must");
@@ -633,7 +699,7 @@ An example of a record reader passing in options to the file it opens.
### `FileContext.openFile`
From `org.apache.hadoop.fs.AvroFSInput`; a file is opened with sequential input.
-Because the file length has already been probed for, the length is passd down
+Because the file length has already been probed for, the length is passed down
```java
public AvroFSInput(FileContext fc, Path p) throws IOException {
@@ -642,7 +708,7 @@ Because the file length has already been probed for, the length is passd down
this.stream = awaitFuture(fc.openFile(p)
.opt("fs.option.openfile.read.policy",
"sequential")
- .opt("fs.option.openfile.length",
+ .optLong("fs.option.openfile.length",
Long.toString(status.getLen()))
.build());
fc.open(p);
@@ -682,8 +748,3 @@ public T load(FileSystem fs,
}
}
```
-
-*Note:* : in Hadoop 3.3.2 and earlier, the `withFileStatus(status)` call
-required a non-null parameter; this has since been relaxed.
-For maximum compatibility across versions, only invoke the method
-when the file status is known to be non-null.
\ No newline at end of file
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractOpenTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractOpenTest.java
index 25bfe082b0..3598d33680 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractOpenTest.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractOpenTest.java
@@ -43,6 +43,7 @@
import static org.apache.hadoop.test.LambdaTestUtils.interceptFuture;
import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
+import org.assertj.core.api.Assertions;
import org.junit.Test;
/**
@@ -186,7 +187,7 @@ public void testSequentialRead() throws Throwable {
@Test
public void testOpenFileReadZeroByte() throws Throwable {
- describe("create & read a 0 byte file through the builders");
+ describe("create & read a 0 byte file through the builders; use a negative length");
Path path = path("zero.txt");
FileSystem fs = getFileSystem();
fs.createFile(path).overwrite(true).build().close();
@@ -194,6 +195,7 @@ public void testOpenFileReadZeroByte() throws Throwable {
.opt("fs.test.something", true)
.opt("fs.test.something2", 3)
.opt("fs.test.something3", "3")
+ .optLong(FS_OPTION_OPENFILE_LENGTH, -1L)
.build().get()) {
assertMinusOne("initial byte read", is.read());
}
@@ -210,6 +212,17 @@ public void testOpenFileUnknownOption() throws Throwable {
() -> builder.build());
}
+ @Test
+ public void testOpenFileUnknownOptionLong() throws Throwable {
+ describe("calling openFile fails when a 'must()' option is unknown");
+ FutureDataInputStreamBuilder builder =
+ getFileSystem().openFile(path("testOpenFileUnknownOption"))
+ .optLong("fs.test.something", 1L)
+ .mustLong("fs.test.something2", 1L);
+ intercept(IllegalArgumentException.class,
+ () -> builder.build());
+ }
+
@Test
public void testOpenFileLazyFail() throws Throwable {
describe("openFile fails on a missing file in the get() and not before");
@@ -320,16 +333,22 @@ public void testOpenFileApplyAsyncRead() throws Throwable {
describe("verify that async accept callbacks are evaluated");
Path path = path("testOpenFileApplyAsyncRead");
FileSystem fs = getFileSystem();
+ final int len = 512;
createFile(fs, path, true,
- dataset(4, 0x40, 0x80));
- CompletableFuture future = fs.openFile(path).build();
+ dataset(len, 0x40, 0x80));
+ CompletableFuture future = fs.openFile(path)
+ .mustDouble(FS_OPTION_OPENFILE_LENGTH, 43.2e60) // pass in a double
+ .build();
AtomicBoolean accepted = new AtomicBoolean(false);
- future.thenApply(stream -> {
+ final Long bytes = future.thenApply(stream -> {
accepted.set(true);
- return stream;
- }).get().close();
+ return ContractTestUtils.readStream(stream);
+ }).get();
assertTrue("async accept operation not invoked",
accepted.get());
+ Assertions.assertThat(bytes)
+ .describedAs("bytes read from stream")
+ .isEqualTo(len);
}
/**
@@ -357,8 +376,8 @@ public void testOpenFileNullStatusButFileLength() throws Throwable {
.withFileStatus(null)
.opt(FS_OPTION_OPENFILE_READ_POLICY,
"unknown, sequential, random")
- .opt(FS_OPTION_OPENFILE_BUFFER_SIZE, 32768)
- .opt(FS_OPTION_OPENFILE_LENGTH, len)
+ .optLong(FS_OPTION_OPENFILE_BUFFER_SIZE, 32768)
+ .optLong(FS_OPTION_OPENFILE_LENGTH, len)
.build();
try (FSDataInputStream in = future.get()) {
@@ -367,4 +386,26 @@ public void testOpenFileNullStatusButFileLength() throws Throwable {
compareByteArrays(dataset, result, len);
}
+ /**
+ * open a file with a length set as a double; verifies resilience
+ * of the parser.
+ */
+ @Test
+ public void testFloatingPointLength() throws Throwable {
+ describe("Open file with a length");
+ Path path = methodPath();
+ FileSystem fs = getFileSystem();
+ int len = 4096;
+ createFile(fs, path, true,
+ dataset(len, 0x40, 0x80));
+ final Long l = fs.openFile(path)
+ .mustDouble(FS_OPTION_OPENFILE_LENGTH, len)
+ .build()
+ .thenApply(ContractTestUtils::readStream)
+ .get();
+ Assertions.assertThat(l)
+ .describedAs("bytes read from file %s", path)
+ .isEqualTo(len);
+ }
+
}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/store/TestFSBuilderSupport.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/store/TestFSBuilderSupport.java
new file mode 100644
index 0000000000..20172ccfe1
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/store/TestFSBuilderSupport.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.store;
+
+import java.io.IOException;
+
+import org.junit.Test;
+
+import org.apache.hadoop.fs.FSBuilder;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.impl.AbstractFSBuilderImpl;
+import org.apache.hadoop.fs.impl.FSBuilderSupport;
+import org.apache.hadoop.test.AbstractHadoopTestBase;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Test builder support, forwarding of opt double/float to long,
+ * resilience.
+ */
+@SuppressWarnings("deprecation")
+public class TestFSBuilderSupport extends AbstractHadoopTestBase {
+
+ @Test
+ public void testOptFloatDoubleForwardsToLong() throws Throwable {
+ FSBuilderSupport c = builder()
+ .opt("f", 1.8f)
+ .opt("d", 2.0e3)
+ .build();
+ assertThat(c.getLong("f", 2))
+ .isEqualTo(1);
+ assertThat(c.getLong("d", 2))
+ .isEqualTo(2000);
+ }
+
+ @Test
+ public void testMustFloatDoubleForwardsToLong() throws Throwable {
+ FSBuilderSupport c = builder()
+ .must("f", 1.8f)
+ .must("d", 2.0e3)
+ .build();
+ assertThat(c.getLong("f", 2))
+ .isEqualTo(1);
+ assertThat(c.getLong("d", 2))
+ .isEqualTo(2000);
+ }
+
+ @Test
+ public void testLongOptStillWorks() throws Throwable {
+ FSBuilderSupport c = builder()
+ .opt("o", 1L)
+ .must("m", 1L)
+ .build();
+ assertThat(c.getLong("o", 2))
+ .isEqualTo(1L);
+ assertThat(c.getLong("m", 2))
+ .isEqualTo(1L);
+ }
+
+ @Test
+ public void testFloatParseFallback() throws Throwable {
+ FSBuilderSupport c = builder()
+ .opt("f", "1.8f")
+ .opt("d", "1.8e20")
+ .build();
+
+ assertThat(c.getLong("f", 2))
+ .isEqualTo(2);
+ assertThat(c.getLong("d", 2))
+ .isEqualTo(2);
+ }
+
+ @Test
+ public void testNegatives() throws Throwable {
+ FSBuilderSupport c = builder()
+ .optLong("-1", -1)
+ .mustLong("-2", -2)
+ .build();
+
+ // getLong gets the long value
+ assertThat(c.getLong("-1", 2))
+ .isEqualTo(-1);
+
+
+ // but getPositiveLong returns the positive default
+ assertThat(c.getPositiveLong("-1", 2))
+ .isEqualTo(2);
+ }
+
+ @Test
+ public void testBoolean() throws Throwable {
+ final FSBuilderSupport c = builder()
+ .opt("f", false)
+ .opt("t", true)
+ .opt("o", "other")
+ .build();
+ assertThat(c.getOptions().getBoolean("f", true))
+ .isEqualTo(false);
+ assertThat(c.getOptions().getBoolean("t", false))
+ .isEqualTo(true);
+ // this is handled in Configuration itself.
+ assertThat(c.getOptions().getBoolean("o", true))
+ .isEqualTo(true);
+ }
+
+ private SimpleBuilder builder() {
+ return new BuilderImpl();
+ }
+
+ private interface SimpleBuilder
+ extends FSBuilder {
+ }
+
+ private static final class BuilderImpl
+ extends AbstractFSBuilderImpl
+ implements SimpleBuilder {
+
+ private BuilderImpl() {
+ super(new Path("/"));
+ }
+
+ @Override
+ public FSBuilderSupport build()
+ throws IOException {
+ return new FSBuilderSupport(getOptions());
+ }
+ }
+}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java
index 5724e72931..ab63c199f2 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java
@@ -114,8 +114,8 @@ public LineRecordReader(Configuration job, FileSplit split,
file.getFileSystem(job).openFile(file);
// the start and end of the split may be used to build
// an input strategy.
- builder.opt(FS_OPTION_OPENFILE_SPLIT_START, start)
- .opt(FS_OPTION_OPENFILE_SPLIT_END, end);
+ builder.optLong(FS_OPTION_OPENFILE_SPLIT_START, start)
+ .optLong(FS_OPTION_OPENFILE_SPLIT_END, end);
FutureIO.propagateOptions(builder, job,
MRJobConfig.INPUT_FILE_OPTION_PREFIX,
MRJobConfig.INPUT_FILE_MANDATORY_PREFIX);
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java
index 617abaacae..089208841f 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java
@@ -92,8 +92,8 @@ public void initialize(InputSplit genericSplit,
file.getFileSystem(job).openFile(file);
// the start and end of the split may be used to build
// an input strategy.
- builder.opt(FS_OPTION_OPENFILE_SPLIT_START, start);
- builder.opt(FS_OPTION_OPENFILE_SPLIT_END, end);
+ builder.optLong(FS_OPTION_OPENFILE_SPLIT_START, start);
+ builder.optLong(FS_OPTION_OPENFILE_SPLIT_END, end);
FutureIO.propagateOptions(builder, job,
MRJobConfig.INPUT_FILE_OPTION_PREFIX,
MRJobConfig.INPUT_FILE_MANDATORY_PREFIX);
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraInputFormat.java
index f284a9c380..3ce6936c3d 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraInputFormat.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraInputFormat.java
@@ -236,8 +236,8 @@ public void initialize(InputSplit split, TaskAttemptContext context)
offset = (RECORD_LENGTH - (start % RECORD_LENGTH)) % RECORD_LENGTH;
length = ((FileSplit)split).getLength();
final FutureDataInputStreamBuilder builder = fs.openFile(p)
- .opt(FS_OPTION_OPENFILE_SPLIT_START, start)
- .opt(FS_OPTION_OPENFILE_SPLIT_END, start + length)
+ .optLong(FS_OPTION_OPENFILE_SPLIT_START, start)
+ .optLong(FS_OPTION_OPENFILE_SPLIT_END, start + length)
.opt(FS_OPTION_OPENFILE_READ_POLICY,
FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL);
in = FutureIO.awaitFuture(builder.build());
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/OpenFileSupport.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/OpenFileSupport.java
index 70a99d5318..4703d63567 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/OpenFileSupport.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/OpenFileSupport.java
@@ -29,6 +29,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.impl.FSBuilderSupport;
import org.apache.hadoop.fs.impl.OpenFileParameters;
import org.apache.hadoop.fs.s3a.S3AFileStatus;
import org.apache.hadoop.fs.s3a.S3AInputPolicy;
@@ -246,12 +247,14 @@ public OpenFileInformation prepareToOpenFile(
// set the end of the read to the file length
fileLength = fileStatus.getLen();
}
+ FSBuilderSupport builderSupport = new FSBuilderSupport(options);
// determine start and end of file.
- long splitStart = options.getLong(FS_OPTION_OPENFILE_SPLIT_START, 0);
+ long splitStart = builderSupport.getPositiveLong(FS_OPTION_OPENFILE_SPLIT_START, 0);
// split end
- long splitEnd = options.getLong(FS_OPTION_OPENFILE_SPLIT_END,
- LENGTH_UNKNOWN);
+ long splitEnd = builderSupport.getLong(
+ FS_OPTION_OPENFILE_SPLIT_END, LENGTH_UNKNOWN);
+
if (splitStart > 0 && splitStart > splitEnd) {
LOG.warn("Split start {} is greater than split end {}, resetting",
splitStart, splitEnd);
@@ -259,7 +262,7 @@ public OpenFileInformation prepareToOpenFile(
}
// read end is the open file value
- fileLength = options.getLong(FS_OPTION_OPENFILE_LENGTH, fileLength);
+ fileLength = builderSupport.getPositiveLong(FS_OPTION_OPENFILE_LENGTH, fileLength);
// if the read end has come from options, use that
// in creating a file status
@@ -281,16 +284,17 @@ public OpenFileInformation prepareToOpenFile(
.withS3Select(isSelect)
.withSql(sql)
.withAsyncDrainThreshold(
- options.getLong(ASYNC_DRAIN_THRESHOLD,
+ builderSupport.getPositiveLong(ASYNC_DRAIN_THRESHOLD,
defaultReadAhead))
.withBufferSize(
- options.getInt(FS_OPTION_OPENFILE_BUFFER_SIZE, defaultBufferSize))
+ (int)builderSupport.getPositiveLong(
+ FS_OPTION_OPENFILE_BUFFER_SIZE, defaultBufferSize))
.withChangePolicy(changePolicy)
.withFileLength(fileLength)
.withInputPolicy(
S3AInputPolicy.getFirstSupportedPolicy(policies, defaultInputPolicy))
.withReadAheadRange(
- options.getLong(READAHEAD_RANGE, defaultReadAhead))
+ builderSupport.getPositiveLong(READAHEAD_RANGE, defaultReadAhead))
.withSplitStart(splitStart)
.withSplitEnd(splitEnd)
.withStatus(fileStatus)
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java
index b0ee531112..4aae84dca8 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java
@@ -141,7 +141,7 @@ public void testOpenFileShorterLength() throws Throwable {
fs.openFile(testFile)
.must(FS_OPTION_OPENFILE_READ_POLICY,
FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)
- .opt(FS_OPTION_OPENFILE_LENGTH, shortLen)
+ .mustLong(FS_OPTION_OPENFILE_LENGTH, shortLen)
.build()
.get(),
always(NO_HEAD_OR_LIST),
@@ -183,7 +183,7 @@ public void testOpenFileLongerLength() throws Throwable {
fs.openFile(testFile)
.must(FS_OPTION_OPENFILE_READ_POLICY,
FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)
- .must(FS_OPTION_OPENFILE_LENGTH, longLen)
+ .mustLong(FS_OPTION_OPENFILE_LENGTH, longLen)
.build()
.get(),
always(NO_HEAD_OR_LIST));
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestUnbufferDraining.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestUnbufferDraining.java
index a03f181cb3..fb75560a80 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestUnbufferDraining.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestUnbufferDraining.java
@@ -160,7 +160,7 @@ public void testUnbufferDraining() throws Throwable {
int offset = FILE_SIZE - READAHEAD + 1;
try (FSDataInputStream in = getBrittleFS().openFile(st.getPath())
.withFileStatus(st)
- .must(ASYNC_DRAIN_THRESHOLD, 1)
+ .mustLong(ASYNC_DRAIN_THRESHOLD, 1)
.build().get()) {
describe("Initiating unbuffer with async drain\n");
for (int i = 0; i < ATTEMPTS; i++) {
@@ -235,9 +235,11 @@ public void testUnbufferAborting() throws Throwable {
// open the file at the beginning with a whole file read policy,
// so even with s3a switching to random on unbuffer,
// this always does a full GET
+ // also provide a floating point string for the threshold, to
+ // verify it is safely parsed
try (FSDataInputStream in = getBrittleFS().openFile(st.getPath())
.withFileStatus(st)
- .must(ASYNC_DRAIN_THRESHOLD, 1)
+ .must(ASYNC_DRAIN_THRESHOLD, "1.0")
.must(FS_OPTION_OPENFILE_READ_POLICY,
FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE)
.build().get()) {
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java
index eea70ced13..fb9988b29a 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java
@@ -219,11 +219,10 @@ private FSDataInputStream openDataFile(S3AFileSystem fs,
final FutureDataInputStreamBuilder builder = fs.openFile(path)
.opt(FS_OPTION_OPENFILE_READ_POLICY,
inputPolicy.toString())
- .opt(FS_OPTION_OPENFILE_LENGTH, length)
- .opt(FS_OPTION_OPENFILE_BUFFER_SIZE, bufferSize);
- if (readahead > 0) {
- builder.opt(READAHEAD_RANGE, readahead);
- }
+ .optLong(FS_OPTION_OPENFILE_LENGTH, length)
+ .opt(FS_OPTION_OPENFILE_BUFFER_SIZE, bufferSize)
+ .optLong(READAHEAD_RANGE, readahead);
+
FSDataInputStream stream = awaitFuture(builder.build());
streamStatistics = getInputStreamStatistics(stream);
return stream;
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
index e9f0e784fc..b7b859cb9f 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
@@ -38,7 +38,6 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.Arrays;
-import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Stack;
@@ -93,6 +92,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS;
import static org.apache.hadoop.fs.azure.NativeAzureFileSystemHelper.*;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
@@ -3149,7 +3149,7 @@ protected CompletableFuture openFileWithOptions(Path path,
OpenFileParameters parameters) throws IOException {
AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(
parameters.getMandatoryKeys(),
- Collections.emptySet(),
+ FS_OPTION_OPENFILE_STANDARD_OPTIONS,
"for " + path);
return LambdaUtils.eval(
new CompletableFuture<>(), () ->
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
index bb9ecdd51a..5fb2c6e170 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
@@ -31,7 +31,6 @@
import java.util.Hashtable;
import java.util.List;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.EnumSet;
import java.util.Map;
import java.util.Optional;
@@ -112,6 +111,7 @@
import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL_DEFAULT;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.*;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.DATA_BLOCKS_BUFFER;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOCK_UPLOAD_ACTIVE_BLOCKS;
@@ -296,7 +296,7 @@ protected CompletableFuture openFileWithOptions(
LOG.debug("AzureBlobFileSystem.openFileWithOptions path: {}", path);
AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(
parameters.getMandatoryKeys(),
- Collections.emptySet(),
+ FS_OPTION_OPENFILE_STANDARD_OPTIONS,
"for " + path);
return LambdaUtils.eval(
new CompletableFuture<>(), () ->
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
index 0122b873aa..7c32ddcfb4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
@@ -587,7 +587,7 @@ public LogReader(Configuration conf, Path remoteAppLogFile)
fileContext.openFile(remoteAppLogFile)
.opt(FS_OPTION_OPENFILE_READ_POLICY,
FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)
- .opt(FS_OPTION_OPENFILE_LENGTH,
+ .optLong(FS_OPTION_OPENFILE_LENGTH,
status.getLen()) // file length hint for object stores
.build());
reader = new TFile.Reader(this.fsDataIStream,