HADOOP-18724. Open file fails with NumberFormatException for S3AFileSystem (#5611)
This: 1. Adds optLong, optDouble, mustLong and mustDouble methods to the FSBuilder interface to let callers explicitly passin long and double arguments. 2. The opt() and must() builder calls which take float/double values now only set long values instead, so as to avoid problems related to overloaded methods resulting in a ".0" being appended to a long value. 3. All of the relevant opt/must calls in the hadoop codebase move to the new methods 4. And the s3a code is resilient to parse errors in is numeric options -it will downgrade to the default. This is nominally incompatible, but the floating-point builder methods were never used: nothing currently expects floating point numbers. For anyone who wants to safely set numeric builder options across all compatible releases, convert the number to a string and then use the opt(String, String) and must(String, String) methods. Contributed by Steve Loughran
This commit is contained in:
parent
949d5ca20b
commit
ab594ec77e
@ -28,6 +28,34 @@
|
||||
* The base interface which various FileSystem FileContext Builder
|
||||
* interfaces can extend, and which underlying implementations
|
||||
* will then implement.
|
||||
* <p>
|
||||
* HADOOP-16202 expanded the opt() and must() arguments with
|
||||
* operator overloading, but HADOOP-18724 identified mapping problems:
|
||||
* passing a long value in to {@code opt()} could end up invoking
|
||||
* {@code opt(string, double)}, which could then trigger parse failures.
|
||||
* <p>
|
||||
* To fix this without forcing existing code to break/be recompiled.
|
||||
* <ol>
|
||||
* <li>A new method to explicitly set a long value is added:
|
||||
* {@link #optLong(String, long)}
|
||||
* </li>
|
||||
* <li>A new method to explicitly set a double value is added:
|
||||
* {@link #optLong(String, long)}
|
||||
* </li>
|
||||
* <li>
|
||||
* All of {@link #opt(String, long)}, {@link #opt(String, float)} and
|
||||
* {@link #opt(String, double)} invoke {@link #optLong(String, long)}.
|
||||
* </li>
|
||||
* <li>
|
||||
* The same changes have been applied to {@code must()} methods.
|
||||
* </li>
|
||||
* </ol>
|
||||
* The forwarding of existing double/float setters to the long setters ensure
|
||||
* that existing code will link, but are guaranteed to always set a long value.
|
||||
* If you need to write code which works correctly with all hadoop releases,
|
||||
* covert the option to a string explicitly and then call {@link #opt(String, String)}
|
||||
* or {@link #must(String, String)} as appropriate.
|
||||
*
|
||||
* @param <S> Return type on the {@link #build()} call.
|
||||
* @param <B> type of builder itself.
|
||||
*/
|
||||
@ -63,13 +91,17 @@ public interface FSBuilder<S, B extends FSBuilder<S, B>> {
|
||||
B opt(@Nonnull String key, int value);
|
||||
|
||||
/**
|
||||
* Set optional float parameter for the Builder.
|
||||
* This parameter is converted to a long and passed
|
||||
* to {@link #optLong(String, long)} -all
|
||||
* decimal precision is lost.
|
||||
*
|
||||
* @param key key.
|
||||
* @param value value.
|
||||
* @return generic type B.
|
||||
* @see #opt(String, String)
|
||||
* @deprecated use {@link #optDouble(String, double)}
|
||||
*/
|
||||
@Deprecated
|
||||
B opt(@Nonnull String key, float value);
|
||||
|
||||
/**
|
||||
@ -78,18 +110,22 @@ public interface FSBuilder<S, B extends FSBuilder<S, B>> {
|
||||
* @param key key.
|
||||
* @param value value.
|
||||
* @return generic type B.
|
||||
* @see #opt(String, String)
|
||||
* @deprecated use {@link #optLong(String, long)} where possible.
|
||||
*/
|
||||
B opt(@Nonnull String key, long value);
|
||||
|
||||
/**
|
||||
* Set optional double parameter for the Builder.
|
||||
*
|
||||
* Pass an optional double parameter for the Builder.
|
||||
* This parameter is converted to a long and passed
|
||||
* to {@link #optLong(String, long)} -all
|
||||
* decimal precision is lost.
|
||||
* @param key key.
|
||||
* @param value value.
|
||||
* @return generic type B.
|
||||
* @see #opt(String, String)
|
||||
* @deprecated use {@link #optDouble(String, double)}
|
||||
*/
|
||||
@Deprecated
|
||||
B opt(@Nonnull String key, double value);
|
||||
|
||||
/**
|
||||
@ -102,6 +138,26 @@ public interface FSBuilder<S, B extends FSBuilder<S, B>> {
|
||||
*/
|
||||
B opt(@Nonnull String key, @Nonnull String... values);
|
||||
|
||||
/**
|
||||
* Set optional long parameter for the Builder.
|
||||
*
|
||||
* @param key key.
|
||||
* @param value value.
|
||||
* @return generic type B.
|
||||
* @see #opt(String, String)
|
||||
*/
|
||||
B optLong(@Nonnull String key, long value);
|
||||
|
||||
/**
|
||||
* Set optional double parameter for the Builder.
|
||||
*
|
||||
* @param key key.
|
||||
* @param value value.
|
||||
* @return generic type B.
|
||||
* @see #opt(String, String)
|
||||
*/
|
||||
B optDouble(@Nonnull String key, double value);
|
||||
|
||||
/**
|
||||
* Set mandatory option to the Builder.
|
||||
*
|
||||
@ -135,13 +191,16 @@ public interface FSBuilder<S, B extends FSBuilder<S, B>> {
|
||||
B must(@Nonnull String key, int value);
|
||||
|
||||
/**
|
||||
* Set mandatory float option.
|
||||
* This parameter is converted to a long and passed
|
||||
* to {@link #mustLong(String, long)} -all
|
||||
* decimal precision is lost.
|
||||
*
|
||||
* @param key key.
|
||||
* @param value value.
|
||||
* @return generic type B.
|
||||
* @see #must(String, String)
|
||||
* @deprecated use {@link #mustDouble(String, double)} to set floating point.
|
||||
*/
|
||||
@Deprecated
|
||||
B must(@Nonnull String key, float value);
|
||||
|
||||
/**
|
||||
@ -152,16 +211,19 @@ public interface FSBuilder<S, B extends FSBuilder<S, B>> {
|
||||
* @return generic type B.
|
||||
* @see #must(String, String)
|
||||
*/
|
||||
@Deprecated
|
||||
B must(@Nonnull String key, long value);
|
||||
|
||||
/**
|
||||
* Set mandatory double option.
|
||||
* Set mandatory long option, despite passing in a floating
|
||||
* point value.
|
||||
*
|
||||
* @param key key.
|
||||
* @param value value.
|
||||
* @return generic type B.
|
||||
* @see #must(String, String)
|
||||
*/
|
||||
@Deprecated
|
||||
B must(@Nonnull String key, double value);
|
||||
|
||||
/**
|
||||
@ -174,6 +236,26 @@ public interface FSBuilder<S, B extends FSBuilder<S, B>> {
|
||||
*/
|
||||
B must(@Nonnull String key, @Nonnull String... values);
|
||||
|
||||
/**
|
||||
* Set mandatory long parameter for the Builder.
|
||||
*
|
||||
* @param key key.
|
||||
* @param value value.
|
||||
* @return generic type B.
|
||||
* @see #opt(String, String)
|
||||
*/
|
||||
B mustLong(@Nonnull String key, long value);
|
||||
|
||||
/**
|
||||
* Set mandatory double parameter for the Builder.
|
||||
*
|
||||
* @param key key.
|
||||
* @param value value.
|
||||
* @return generic type B.
|
||||
* @see #opt(String, String)
|
||||
*/
|
||||
B mustDouble(@Nonnull String key, double value);
|
||||
|
||||
/**
|
||||
* Instantiate the object which was being built.
|
||||
*
|
||||
|
@ -2237,7 +2237,7 @@ public boolean copy(final Path src, final Path dst, boolean deleteSource,
|
||||
InputStream in = awaitFuture(openFile(qSrc)
|
||||
.opt(FS_OPTION_OPENFILE_READ_POLICY,
|
||||
FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE)
|
||||
.opt(FS_OPTION_OPENFILE_LENGTH,
|
||||
.optLong(FS_OPTION_OPENFILE_LENGTH,
|
||||
fs.getLen()) // file length hint for object stores
|
||||
.build());
|
||||
try (OutputStream out = create(qDst, createFlag)) {
|
||||
|
@ -483,7 +483,7 @@ public static boolean copy(FileSystem srcFS, FileStatus srcStatus,
|
||||
in = awaitFuture(srcFS.openFile(src)
|
||||
.opt(FS_OPTION_OPENFILE_READ_POLICY,
|
||||
FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE)
|
||||
.opt(FS_OPTION_OPENFILE_LENGTH,
|
||||
.optLong(FS_OPTION_OPENFILE_LENGTH,
|
||||
srcStatus.getLen()) // file length hint for object stores
|
||||
.build());
|
||||
out = dstFS.create(dst, overwrite);
|
||||
|
@ -44,11 +44,13 @@
|
||||
* with option support.
|
||||
*
|
||||
* <code>
|
||||
* .opt("foofs:option.a", true)
|
||||
* .opt("foofs:option.b", "value")
|
||||
* .opt("fs.s3a.open.option.caching", true)
|
||||
* .opt("fs.option.openfile.read.policy", "random, adaptive")
|
||||
* .opt("fs.s3a.open.option.etag", "9fe4c37c25b")
|
||||
* .must("foofs:cache", true)
|
||||
* .must("barfs:cache-size", 256 * 1024 * 1024)
|
||||
* .optLong("fs.option.openfile.length", 1_500_000_000_000)
|
||||
* .must("fs.option.openfile.buffer.size", 256_000)
|
||||
* .mustLong("fs.option.openfile.split.start", 256_000_000)
|
||||
* .mustLong("fs.option.openfile.split.end", 512_000_000)
|
||||
* .build();
|
||||
* </code>
|
||||
*
|
||||
@ -64,6 +66,7 @@
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Unstable
|
||||
@SuppressWarnings({"deprecation", "unused"})
|
||||
public abstract class
|
||||
AbstractFSBuilderImpl<S, B extends FSBuilder<S, B>>
|
||||
implements FSBuilder<S, B> {
|
||||
@ -178,10 +181,7 @@ public B opt(@Nonnull final String key, @Nonnull final String value) {
|
||||
*/
|
||||
@Override
|
||||
public B opt(@Nonnull final String key, boolean value) {
|
||||
mandatoryKeys.remove(key);
|
||||
optionalKeys.add(key);
|
||||
options.setBoolean(key, value);
|
||||
return getThisBuilder();
|
||||
return opt(key, Boolean.toString(value));
|
||||
}
|
||||
|
||||
/**
|
||||
@ -190,19 +190,18 @@ public B opt(@Nonnull final String key, boolean value) {
|
||||
* @see #opt(String, String)
|
||||
*/
|
||||
@Override
|
||||
public B opt(@Nonnull final String key, int value) {
|
||||
mandatoryKeys.remove(key);
|
||||
optionalKeys.add(key);
|
||||
options.setInt(key, value);
|
||||
return getThisBuilder();
|
||||
public final B opt(@Nonnull final String key, int value) {
|
||||
return optLong(key, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public B opt(@Nonnull final String key, final long value) {
|
||||
mandatoryKeys.remove(key);
|
||||
optionalKeys.add(key);
|
||||
options.setLong(key, value);
|
||||
return getThisBuilder();
|
||||
public final B opt(@Nonnull final String key, final long value) {
|
||||
return optLong(key, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public B optLong(@Nonnull final String key, final long value) {
|
||||
return opt(key, Long.toString(value));
|
||||
}
|
||||
|
||||
/**
|
||||
@ -211,11 +210,8 @@ public B opt(@Nonnull final String key, final long value) {
|
||||
* @see #opt(String, String)
|
||||
*/
|
||||
@Override
|
||||
public B opt(@Nonnull final String key, float value) {
|
||||
mandatoryKeys.remove(key);
|
||||
optionalKeys.add(key);
|
||||
options.setFloat(key, value);
|
||||
return getThisBuilder();
|
||||
public final B opt(@Nonnull final String key, float value) {
|
||||
return optLong(key, (long) value);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -224,11 +220,18 @@ public B opt(@Nonnull final String key, float value) {
|
||||
* @see #opt(String, String)
|
||||
*/
|
||||
@Override
|
||||
public B opt(@Nonnull final String key, double value) {
|
||||
mandatoryKeys.remove(key);
|
||||
optionalKeys.add(key);
|
||||
options.setDouble(key, value);
|
||||
return getThisBuilder();
|
||||
public final B opt(@Nonnull final String key, double value) {
|
||||
return optLong(key, (long) value);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set optional double parameter for the Builder.
|
||||
*
|
||||
* @see #opt(String, String)
|
||||
*/
|
||||
@Override
|
||||
public B optDouble(@Nonnull final String key, double value) {
|
||||
return opt(key, Double.toString(value));
|
||||
}
|
||||
|
||||
/**
|
||||
@ -264,10 +267,22 @@ public B must(@Nonnull final String key, @Nonnull final String value) {
|
||||
*/
|
||||
@Override
|
||||
public B must(@Nonnull final String key, boolean value) {
|
||||
mandatoryKeys.add(key);
|
||||
optionalKeys.remove(key);
|
||||
options.setBoolean(key, value);
|
||||
return getThisBuilder();
|
||||
return must(key, Boolean.toString(value));
|
||||
}
|
||||
|
||||
@Override
|
||||
public B mustLong(@Nonnull final String key, final long value) {
|
||||
return must(key, Long.toString(value));
|
||||
}
|
||||
|
||||
/**
|
||||
* Set optional double parameter for the Builder.
|
||||
*
|
||||
* @see #opt(String, String)
|
||||
*/
|
||||
@Override
|
||||
public B mustDouble(@Nonnull final String key, double value) {
|
||||
return must(key, Double.toString(value));
|
||||
}
|
||||
|
||||
/**
|
||||
@ -276,45 +291,23 @@ public B must(@Nonnull final String key, boolean value) {
|
||||
* @see #must(String, String)
|
||||
*/
|
||||
@Override
|
||||
public B must(@Nonnull final String key, int value) {
|
||||
mandatoryKeys.add(key);
|
||||
optionalKeys.remove(key);
|
||||
options.setInt(key, value);
|
||||
return getThisBuilder();
|
||||
public final B must(@Nonnull final String key, int value) {
|
||||
return mustLong(key, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public B must(@Nonnull final String key, final long value) {
|
||||
mandatoryKeys.add(key);
|
||||
optionalKeys.remove(key);
|
||||
options.setLong(key, value);
|
||||
return getThisBuilder();
|
||||
public final B must(@Nonnull final String key, final long value) {
|
||||
return mustLong(key, value);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set mandatory float option.
|
||||
*
|
||||
* @see #must(String, String)
|
||||
*/
|
||||
@Override
|
||||
public B must(@Nonnull final String key, float value) {
|
||||
mandatoryKeys.add(key);
|
||||
optionalKeys.remove(key);
|
||||
options.setFloat(key, value);
|
||||
return getThisBuilder();
|
||||
public final B must(@Nonnull final String key, final float value) {
|
||||
return mustLong(key, (long) value);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set mandatory double option.
|
||||
*
|
||||
* @see #must(String, String)
|
||||
*/
|
||||
@Override
|
||||
public B must(@Nonnull final String key, double value) {
|
||||
mandatoryKeys.add(key);
|
||||
optionalKeys.remove(key);
|
||||
options.setDouble(key, value);
|
||||
return getThisBuilder();
|
||||
public final B must(@Nonnull final String key, double value) {
|
||||
return mustLong(key, (long) value);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -0,0 +1,95 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.impl;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.store.LogExactlyOnce;
|
||||
|
||||
/**
|
||||
* Class to help with use of FSBuilder.
|
||||
*/
|
||||
public class FSBuilderSupport {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(FSBuilderSupport.class);
|
||||
|
||||
public static final LogExactlyOnce LOG_PARSE_ERROR = new LogExactlyOnce(LOG);
|
||||
|
||||
/**
|
||||
* Options which are parsed.
|
||||
*/
|
||||
private final Configuration options;
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
* @param options the configuration options from the builder.
|
||||
*/
|
||||
public FSBuilderSupport(final Configuration options) {
|
||||
this.options = options;
|
||||
}
|
||||
|
||||
public Configuration getOptions() {
|
||||
return options;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a long value with resilience to unparseable values.
|
||||
* Negative values are replaced with the default.
|
||||
* @param key key to log
|
||||
* @param defVal default value
|
||||
* @return long value
|
||||
*/
|
||||
public long getPositiveLong(String key, long defVal) {
|
||||
long l = getLong(key, defVal);
|
||||
if (l < 0) {
|
||||
LOG.debug("The option {} has a negative value {}, replacing with the default {}",
|
||||
key, l, defVal);
|
||||
l = defVal;
|
||||
}
|
||||
return l;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a long value with resilience to unparseable values.
|
||||
* @param key key to log
|
||||
* @param defVal default value
|
||||
* @return long value
|
||||
*/
|
||||
public long getLong(String key, long defVal) {
|
||||
final String v = options.getTrimmed(key, "");
|
||||
if (v.isEmpty()) {
|
||||
return defVal;
|
||||
}
|
||||
try {
|
||||
return options.getLong(key, defVal);
|
||||
} catch (NumberFormatException e) {
|
||||
final String msg = String.format(
|
||||
"The option %s value \"%s\" is not a long integer; using the default value %s",
|
||||
key, v, defVal);
|
||||
// not a long,
|
||||
LOG_PARSE_ERROR.warn(msg);
|
||||
LOG.debug("{}", msg, e);
|
||||
return defVal;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -633,7 +633,7 @@ protected FSDataInputStream openFile(final String policy) throws IOException {
|
||||
return awaitFuture(fs.openFile(path)
|
||||
.opt(FS_OPTION_OPENFILE_READ_POLICY,
|
||||
policy)
|
||||
.opt(FS_OPTION_OPENFILE_LENGTH,
|
||||
.optLong(FS_OPTION_OPENFILE_LENGTH,
|
||||
stat.getLen()) // file length hint for object stores
|
||||
.build());
|
||||
}
|
||||
|
@ -2017,7 +2017,7 @@ protected FSDataInputStream openFile(FileSystem fs, Path file,
|
||||
FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)
|
||||
.opt(FS_OPTION_OPENFILE_BUFFER_SIZE, bufferSize);
|
||||
if (length >= 0) {
|
||||
builder.opt(FS_OPTION_OPENFILE_LENGTH, length);
|
||||
builder.optLong(FS_OPTION_OPENFILE_LENGTH, length);
|
||||
}
|
||||
return awaitFuture(builder.build());
|
||||
}
|
||||
|
@ -25,6 +25,55 @@ references to `FSDataInputStream` and its subclasses.
|
||||
It is used to initate a (potentially asynchronous) operation to open an existing
|
||||
file for reading.
|
||||
|
||||
|
||||
## <a name="History"></a> History
|
||||
|
||||
### Hadoop 3.3.0: API introduced
|
||||
|
||||
[HADOOP-15229](https://issues.apache.org/jira/browse/HADOOP-15229)
|
||||
_Add FileSystem builder-based openFile() API to match createFile()_
|
||||
|
||||
* No `opt(String key, long value)` method was available.
|
||||
* the `withFileStatus(status)` call required a non-null parameter.
|
||||
* Sole Filesystem to process options and file status was S3A;
|
||||
* Only the s3a specific options were the S3 select and `fs.s3a.experimental.input.fadvise`
|
||||
* S3A Filesystem raised `IllegalArgumentException` if a file status was passed in
|
||||
and the path of the filestatus did not match the path of the `openFile(path)` call.
|
||||
|
||||
This is the baseline implementation. To write code guaranteed to compile against this version,
|
||||
use the `opt(String, String)` and `must(String, String)` methods, converting numbers to
|
||||
string explicitly.
|
||||
|
||||
```java
|
||||
fs.open("s3a://bucket/file")
|
||||
.opt("fs.option.openfile.length", Long.toString(length))
|
||||
.build().get()
|
||||
```
|
||||
|
||||
### Hadoop 3.3.5: standardization and expansion
|
||||
|
||||
[HADOOP-16202](https://issues.apache.org/jira/browse/HADOOP-16202)
|
||||
_Enhance openFile() for better read performance against object stores_
|
||||
|
||||
* `withFileStatus(null)` required to be accepted (and ignored)
|
||||
* only the filename part of any supplied FileStatus path must match the
|
||||
filename passed in on `openFile(path)`.
|
||||
* An `opt(String key, long value)` option was added. *This is now deprecated as it
|
||||
caused regression
|
||||
* Standard `fs.option.openfile` options defined.
|
||||
* S3A FS to use openfile length option, seek start/end options not _yet_ used.
|
||||
* Azure ABFS connector takes a supplied `VersionedFileStatus` and omits any
|
||||
HEAD probe for the object.
|
||||
|
||||
### Hadoop 3.3.6: API change to address operator overload bugs.
|
||||
|
||||
new `optLong()`, `optDouble()`, `mustLong()` and `mustDouble()` builder methods.
|
||||
|
||||
* See [HADOOP-18724](https://issues.apache.org/jira/browse/HADOOP-18724) _Open file fails with NumberFormatException for S3AFileSystem_,
|
||||
which was somehow caused by the overloaded `opt(long)`.
|
||||
* Specification updated to declare that unparseable numbers MUST be treated as "unset" and the default
|
||||
value used instead.
|
||||
|
||||
## Invariants
|
||||
|
||||
The `FutureDataInputStreamBuilder` interface does not require parameters or
|
||||
@ -36,7 +85,7 @@ Some aspects of the state of the filesystem, MAY be checked in the initial
|
||||
change between `openFile()` and the `build().get()` sequence. For example,
|
||||
path validation.
|
||||
|
||||
## Implementation-agnostic parameters.
|
||||
## <a name="parameters"></a> `Implementation-agnostic parameters.
|
||||
|
||||
|
||||
### <a name="Builder.bufferSize"></a> `FutureDataInputStreamBuilder bufferSize(int bufSize)`
|
||||
@ -89,10 +138,20 @@ operations. This is to support wrapper filesystems and serialization/deserializa
|
||||
of the status.
|
||||
|
||||
|
||||
### Set optional or mandatory parameters
|
||||
### <a name="optional"></a> Set optional or mandatory parameters
|
||||
|
||||
FutureDataInputStreamBuilder opt(String key, ...)
|
||||
FutureDataInputStreamBuilder must(String key, ...)
|
||||
```java
|
||||
FutureDataInputStreamBuilder opt(String key, String value)
|
||||
FutureDataInputStreamBuilder opt(String key, int value)
|
||||
FutureDataInputStreamBuilder opt(String key, boolean value)
|
||||
FutureDataInputStreamBuilder optLong(String key, long value)
|
||||
FutureDataInputStreamBuilder optDouble(String key, double value)
|
||||
FutureDataInputStreamBuilder must(String key, String value)
|
||||
FutureDataInputStreamBuilder must(String key, int value)
|
||||
FutureDataInputStreamBuilder must(String key, boolean value)
|
||||
FutureDataInputStreamBuilder mustLong(String key, long value)
|
||||
FutureDataInputStreamBuilder mustDouble(String key, double value)
|
||||
```
|
||||
|
||||
Set optional or mandatory parameters to the builder. Using `opt()` or `must()`,
|
||||
client can specify FS-specific parameters without inspecting the concrete type
|
||||
@ -103,7 +162,7 @@ Example:
|
||||
```java
|
||||
out = fs.openFile(path)
|
||||
.must("fs.option.openfile.read.policy", "random")
|
||||
.opt("fs.http.connection.timeout", 30_000L)
|
||||
.optLong("fs.http.connection.timeout", 30_000L)
|
||||
.withFileStatus(statusFromListing)
|
||||
.build()
|
||||
.get();
|
||||
@ -115,9 +174,9 @@ An http-specific option has been supplied which may be interpreted by any store;
|
||||
If the filesystem opening the file does not recognize the option, it can safely be
|
||||
ignored.
|
||||
|
||||
### When to use `opt()` versus `must()`
|
||||
### <a name="usage"></a> When to use `opt` versus `must`
|
||||
|
||||
The difference between `opt()` versus `must()` is how the FileSystem opening
|
||||
The difference between `opt` versus `must` is how the FileSystem opening
|
||||
the file must react to an option which it does not recognize.
|
||||
|
||||
```python
|
||||
@ -144,7 +203,7 @@ irrespective of how the (key, value) pair was declared.
|
||||
defined in this filesystem specification, validated through contract
|
||||
tests.
|
||||
|
||||
#### Implementation Notes
|
||||
## <a name="implementation"></a> Implementation Notes
|
||||
|
||||
Checking for supported options must be performed in the `build()` operation.
|
||||
|
||||
@ -155,6 +214,13 @@ Checking for supported options must be performed in the `build()` operation.
|
||||
a feature which is recognized but not supported in the specific
|
||||
`FileSystem`/`FileContext` instance `UnsupportedException` MUST be thrown.
|
||||
|
||||
Parsing of numeric values SHOULD trim any string and if the value
|
||||
cannot be parsed as a number, downgrade to any default value supplied.
|
||||
This is to address [HADOOP-18724](https://issues.apache.org/jira/browse/HADOOP-18724)
|
||||
_Open file fails with NumberFormatException for S3AFileSystem_, which was cause by the overloaded `opt()`
|
||||
builder parameter binding to `opt(String, double)` rather than `opt(String, long)` when a long
|
||||
value was passed in.
|
||||
|
||||
The behavior of resolving the conflicts between the parameters set by
|
||||
builder methods (i.e., `bufferSize()`) and `opt()`/`must()` is as follows:
|
||||
|
||||
@ -181,7 +247,7 @@ Even if not values of the status are used, the presence of the argument
|
||||
can be interpreted as the caller declaring that they believe the file
|
||||
to be present and of the given size.
|
||||
|
||||
## Builder interface
|
||||
## <a name="builder"></a> Builder interface
|
||||
|
||||
### <a name="build"></a> `CompletableFuture<FSDataInputStream> build()`
|
||||
|
||||
@ -339,7 +405,7 @@ _Futher reading_
|
||||
* [Linux fadvise()](https://linux.die.net/man/2/fadvise).
|
||||
* [Windows `CreateFile()`](https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilea#caching-behavior)
|
||||
|
||||
#### <a name="read.policy."></a> Read Policy `adaptive`
|
||||
#### <a name="read.policy.adaptive"></a> Read Policy `adaptive`
|
||||
|
||||
Try to adapt the seek policy to the read pattern of the application.
|
||||
|
||||
@ -429,7 +495,7 @@ If this option is used by the FileSystem implementation
|
||||
|
||||
*Implementor's Notes*
|
||||
|
||||
* A value of `fs.option.openfile.length` < 0 MUST be rejected.
|
||||
* A value of `fs.option.openfile.length` < 0 MUST be ignored.
|
||||
* If a file status is supplied along with a value in `fs.opt.openfile.length`;
|
||||
the file status values take precedence.
|
||||
|
||||
@ -466,11 +532,11 @@ than that value.
|
||||
|
||||
The S3A Connector supports custom options for readahead and seek policy.
|
||||
|
||||
| Name | Type | Meaning |
|
||||
|--------------------------------------|----------|-------------------------------------------------------------|
|
||||
| `fs.s3a.readahead.range` | `long` | readahead range in bytes |
|
||||
| `fs.s3a.input.async.drain.threshold` | `long` | threshold to switch to asynchronous draining of the stream |
|
||||
| `fs.s3a.experimental.input.fadvise` | `String` | seek policy. Superceded by `fs.option.openfile.read.policy` |
|
||||
| Name | Type | Meaning |
|
||||
|--------------------------------------|----------|---------------------------------------------------------------------------|
|
||||
| `fs.s3a.readahead.range` | `long` | readahead range in bytes |
|
||||
| `fs.s3a.experimental.input.fadvise` | `String` | seek policy. Superceded by `fs.option.openfile.read.policy` |
|
||||
| `fs.s3a.input.async.drain.threshold` | `long` | threshold to switch to asynchronous draining of the stream. (Since 3.3.5) |
|
||||
|
||||
If the option set contains a SQL statement in the `fs.s3a.select.sql` statement,
|
||||
then the file is opened as an S3 Select query.
|
||||
@ -510,8 +576,8 @@ protected SeekableInputStream newStream(Path path, FileStatus stat,
|
||||
.opt("fs.option.openfile.read.policy", "vector, random")
|
||||
.withFileStatus(stat);
|
||||
|
||||
builder.opt("fs.option.openfile.split.start", splitStart);
|
||||
builder.opt("fs.option.openfile.split.end", splitEnd);
|
||||
builder.optLong("fs.option.openfile.split.start", splitStart);
|
||||
builder.optLong("fs.option.openfile.split.end", splitEnd);
|
||||
CompletableFuture<FSDataInputStream> streamF = builder.build();
|
||||
return HadoopStreams.wrap(FutureIO.awaitFuture(streamF));
|
||||
}
|
||||
@ -618,8 +684,8 @@ An example of a record reader passing in options to the file it opens.
|
||||
file.getFileSystem(job).openFile(file);
|
||||
// the start and end of the split may be used to build
|
||||
// an input strategy.
|
||||
builder.opt("fs.option.openfile.split.start", start);
|
||||
builder.opt("fs.option.openfile.split.end", end);
|
||||
builder.optLong("fs.option.openfile.split.start", start);
|
||||
builder.optLong("fs.option.openfile.split.end", end);
|
||||
FutureIO.propagateOptions(builder, job,
|
||||
"mapreduce.job.input.file.option",
|
||||
"mapreduce.job.input.file.must");
|
||||
@ -633,7 +699,7 @@ An example of a record reader passing in options to the file it opens.
|
||||
### `FileContext.openFile`
|
||||
|
||||
From `org.apache.hadoop.fs.AvroFSInput`; a file is opened with sequential input.
|
||||
Because the file length has already been probed for, the length is passd down
|
||||
Because the file length has already been probed for, the length is passed down
|
||||
|
||||
```java
|
||||
public AvroFSInput(FileContext fc, Path p) throws IOException {
|
||||
@ -642,7 +708,7 @@ Because the file length has already been probed for, the length is passd down
|
||||
this.stream = awaitFuture(fc.openFile(p)
|
||||
.opt("fs.option.openfile.read.policy",
|
||||
"sequential")
|
||||
.opt("fs.option.openfile.length",
|
||||
.optLong("fs.option.openfile.length",
|
||||
Long.toString(status.getLen()))
|
||||
.build());
|
||||
fc.open(p);
|
||||
@ -682,8 +748,3 @@ public T load(FileSystem fs,
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
*Note:* : in Hadoop 3.3.2 and earlier, the `withFileStatus(status)` call
|
||||
required a non-null parameter; this has since been relaxed.
|
||||
For maximum compatibility across versions, only invoke the method
|
||||
when the file status is known to be non-null.
|
@ -43,6 +43,7 @@
|
||||
import static org.apache.hadoop.test.LambdaTestUtils.interceptFuture;
|
||||
import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
|
||||
|
||||
import org.assertj.core.api.Assertions;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
@ -186,7 +187,7 @@ public void testSequentialRead() throws Throwable {
|
||||
|
||||
@Test
|
||||
public void testOpenFileReadZeroByte() throws Throwable {
|
||||
describe("create & read a 0 byte file through the builders");
|
||||
describe("create & read a 0 byte file through the builders; use a negative length");
|
||||
Path path = path("zero.txt");
|
||||
FileSystem fs = getFileSystem();
|
||||
fs.createFile(path).overwrite(true).build().close();
|
||||
@ -194,6 +195,7 @@ public void testOpenFileReadZeroByte() throws Throwable {
|
||||
.opt("fs.test.something", true)
|
||||
.opt("fs.test.something2", 3)
|
||||
.opt("fs.test.something3", "3")
|
||||
.optLong(FS_OPTION_OPENFILE_LENGTH, -1L)
|
||||
.build().get()) {
|
||||
assertMinusOne("initial byte read", is.read());
|
||||
}
|
||||
@ -210,6 +212,17 @@ public void testOpenFileUnknownOption() throws Throwable {
|
||||
() -> builder.build());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOpenFileUnknownOptionLong() throws Throwable {
|
||||
describe("calling openFile fails when a 'must()' option is unknown");
|
||||
FutureDataInputStreamBuilder builder =
|
||||
getFileSystem().openFile(path("testOpenFileUnknownOption"))
|
||||
.optLong("fs.test.something", 1L)
|
||||
.mustLong("fs.test.something2", 1L);
|
||||
intercept(IllegalArgumentException.class,
|
||||
() -> builder.build());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOpenFileLazyFail() throws Throwable {
|
||||
describe("openFile fails on a missing file in the get() and not before");
|
||||
@ -320,16 +333,22 @@ public void testOpenFileApplyAsyncRead() throws Throwable {
|
||||
describe("verify that async accept callbacks are evaluated");
|
||||
Path path = path("testOpenFileApplyAsyncRead");
|
||||
FileSystem fs = getFileSystem();
|
||||
final int len = 512;
|
||||
createFile(fs, path, true,
|
||||
dataset(4, 0x40, 0x80));
|
||||
CompletableFuture<FSDataInputStream> future = fs.openFile(path).build();
|
||||
dataset(len, 0x40, 0x80));
|
||||
CompletableFuture<FSDataInputStream> future = fs.openFile(path)
|
||||
.mustDouble(FS_OPTION_OPENFILE_LENGTH, 43.2e60) // pass in a double
|
||||
.build();
|
||||
AtomicBoolean accepted = new AtomicBoolean(false);
|
||||
future.thenApply(stream -> {
|
||||
final Long bytes = future.thenApply(stream -> {
|
||||
accepted.set(true);
|
||||
return stream;
|
||||
}).get().close();
|
||||
return ContractTestUtils.readStream(stream);
|
||||
}).get();
|
||||
assertTrue("async accept operation not invoked",
|
||||
accepted.get());
|
||||
Assertions.assertThat(bytes)
|
||||
.describedAs("bytes read from stream")
|
||||
.isEqualTo(len);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -357,8 +376,8 @@ public void testOpenFileNullStatusButFileLength() throws Throwable {
|
||||
.withFileStatus(null)
|
||||
.opt(FS_OPTION_OPENFILE_READ_POLICY,
|
||||
"unknown, sequential, random")
|
||||
.opt(FS_OPTION_OPENFILE_BUFFER_SIZE, 32768)
|
||||
.opt(FS_OPTION_OPENFILE_LENGTH, len)
|
||||
.optLong(FS_OPTION_OPENFILE_BUFFER_SIZE, 32768)
|
||||
.optLong(FS_OPTION_OPENFILE_LENGTH, len)
|
||||
.build();
|
||||
|
||||
try (FSDataInputStream in = future.get()) {
|
||||
@ -367,4 +386,26 @@ public void testOpenFileNullStatusButFileLength() throws Throwable {
|
||||
compareByteArrays(dataset, result, len);
|
||||
}
|
||||
|
||||
/**
|
||||
* open a file with a length set as a double; verifies resilience
|
||||
* of the parser.
|
||||
*/
|
||||
@Test
|
||||
public void testFloatingPointLength() throws Throwable {
|
||||
describe("Open file with a length");
|
||||
Path path = methodPath();
|
||||
FileSystem fs = getFileSystem();
|
||||
int len = 4096;
|
||||
createFile(fs, path, true,
|
||||
dataset(len, 0x40, 0x80));
|
||||
final Long l = fs.openFile(path)
|
||||
.mustDouble(FS_OPTION_OPENFILE_LENGTH, len)
|
||||
.build()
|
||||
.thenApply(ContractTestUtils::readStream)
|
||||
.get();
|
||||
Assertions.assertThat(l)
|
||||
.describedAs("bytes read from file %s", path)
|
||||
.isEqualTo(len);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -0,0 +1,144 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.store;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import org.apache.hadoop.fs.FSBuilder;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.impl.AbstractFSBuilderImpl;
|
||||
import org.apache.hadoop.fs.impl.FSBuilderSupport;
|
||||
import org.apache.hadoop.test.AbstractHadoopTestBase;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
/**
|
||||
* Test builder support, forwarding of opt double/float to long,
|
||||
* resilience.
|
||||
*/
|
||||
@SuppressWarnings("deprecation")
|
||||
public class TestFSBuilderSupport extends AbstractHadoopTestBase {
|
||||
|
||||
@Test
|
||||
public void testOptFloatDoubleForwardsToLong() throws Throwable {
|
||||
FSBuilderSupport c = builder()
|
||||
.opt("f", 1.8f)
|
||||
.opt("d", 2.0e3)
|
||||
.build();
|
||||
assertThat(c.getLong("f", 2))
|
||||
.isEqualTo(1);
|
||||
assertThat(c.getLong("d", 2))
|
||||
.isEqualTo(2000);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMustFloatDoubleForwardsToLong() throws Throwable {
|
||||
FSBuilderSupport c = builder()
|
||||
.must("f", 1.8f)
|
||||
.must("d", 2.0e3)
|
||||
.build();
|
||||
assertThat(c.getLong("f", 2))
|
||||
.isEqualTo(1);
|
||||
assertThat(c.getLong("d", 2))
|
||||
.isEqualTo(2000);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLongOptStillWorks() throws Throwable {
|
||||
FSBuilderSupport c = builder()
|
||||
.opt("o", 1L)
|
||||
.must("m", 1L)
|
||||
.build();
|
||||
assertThat(c.getLong("o", 2))
|
||||
.isEqualTo(1L);
|
||||
assertThat(c.getLong("m", 2))
|
||||
.isEqualTo(1L);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFloatParseFallback() throws Throwable {
|
||||
FSBuilderSupport c = builder()
|
||||
.opt("f", "1.8f")
|
||||
.opt("d", "1.8e20")
|
||||
.build();
|
||||
|
||||
assertThat(c.getLong("f", 2))
|
||||
.isEqualTo(2);
|
||||
assertThat(c.getLong("d", 2))
|
||||
.isEqualTo(2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNegatives() throws Throwable {
|
||||
FSBuilderSupport c = builder()
|
||||
.optLong("-1", -1)
|
||||
.mustLong("-2", -2)
|
||||
.build();
|
||||
|
||||
// getLong gets the long value
|
||||
assertThat(c.getLong("-1", 2))
|
||||
.isEqualTo(-1);
|
||||
|
||||
|
||||
// but getPositiveLong returns the positive default
|
||||
assertThat(c.getPositiveLong("-1", 2))
|
||||
.isEqualTo(2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBoolean() throws Throwable {
|
||||
final FSBuilderSupport c = builder()
|
||||
.opt("f", false)
|
||||
.opt("t", true)
|
||||
.opt("o", "other")
|
||||
.build();
|
||||
assertThat(c.getOptions().getBoolean("f", true))
|
||||
.isEqualTo(false);
|
||||
assertThat(c.getOptions().getBoolean("t", false))
|
||||
.isEqualTo(true);
|
||||
// this is handled in Configuration itself.
|
||||
assertThat(c.getOptions().getBoolean("o", true))
|
||||
.isEqualTo(true);
|
||||
}
|
||||
|
||||
private SimpleBuilder builder() {
|
||||
return new BuilderImpl();
|
||||
}
|
||||
|
||||
private interface SimpleBuilder
|
||||
extends FSBuilder<FSBuilderSupport, SimpleBuilder> {
|
||||
}
|
||||
|
||||
private static final class BuilderImpl
|
||||
extends AbstractFSBuilderImpl<FSBuilderSupport, SimpleBuilder>
|
||||
implements SimpleBuilder {
|
||||
|
||||
private BuilderImpl() {
|
||||
super(new Path("/"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public FSBuilderSupport build()
|
||||
throws IOException {
|
||||
return new FSBuilderSupport(getOptions());
|
||||
}
|
||||
}
|
||||
}
|
@ -114,8 +114,8 @@ public LineRecordReader(Configuration job, FileSplit split,
|
||||
file.getFileSystem(job).openFile(file);
|
||||
// the start and end of the split may be used to build
|
||||
// an input strategy.
|
||||
builder.opt(FS_OPTION_OPENFILE_SPLIT_START, start)
|
||||
.opt(FS_OPTION_OPENFILE_SPLIT_END, end);
|
||||
builder.optLong(FS_OPTION_OPENFILE_SPLIT_START, start)
|
||||
.optLong(FS_OPTION_OPENFILE_SPLIT_END, end);
|
||||
FutureIO.propagateOptions(builder, job,
|
||||
MRJobConfig.INPUT_FILE_OPTION_PREFIX,
|
||||
MRJobConfig.INPUT_FILE_MANDATORY_PREFIX);
|
||||
|
@ -92,8 +92,8 @@ public void initialize(InputSplit genericSplit,
|
||||
file.getFileSystem(job).openFile(file);
|
||||
// the start and end of the split may be used to build
|
||||
// an input strategy.
|
||||
builder.opt(FS_OPTION_OPENFILE_SPLIT_START, start);
|
||||
builder.opt(FS_OPTION_OPENFILE_SPLIT_END, end);
|
||||
builder.optLong(FS_OPTION_OPENFILE_SPLIT_START, start);
|
||||
builder.optLong(FS_OPTION_OPENFILE_SPLIT_END, end);
|
||||
FutureIO.propagateOptions(builder, job,
|
||||
MRJobConfig.INPUT_FILE_OPTION_PREFIX,
|
||||
MRJobConfig.INPUT_FILE_MANDATORY_PREFIX);
|
||||
|
@ -236,8 +236,8 @@ public void initialize(InputSplit split, TaskAttemptContext context)
|
||||
offset = (RECORD_LENGTH - (start % RECORD_LENGTH)) % RECORD_LENGTH;
|
||||
length = ((FileSplit)split).getLength();
|
||||
final FutureDataInputStreamBuilder builder = fs.openFile(p)
|
||||
.opt(FS_OPTION_OPENFILE_SPLIT_START, start)
|
||||
.opt(FS_OPTION_OPENFILE_SPLIT_END, start + length)
|
||||
.optLong(FS_OPTION_OPENFILE_SPLIT_START, start)
|
||||
.optLong(FS_OPTION_OPENFILE_SPLIT_END, start + length)
|
||||
.opt(FS_OPTION_OPENFILE_READ_POLICY,
|
||||
FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL);
|
||||
in = FutureIO.awaitFuture(builder.build());
|
||||
|
@ -29,6 +29,7 @@
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.impl.FSBuilderSupport;
|
||||
import org.apache.hadoop.fs.impl.OpenFileParameters;
|
||||
import org.apache.hadoop.fs.s3a.S3AFileStatus;
|
||||
import org.apache.hadoop.fs.s3a.S3AInputPolicy;
|
||||
@ -246,12 +247,14 @@ public OpenFileInformation prepareToOpenFile(
|
||||
// set the end of the read to the file length
|
||||
fileLength = fileStatus.getLen();
|
||||
}
|
||||
FSBuilderSupport builderSupport = new FSBuilderSupport(options);
|
||||
// determine start and end of file.
|
||||
long splitStart = options.getLong(FS_OPTION_OPENFILE_SPLIT_START, 0);
|
||||
long splitStart = builderSupport.getPositiveLong(FS_OPTION_OPENFILE_SPLIT_START, 0);
|
||||
|
||||
// split end
|
||||
long splitEnd = options.getLong(FS_OPTION_OPENFILE_SPLIT_END,
|
||||
LENGTH_UNKNOWN);
|
||||
long splitEnd = builderSupport.getLong(
|
||||
FS_OPTION_OPENFILE_SPLIT_END, LENGTH_UNKNOWN);
|
||||
|
||||
if (splitStart > 0 && splitStart > splitEnd) {
|
||||
LOG.warn("Split start {} is greater than split end {}, resetting",
|
||||
splitStart, splitEnd);
|
||||
@ -259,7 +262,7 @@ public OpenFileInformation prepareToOpenFile(
|
||||
}
|
||||
|
||||
// read end is the open file value
|
||||
fileLength = options.getLong(FS_OPTION_OPENFILE_LENGTH, fileLength);
|
||||
fileLength = builderSupport.getPositiveLong(FS_OPTION_OPENFILE_LENGTH, fileLength);
|
||||
|
||||
// if the read end has come from options, use that
|
||||
// in creating a file status
|
||||
@ -281,16 +284,17 @@ public OpenFileInformation prepareToOpenFile(
|
||||
.withS3Select(isSelect)
|
||||
.withSql(sql)
|
||||
.withAsyncDrainThreshold(
|
||||
options.getLong(ASYNC_DRAIN_THRESHOLD,
|
||||
builderSupport.getPositiveLong(ASYNC_DRAIN_THRESHOLD,
|
||||
defaultReadAhead))
|
||||
.withBufferSize(
|
||||
options.getInt(FS_OPTION_OPENFILE_BUFFER_SIZE, defaultBufferSize))
|
||||
(int)builderSupport.getPositiveLong(
|
||||
FS_OPTION_OPENFILE_BUFFER_SIZE, defaultBufferSize))
|
||||
.withChangePolicy(changePolicy)
|
||||
.withFileLength(fileLength)
|
||||
.withInputPolicy(
|
||||
S3AInputPolicy.getFirstSupportedPolicy(policies, defaultInputPolicy))
|
||||
.withReadAheadRange(
|
||||
options.getLong(READAHEAD_RANGE, defaultReadAhead))
|
||||
builderSupport.getPositiveLong(READAHEAD_RANGE, defaultReadAhead))
|
||||
.withSplitStart(splitStart)
|
||||
.withSplitEnd(splitEnd)
|
||||
.withStatus(fileStatus)
|
||||
|
@ -141,7 +141,7 @@ public void testOpenFileShorterLength() throws Throwable {
|
||||
fs.openFile(testFile)
|
||||
.must(FS_OPTION_OPENFILE_READ_POLICY,
|
||||
FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)
|
||||
.opt(FS_OPTION_OPENFILE_LENGTH, shortLen)
|
||||
.mustLong(FS_OPTION_OPENFILE_LENGTH, shortLen)
|
||||
.build()
|
||||
.get(),
|
||||
always(NO_HEAD_OR_LIST),
|
||||
@ -183,7 +183,7 @@ public void testOpenFileLongerLength() throws Throwable {
|
||||
fs.openFile(testFile)
|
||||
.must(FS_OPTION_OPENFILE_READ_POLICY,
|
||||
FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)
|
||||
.must(FS_OPTION_OPENFILE_LENGTH, longLen)
|
||||
.mustLong(FS_OPTION_OPENFILE_LENGTH, longLen)
|
||||
.build()
|
||||
.get(),
|
||||
always(NO_HEAD_OR_LIST));
|
||||
|
@ -160,7 +160,7 @@ public void testUnbufferDraining() throws Throwable {
|
||||
int offset = FILE_SIZE - READAHEAD + 1;
|
||||
try (FSDataInputStream in = getBrittleFS().openFile(st.getPath())
|
||||
.withFileStatus(st)
|
||||
.must(ASYNC_DRAIN_THRESHOLD, 1)
|
||||
.mustLong(ASYNC_DRAIN_THRESHOLD, 1)
|
||||
.build().get()) {
|
||||
describe("Initiating unbuffer with async drain\n");
|
||||
for (int i = 0; i < ATTEMPTS; i++) {
|
||||
@ -235,9 +235,11 @@ public void testUnbufferAborting() throws Throwable {
|
||||
// open the file at the beginning with a whole file read policy,
|
||||
// so even with s3a switching to random on unbuffer,
|
||||
// this always does a full GET
|
||||
// also provide a floating point string for the threshold, to
|
||||
// verify it is safely parsed
|
||||
try (FSDataInputStream in = getBrittleFS().openFile(st.getPath())
|
||||
.withFileStatus(st)
|
||||
.must(ASYNC_DRAIN_THRESHOLD, 1)
|
||||
.must(ASYNC_DRAIN_THRESHOLD, "1.0")
|
||||
.must(FS_OPTION_OPENFILE_READ_POLICY,
|
||||
FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE)
|
||||
.build().get()) {
|
||||
|
@ -219,11 +219,10 @@ private FSDataInputStream openDataFile(S3AFileSystem fs,
|
||||
final FutureDataInputStreamBuilder builder = fs.openFile(path)
|
||||
.opt(FS_OPTION_OPENFILE_READ_POLICY,
|
||||
inputPolicy.toString())
|
||||
.opt(FS_OPTION_OPENFILE_LENGTH, length)
|
||||
.opt(FS_OPTION_OPENFILE_BUFFER_SIZE, bufferSize);
|
||||
if (readahead > 0) {
|
||||
builder.opt(READAHEAD_RANGE, readahead);
|
||||
}
|
||||
.optLong(FS_OPTION_OPENFILE_LENGTH, length)
|
||||
.opt(FS_OPTION_OPENFILE_BUFFER_SIZE, bufferSize)
|
||||
.optLong(READAHEAD_RANGE, readahead);
|
||||
|
||||
FSDataInputStream stream = awaitFuture(builder.build());
|
||||
streamStatistics = getInputStreamStatistics(stream);
|
||||
return stream;
|
||||
|
@ -38,7 +38,6 @@
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.Stack;
|
||||
@ -93,6 +92,7 @@
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS;
|
||||
import static org.apache.hadoop.fs.azure.NativeAzureFileSystemHelper.*;
|
||||
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
|
||||
|
||||
@ -3149,7 +3149,7 @@ protected CompletableFuture<FSDataInputStream> openFileWithOptions(Path path,
|
||||
OpenFileParameters parameters) throws IOException {
|
||||
AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(
|
||||
parameters.getMandatoryKeys(),
|
||||
Collections.emptySet(),
|
||||
FS_OPTION_OPENFILE_STANDARD_OPTIONS,
|
||||
"for " + path);
|
||||
return LambdaUtils.eval(
|
||||
new CompletableFuture<>(), () ->
|
||||
|
@ -31,7 +31,6 @@
|
||||
import java.util.Hashtable;
|
||||
import java.util.List;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
@ -112,6 +111,7 @@
|
||||
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL;
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL_DEFAULT;
|
||||
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS;
|
||||
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.*;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.DATA_BLOCKS_BUFFER;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOCK_UPLOAD_ACTIVE_BLOCKS;
|
||||
@ -296,7 +296,7 @@ protected CompletableFuture<FSDataInputStream> openFileWithOptions(
|
||||
LOG.debug("AzureBlobFileSystem.openFileWithOptions path: {}", path);
|
||||
AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(
|
||||
parameters.getMandatoryKeys(),
|
||||
Collections.emptySet(),
|
||||
FS_OPTION_OPENFILE_STANDARD_OPTIONS,
|
||||
"for " + path);
|
||||
return LambdaUtils.eval(
|
||||
new CompletableFuture<>(), () ->
|
||||
|
@ -587,7 +587,7 @@ public LogReader(Configuration conf, Path remoteAppLogFile)
|
||||
fileContext.openFile(remoteAppLogFile)
|
||||
.opt(FS_OPTION_OPENFILE_READ_POLICY,
|
||||
FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)
|
||||
.opt(FS_OPTION_OPENFILE_LENGTH,
|
||||
.optLong(FS_OPTION_OPENFILE_LENGTH,
|
||||
status.getLen()) // file length hint for object stores
|
||||
.build());
|
||||
reader = new TFile.Reader(this.fsDataIStream,
|
||||
|
Loading…
Reference in New Issue
Block a user