HADOOP-18724. Open file fails with NumberFormatException for S3AFileSystem (#5611)
This: 1. Adds optLong, optDouble, mustLong and mustDouble methods to the FSBuilder interface to let callers explicitly passin long and double arguments. 2. The opt() and must() builder calls which take float/double values now only set long values instead, so as to avoid problems related to overloaded methods resulting in a ".0" being appended to a long value. 3. All of the relevant opt/must calls in the hadoop codebase move to the new methods 4. And the s3a code is resilient to parse errors in is numeric options -it will downgrade to the default. This is nominally incompatible, but the floating-point builder methods were never used: nothing currently expects floating point numbers. For anyone who wants to safely set numeric builder options across all compatible releases, convert the number to a string and then use the opt(String, String) and must(String, String) methods. Contributed by Steve Loughran
This commit is contained in:
parent
fe61d8f073
commit
e76c09ac3b
@ -28,6 +28,34 @@
|
|||||||
* The base interface which various FileSystem FileContext Builder
|
* The base interface which various FileSystem FileContext Builder
|
||||||
* interfaces can extend, and which underlying implementations
|
* interfaces can extend, and which underlying implementations
|
||||||
* will then implement.
|
* will then implement.
|
||||||
|
* <p>
|
||||||
|
* HADOOP-16202 expanded the opt() and must() arguments with
|
||||||
|
* operator overloading, but HADOOP-18724 identified mapping problems:
|
||||||
|
* passing a long value in to {@code opt()} could end up invoking
|
||||||
|
* {@code opt(string, double)}, which could then trigger parse failures.
|
||||||
|
* <p>
|
||||||
|
* To fix this without forcing existing code to break/be recompiled.
|
||||||
|
* <ol>
|
||||||
|
* <li>A new method to explicitly set a long value is added:
|
||||||
|
* {@link #optLong(String, long)}
|
||||||
|
* </li>
|
||||||
|
* <li>A new method to explicitly set a double value is added:
|
||||||
|
* {@link #optLong(String, long)}
|
||||||
|
* </li>
|
||||||
|
* <li>
|
||||||
|
* All of {@link #opt(String, long)}, {@link #opt(String, float)} and
|
||||||
|
* {@link #opt(String, double)} invoke {@link #optLong(String, long)}.
|
||||||
|
* </li>
|
||||||
|
* <li>
|
||||||
|
* The same changes have been applied to {@code must()} methods.
|
||||||
|
* </li>
|
||||||
|
* </ol>
|
||||||
|
* The forwarding of existing double/float setters to the long setters ensure
|
||||||
|
* that existing code will link, but are guaranteed to always set a long value.
|
||||||
|
* If you need to write code which works correctly with all hadoop releases,
|
||||||
|
* covert the option to a string explicitly and then call {@link #opt(String, String)}
|
||||||
|
* or {@link #must(String, String)} as appropriate.
|
||||||
|
*
|
||||||
* @param <S> Return type on the {@link #build()} call.
|
* @param <S> Return type on the {@link #build()} call.
|
||||||
* @param <B> type of builder itself.
|
* @param <B> type of builder itself.
|
||||||
*/
|
*/
|
||||||
@ -63,13 +91,17 @@ public interface FSBuilder<S, B extends FSBuilder<S, B>> {
|
|||||||
B opt(@Nonnull String key, int value);
|
B opt(@Nonnull String key, int value);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set optional float parameter for the Builder.
|
* This parameter is converted to a long and passed
|
||||||
|
* to {@link #optLong(String, long)} -all
|
||||||
|
* decimal precision is lost.
|
||||||
*
|
*
|
||||||
* @param key key.
|
* @param key key.
|
||||||
* @param value value.
|
* @param value value.
|
||||||
* @return generic type B.
|
* @return generic type B.
|
||||||
* @see #opt(String, String)
|
* @see #opt(String, String)
|
||||||
|
* @deprecated use {@link #optDouble(String, double)}
|
||||||
*/
|
*/
|
||||||
|
@Deprecated
|
||||||
B opt(@Nonnull String key, float value);
|
B opt(@Nonnull String key, float value);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -78,18 +110,22 @@ public interface FSBuilder<S, B extends FSBuilder<S, B>> {
|
|||||||
* @param key key.
|
* @param key key.
|
||||||
* @param value value.
|
* @param value value.
|
||||||
* @return generic type B.
|
* @return generic type B.
|
||||||
* @see #opt(String, String)
|
* @deprecated use {@link #optLong(String, long)} where possible.
|
||||||
*/
|
*/
|
||||||
B opt(@Nonnull String key, long value);
|
B opt(@Nonnull String key, long value);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set optional double parameter for the Builder.
|
* Pass an optional double parameter for the Builder.
|
||||||
*
|
* This parameter is converted to a long and passed
|
||||||
|
* to {@link #optLong(String, long)} -all
|
||||||
|
* decimal precision is lost.
|
||||||
* @param key key.
|
* @param key key.
|
||||||
* @param value value.
|
* @param value value.
|
||||||
* @return generic type B.
|
* @return generic type B.
|
||||||
* @see #opt(String, String)
|
* @see #opt(String, String)
|
||||||
|
* @deprecated use {@link #optDouble(String, double)}
|
||||||
*/
|
*/
|
||||||
|
@Deprecated
|
||||||
B opt(@Nonnull String key, double value);
|
B opt(@Nonnull String key, double value);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -102,6 +138,28 @@ public interface FSBuilder<S, B extends FSBuilder<S, B>> {
|
|||||||
*/
|
*/
|
||||||
B opt(@Nonnull String key, @Nonnull String... values);
|
B opt(@Nonnull String key, @Nonnull String... values);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set optional long parameter for the Builder.
|
||||||
|
*
|
||||||
|
* @param key key.
|
||||||
|
* @param value value.
|
||||||
|
* @return generic type B.
|
||||||
|
* @see #opt(String, String)
|
||||||
|
*/
|
||||||
|
@Deprecated
|
||||||
|
B optLong(@Nonnull String key, long value);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set optional double parameter for the Builder.
|
||||||
|
*
|
||||||
|
* @param key key.
|
||||||
|
* @param value value.
|
||||||
|
* @return generic type B.
|
||||||
|
* @see #opt(String, String)
|
||||||
|
*/
|
||||||
|
@Deprecated
|
||||||
|
B optDouble(@Nonnull String key, double value);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set mandatory option to the Builder.
|
* Set mandatory option to the Builder.
|
||||||
*
|
*
|
||||||
@ -135,13 +193,16 @@ public interface FSBuilder<S, B extends FSBuilder<S, B>> {
|
|||||||
B must(@Nonnull String key, int value);
|
B must(@Nonnull String key, int value);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set mandatory float option.
|
* This parameter is converted to a long and passed
|
||||||
|
* to {@link #mustLong(String, long)} -all
|
||||||
|
* decimal precision is lost.
|
||||||
*
|
*
|
||||||
* @param key key.
|
* @param key key.
|
||||||
* @param value value.
|
* @param value value.
|
||||||
* @return generic type B.
|
* @return generic type B.
|
||||||
* @see #must(String, String)
|
* @deprecated use {@link #mustDouble(String, double)} to set floating point.
|
||||||
*/
|
*/
|
||||||
|
@Deprecated
|
||||||
B must(@Nonnull String key, float value);
|
B must(@Nonnull String key, float value);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -152,16 +213,19 @@ public interface FSBuilder<S, B extends FSBuilder<S, B>> {
|
|||||||
* @return generic type B.
|
* @return generic type B.
|
||||||
* @see #must(String, String)
|
* @see #must(String, String)
|
||||||
*/
|
*/
|
||||||
|
@Deprecated
|
||||||
B must(@Nonnull String key, long value);
|
B must(@Nonnull String key, long value);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set mandatory double option.
|
* Set mandatory long option, despite passing in a floating
|
||||||
|
* point value.
|
||||||
*
|
*
|
||||||
* @param key key.
|
* @param key key.
|
||||||
* @param value value.
|
* @param value value.
|
||||||
* @return generic type B.
|
* @return generic type B.
|
||||||
* @see #must(String, String)
|
* @see #must(String, String)
|
||||||
*/
|
*/
|
||||||
|
@Deprecated
|
||||||
B must(@Nonnull String key, double value);
|
B must(@Nonnull String key, double value);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -174,6 +238,26 @@ public interface FSBuilder<S, B extends FSBuilder<S, B>> {
|
|||||||
*/
|
*/
|
||||||
B must(@Nonnull String key, @Nonnull String... values);
|
B must(@Nonnull String key, @Nonnull String... values);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set mandatory long parameter for the Builder.
|
||||||
|
*
|
||||||
|
* @param key key.
|
||||||
|
* @param value value.
|
||||||
|
* @return generic type B.
|
||||||
|
* @see #opt(String, String)
|
||||||
|
*/
|
||||||
|
B mustLong(@Nonnull String key, long value);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set mandatory double parameter for the Builder.
|
||||||
|
*
|
||||||
|
* @param key key.
|
||||||
|
* @param value value.
|
||||||
|
* @return generic type B.
|
||||||
|
* @see #opt(String, String)
|
||||||
|
*/
|
||||||
|
B mustDouble(@Nonnull String key, double value);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Instantiate the object which was being built.
|
* Instantiate the object which was being built.
|
||||||
*
|
*
|
||||||
|
@ -44,11 +44,13 @@
|
|||||||
* with option support.
|
* with option support.
|
||||||
*
|
*
|
||||||
* <code>
|
* <code>
|
||||||
* .opt("foofs:option.a", true)
|
* .opt("fs.s3a.open.option.caching", true)
|
||||||
* .opt("foofs:option.b", "value")
|
* .opt("fs.option.openfile.read.policy", "random, adaptive")
|
||||||
* .opt("fs.s3a.open.option.etag", "9fe4c37c25b")
|
* .opt("fs.s3a.open.option.etag", "9fe4c37c25b")
|
||||||
* .must("foofs:cache", true)
|
* .optLong("fs.option.openfile.length", 1_500_000_000_000)
|
||||||
* .must("barfs:cache-size", 256 * 1024 * 1024)
|
* .must("fs.option.openfile.buffer.size", 256_000)
|
||||||
|
* .mustLong("fs.option.openfile.split.start", 256_000_000)
|
||||||
|
* .mustLong("fs.option.openfile.split.end", 512_000_000)
|
||||||
* .build();
|
* .build();
|
||||||
* </code>
|
* </code>
|
||||||
*
|
*
|
||||||
@ -64,6 +66,7 @@
|
|||||||
*/
|
*/
|
||||||
@InterfaceAudience.Public
|
@InterfaceAudience.Public
|
||||||
@InterfaceStability.Unstable
|
@InterfaceStability.Unstable
|
||||||
|
@SuppressWarnings({"deprecation", "unused"})
|
||||||
public abstract class
|
public abstract class
|
||||||
AbstractFSBuilderImpl<S, B extends FSBuilder<S, B>>
|
AbstractFSBuilderImpl<S, B extends FSBuilder<S, B>>
|
||||||
implements FSBuilder<S, B> {
|
implements FSBuilder<S, B> {
|
||||||
@ -178,10 +181,7 @@ public B opt(@Nonnull final String key, @Nonnull final String value) {
|
|||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public B opt(@Nonnull final String key, boolean value) {
|
public B opt(@Nonnull final String key, boolean value) {
|
||||||
mandatoryKeys.remove(key);
|
return opt(key, Boolean.toString(value));
|
||||||
optionalKeys.add(key);
|
|
||||||
options.setBoolean(key, value);
|
|
||||||
return getThisBuilder();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -190,19 +190,18 @@ public B opt(@Nonnull final String key, boolean value) {
|
|||||||
* @see #opt(String, String)
|
* @see #opt(String, String)
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public B opt(@Nonnull final String key, int value) {
|
public final B opt(@Nonnull final String key, int value) {
|
||||||
mandatoryKeys.remove(key);
|
return optLong(key, value);
|
||||||
optionalKeys.add(key);
|
|
||||||
options.setInt(key, value);
|
|
||||||
return getThisBuilder();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public B opt(@Nonnull final String key, final long value) {
|
public final B opt(@Nonnull final String key, final long value) {
|
||||||
mandatoryKeys.remove(key);
|
return optLong(key, value);
|
||||||
optionalKeys.add(key);
|
}
|
||||||
options.setLong(key, value);
|
|
||||||
return getThisBuilder();
|
@Override
|
||||||
|
public B optLong(@Nonnull final String key, final long value) {
|
||||||
|
return opt(key, Long.toString(value));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -211,11 +210,8 @@ public B opt(@Nonnull final String key, final long value) {
|
|||||||
* @see #opt(String, String)
|
* @see #opt(String, String)
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public B opt(@Nonnull final String key, float value) {
|
public final B opt(@Nonnull final String key, float value) {
|
||||||
mandatoryKeys.remove(key);
|
return optLong(key, (long) value);
|
||||||
optionalKeys.add(key);
|
|
||||||
options.setFloat(key, value);
|
|
||||||
return getThisBuilder();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -224,11 +220,18 @@ public B opt(@Nonnull final String key, float value) {
|
|||||||
* @see #opt(String, String)
|
* @see #opt(String, String)
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public B opt(@Nonnull final String key, double value) {
|
public final B opt(@Nonnull final String key, double value) {
|
||||||
mandatoryKeys.remove(key);
|
return optLong(key, (long) value);
|
||||||
optionalKeys.add(key);
|
}
|
||||||
options.setDouble(key, value);
|
|
||||||
return getThisBuilder();
|
/**
|
||||||
|
* Set optional double parameter for the Builder.
|
||||||
|
*
|
||||||
|
* @see #opt(String, String)
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public B optDouble(@Nonnull final String key, double value) {
|
||||||
|
return opt(key, Double.toString(value));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -264,10 +267,22 @@ public B must(@Nonnull final String key, @Nonnull final String value) {
|
|||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public B must(@Nonnull final String key, boolean value) {
|
public B must(@Nonnull final String key, boolean value) {
|
||||||
mandatoryKeys.add(key);
|
return must(key, Boolean.toString(value));
|
||||||
optionalKeys.remove(key);
|
}
|
||||||
options.setBoolean(key, value);
|
|
||||||
return getThisBuilder();
|
@Override
|
||||||
|
public B mustLong(@Nonnull final String key, final long value) {
|
||||||
|
return must(key, Long.toString(value));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set optional double parameter for the Builder.
|
||||||
|
*
|
||||||
|
* @see #opt(String, String)
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public B mustDouble(@Nonnull final String key, double value) {
|
||||||
|
return must(key, Double.toString(value));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -276,45 +291,23 @@ public B must(@Nonnull final String key, boolean value) {
|
|||||||
* @see #must(String, String)
|
* @see #must(String, String)
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public B must(@Nonnull final String key, int value) {
|
public final B must(@Nonnull final String key, int value) {
|
||||||
mandatoryKeys.add(key);
|
return mustLong(key, value);
|
||||||
optionalKeys.remove(key);
|
|
||||||
options.setInt(key, value);
|
|
||||||
return getThisBuilder();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public B must(@Nonnull final String key, final long value) {
|
public final B must(@Nonnull final String key, final long value) {
|
||||||
mandatoryKeys.add(key);
|
return mustLong(key, value);
|
||||||
optionalKeys.remove(key);
|
|
||||||
options.setLong(key, value);
|
|
||||||
return getThisBuilder();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Set mandatory float option.
|
|
||||||
*
|
|
||||||
* @see #must(String, String)
|
|
||||||
*/
|
|
||||||
@Override
|
@Override
|
||||||
public B must(@Nonnull final String key, float value) {
|
public final B must(@Nonnull final String key, final float value) {
|
||||||
mandatoryKeys.add(key);
|
return mustLong(key, (long) value);
|
||||||
optionalKeys.remove(key);
|
|
||||||
options.setFloat(key, value);
|
|
||||||
return getThisBuilder();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Set mandatory double option.
|
|
||||||
*
|
|
||||||
* @see #must(String, String)
|
|
||||||
*/
|
|
||||||
@Override
|
@Override
|
||||||
public B must(@Nonnull final String key, double value) {
|
public final B must(@Nonnull final String key, double value) {
|
||||||
mandatoryKeys.add(key);
|
return mustLong(key, (long) value);
|
||||||
optionalKeys.remove(key);
|
|
||||||
options.setDouble(key, value);
|
|
||||||
return getThisBuilder();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -0,0 +1,95 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.fs.impl;
|
||||||
|
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.store.LogExactlyOnce;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Class to help with use of FSBuilder.
|
||||||
|
*/
|
||||||
|
public class FSBuilderSupport {
|
||||||
|
|
||||||
|
private static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(FSBuilderSupport.class);
|
||||||
|
|
||||||
|
public static final LogExactlyOnce LOG_PARSE_ERROR = new LogExactlyOnce(LOG);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Options which are parsed.
|
||||||
|
*/
|
||||||
|
private final Configuration options;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor.
|
||||||
|
* @param options the configuration options from the builder.
|
||||||
|
*/
|
||||||
|
public FSBuilderSupport(final Configuration options) {
|
||||||
|
this.options = options;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Configuration getOptions() {
|
||||||
|
return options;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get a long value with resilience to unparseable values.
|
||||||
|
* Negative values are replaced with the default.
|
||||||
|
* @param key key to log
|
||||||
|
* @param defVal default value
|
||||||
|
* @return long value
|
||||||
|
*/
|
||||||
|
public long getPositiveLong(String key, long defVal) {
|
||||||
|
long l = getLong(key, defVal);
|
||||||
|
if (l < 0) {
|
||||||
|
LOG.debug("The option {} has a negative value {}, replacing with the default {}",
|
||||||
|
key, l, defVal);
|
||||||
|
l = defVal;
|
||||||
|
}
|
||||||
|
return l;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get a long value with resilience to unparseable values.
|
||||||
|
* @param key key to log
|
||||||
|
* @param defVal default value
|
||||||
|
* @return long value
|
||||||
|
*/
|
||||||
|
public long getLong(String key, long defVal) {
|
||||||
|
final String v = options.getTrimmed(key, "");
|
||||||
|
if (v.isEmpty()) {
|
||||||
|
return defVal;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
return options.getLong(key, defVal);
|
||||||
|
} catch (NumberFormatException e) {
|
||||||
|
final String msg = String.format(
|
||||||
|
"The option %s value \"%s\" is not a long integer; using the default value %s",
|
||||||
|
key, v, defVal);
|
||||||
|
// not a long,
|
||||||
|
LOG_PARSE_ERROR.warn(msg);
|
||||||
|
LOG.debug("{}", msg, e);
|
||||||
|
return defVal;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -633,7 +633,7 @@ protected FSDataInputStream openFile(final String policy) throws IOException {
|
|||||||
return awaitFuture(fs.openFile(path)
|
return awaitFuture(fs.openFile(path)
|
||||||
.opt(FS_OPTION_OPENFILE_READ_POLICY,
|
.opt(FS_OPTION_OPENFILE_READ_POLICY,
|
||||||
policy)
|
policy)
|
||||||
.opt(FS_OPTION_OPENFILE_LENGTH,
|
.optLong(FS_OPTION_OPENFILE_LENGTH,
|
||||||
stat.getLen()) // file length hint for object stores
|
stat.getLen()) // file length hint for object stores
|
||||||
.build());
|
.build());
|
||||||
}
|
}
|
||||||
|
@ -2006,7 +2006,7 @@ protected FSDataInputStream openFile(FileSystem fs, Path file,
|
|||||||
FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)
|
FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)
|
||||||
.opt(FS_OPTION_OPENFILE_BUFFER_SIZE, bufferSize);
|
.opt(FS_OPTION_OPENFILE_BUFFER_SIZE, bufferSize);
|
||||||
if (length >= 0) {
|
if (length >= 0) {
|
||||||
builder.opt(FS_OPTION_OPENFILE_LENGTH, length);
|
builder.optLong(FS_OPTION_OPENFILE_LENGTH, length);
|
||||||
}
|
}
|
||||||
return awaitFuture(builder.build());
|
return awaitFuture(builder.build());
|
||||||
}
|
}
|
||||||
|
@ -25,6 +25,55 @@ references to `FSDataInputStream` and its subclasses.
|
|||||||
It is used to initate a (potentially asynchronous) operation to open an existing
|
It is used to initate a (potentially asynchronous) operation to open an existing
|
||||||
file for reading.
|
file for reading.
|
||||||
|
|
||||||
|
|
||||||
|
## <a name="History"></a> History
|
||||||
|
|
||||||
|
### Hadoop 3.3.0: API introduced
|
||||||
|
|
||||||
|
[HADOOP-15229](https://issues.apache.org/jira/browse/HADOOP-15229)
|
||||||
|
_Add FileSystem builder-based openFile() API to match createFile()_
|
||||||
|
|
||||||
|
* No `opt(String key, long value)` method was available.
|
||||||
|
* the `withFileStatus(status)` call required a non-null parameter.
|
||||||
|
* Sole Filesystem to process options and file status was S3A;
|
||||||
|
* Only the s3a specific options were the S3 select and `fs.s3a.experimental.input.fadvise`
|
||||||
|
* S3A Filesystem raised `IllegalArgumentException` if a file status was passed in
|
||||||
|
and the path of the filestatus did not match the path of the `openFile(path)` call.
|
||||||
|
|
||||||
|
This is the baseline implementation. To write code guaranteed to compile against this version,
|
||||||
|
use the `opt(String, String)` and `must(String, String)` methods, converting numbers to
|
||||||
|
string explicitly.
|
||||||
|
|
||||||
|
```java
|
||||||
|
fs.open("s3a://bucket/file")
|
||||||
|
.opt("fs.option.openfile.length", Long.toString(length))
|
||||||
|
.build().get()
|
||||||
|
```
|
||||||
|
|
||||||
|
### Hadoop 3.3.5: standardization and expansion
|
||||||
|
|
||||||
|
[HADOOP-16202](https://issues.apache.org/jira/browse/HADOOP-16202)
|
||||||
|
_Enhance openFile() for better read performance against object stores_
|
||||||
|
|
||||||
|
* `withFileStatus(null)` required to be accepted (and ignored)
|
||||||
|
* only the filename part of any supplied FileStatus path must match the
|
||||||
|
filename passed in on `openFile(path)`.
|
||||||
|
* An `opt(String key, long value)` option was added. *This is now deprecated as it
|
||||||
|
caused regression
|
||||||
|
* Standard `fs.option.openfile` options defined.
|
||||||
|
* S3A FS to use openfile length option, seek start/end options not _yet_ used.
|
||||||
|
* Azure ABFS connector takes a supplied `VersionedFileStatus` and omits any
|
||||||
|
HEAD probe for the object.
|
||||||
|
|
||||||
|
### Hadoop 3.3.6: API change to address operator overload bugs.
|
||||||
|
|
||||||
|
new `optLong()`, `optDouble()`, `mustLong()` and `mustDouble()` builder methods.
|
||||||
|
|
||||||
|
* See [HADOOP-18724](https://issues.apache.org/jira/browse/HADOOP-18724) _Open file fails with NumberFormatException for S3AFileSystem_,
|
||||||
|
which was somehow caused by the overloaded `opt(long)`.
|
||||||
|
* Specification updated to declare that unparseable numbers MUST be treated as "unset" and the default
|
||||||
|
value used instead.
|
||||||
|
|
||||||
## Invariants
|
## Invariants
|
||||||
|
|
||||||
The `FutureDataInputStreamBuilder` interface does not require parameters or
|
The `FutureDataInputStreamBuilder` interface does not require parameters or
|
||||||
@ -36,7 +85,7 @@ Some aspects of the state of the filesystem, MAY be checked in the initial
|
|||||||
change between `openFile()` and the `build().get()` sequence. For example,
|
change between `openFile()` and the `build().get()` sequence. For example,
|
||||||
path validation.
|
path validation.
|
||||||
|
|
||||||
## Implementation-agnostic parameters.
|
## <a name="parameters"></a> `Implementation-agnostic parameters.
|
||||||
|
|
||||||
|
|
||||||
### <a name="Builder.bufferSize"></a> `FutureDataInputStreamBuilder bufferSize(int bufSize)`
|
### <a name="Builder.bufferSize"></a> `FutureDataInputStreamBuilder bufferSize(int bufSize)`
|
||||||
@ -89,10 +138,20 @@ operations. This is to support wrapper filesystems and serialization/deserializa
|
|||||||
of the status.
|
of the status.
|
||||||
|
|
||||||
|
|
||||||
### Set optional or mandatory parameters
|
### <a name="optional"></a> Set optional or mandatory parameters
|
||||||
|
|
||||||
FutureDataInputStreamBuilder opt(String key, ...)
|
```java
|
||||||
FutureDataInputStreamBuilder must(String key, ...)
|
FutureDataInputStreamBuilder opt(String key, String value)
|
||||||
|
FutureDataInputStreamBuilder opt(String key, int value)
|
||||||
|
FutureDataInputStreamBuilder opt(String key, boolean value)
|
||||||
|
FutureDataInputStreamBuilder optLong(String key, long value)
|
||||||
|
FutureDataInputStreamBuilder optDouble(String key, double value)
|
||||||
|
FutureDataInputStreamBuilder must(String key, String value)
|
||||||
|
FutureDataInputStreamBuilder must(String key, int value)
|
||||||
|
FutureDataInputStreamBuilder must(String key, boolean value)
|
||||||
|
FutureDataInputStreamBuilder mustLong(String key, long value)
|
||||||
|
FutureDataInputStreamBuilder mustDouble(String key, double value)
|
||||||
|
```
|
||||||
|
|
||||||
Set optional or mandatory parameters to the builder. Using `opt()` or `must()`,
|
Set optional or mandatory parameters to the builder. Using `opt()` or `must()`,
|
||||||
client can specify FS-specific parameters without inspecting the concrete type
|
client can specify FS-specific parameters without inspecting the concrete type
|
||||||
@ -103,7 +162,7 @@ Example:
|
|||||||
```java
|
```java
|
||||||
out = fs.openFile(path)
|
out = fs.openFile(path)
|
||||||
.must("fs.option.openfile.read.policy", "random")
|
.must("fs.option.openfile.read.policy", "random")
|
||||||
.opt("fs.http.connection.timeout", 30_000L)
|
.optLong("fs.http.connection.timeout", 30_000L)
|
||||||
.withFileStatus(statusFromListing)
|
.withFileStatus(statusFromListing)
|
||||||
.build()
|
.build()
|
||||||
.get();
|
.get();
|
||||||
@ -115,9 +174,9 @@ An http-specific option has been supplied which may be interpreted by any store;
|
|||||||
If the filesystem opening the file does not recognize the option, it can safely be
|
If the filesystem opening the file does not recognize the option, it can safely be
|
||||||
ignored.
|
ignored.
|
||||||
|
|
||||||
### When to use `opt()` versus `must()`
|
### <a name="usage"></a> When to use `opt` versus `must`
|
||||||
|
|
||||||
The difference between `opt()` versus `must()` is how the FileSystem opening
|
The difference between `opt` versus `must` is how the FileSystem opening
|
||||||
the file must react to an option which it does not recognize.
|
the file must react to an option which it does not recognize.
|
||||||
|
|
||||||
```python
|
```python
|
||||||
@ -144,7 +203,7 @@ irrespective of how the (key, value) pair was declared.
|
|||||||
defined in this filesystem specification, validated through contract
|
defined in this filesystem specification, validated through contract
|
||||||
tests.
|
tests.
|
||||||
|
|
||||||
#### Implementation Notes
|
## <a name="implementation"></a> Implementation Notes
|
||||||
|
|
||||||
Checking for supported options must be performed in the `build()` operation.
|
Checking for supported options must be performed in the `build()` operation.
|
||||||
|
|
||||||
@ -155,6 +214,13 @@ Checking for supported options must be performed in the `build()` operation.
|
|||||||
a feature which is recognized but not supported in the specific
|
a feature which is recognized but not supported in the specific
|
||||||
`FileSystem`/`FileContext` instance `UnsupportedException` MUST be thrown.
|
`FileSystem`/`FileContext` instance `UnsupportedException` MUST be thrown.
|
||||||
|
|
||||||
|
Parsing of numeric values SHOULD trim any string and if the value
|
||||||
|
cannot be parsed as a number, downgrade to any default value supplied.
|
||||||
|
This is to address [HADOOP-18724](https://issues.apache.org/jira/browse/HADOOP-18724)
|
||||||
|
_Open file fails with NumberFormatException for S3AFileSystem_, which was cause by the overloaded `opt()`
|
||||||
|
builder parameter binding to `opt(String, double)` rather than `opt(String, long)` when a long
|
||||||
|
value was passed in.
|
||||||
|
|
||||||
The behavior of resolving the conflicts between the parameters set by
|
The behavior of resolving the conflicts between the parameters set by
|
||||||
builder methods (i.e., `bufferSize()`) and `opt()`/`must()` is as follows:
|
builder methods (i.e., `bufferSize()`) and `opt()`/`must()` is as follows:
|
||||||
|
|
||||||
@ -181,7 +247,7 @@ Even if not values of the status are used, the presence of the argument
|
|||||||
can be interpreted as the caller declaring that they believe the file
|
can be interpreted as the caller declaring that they believe the file
|
||||||
to be present and of the given size.
|
to be present and of the given size.
|
||||||
|
|
||||||
## Builder interface
|
## <a name="builder"></a> Builder interface
|
||||||
|
|
||||||
### <a name="build"></a> `CompletableFuture<FSDataInputStream> build()`
|
### <a name="build"></a> `CompletableFuture<FSDataInputStream> build()`
|
||||||
|
|
||||||
@ -339,7 +405,7 @@ _Futher reading_
|
|||||||
* [Linux fadvise()](https://linux.die.net/man/2/fadvise).
|
* [Linux fadvise()](https://linux.die.net/man/2/fadvise).
|
||||||
* [Windows `CreateFile()`](https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilea#caching-behavior)
|
* [Windows `CreateFile()`](https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilea#caching-behavior)
|
||||||
|
|
||||||
#### <a name="read.policy."></a> Read Policy `adaptive`
|
#### <a name="read.policy.adaptive"></a> Read Policy `adaptive`
|
||||||
|
|
||||||
Try to adapt the seek policy to the read pattern of the application.
|
Try to adapt the seek policy to the read pattern of the application.
|
||||||
|
|
||||||
@ -429,7 +495,7 @@ If this option is used by the FileSystem implementation
|
|||||||
|
|
||||||
*Implementor's Notes*
|
*Implementor's Notes*
|
||||||
|
|
||||||
* A value of `fs.option.openfile.length` < 0 MUST be rejected.
|
* A value of `fs.option.openfile.length` < 0 MUST be ignored.
|
||||||
* If a file status is supplied along with a value in `fs.opt.openfile.length`;
|
* If a file status is supplied along with a value in `fs.opt.openfile.length`;
|
||||||
the file status values take precedence.
|
the file status values take precedence.
|
||||||
|
|
||||||
@ -466,11 +532,11 @@ than that value.
|
|||||||
|
|
||||||
The S3A Connector supports custom options for readahead and seek policy.
|
The S3A Connector supports custom options for readahead and seek policy.
|
||||||
|
|
||||||
| Name | Type | Meaning |
|
| Name | Type | Meaning |
|
||||||
|--------------------------------------|----------|-------------------------------------------------------------|
|
|--------------------------------------|----------|---------------------------------------------------------------------------|
|
||||||
| `fs.s3a.readahead.range` | `long` | readahead range in bytes |
|
| `fs.s3a.readahead.range` | `long` | readahead range in bytes |
|
||||||
| `fs.s3a.input.async.drain.threshold` | `long` | threshold to switch to asynchronous draining of the stream |
|
| `fs.s3a.experimental.input.fadvise` | `String` | seek policy. Superceded by `fs.option.openfile.read.policy` |
|
||||||
| `fs.s3a.experimental.input.fadvise` | `String` | seek policy. Superceded by `fs.option.openfile.read.policy` |
|
| `fs.s3a.input.async.drain.threshold` | `long` | threshold to switch to asynchronous draining of the stream. (Since 3.3.5) |
|
||||||
|
|
||||||
If the option set contains a SQL statement in the `fs.s3a.select.sql` statement,
|
If the option set contains a SQL statement in the `fs.s3a.select.sql` statement,
|
||||||
then the file is opened as an S3 Select query.
|
then the file is opened as an S3 Select query.
|
||||||
@ -510,8 +576,8 @@ protected SeekableInputStream newStream(Path path, FileStatus stat,
|
|||||||
.opt("fs.option.openfile.read.policy", "vector, random")
|
.opt("fs.option.openfile.read.policy", "vector, random")
|
||||||
.withFileStatus(stat);
|
.withFileStatus(stat);
|
||||||
|
|
||||||
builder.opt("fs.option.openfile.split.start", splitStart);
|
builder.optLong("fs.option.openfile.split.start", splitStart);
|
||||||
builder.opt("fs.option.openfile.split.end", splitEnd);
|
builder.optLong("fs.option.openfile.split.end", splitEnd);
|
||||||
CompletableFuture<FSDataInputStream> streamF = builder.build();
|
CompletableFuture<FSDataInputStream> streamF = builder.build();
|
||||||
return HadoopStreams.wrap(FutureIO.awaitFuture(streamF));
|
return HadoopStreams.wrap(FutureIO.awaitFuture(streamF));
|
||||||
}
|
}
|
||||||
@ -618,8 +684,8 @@ An example of a record reader passing in options to the file it opens.
|
|||||||
file.getFileSystem(job).openFile(file);
|
file.getFileSystem(job).openFile(file);
|
||||||
// the start and end of the split may be used to build
|
// the start and end of the split may be used to build
|
||||||
// an input strategy.
|
// an input strategy.
|
||||||
builder.opt("fs.option.openfile.split.start", start);
|
builder.optLong("fs.option.openfile.split.start", start);
|
||||||
builder.opt("fs.option.openfile.split.end", end);
|
builder.optLong("fs.option.openfile.split.end", end);
|
||||||
FutureIO.propagateOptions(builder, job,
|
FutureIO.propagateOptions(builder, job,
|
||||||
"mapreduce.job.input.file.option",
|
"mapreduce.job.input.file.option",
|
||||||
"mapreduce.job.input.file.must");
|
"mapreduce.job.input.file.must");
|
||||||
@ -633,7 +699,7 @@ An example of a record reader passing in options to the file it opens.
|
|||||||
### `FileContext.openFile`
|
### `FileContext.openFile`
|
||||||
|
|
||||||
From `org.apache.hadoop.fs.AvroFSInput`; a file is opened with sequential input.
|
From `org.apache.hadoop.fs.AvroFSInput`; a file is opened with sequential input.
|
||||||
Because the file length has already been probed for, the length is passd down
|
Because the file length has already been probed for, the length is passed down
|
||||||
|
|
||||||
```java
|
```java
|
||||||
public AvroFSInput(FileContext fc, Path p) throws IOException {
|
public AvroFSInput(FileContext fc, Path p) throws IOException {
|
||||||
@ -642,7 +708,7 @@ Because the file length has already been probed for, the length is passd down
|
|||||||
this.stream = awaitFuture(fc.openFile(p)
|
this.stream = awaitFuture(fc.openFile(p)
|
||||||
.opt("fs.option.openfile.read.policy",
|
.opt("fs.option.openfile.read.policy",
|
||||||
"sequential")
|
"sequential")
|
||||||
.opt("fs.option.openfile.length",
|
.optLong("fs.option.openfile.length",
|
||||||
Long.toString(status.getLen()))
|
Long.toString(status.getLen()))
|
||||||
.build());
|
.build());
|
||||||
fc.open(p);
|
fc.open(p);
|
||||||
@ -682,8 +748,3 @@ public T load(FileSystem fs,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
*Note:* : in Hadoop 3.3.2 and earlier, the `withFileStatus(status)` call
|
|
||||||
required a non-null parameter; this has since been relaxed.
|
|
||||||
For maximum compatibility across versions, only invoke the method
|
|
||||||
when the file status is known to be non-null.
|
|
||||||
|
@ -43,6 +43,7 @@
|
|||||||
import static org.apache.hadoop.test.LambdaTestUtils.interceptFuture;
|
import static org.apache.hadoop.test.LambdaTestUtils.interceptFuture;
|
||||||
import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
|
import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
|
||||||
|
|
||||||
|
import org.assertj.core.api.Assertions;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -186,7 +187,7 @@ public void testSequentialRead() throws Throwable {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testOpenFileReadZeroByte() throws Throwable {
|
public void testOpenFileReadZeroByte() throws Throwable {
|
||||||
describe("create & read a 0 byte file through the builders");
|
describe("create & read a 0 byte file through the builders; use a negative length");
|
||||||
Path path = path("zero.txt");
|
Path path = path("zero.txt");
|
||||||
FileSystem fs = getFileSystem();
|
FileSystem fs = getFileSystem();
|
||||||
fs.createFile(path).overwrite(true).build().close();
|
fs.createFile(path).overwrite(true).build().close();
|
||||||
@ -194,6 +195,7 @@ public void testOpenFileReadZeroByte() throws Throwable {
|
|||||||
.opt("fs.test.something", true)
|
.opt("fs.test.something", true)
|
||||||
.opt("fs.test.something2", 3)
|
.opt("fs.test.something2", 3)
|
||||||
.opt("fs.test.something3", "3")
|
.opt("fs.test.something3", "3")
|
||||||
|
.optLong(FS_OPTION_OPENFILE_LENGTH, -1L)
|
||||||
.build().get()) {
|
.build().get()) {
|
||||||
assertMinusOne("initial byte read", is.read());
|
assertMinusOne("initial byte read", is.read());
|
||||||
}
|
}
|
||||||
@ -210,6 +212,17 @@ public void testOpenFileUnknownOption() throws Throwable {
|
|||||||
() -> builder.build());
|
() -> builder.build());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testOpenFileUnknownOptionLong() throws Throwable {
|
||||||
|
describe("calling openFile fails when a 'must()' option is unknown");
|
||||||
|
FutureDataInputStreamBuilder builder =
|
||||||
|
getFileSystem().openFile(path("testOpenFileUnknownOption"))
|
||||||
|
.optLong("fs.test.something", 1L)
|
||||||
|
.mustLong("fs.test.something2", 1L);
|
||||||
|
intercept(IllegalArgumentException.class,
|
||||||
|
() -> builder.build());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testOpenFileLazyFail() throws Throwable {
|
public void testOpenFileLazyFail() throws Throwable {
|
||||||
describe("openFile fails on a missing file in the get() and not before");
|
describe("openFile fails on a missing file in the get() and not before");
|
||||||
@ -320,16 +333,22 @@ public void testOpenFileApplyAsyncRead() throws Throwable {
|
|||||||
describe("verify that async accept callbacks are evaluated");
|
describe("verify that async accept callbacks are evaluated");
|
||||||
Path path = path("testOpenFileApplyAsyncRead");
|
Path path = path("testOpenFileApplyAsyncRead");
|
||||||
FileSystem fs = getFileSystem();
|
FileSystem fs = getFileSystem();
|
||||||
|
final int len = 512;
|
||||||
createFile(fs, path, true,
|
createFile(fs, path, true,
|
||||||
dataset(4, 0x40, 0x80));
|
dataset(len, 0x40, 0x80));
|
||||||
CompletableFuture<FSDataInputStream> future = fs.openFile(path).build();
|
CompletableFuture<FSDataInputStream> future = fs.openFile(path)
|
||||||
|
.mustDouble(FS_OPTION_OPENFILE_LENGTH, 43.2e60) // pass in a double
|
||||||
|
.build();
|
||||||
AtomicBoolean accepted = new AtomicBoolean(false);
|
AtomicBoolean accepted = new AtomicBoolean(false);
|
||||||
future.thenApply(stream -> {
|
final Long bytes = future.thenApply(stream -> {
|
||||||
accepted.set(true);
|
accepted.set(true);
|
||||||
return stream;
|
return ContractTestUtils.readStream(stream);
|
||||||
}).get().close();
|
}).get();
|
||||||
assertTrue("async accept operation not invoked",
|
assertTrue("async accept operation not invoked",
|
||||||
accepted.get());
|
accepted.get());
|
||||||
|
Assertions.assertThat(bytes)
|
||||||
|
.describedAs("bytes read from stream")
|
||||||
|
.isEqualTo(len);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -357,8 +376,8 @@ public void testOpenFileNullStatusButFileLength() throws Throwable {
|
|||||||
.withFileStatus(null)
|
.withFileStatus(null)
|
||||||
.opt(FS_OPTION_OPENFILE_READ_POLICY,
|
.opt(FS_OPTION_OPENFILE_READ_POLICY,
|
||||||
"unknown, sequential, random")
|
"unknown, sequential, random")
|
||||||
.opt(FS_OPTION_OPENFILE_BUFFER_SIZE, 32768)
|
.optLong(FS_OPTION_OPENFILE_BUFFER_SIZE, 32768)
|
||||||
.opt(FS_OPTION_OPENFILE_LENGTH, len)
|
.optLong(FS_OPTION_OPENFILE_LENGTH, len)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
try (FSDataInputStream in = future.get()) {
|
try (FSDataInputStream in = future.get()) {
|
||||||
@ -367,4 +386,26 @@ public void testOpenFileNullStatusButFileLength() throws Throwable {
|
|||||||
compareByteArrays(dataset, result, len);
|
compareByteArrays(dataset, result, len);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* open a file with a length set as a double; verifies resilience
|
||||||
|
* of the parser.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testFloatingPointLength() throws Throwable {
|
||||||
|
describe("Open file with a length");
|
||||||
|
Path path = methodPath();
|
||||||
|
FileSystem fs = getFileSystem();
|
||||||
|
int len = 4096;
|
||||||
|
createFile(fs, path, true,
|
||||||
|
dataset(len, 0x40, 0x80));
|
||||||
|
final Long l = fs.openFile(path)
|
||||||
|
.mustDouble(FS_OPTION_OPENFILE_LENGTH, len)
|
||||||
|
.build()
|
||||||
|
.thenApply(ContractTestUtils::readStream)
|
||||||
|
.get();
|
||||||
|
Assertions.assertThat(l)
|
||||||
|
.describedAs("bytes read from file %s", path)
|
||||||
|
.isEqualTo(len);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,144 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.fs.store;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.FSBuilder;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.impl.AbstractFSBuilderImpl;
|
||||||
|
import org.apache.hadoop.fs.impl.FSBuilderSupport;
|
||||||
|
import org.apache.hadoop.test.AbstractHadoopTestBase;
|
||||||
|
|
||||||
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test builder support, forwarding of opt double/float to long,
|
||||||
|
* resilience.
|
||||||
|
*/
|
||||||
|
@SuppressWarnings("deprecation")
|
||||||
|
public class TestFSBuilderSupport extends AbstractHadoopTestBase {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testOptFloatDoubleForwardsToLong() throws Throwable {
|
||||||
|
FSBuilderSupport c = builder()
|
||||||
|
.opt("f", 1.8f)
|
||||||
|
.opt("d", 2.0e3)
|
||||||
|
.build();
|
||||||
|
assertThat(c.getLong("f", 2))
|
||||||
|
.isEqualTo(1);
|
||||||
|
assertThat(c.getLong("d", 2))
|
||||||
|
.isEqualTo(2000);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMustFloatDoubleForwardsToLong() throws Throwable {
|
||||||
|
FSBuilderSupport c = builder()
|
||||||
|
.must("f", 1.8f)
|
||||||
|
.must("d", 2.0e3)
|
||||||
|
.build();
|
||||||
|
assertThat(c.getLong("f", 2))
|
||||||
|
.isEqualTo(1);
|
||||||
|
assertThat(c.getLong("d", 2))
|
||||||
|
.isEqualTo(2000);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLongOptStillWorks() throws Throwable {
|
||||||
|
FSBuilderSupport c = builder()
|
||||||
|
.opt("o", 1L)
|
||||||
|
.must("m", 1L)
|
||||||
|
.build();
|
||||||
|
assertThat(c.getLong("o", 2))
|
||||||
|
.isEqualTo(1L);
|
||||||
|
assertThat(c.getLong("m", 2))
|
||||||
|
.isEqualTo(1L);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFloatParseFallback() throws Throwable {
|
||||||
|
FSBuilderSupport c = builder()
|
||||||
|
.opt("f", "1.8f")
|
||||||
|
.opt("d", "1.8e20")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
assertThat(c.getLong("f", 2))
|
||||||
|
.isEqualTo(2);
|
||||||
|
assertThat(c.getLong("d", 2))
|
||||||
|
.isEqualTo(2);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNegatives() throws Throwable {
|
||||||
|
FSBuilderSupport c = builder()
|
||||||
|
.optLong("-1", -1)
|
||||||
|
.mustLong("-2", -2)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
// getLong gets the long value
|
||||||
|
assertThat(c.getLong("-1", 2))
|
||||||
|
.isEqualTo(-1);
|
||||||
|
|
||||||
|
|
||||||
|
// but getPositiveLong returns the positive default
|
||||||
|
assertThat(c.getPositiveLong("-1", 2))
|
||||||
|
.isEqualTo(2);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBoolean() throws Throwable {
|
||||||
|
final FSBuilderSupport c = builder()
|
||||||
|
.opt("f", false)
|
||||||
|
.opt("t", true)
|
||||||
|
.opt("o", "other")
|
||||||
|
.build();
|
||||||
|
assertThat(c.getOptions().getBoolean("f", true))
|
||||||
|
.isEqualTo(false);
|
||||||
|
assertThat(c.getOptions().getBoolean("t", false))
|
||||||
|
.isEqualTo(true);
|
||||||
|
// this is handled in Configuration itself.
|
||||||
|
assertThat(c.getOptions().getBoolean("o", true))
|
||||||
|
.isEqualTo(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
private SimpleBuilder builder() {
|
||||||
|
return new BuilderImpl();
|
||||||
|
}
|
||||||
|
|
||||||
|
private interface SimpleBuilder
|
||||||
|
extends FSBuilder<FSBuilderSupport, SimpleBuilder> {
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final class BuilderImpl
|
||||||
|
extends AbstractFSBuilderImpl<FSBuilderSupport, SimpleBuilder>
|
||||||
|
implements SimpleBuilder {
|
||||||
|
|
||||||
|
private BuilderImpl() {
|
||||||
|
super(new Path("/"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public FSBuilderSupport build()
|
||||||
|
throws IOException {
|
||||||
|
return new FSBuilderSupport(getOptions());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -114,8 +114,8 @@ public LineRecordReader(Configuration job, FileSplit split,
|
|||||||
file.getFileSystem(job).openFile(file);
|
file.getFileSystem(job).openFile(file);
|
||||||
// the start and end of the split may be used to build
|
// the start and end of the split may be used to build
|
||||||
// an input strategy.
|
// an input strategy.
|
||||||
builder.opt(FS_OPTION_OPENFILE_SPLIT_START, start)
|
builder.optLong(FS_OPTION_OPENFILE_SPLIT_START, start)
|
||||||
.opt(FS_OPTION_OPENFILE_SPLIT_END, end);
|
.optLong(FS_OPTION_OPENFILE_SPLIT_END, end);
|
||||||
FutureIO.propagateOptions(builder, job,
|
FutureIO.propagateOptions(builder, job,
|
||||||
MRJobConfig.INPUT_FILE_OPTION_PREFIX,
|
MRJobConfig.INPUT_FILE_OPTION_PREFIX,
|
||||||
MRJobConfig.INPUT_FILE_MANDATORY_PREFIX);
|
MRJobConfig.INPUT_FILE_MANDATORY_PREFIX);
|
||||||
|
@ -92,8 +92,8 @@ public void initialize(InputSplit genericSplit,
|
|||||||
file.getFileSystem(job).openFile(file);
|
file.getFileSystem(job).openFile(file);
|
||||||
// the start and end of the split may be used to build
|
// the start and end of the split may be used to build
|
||||||
// an input strategy.
|
// an input strategy.
|
||||||
builder.opt(FS_OPTION_OPENFILE_SPLIT_START, start);
|
builder.optLong(FS_OPTION_OPENFILE_SPLIT_START, start);
|
||||||
builder.opt(FS_OPTION_OPENFILE_SPLIT_END, end);
|
builder.optLong(FS_OPTION_OPENFILE_SPLIT_END, end);
|
||||||
FutureIO.propagateOptions(builder, job,
|
FutureIO.propagateOptions(builder, job,
|
||||||
MRJobConfig.INPUT_FILE_OPTION_PREFIX,
|
MRJobConfig.INPUT_FILE_OPTION_PREFIX,
|
||||||
MRJobConfig.INPUT_FILE_MANDATORY_PREFIX);
|
MRJobConfig.INPUT_FILE_MANDATORY_PREFIX);
|
||||||
|
@ -236,8 +236,8 @@ public void initialize(InputSplit split, TaskAttemptContext context)
|
|||||||
offset = (RECORD_LENGTH - (start % RECORD_LENGTH)) % RECORD_LENGTH;
|
offset = (RECORD_LENGTH - (start % RECORD_LENGTH)) % RECORD_LENGTH;
|
||||||
length = ((FileSplit)split).getLength();
|
length = ((FileSplit)split).getLength();
|
||||||
final FutureDataInputStreamBuilder builder = fs.openFile(p)
|
final FutureDataInputStreamBuilder builder = fs.openFile(p)
|
||||||
.opt(FS_OPTION_OPENFILE_SPLIT_START, start)
|
.optLong(FS_OPTION_OPENFILE_SPLIT_START, start)
|
||||||
.opt(FS_OPTION_OPENFILE_SPLIT_END, start + length)
|
.optLong(FS_OPTION_OPENFILE_SPLIT_END, start + length)
|
||||||
.opt(FS_OPTION_OPENFILE_READ_POLICY,
|
.opt(FS_OPTION_OPENFILE_READ_POLICY,
|
||||||
FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL);
|
FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL);
|
||||||
in = FutureIO.awaitFuture(builder.build());
|
in = FutureIO.awaitFuture(builder.build());
|
||||||
|
@ -29,6 +29,7 @@
|
|||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.impl.FSBuilderSupport;
|
||||||
import org.apache.hadoop.fs.impl.OpenFileParameters;
|
import org.apache.hadoop.fs.impl.OpenFileParameters;
|
||||||
import org.apache.hadoop.fs.s3a.S3AFileStatus;
|
import org.apache.hadoop.fs.s3a.S3AFileStatus;
|
||||||
import org.apache.hadoop.fs.s3a.S3AInputPolicy;
|
import org.apache.hadoop.fs.s3a.S3AInputPolicy;
|
||||||
@ -246,12 +247,14 @@ public OpenFileInformation prepareToOpenFile(
|
|||||||
// set the end of the read to the file length
|
// set the end of the read to the file length
|
||||||
fileLength = fileStatus.getLen();
|
fileLength = fileStatus.getLen();
|
||||||
}
|
}
|
||||||
|
FSBuilderSupport builderSupport = new FSBuilderSupport(options);
|
||||||
// determine start and end of file.
|
// determine start and end of file.
|
||||||
long splitStart = options.getLong(FS_OPTION_OPENFILE_SPLIT_START, 0);
|
long splitStart = builderSupport.getPositiveLong(FS_OPTION_OPENFILE_SPLIT_START, 0);
|
||||||
|
|
||||||
// split end
|
// split end
|
||||||
long splitEnd = options.getLong(FS_OPTION_OPENFILE_SPLIT_END,
|
long splitEnd = builderSupport.getLong(
|
||||||
LENGTH_UNKNOWN);
|
FS_OPTION_OPENFILE_SPLIT_END, LENGTH_UNKNOWN);
|
||||||
|
|
||||||
if (splitStart > 0 && splitStart > splitEnd) {
|
if (splitStart > 0 && splitStart > splitEnd) {
|
||||||
LOG.warn("Split start {} is greater than split end {}, resetting",
|
LOG.warn("Split start {} is greater than split end {}, resetting",
|
||||||
splitStart, splitEnd);
|
splitStart, splitEnd);
|
||||||
@ -259,7 +262,7 @@ public OpenFileInformation prepareToOpenFile(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// read end is the open file value
|
// read end is the open file value
|
||||||
fileLength = options.getLong(FS_OPTION_OPENFILE_LENGTH, fileLength);
|
fileLength = builderSupport.getPositiveLong(FS_OPTION_OPENFILE_LENGTH, fileLength);
|
||||||
|
|
||||||
// if the read end has come from options, use that
|
// if the read end has come from options, use that
|
||||||
// in creating a file status
|
// in creating a file status
|
||||||
@ -281,16 +284,17 @@ public OpenFileInformation prepareToOpenFile(
|
|||||||
.withS3Select(isSelect)
|
.withS3Select(isSelect)
|
||||||
.withSql(sql)
|
.withSql(sql)
|
||||||
.withAsyncDrainThreshold(
|
.withAsyncDrainThreshold(
|
||||||
options.getLong(ASYNC_DRAIN_THRESHOLD,
|
builderSupport.getPositiveLong(ASYNC_DRAIN_THRESHOLD,
|
||||||
defaultReadAhead))
|
defaultReadAhead))
|
||||||
.withBufferSize(
|
.withBufferSize(
|
||||||
options.getInt(FS_OPTION_OPENFILE_BUFFER_SIZE, defaultBufferSize))
|
(int)builderSupport.getPositiveLong(
|
||||||
|
FS_OPTION_OPENFILE_BUFFER_SIZE, defaultBufferSize))
|
||||||
.withChangePolicy(changePolicy)
|
.withChangePolicy(changePolicy)
|
||||||
.withFileLength(fileLength)
|
.withFileLength(fileLength)
|
||||||
.withInputPolicy(
|
.withInputPolicy(
|
||||||
S3AInputPolicy.getFirstSupportedPolicy(policies, defaultInputPolicy))
|
S3AInputPolicy.getFirstSupportedPolicy(policies, defaultInputPolicy))
|
||||||
.withReadAheadRange(
|
.withReadAheadRange(
|
||||||
options.getLong(READAHEAD_RANGE, defaultReadAhead))
|
builderSupport.getPositiveLong(READAHEAD_RANGE, defaultReadAhead))
|
||||||
.withSplitStart(splitStart)
|
.withSplitStart(splitStart)
|
||||||
.withSplitEnd(splitEnd)
|
.withSplitEnd(splitEnd)
|
||||||
.withStatus(fileStatus)
|
.withStatus(fileStatus)
|
||||||
|
@ -141,7 +141,7 @@ public void testOpenFileShorterLength() throws Throwable {
|
|||||||
fs.openFile(testFile)
|
fs.openFile(testFile)
|
||||||
.must(FS_OPTION_OPENFILE_READ_POLICY,
|
.must(FS_OPTION_OPENFILE_READ_POLICY,
|
||||||
FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)
|
FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)
|
||||||
.opt(FS_OPTION_OPENFILE_LENGTH, shortLen)
|
.mustLong(FS_OPTION_OPENFILE_LENGTH, shortLen)
|
||||||
.build()
|
.build()
|
||||||
.get(),
|
.get(),
|
||||||
always(NO_HEAD_OR_LIST),
|
always(NO_HEAD_OR_LIST),
|
||||||
@ -183,7 +183,7 @@ public void testOpenFileLongerLength() throws Throwable {
|
|||||||
fs.openFile(testFile)
|
fs.openFile(testFile)
|
||||||
.must(FS_OPTION_OPENFILE_READ_POLICY,
|
.must(FS_OPTION_OPENFILE_READ_POLICY,
|
||||||
FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)
|
FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)
|
||||||
.must(FS_OPTION_OPENFILE_LENGTH, longLen)
|
.mustLong(FS_OPTION_OPENFILE_LENGTH, longLen)
|
||||||
.build()
|
.build()
|
||||||
.get(),
|
.get(),
|
||||||
always(NO_HEAD_OR_LIST));
|
always(NO_HEAD_OR_LIST));
|
||||||
|
@ -160,7 +160,7 @@ public void testUnbufferDraining() throws Throwable {
|
|||||||
int offset = FILE_SIZE - READAHEAD + 1;
|
int offset = FILE_SIZE - READAHEAD + 1;
|
||||||
try (FSDataInputStream in = getBrittleFS().openFile(st.getPath())
|
try (FSDataInputStream in = getBrittleFS().openFile(st.getPath())
|
||||||
.withFileStatus(st)
|
.withFileStatus(st)
|
||||||
.must(ASYNC_DRAIN_THRESHOLD, 1)
|
.mustLong(ASYNC_DRAIN_THRESHOLD, 1)
|
||||||
.build().get()) {
|
.build().get()) {
|
||||||
describe("Initiating unbuffer with async drain\n");
|
describe("Initiating unbuffer with async drain\n");
|
||||||
for (int i = 0; i < ATTEMPTS; i++) {
|
for (int i = 0; i < ATTEMPTS; i++) {
|
||||||
@ -235,9 +235,11 @@ public void testUnbufferAborting() throws Throwable {
|
|||||||
// open the file at the beginning with a whole file read policy,
|
// open the file at the beginning with a whole file read policy,
|
||||||
// so even with s3a switching to random on unbuffer,
|
// so even with s3a switching to random on unbuffer,
|
||||||
// this always does a full GET
|
// this always does a full GET
|
||||||
|
// also provide a floating point string for the threshold, to
|
||||||
|
// verify it is safely parsed
|
||||||
try (FSDataInputStream in = getBrittleFS().openFile(st.getPath())
|
try (FSDataInputStream in = getBrittleFS().openFile(st.getPath())
|
||||||
.withFileStatus(st)
|
.withFileStatus(st)
|
||||||
.must(ASYNC_DRAIN_THRESHOLD, 1)
|
.must(ASYNC_DRAIN_THRESHOLD, "1.0")
|
||||||
.must(FS_OPTION_OPENFILE_READ_POLICY,
|
.must(FS_OPTION_OPENFILE_READ_POLICY,
|
||||||
FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE)
|
FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE)
|
||||||
.build().get()) {
|
.build().get()) {
|
||||||
|
@ -572,8 +572,8 @@ public void test_045_vectoredIOHugeFile() throws Throwable {
|
|||||||
byte[] readFullRes;
|
byte[] readFullRes;
|
||||||
IOStatistics sequentialIOStats, vectorIOStats;
|
IOStatistics sequentialIOStats, vectorIOStats;
|
||||||
try (FSDataInputStream in = fs.openFile(hugefile)
|
try (FSDataInputStream in = fs.openFile(hugefile)
|
||||||
.opt(FS_OPTION_OPENFILE_LENGTH, validateSize) // lets us actually force a shorter read
|
.optLong(FS_OPTION_OPENFILE_LENGTH, validateSize) // lets us actually force a shorter read
|
||||||
.opt(FS_OPTION_OPENFILE_SPLIT_START, 0)
|
.optLong(FS_OPTION_OPENFILE_SPLIT_START, 0)
|
||||||
.opt(FS_OPTION_OPENFILE_SPLIT_END, validateSize)
|
.opt(FS_OPTION_OPENFILE_SPLIT_END, validateSize)
|
||||||
.opt(FS_OPTION_OPENFILE_READ_POLICY, "sequential")
|
.opt(FS_OPTION_OPENFILE_READ_POLICY, "sequential")
|
||||||
.opt(FS_OPTION_OPENFILE_BUFFER_SIZE, uploadBlockSize)
|
.opt(FS_OPTION_OPENFILE_BUFFER_SIZE, uploadBlockSize)
|
||||||
@ -587,7 +587,7 @@ public void test_045_vectoredIOHugeFile() throws Throwable {
|
|||||||
|
|
||||||
// now do a vector IO read
|
// now do a vector IO read
|
||||||
try (FSDataInputStream in = fs.openFile(hugefile)
|
try (FSDataInputStream in = fs.openFile(hugefile)
|
||||||
.opt(FS_OPTION_OPENFILE_LENGTH, filesize)
|
.optLong(FS_OPTION_OPENFILE_LENGTH, filesize)
|
||||||
.opt(FS_OPTION_OPENFILE_READ_POLICY, "vector, random")
|
.opt(FS_OPTION_OPENFILE_READ_POLICY, "vector, random")
|
||||||
.build().get();
|
.build().get();
|
||||||
DurationInfo ignored = new DurationInfo(LOG, "Vector Read")) {
|
DurationInfo ignored = new DurationInfo(LOG, "Vector Read")) {
|
||||||
|
@ -219,11 +219,10 @@ private FSDataInputStream openDataFile(S3AFileSystem fs,
|
|||||||
final FutureDataInputStreamBuilder builder = fs.openFile(path)
|
final FutureDataInputStreamBuilder builder = fs.openFile(path)
|
||||||
.opt(FS_OPTION_OPENFILE_READ_POLICY,
|
.opt(FS_OPTION_OPENFILE_READ_POLICY,
|
||||||
inputPolicy.toString())
|
inputPolicy.toString())
|
||||||
.opt(FS_OPTION_OPENFILE_LENGTH, length)
|
.optLong(FS_OPTION_OPENFILE_LENGTH, length)
|
||||||
.opt(FS_OPTION_OPENFILE_BUFFER_SIZE, bufferSize);
|
.opt(FS_OPTION_OPENFILE_BUFFER_SIZE, bufferSize)
|
||||||
if (readahead > 0) {
|
.optLong(READAHEAD_RANGE, readahead);
|
||||||
builder.opt(READAHEAD_RANGE, readahead);
|
|
||||||
}
|
|
||||||
FSDataInputStream stream = awaitFuture(builder.build());
|
FSDataInputStream stream = awaitFuture(builder.build());
|
||||||
streamStatistics = getInputStreamStatistics(stream);
|
streamStatistics = getInputStreamStatistics(stream);
|
||||||
return stream;
|
return stream;
|
||||||
|
@ -38,7 +38,6 @@
|
|||||||
import java.util.regex.Matcher;
|
import java.util.regex.Matcher;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.Stack;
|
import java.util.Stack;
|
||||||
@ -93,6 +92,7 @@
|
|||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS;
|
||||||
import static org.apache.hadoop.fs.azure.NativeAzureFileSystemHelper.*;
|
import static org.apache.hadoop.fs.azure.NativeAzureFileSystemHelper.*;
|
||||||
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
|
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
|
||||||
|
|
||||||
@ -3149,7 +3149,7 @@ protected CompletableFuture<FSDataInputStream> openFileWithOptions(Path path,
|
|||||||
OpenFileParameters parameters) throws IOException {
|
OpenFileParameters parameters) throws IOException {
|
||||||
AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(
|
AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(
|
||||||
parameters.getMandatoryKeys(),
|
parameters.getMandatoryKeys(),
|
||||||
Collections.emptySet(),
|
FS_OPTION_OPENFILE_STANDARD_OPTIONS,
|
||||||
"for " + path);
|
"for " + path);
|
||||||
return LambdaUtils.eval(
|
return LambdaUtils.eval(
|
||||||
new CompletableFuture<>(), () ->
|
new CompletableFuture<>(), () ->
|
||||||
|
@ -31,7 +31,6 @@
|
|||||||
import java.util.Hashtable;
|
import java.util.Hashtable;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
@ -112,6 +111,7 @@
|
|||||||
|
|
||||||
import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL;
|
import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL;
|
||||||
import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL_DEFAULT;
|
import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL_DEFAULT;
|
||||||
|
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS;
|
||||||
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.*;
|
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.*;
|
||||||
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.DATA_BLOCKS_BUFFER;
|
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.DATA_BLOCKS_BUFFER;
|
||||||
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOCK_UPLOAD_ACTIVE_BLOCKS;
|
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOCK_UPLOAD_ACTIVE_BLOCKS;
|
||||||
@ -296,7 +296,7 @@ protected CompletableFuture<FSDataInputStream> openFileWithOptions(
|
|||||||
LOG.debug("AzureBlobFileSystem.openFileWithOptions path: {}", path);
|
LOG.debug("AzureBlobFileSystem.openFileWithOptions path: {}", path);
|
||||||
AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(
|
AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(
|
||||||
parameters.getMandatoryKeys(),
|
parameters.getMandatoryKeys(),
|
||||||
Collections.emptySet(),
|
FS_OPTION_OPENFILE_STANDARD_OPTIONS,
|
||||||
"for " + path);
|
"for " + path);
|
||||||
return LambdaUtils.eval(
|
return LambdaUtils.eval(
|
||||||
new CompletableFuture<>(), () ->
|
new CompletableFuture<>(), () ->
|
||||||
|
@ -587,7 +587,7 @@ public LogReader(Configuration conf, Path remoteAppLogFile)
|
|||||||
fileContext.openFile(remoteAppLogFile)
|
fileContext.openFile(remoteAppLogFile)
|
||||||
.opt(FS_OPTION_OPENFILE_READ_POLICY,
|
.opt(FS_OPTION_OPENFILE_READ_POLICY,
|
||||||
FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)
|
FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)
|
||||||
.opt(FS_OPTION_OPENFILE_LENGTH,
|
.optLong(FS_OPTION_OPENFILE_LENGTH,
|
||||||
status.getLen()) // file length hint for object stores
|
status.getLen()) // file length hint for object stores
|
||||||
.build());
|
.build());
|
||||||
reader = new TFile.Reader(this.fsDataIStream,
|
reader = new TFile.Reader(this.fsDataIStream,
|
||||||
|
Loading…
Reference in New Issue
Block a user