HADOOP-19161. S3A: option "fs.s3a.performance.flags" to take list of performance flags (#6789)

1. Configuration adds new method `getEnumSet()` to get a set of enum values from
   a configuration string.
   <E extends Enum<E>> EnumSet<E> getEnumSet(String key, Class<E> enumClass, boolean ignoreUnknown)

   Whitespace is ignored, case is ignored and the value "*" is mapped to "all values of the enum".
   If "ignoreUnknown" is true then when parsing, unknown values are ignored.
   This is recommended for forward compatiblity with later versions.

2. This support is implemented in org.apache.hadoop.fs.s3a.impl.ConfigurationHelper -it can be used
    elsewhere in the hadoop codebase.

3. A new private FlagSet class in hadoop common manages a set of enum flags.

     It implements StreamCapabilities and can be probed for a specific option being set
    (with a prefix)


S3A adds an option fs.s3a.performance.flags which builds a FlagSet with enum
type PerformanceFlagEnum

* which initially contains {Create, Delete, Mkdir, Open}
* the existing fs.s3a.create.performance option sets the flag "Create".
* tests which configure fs.s3a.create.performance MUST clear
  fs.s3a.performance.flags in test setup.

Future performance flags are planned, with different levels of safety
and/or backwards compatibility.

Contributed by Steve Loughran
This commit is contained in:
Steve Loughran 2024-07-29 11:33:51 +01:00 committed by GitHub
parent 4525c7e35e
commit a5806a9e7b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
21 changed files with 1350 additions and 60 deletions

View File

@ -49,6 +49,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
@ -99,6 +100,7 @@
import org.apache.hadoop.security.alias.CredentialProvider.CredentialEntry;
import org.apache.hadoop.security.alias.CredentialProviderFactory;
import org.apache.hadoop.thirdparty.com.google.common.base.Strings;
import org.apache.hadoop.util.ConfigurationHelper;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringInterner;
@ -1786,6 +1788,26 @@ public <T extends Enum<T>> T getEnum(String name, T defaultValue) {
: Enum.valueOf(defaultValue.getDeclaringClass(), val);
}
/**
* Build an enumset from a comma separated list of values.
* Case independent.
* Special handling of "*" meaning: all values.
* @param key key to look for
* @param enumClass class of enum
* @param ignoreUnknown should unknown values raise an exception?
* @return a mutable set of the identified enum values declared in the configuration
* @param <E> enumeration type
* @throws IllegalArgumentException if one of the entries was unknown and ignoreUnknown is false,
* or there are two entries in the enum which differ only by case.
*/
public <E extends Enum<E>> EnumSet<E> getEnumSet(
final String key,
final Class<E> enumClass,
final boolean ignoreUnknown) throws IllegalArgumentException {
final String value = get(key, "");
return ConfigurationHelper.parseEnumSet(key, value, enumClass, ignoreUnknown);
}
enum ParsedTimeDuration {
NS {
TimeUnit unit() { return TimeUnit.NANOSECONDS; }

View File

@ -0,0 +1,327 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.impl;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.util.ConfigurationHelper;
import org.apache.hadoop.util.Preconditions;
import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.util.ConfigurationHelper.mapEnumNamesToValues;
/**
* A set of flags, constructed from a configuration option or from a string,
* with the semantics of
* {@link ConfigurationHelper#parseEnumSet(String, String, Class, boolean)}
* and implementing {@link StreamCapabilities}.
* <p>
* Thread safety: there is no synchronization on a mutable {@code FlagSet}.
* Once declared immutable, flags cannot be changed, so they
* becomes implicitly thread-safe.
*/
public final class FlagSet<E extends Enum<E>> implements StreamCapabilities {
/**
* Class of the enum.
* Used for duplicating the flags as java type erasure
* loses this information otherwise.
*/
private final Class<E> enumClass;
/**
* Prefix for path capabilities probe.
*/
private final String prefix;
/**
* Set of flags.
*/
private final EnumSet<E> flags;
/**
* Is the set immutable?
*/
private final AtomicBoolean immutable = new AtomicBoolean(false);
/**
* Mapping of prefixed flag names to enum values.
*/
private final Map<String, E> namesToValues;
/**
* Create a FlagSet.
* @param enumClass class of enum
* @param prefix prefix (with trailing ".") for path capabilities probe
* @param flags flags. A copy of these are made.
*/
private FlagSet(final Class<E> enumClass,
final String prefix,
@Nullable final EnumSet<E> flags) {
this.enumClass = requireNonNull(enumClass, "null enumClass");
this.prefix = requireNonNull(prefix, "null prefix");
this.flags = flags != null
? EnumSet.copyOf(flags)
: EnumSet.noneOf(enumClass);
this.namesToValues = mapEnumNamesToValues(prefix, enumClass);
}
/**
* Get a copy of the flags.
* <p>
* This is immutable.
* @return the flags.
*/
public EnumSet<E> flags() {
return EnumSet.copyOf(flags);
}
/**
* Probe for the FlagSet being empty.
* @return true if there are no flags set.
*/
public boolean isEmpty() {
return flags.isEmpty();
}
/**
* Is a flag enabled?
* @param flag flag to check
* @return true if it is in the set of enabled flags.
*/
public boolean enabled(final E flag) {
return flags.contains(flag);
}
/**
* Check for mutability before any mutating operation.
* @throws IllegalStateException if the set is still mutable
*/
private void checkMutable() {
Preconditions.checkState(!immutable.get(),
"FlagSet is immutable");
}
/**
* Enable a flag.
* @param flag flag to enable.
*/
public void enable(final E flag) {
checkMutable();
flags.add(flag);
}
/**
* Disable a flag.
* @param flag flag to disable
*/
public void disable(final E flag) {
checkMutable();
flags.remove(flag);
}
/**
* Set a flag to the chosen value.
* @param flag flag
* @param state true to enable, false to disable.
*/
public void set(final E flag, boolean state) {
if (state) {
enable(flag);
} else {
disable(flag);
}
}
/**
* Is a flag enabled?
* @param capability string to query the stream support for.
* @return true if the capability maps to an enum value and
* that value is set.
*/
@Override
public boolean hasCapability(final String capability) {
final E e = namesToValues.get(capability);
return e != null && enabled(e);
}
/**
* Make immutable; no-op if already set.
*/
public void makeImmutable() {
immutable.set(true);
}
/**
* Is the FlagSet immutable?
* @return true iff the FlagSet is immutable.
*/
public boolean isImmutable() {
return immutable.get();
}
/**
* Get the enum class.
* @return the enum class.
*/
public Class<E> getEnumClass() {
return enumClass;
}
@Override
public String toString() {
return "{" +
(flags.stream()
.map(Enum::name)
.collect(Collectors.joining(", ")))
+ "}";
}
/**
* Generate the list of capabilities.
* @return a possibly empty list.
*/
public List<String> pathCapabilities() {
return namesToValues.keySet().stream()
.filter(this::hasCapability)
.collect(Collectors.toList());
}
/**
* Equality is based on the value of {@link #enumClass} and
* {@link #prefix} and the contents of the set, which must match.
* <p>
* The immutability flag is not considered, nor is the
* {@link #namesToValues} map, though as that is generated from
* the enumeration and prefix, it is implicitly equal if the prefix
* and enumClass fields are equal.
* @param o other object
* @return true iff the equality condition is met.
*/
@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
FlagSet<?> flagSet = (FlagSet<?>) o;
return Objects.equals(enumClass, flagSet.enumClass)
&& Objects.equals(prefix, flagSet.prefix)
&& Objects.equals(flags, flagSet.flags);
}
/**
* Hash code is based on the flags.
* @return a hash code.
*/
@Override
public int hashCode() {
return Objects.hashCode(flags);
}
/**
* Create a copy of the FlagSet.
* @return a new mutable instance with a separate copy of the flags
*/
public FlagSet<E> copy() {
return new FlagSet<>(enumClass, prefix, flags);
}
/**
* Convert to a string which can be then set in a configuration.
* This is effectively a marshalled form of the flags.
* @return a comma separated list of flag names.
*/
public String toConfigurationString() {
return flags.stream()
.map(Enum::name)
.collect(Collectors.joining(", "));
}
/**
* Create a FlagSet.
* @param enumClass class of enum
* @param prefix prefix (with trailing ".") for path capabilities probe
* @param flags flags
* @param <E> enum type
* @return a mutable FlagSet
*/
public static <E extends Enum<E>> FlagSet<E> createFlagSet(
final Class<E> enumClass,
final String prefix,
final EnumSet<E> flags) {
return new FlagSet<>(enumClass, prefix, flags);
}
/**
* Create a FlagSet from a list of enum values.
* @param enumClass class of enum
* @param prefix prefix (with trailing ".") for path capabilities probe
* @param enabled varags list of flags to enable.
* @param <E> enum type
* @return a mutable FlagSet
*/
@SafeVarargs
public static <E extends Enum<E>> FlagSet<E> createFlagSet(
final Class<E> enumClass,
final String prefix,
final E... enabled) {
final FlagSet<E> flagSet = new FlagSet<>(enumClass, prefix, null);
Arrays.stream(enabled).forEach(flag -> {
if (flag != null) {
flagSet.enable(flag);
}
});
return flagSet;
}
/**
* Build a FlagSet from a comma separated list of values.
* Case independent.
* Special handling of "*" meaning: all values.
* @param enumClass class of enum
* @param conf configuration
* @param key key to look for
* @param ignoreUnknown should unknown values raise an exception?
* @param <E> enumeration type
* @return a mutable FlagSet
* @throws IllegalArgumentException if one of the entries was unknown and ignoreUnknown is false,
* or there are two entries in the enum which differ only by case.
*/
public static <E extends Enum<E>> FlagSet<E> buildFlagSet(
final Class<E> enumClass,
final Configuration conf,
final String key,
final boolean ignoreUnknown) {
final EnumSet<E> flags = conf.getEnumSet(key, enumClass, ignoreUnknown);
return createFlagSet(enumClass, key + ".", flags);
}
}

View File

@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.util;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import static java.util.EnumSet.noneOf;
import static org.apache.hadoop.util.Preconditions.checkArgument;
import static org.apache.hadoop.util.StringUtils.getTrimmedStringCollection;
/**
* Configuration Helper class to provide advanced configuration parsing.
* Private; external code MUST use {@link Configuration} instead
*/
@InterfaceAudience.Private
public final class ConfigurationHelper {
/**
* Error string if there are multiple enum elements which only differ
* by case: {@value}.
*/
@VisibleForTesting
static final String ERROR_MULTIPLE_ELEMENTS_MATCHING_TO_LOWER_CASE_VALUE =
"has multiple elements matching to lower case value";
private ConfigurationHelper() {
}
/**
* Given a comma separated list of enum values,
* trim the list, map to enum values in the message (case insensitive)
* and return the set.
* Special handling of "*" meaning: all values.
* @param key Configuration object key -used in error messages.
* @param valueString value from Configuration
* @param enumClass class of enum
* @param ignoreUnknown should unknown values be ignored?
* @param <E> enum type
* @return a mutable set of enum values parsed from the valueString, with any unknown
* matches stripped if {@code ignoreUnknown} is true.
* @throws IllegalArgumentException if one of the entries was unknown and ignoreUnknown is false,
* or there are two entries in the enum which differ only by case.
*/
@SuppressWarnings("unchecked")
public static <E extends Enum<E>> EnumSet<E> parseEnumSet(final String key,
final String valueString,
final Class<E> enumClass,
final boolean ignoreUnknown) throws IllegalArgumentException {
// build a map of lower case string to enum values.
final Map<String, E> mapping = mapEnumNamesToValues("", enumClass);
// scan the input string and add all which match
final EnumSet<E> enumSet = noneOf(enumClass);
for (String element : getTrimmedStringCollection(valueString)) {
final String item = element.toLowerCase(Locale.ROOT);
if ("*".equals(item)) {
enumSet.addAll(mapping.values());
continue;
}
final E e = mapping.get(item);
if (e != null) {
enumSet.add(e);
} else {
// no match
// unless configured to ignore unknown values, raise an exception
checkArgument(ignoreUnknown, "%s: Unknown option value: %s in list %s."
+ " Valid options for enum class %s are: %s",
key, element, valueString,
enumClass.getName(),
mapping.keySet().stream().collect(Collectors.joining(",")));
}
}
return enumSet;
}
/**
* Given an enum class, build a map of lower case names to values.
* @param prefix prefix (with trailing ".") for path capabilities probe
* @param enumClass class of enum
* @param <E> enum type
* @return a mutable map of lower case names to enum values
* @throws IllegalArgumentException if there are two entries which differ only by case.
*/
public static <E extends Enum<E>> Map<String, E> mapEnumNamesToValues(
final String prefix,
final Class<E> enumClass) {
final E[] constants = enumClass.getEnumConstants();
Map<String, E> mapping = new HashMap<>(constants.length);
for (E constant : constants) {
final String lc = constant.name().toLowerCase(Locale.ROOT);
final E orig = mapping.put(prefix + lc, constant);
checkArgument(orig == null,
"Enum %s "
+ ERROR_MULTIPLE_ELEMENTS_MATCHING_TO_LOWER_CASE_VALUE
+ " %s",
enumClass, lc);
}
return mapping;
}
}

View File

@ -0,0 +1,431 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.impl;
import java.util.EnumSet;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.test.AbstractHadoopTestBase;
import static java.util.EnumSet.allOf;
import static java.util.EnumSet.noneOf;
import static org.apache.hadoop.fs.impl.FlagSet.buildFlagSet;
import static org.apache.hadoop.fs.impl.FlagSet.createFlagSet;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
/**
* Unit tests for {@link FlagSet} class.
*/
public final class TestFlagSet extends AbstractHadoopTestBase {
private static final String KEY = "key";
public static final String CAPABILITY_B = KEY + ".b";
public static final String CAPABILITY_C = KEY + ".c";
public static final String CAPABILITY_A = KEY + ".a";
private static final String KEYDOT = KEY + ".";
/**
* Flagset used in tests and assertions.
*/
private FlagSet<SimpleEnum> flagSet =
createFlagSet(SimpleEnum.class, KEYDOT, noneOf(SimpleEnum.class));
/**
* Simple Enums for the tests.
*/
private enum SimpleEnum { a, b, c }
/**
* Enum with a single value.
*/
private enum OtherEnum { a }
/**
* Test that an entry can be enabled and disabled.
*/
@Test
public void testEntryEnableDisable() {
Assertions.assertThat(flagSet.flags()).isEmpty();
assertDisabled(SimpleEnum.a);
flagSet.enable(SimpleEnum.a);
assertEnabled(SimpleEnum.a);
flagSet.disable(SimpleEnum.a);
assertDisabled(SimpleEnum.a);
}
/**
* Test the setter.
*/
@Test
public void testSetMethod() {
Assertions.assertThat(flagSet.flags()).isEmpty();
flagSet.set(SimpleEnum.a, true);
assertEnabled(SimpleEnum.a);
flagSet.set(SimpleEnum.a, false);
assertDisabled(SimpleEnum.a);
}
/**
* Test mutability by making immutable and
* expecting setters to fail.
*/
@Test
public void testMutability() throws Throwable {
flagSet.set(SimpleEnum.a, true);
flagSet.makeImmutable();
intercept(IllegalStateException.class, () ->
flagSet.disable(SimpleEnum.a));
assertEnabled(SimpleEnum.a);
intercept(IllegalStateException.class, () ->
flagSet.set(SimpleEnum.a, false));
assertEnabled(SimpleEnum.a);
// now look at the setters
intercept(IllegalStateException.class, () ->
flagSet.enable(SimpleEnum.b));
assertDisabled(SimpleEnum.b);
intercept(IllegalStateException.class, () ->
flagSet.set(SimpleEnum.b, true));
assertDisabled(SimpleEnum.b);
}
/**
* Test stringification.
*/
@Test
public void testToString() throws Throwable {
// empty
assertStringValue("{}");
assertConfigurationStringMatches("");
// single value
flagSet.enable(SimpleEnum.a);
assertStringValue("{a}");
assertConfigurationStringMatches("a");
// add a second value.
flagSet.enable(SimpleEnum.b);
assertStringValue("{a, b}");
}
/**
* Assert that {@link FlagSet#toString()} matches the expected
* value.
* @param expected expected value
*/
private void assertStringValue(final String expected) {
Assertions.assertThat(flagSet.toString())
.isEqualTo(expected);
}
/**
* Assert the configuration string form matches that expected.
*/
public void assertConfigurationStringMatches(final String expected) {
Assertions.assertThat(flagSet.toConfigurationString())
.describedAs("Configuration string of %s", flagSet)
.isEqualTo(expected);
}
/**
* Test parsing from a configuration file.
* Multiple entries must be parsed, whitespace trimmed.
*/
@Test
public void testConfEntry() {
flagSet = flagSetFromConfig("a\t,\nc ", true);
assertFlagSetMatches(flagSet, SimpleEnum.a, SimpleEnum.c);
assertHasCapability(CAPABILITY_A);
assertHasCapability(CAPABILITY_C);
assertLacksCapability(CAPABILITY_B);
assertPathCapabilitiesMatch(flagSet, CAPABILITY_A, CAPABILITY_C);
}
/**
* Create a flagset from a configuration string.
* @param string configuration string.
* @param ignoreUnknown should unknown values be ignored?
* @return a flagset
*/
private static FlagSet<SimpleEnum> flagSetFromConfig(final String string,
final boolean ignoreUnknown) {
final Configuration conf = mkConf(string);
return buildFlagSet(SimpleEnum.class, conf, KEY, ignoreUnknown);
}
/**
* Test parsing from a configuration file,
* where an entry is unknown; the builder is set to ignoreUnknown.
*/
@Test
public void testConfEntryWithUnknownIgnored() {
flagSet = flagSetFromConfig("a, unknown", true);
assertFlagSetMatches(flagSet, SimpleEnum.a);
assertHasCapability(CAPABILITY_A);
assertLacksCapability(CAPABILITY_B);
assertLacksCapability(CAPABILITY_C);
}
/**
* Test parsing from a configuration file where
* the same entry is duplicated.
*/
@Test
public void testDuplicateConfEntry() {
flagSet = flagSetFromConfig("a,\ta,\na\"", true);
assertFlagSetMatches(flagSet, SimpleEnum.a);
assertHasCapability(CAPABILITY_A);
}
/**
* Handle an unknown configuration value.
*/
@Test
public void testConfUnknownFailure() throws Throwable {
intercept(IllegalArgumentException.class, () ->
flagSetFromConfig("a, unknown", false));
}
/**
* Create a configuration with {@link #KEY} set to the given value.
* @param value value to set
* @return the configuration.
*/
private static Configuration mkConf(final String value) {
final Configuration conf = new Configuration(false);
conf.set(KEY, value);
return conf;
}
/**
* Assert that the flagset has a capability.
* @param capability capability to probe for
*/
private void assertHasCapability(final String capability) {
Assertions.assertThat(flagSet.hasCapability(capability))
.describedAs("Capability of %s on %s", capability, flagSet)
.isTrue();
}
/**
* Assert that the flagset lacks a capability.
* @param capability capability to probe for
*/
private void assertLacksCapability(final String capability) {
Assertions.assertThat(flagSet.hasCapability(capability))
.describedAs("Capability of %s on %s", capability, flagSet)
.isFalse();
}
/**
* Test the * binding.
*/
@Test
public void testStarEntry() {
flagSet = flagSetFromConfig("*", false);
assertFlags(SimpleEnum.a, SimpleEnum.b, SimpleEnum.c);
assertHasCapability(CAPABILITY_A);
assertHasCapability(CAPABILITY_B);
Assertions.assertThat(flagSet.pathCapabilities())
.describedAs("path capabilities of %s", flagSet)
.containsExactlyInAnyOrder(CAPABILITY_A, CAPABILITY_B, CAPABILITY_C);
}
@Test
public void testRoundTrip() {
final FlagSet<SimpleEnum> s1 = createFlagSet(SimpleEnum.class,
KEYDOT,
allOf(SimpleEnum.class));
final FlagSet<SimpleEnum> s2 = roundTrip(s1);
Assertions.assertThat(s1.flags()).isEqualTo(s2.flags());
assertFlagSetMatches(s2, SimpleEnum.a, SimpleEnum.b, SimpleEnum.c);
}
@Test
public void testEmptyRoundTrip() {
final FlagSet<SimpleEnum> s1 = createFlagSet(SimpleEnum.class, KEYDOT,
noneOf(SimpleEnum.class));
final FlagSet<SimpleEnum> s2 = roundTrip(s1);
Assertions.assertThat(s1.flags())
.isEqualTo(s2.flags());
Assertions.assertThat(s2.isEmpty())
.describedAs("empty flagset %s", s2)
.isTrue();
assertFlagSetMatches(flagSet);
Assertions.assertThat(flagSet.pathCapabilities())
.describedAs("path capabilities of %s", flagSet)
.isEmpty();
}
@Test
public void testSetIsClone() {
final EnumSet<SimpleEnum> flags = noneOf(SimpleEnum.class);
final FlagSet<SimpleEnum> s1 = createFlagSet(SimpleEnum.class, KEYDOT, flags);
s1.enable(SimpleEnum.b);
// set a source flag
flags.add(SimpleEnum.a);
// verify the derived flagset is unchanged
assertFlagSetMatches(s1, SimpleEnum.b);
}
@Test
public void testEquality() {
final FlagSet<SimpleEnum> s1 = createFlagSet(SimpleEnum.class, KEYDOT, SimpleEnum.a);
final FlagSet<SimpleEnum> s2 = createFlagSet(SimpleEnum.class, KEYDOT, SimpleEnum.a);
// make one of them immutable
s2.makeImmutable();
Assertions.assertThat(s1)
.describedAs("s1 == s2")
.isEqualTo(s2);
Assertions.assertThat(s1.hashCode())
.describedAs("hashcode of s1 == hashcode of s2")
.isEqualTo(s2.hashCode());
}
@Test
public void testInequality() {
final FlagSet<SimpleEnum> s1 =
createFlagSet(SimpleEnum.class, KEYDOT, noneOf(SimpleEnum.class));
final FlagSet<SimpleEnum> s2 =
createFlagSet(SimpleEnum.class, KEYDOT, SimpleEnum.a, SimpleEnum.b);
Assertions.assertThat(s1)
.describedAs("s1 == s2")
.isNotEqualTo(s2);
}
@Test
public void testClassInequality() {
final FlagSet<?> s1 =
createFlagSet(SimpleEnum.class, KEYDOT, noneOf(SimpleEnum.class));
final FlagSet<?> s2 =
createFlagSet(OtherEnum.class, KEYDOT, OtherEnum.a);
Assertions.assertThat(s1)
.describedAs("s1 == s2")
.isNotEqualTo(s2);
}
/**
* The copy operation creates a new instance which is now mutable,
* even if the original was immutable.
*/
@Test
public void testCopy() throws Throwable {
FlagSet<SimpleEnum> s1 =
createFlagSet(SimpleEnum.class, KEYDOT, SimpleEnum.a, SimpleEnum.b);
s1.makeImmutable();
FlagSet<SimpleEnum> s2 = s1.copy();
Assertions.assertThat(s2)
.describedAs("copy of %s", s1)
.isNotSameAs(s1);
Assertions.assertThat(!s2.isImmutable())
.describedAs("set %s is immutable", s2)
.isTrue();
Assertions.assertThat(s1)
.describedAs("s1 == s2")
.isEqualTo(s2);
}
@Test
public void testCreateNullEnumClass() throws Throwable {
intercept(NullPointerException.class, () ->
createFlagSet(null, KEYDOT, SimpleEnum.a));
}
@Test
public void testCreateNullPrefix() throws Throwable {
intercept(NullPointerException.class, () ->
createFlagSet(SimpleEnum.class, null, SimpleEnum.a));
}
/**
* Round trip a FlagSet.
* @param flagset FlagSet to save to a configuration and retrieve.
* @return a new FlagSet.
*/
private FlagSet<SimpleEnum> roundTrip(FlagSet<SimpleEnum> flagset) {
final Configuration conf = new Configuration(false);
conf.set(KEY, flagset.toConfigurationString());
return buildFlagSet(SimpleEnum.class, conf, KEY, false);
}
/**
* Assert a flag is enabled in the {@link #flagSet} field.
* @param flag flag to check
*/
private void assertEnabled(final SimpleEnum flag) {
Assertions.assertThat(flagSet.enabled(flag))
.describedAs("status of flag %s in %s", flag, flagSet)
.isTrue();
}
/**
* Assert a flag is disabled in the {@link #flagSet} field.
* @param flag flag to check
*/
private void assertDisabled(final SimpleEnum flag) {
Assertions.assertThat(flagSet.enabled(flag))
.describedAs("status of flag %s in %s", flag, flagSet)
.isFalse();
}
/**
* Assert that a set of flags are enabled in the {@link #flagSet} field.
* @param flags flags which must be set.
*/
private void assertFlags(final SimpleEnum... flags) {
for (SimpleEnum flag : flags) {
assertEnabled(flag);
}
}
/**
* Assert that a FlagSet contains an exclusive set of values.
* @param flags flags which must be set.
*/
private void assertFlagSetMatches(
FlagSet<SimpleEnum> fs,
SimpleEnum... flags) {
Assertions.assertThat(fs.flags())
.describedAs("path capabilities of %s", fs)
.containsExactly(flags);
}
/**
* Assert that a flagset contains exactly the capabilities.
* This is calculated by getting the list of active capabilities
* and asserting on the list.
* @param fs flagset
* @param capabilities capabilities
*/
private void assertPathCapabilitiesMatch(
FlagSet<SimpleEnum> fs,
String... capabilities) {
Assertions.assertThat(fs.pathCapabilities())
.describedAs("path capabilities of %s", fs)
.containsExactlyInAnyOrder(capabilities);
}
}

View File

@ -0,0 +1,174 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.util;
import java.util.Set;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.IterableAssert;
import org.junit.Test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.test.AbstractHadoopTestBase;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.apache.hadoop.util.ConfigurationHelper.ERROR_MULTIPLE_ELEMENTS_MATCHING_TO_LOWER_CASE_VALUE;
import static org.apache.hadoop.util.ConfigurationHelper.mapEnumNamesToValues;
import static org.apache.hadoop.util.ConfigurationHelper.parseEnumSet;
/**
* Test for {@link ConfigurationHelper}.
*/
public class TestConfigurationHelper extends AbstractHadoopTestBase {
/**
* Simple Enums.
* "i" is included for case tests, as it is special in turkey.
*/
private enum SimpleEnum { a, b, c, i }
/**
* Special case: an enum with no values.
*/
private enum EmptyEnum { }
/**
* Create assertion about the outcome of
* {@link ConfigurationHelper#parseEnumSet(String, String, Class, boolean)}.
* @param valueString value from Configuration
* @param enumClass class of enum
* @param ignoreUnknown should unknown values be ignored?
* @param <E> enum type
* @return an assertion on the outcome.
* @throws IllegalArgumentException if one of the entries was unknown and ignoreUnknown is false,
* or there are two entries in the enum which differ only by case.
*/
private static <E extends Enum<E>> IterableAssert<E> assertEnumParse(
final String valueString,
final Class<E> enumClass,
final boolean ignoreUnknown) {
final Set<E> enumSet = parseEnumSet("key", valueString, enumClass, ignoreUnknown);
final IterableAssert<E> assertion = Assertions.assertThat(enumSet);
return assertion.describedAs("parsed enum set '%s'", valueString);
}
/**
* Create a configuration with the key {@code key} set to a {@code value}.
* @param value value for the key
* @return a configuration with only key set.
*/
private Configuration confWithKey(String value) {
final Configuration conf = new Configuration(false);
conf.set("key", value);
return conf;
}
@Test
public void testEnumParseAll() {
assertEnumParse("*", SimpleEnum.class, false)
.containsExactly(SimpleEnum.a, SimpleEnum.b, SimpleEnum.c, SimpleEnum.i);
}
@Test
public void testEnumParse() {
assertEnumParse("a, b,c", SimpleEnum.class, false)
.containsExactly(SimpleEnum.a, SimpleEnum.b, SimpleEnum.c);
}
@Test
public void testEnumCaseIndependence() {
assertEnumParse("A, B, C, I", SimpleEnum.class, false)
.containsExactly(SimpleEnum.a, SimpleEnum.b, SimpleEnum.c, SimpleEnum.i);
}
@Test
public void testEmptyArguments() {
assertEnumParse(" ", SimpleEnum.class, false)
.isEmpty();
}
@Test
public void testUnknownEnumNotIgnored() throws Throwable {
intercept(IllegalArgumentException.class, "unrecognized", () ->
parseEnumSet("key", "c, unrecognized", SimpleEnum.class, false));
}
@Test
public void testUnknownEnumNotIgnoredThroughConf() throws Throwable {
intercept(IllegalArgumentException.class, "unrecognized", () ->
confWithKey("c, unrecognized")
.getEnumSet("key", SimpleEnum.class, false));
}
@Test
public void testUnknownEnumIgnored() {
assertEnumParse("c, d", SimpleEnum.class, true)
.containsExactly(SimpleEnum.c);
}
@Test
public void testUnknownStarEnum() throws Throwable {
intercept(IllegalArgumentException.class, "unrecognized", () ->
parseEnumSet("key", "*, unrecognized", SimpleEnum.class, false));
}
@Test
public void testUnknownStarEnumIgnored() {
assertEnumParse("*, d", SimpleEnum.class, true)
.containsExactly(SimpleEnum.a, SimpleEnum.b, SimpleEnum.c, SimpleEnum.i);
}
/**
* Unsupported enum as the same case value is present.
*/
private enum CaseConflictingEnum { a, A }
@Test
public void testCaseConflictingEnumNotSupported() throws Throwable {
intercept(IllegalArgumentException.class,
ERROR_MULTIPLE_ELEMENTS_MATCHING_TO_LOWER_CASE_VALUE,
() ->
parseEnumSet("key", "c, unrecognized",
CaseConflictingEnum.class, false));
}
@Test
public void testEmptyEnumMap() {
Assertions.assertThat(mapEnumNamesToValues("", EmptyEnum.class))
.isEmpty();
}
/**
* A star enum for an empty enum must be empty.
*/
@Test
public void testEmptyStarEnum() {
assertEnumParse("*", EmptyEnum.class, false)
.isEmpty();
}
@Test
public void testDuplicateValues() {
assertEnumParse("a, a, c, b, c", SimpleEnum.class, true)
.containsExactly(SimpleEnum.a, SimpleEnum.b, SimpleEnum.c);
}
}

View File

@ -1404,6 +1404,11 @@ private Constants() {
public static final String FS_S3A_CREATE_PERFORMANCE_ENABLED =
FS_S3A_CREATE_PERFORMANCE + ".enabled";
/**
* Comma separated list of performance flags.
*/
public static final String FS_S3A_PERFORMANCE_FLAGS =
"fs.s3a.performance.flags";
/**
* Prefix for adding a header to the object when created.
* The actual value must have a "." suffix and then the actual header.

View File

@ -109,8 +109,10 @@
import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
import org.apache.hadoop.fs.Globber;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.impl.FlagSet;
import org.apache.hadoop.fs.impl.OpenFileParameters;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.s3a.api.PerformanceFlagEnum;
import org.apache.hadoop.fs.s3a.audit.AuditSpanS3A;
import org.apache.hadoop.fs.s3a.auth.SignerManager;
import org.apache.hadoop.fs.s3a.auth.delegation.DelegationOperations;
@ -223,6 +225,7 @@
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
import static org.apache.hadoop.fs.CommonPathCapabilities.DIRECTORY_LISTING_INCONSISTENT;
import static org.apache.hadoop.fs.impl.FlagSet.buildFlagSet;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.Invoker.*;
@ -369,8 +372,12 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
private S3AStatisticsContext statisticsContext;
/** Storage Statistics Bonded to the instrumentation. */
private S3AStorageStatistics storageStatistics;
/** Should all create files be "performance" unless unset. */
private boolean performanceCreation;
/**
* Performance flags.
*/
private FlagSet<PerformanceFlagEnum> performanceFlags;
/**
* Default input policy; may be overridden in
* {@code openFile()}.
@ -740,10 +747,23 @@ public void initialize(URI name, Configuration originalConf)
// verify there's no S3Guard in the store config.
checkNoS3Guard(this.getUri(), getConf());
// read in performance options and parse them to a list of flags.
performanceFlags = buildFlagSet(
PerformanceFlagEnum.class,
conf,
FS_S3A_PERFORMANCE_FLAGS,
true);
// performance creation flag for code which wants performance
// at the risk of overwrites.
performanceCreation = conf.getBoolean(FS_S3A_CREATE_PERFORMANCE,
FS_S3A_CREATE_PERFORMANCE_DEFAULT);
// this uses the performance flags as the default and then
// updates the performance flags to match.
// a bit convoluted.
boolean performanceCreation = conf.getBoolean(FS_S3A_CREATE_PERFORMANCE,
performanceFlags.enabled(PerformanceFlagEnum.Create));
performanceFlags.set(PerformanceFlagEnum.Create, performanceCreation);
// freeze.
performanceFlags.makeImmutable();
LOG.debug("{} = {}", FS_S3A_CREATE_PERFORMANCE, performanceCreation);
allowAuthoritativePaths = S3Guard.getAuthoritativePaths(this);
@ -1289,6 +1309,14 @@ public RequestFactory getRequestFactory() {
return requestFactory;
}
/**
* Get the performance flags.
* @return performance flags.
*/
public FlagSet<PerformanceFlagEnum> getPerformanceFlags() {
return performanceFlags;
}
/**
* Implementation of all operations used by delegation tokens.
*/
@ -2036,9 +2064,9 @@ public FSDataOutputStream create(Path f, FsPermission permission,
// work out the options to pass down
CreateFileBuilder.CreateFileOptions options;
if (performanceCreation) {
if (getPerformanceFlags().enabled(PerformanceFlagEnum.Create)) {
options = OPTIONS_CREATE_FILE_PERFORMANCE;
}else {
} else {
options = overwrite
? OPTIONS_CREATE_FILE_OVERWRITE
: OPTIONS_CREATE_FILE_NO_OVERWRITE;
@ -2209,7 +2237,8 @@ public FSDataOutputStreamBuilder createFile(final Path path) {
builder
.create()
.overwrite(true)
.must(FS_S3A_CREATE_PERFORMANCE, performanceCreation);
.must(FS_S3A_CREATE_PERFORMANCE,
getPerformanceFlags().enabled(PerformanceFlagEnum.Create));
return builder;
} catch (IOException e) {
// catch any IOEs raised in span creation and convert to
@ -2274,7 +2303,8 @@ public FSDataOutputStream createNonRecursive(Path p,
.withFlags(flags)
.blockSize(blockSize)
.bufferSize(bufferSize)
.must(FS_S3A_CREATE_PERFORMANCE, performanceCreation);
.must(FS_S3A_CREATE_PERFORMANCE,
getPerformanceFlags().enabled(PerformanceFlagEnum.Create));
if (progress != null) {
builder.progress(progress);
}
@ -4845,6 +4875,7 @@ public String toString() {
sb.append(", partSize=").append(partSize);
sb.append(", enableMultiObjectsDelete=").append(enableMultiObjectsDelete);
sb.append(", maxKeys=").append(maxKeys);
sb.append(", performanceFlags=").append(performanceFlags);
if (cannedACL != null) {
sb.append(", cannedACL=").append(cannedACL);
}
@ -5557,7 +5588,7 @@ public boolean hasPathCapability(final Path path, final String capability)
// is the FS configured for create file performance
case FS_S3A_CREATE_PERFORMANCE_ENABLED:
return performanceCreation;
return performanceFlags.enabled(PerformanceFlagEnum.Create);
// is the optimized copy from local enabled.
case OPTIMIZED_COPY_FROM_LOCAL:
@ -5572,8 +5603,15 @@ public boolean hasPathCapability(final Path path, final String capability)
return s3AccessGrantsEnabled;
default:
return super.hasPathCapability(p, cap);
// is it a performance flag?
if (performanceFlags.hasCapability(capability)) {
return true;
}
// fall through
}
// hand off to superclass
return super.hasPathCapability(p, cap);
}
/**
@ -5697,23 +5735,27 @@ public S3AMultipartUploaderBuilder createMultipartUploader(
@Override
@InterfaceAudience.Private
public StoreContext createStoreContext() {
return new StoreContextBuilder().setFsURI(getUri())
// please keep after setFsURI() in alphabetical order
return new StoreContextBuilder()
.setFsURI(getUri())
.setAuditor(getAuditor())
.setBucket(getBucket())
.setChangeDetectionPolicy(changeDetectionPolicy)
.setConfiguration(getConf())
.setUsername(getUsername())
.setOwner(owner)
.setContextAccessors(new ContextAccessorsImpl())
.setEnableCSE(isCSEEnabled)
.setExecutor(boundedThreadPool)
.setExecutorCapacity(executorCapacity)
.setInvoker(invoker)
.setInstrumentation(statisticsContext)
.setStorageStatistics(getStorageStatistics())
.setInputPolicy(getInputPolicy())
.setChangeDetectionPolicy(changeDetectionPolicy)
.setInstrumentation(statisticsContext)
.setInvoker(invoker)
.setMultiObjectDeleteEnabled(enableMultiObjectsDelete)
.setOwner(owner)
.setPerformanceFlags(performanceFlags)
.setStorageStatistics(getStorageStatistics())
.setUseListV1(useListV1)
.setContextAccessors(new ContextAccessorsImpl())
.setAuditor(getAuditor())
.setEnableCSE(isCSEEnabled)
.setUsername(getUsername())
.build();
}

View File

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.api;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Enum of performance flags.
* <p>
* When adding new flags, please keep in alphabetical order.
*/
@InterfaceAudience.LimitedPrivate("S3A Filesystem and extensions")
@InterfaceStability.Unstable
public enum PerformanceFlagEnum {
/**
* Create performance.
*/
Create,
/**
* Delete performance.
*/
Delete,
/**
* Mkdir performance.
*/
Mkdir,
/**
* Open performance.
*/
Open
}

View File

@ -32,6 +32,8 @@
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.impl.FlagSet;
import org.apache.hadoop.fs.s3a.api.PerformanceFlagEnum;
import org.apache.hadoop.fs.s3a.api.RequestFactory;
import org.apache.hadoop.fs.s3a.audit.AuditSpanS3A;
import org.apache.hadoop.fs.s3a.Invoker;
@ -117,6 +119,11 @@ public class StoreContext implements ActiveThreadSpanSource<AuditSpan> {
/** Is client side encryption enabled? */
private final boolean isCSEEnabled;
/**
* Performance flags.
*/
private final FlagSet<PerformanceFlagEnum> performanceFlags;
/**
* Instantiate.
*/
@ -137,7 +144,8 @@ public class StoreContext implements ActiveThreadSpanSource<AuditSpan> {
final boolean useListV1,
final ContextAccessors contextAccessors,
final AuditSpanSource<AuditSpanS3A> auditor,
final boolean isCSEEnabled) {
final boolean isCSEEnabled,
final FlagSet<PerformanceFlagEnum> performanceFlags) {
this.fsURI = fsURI;
this.bucket = bucket;
this.configuration = configuration;
@ -158,6 +166,7 @@ public class StoreContext implements ActiveThreadSpanSource<AuditSpan> {
this.contextAccessors = contextAccessors;
this.auditor = auditor;
this.isCSEEnabled = isCSEEnabled;
this.performanceFlags = performanceFlags;
}
public URI getFsURI() {
@ -411,4 +420,12 @@ public RequestFactory getRequestFactory() {
public boolean isCSEEnabled() {
return isCSEEnabled;
}
/**
* Get the performance flags.
* @return FlagSet containing the performance flags.
*/
public FlagSet<PerformanceFlagEnum> getPerformanceFlags() {
return performanceFlags;
}
}

View File

@ -22,9 +22,11 @@
import java.util.concurrent.ExecutorService;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.impl.FlagSet;
import org.apache.hadoop.fs.s3a.Invoker;
import org.apache.hadoop.fs.s3a.S3AInputPolicy;
import org.apache.hadoop.fs.s3a.S3AStorageStatistics;
import org.apache.hadoop.fs.s3a.api.PerformanceFlagEnum;
import org.apache.hadoop.fs.s3a.audit.AuditSpanS3A;
import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
import org.apache.hadoop.fs.store.audit.AuditSpanSource;
@ -69,6 +71,8 @@ public class StoreContextBuilder {
private boolean isCSEEnabled;
private FlagSet<PerformanceFlagEnum> performanceFlags;
public StoreContextBuilder setFsURI(final URI fsURI) {
this.fsURI = fsURI;
return this;
@ -175,6 +179,16 @@ public StoreContextBuilder setEnableCSE(
return this;
}
public FlagSet<PerformanceFlagEnum> getPerformanceFlags() {
return performanceFlags;
}
public StoreContextBuilder setPerformanceFlags(
final FlagSet<PerformanceFlagEnum> flagSet) {
this.performanceFlags = flagSet;
return this;
}
public StoreContext build() {
return new StoreContext(fsURI,
bucket,
@ -192,6 +206,7 @@ public StoreContext build() {
useListV1,
contextAccessors,
auditor,
isCSEEnabled);
isCSEEnabled,
performanceFlags);
}
}

View File

@ -392,6 +392,8 @@ public static class BucketInfo extends S3GuardTool {
"\tThe S3A connector is compatible with buckets where"
+ " directory markers are not deleted";
public static final String CAPABILITY_FORMAT = "\t%s %s%n";
public BucketInfo(Configuration conf) {
super(conf, GUARDED_FLAG, UNGUARDED_FLAG, FIPS_FLAG, MAGIC_FLAG);
CommandFormat format = getCommandFormat();
@ -560,9 +562,14 @@ public int run(String[] args, PrintStream out)
// and check for capabilities
println(out, "%nStore Capabilities");
for (String capability : S3A_DYNAMIC_CAPABILITIES) {
out.printf("\t%s %s%n", capability,
out.printf(CAPABILITY_FORMAT, capability,
fs.hasPathCapability(root, capability));
}
// the performance flags are dynamically generated
fs.createStoreContext().getPerformanceFlags().pathCapabilities()
.forEach(capability -> out.printf(CAPABILITY_FORMAT, capability, "true"));
// finish with a newline
println(out, "");
if (commands.getOpt(FIPS_FLAG) && !fs.hasPathCapability(root, FIPS_ENDPOINT)) {

View File

@ -180,7 +180,11 @@ The S3A Filesystem client supports the notion of input policies, similar
to that of the Posix `fadvise()` API call. This tunes the behavior of the S3A
client to optimise HTTP GET requests for the different use cases.
### fadvise `sequential`
The list of supported options is found in
[FSDataInputStream](../../../../../../hadoop-common-project/hadoop-common/target/site/filesystem/fsdatainputstreambuilder.html).
### fadvise `sequential`, `whole-file`
Read through the file, possibly with some short forward seeks.
@ -196,6 +200,9 @@ sequential access, as should those reading data from gzipped `.gz` files.
Because the "normal" fadvise policy starts off in sequential IO mode,
there is rarely any need to explicit request this policy.
Distcp will automatically request `whole-file` access, even on deployments
where the cluster configuration is for `random` IO.
### fadvise `random`
Optimised for random IO, specifically the Hadoop `PositionedReadable`
@ -243,7 +250,7 @@ basis.
to set fadvise policies on input streams. Once implemented,
this will become the supported mechanism used for configuring the input IO policy.
### fadvise `normal` (default)
### fadvise `normal` or `adaptive` (default)
The `normal` policy starts off reading a file in `sequential` mode,
but if the caller seeks backwards in the stream, it switches from
@ -276,7 +283,45 @@ Fix: Use one of the dedicated [S3A Committers](committers.md).
## <a name="tuning"></a> Options to Tune
### <a name="pooling"></a> Thread and connection pool settings.
### <a name="flags"></a> Performance Flags: `fs.s3a.performance.flag`
This option takes a comma separated list of performance flags.
View it as the equivalent of the `-O` compiler optimization list C/C++ compilers offer.
That is a complicated list of options which deliver speed if the person setting them
understands the risks.
* The list of flags MAY change across releases
* The semantics of specific flags SHOULD NOT change across releases.
* If an option is to be tuned which may relax semantics, a new option MUST be defined.
* Unknown flags are ignored; this is to avoid compatibility.
* The option `*` means "turn everything on". This is implicitly unstable across releases.
| *Option* | *Meaning* | Since |
|----------|--------------------|:------|
| `create` | Create Performance | 3.4.1 |
The `create` flag has the same semantics as [`fs.s3a.create.performance`](#create-performance)
### <a name="create-performance"></a> Create Performance `fs.s3a.create.performance`
The configuration option `fs.s3a.create.performance` has the same behavior as
the `fs.s3a.performance.flag` flag option `create`:
* No overwrite checks are made when creating a file, even if overwrite is set to `false` in the application/library code
* No checks are made for an object being written above a path containing other objects (i.e. a "directory")
* No checks are made for a parent path containing an object which is not a directory marker (i.e. a "file")
This saves multiple probes per operation, especially a `LIST` call.
It may however result in
* Unintentional overwriting of data
* Creation of directory structures which can no longer be navigated through filesystem APIs.
Use with care, and, ideally, enable versioning on the S3 store.
### <a name="threads"></a> Thread and connection pool settings.
Each S3A client interacting with a single bucket, as a single user, has its
own dedicated pool of open HTTP connections alongside a pool of threads used
@ -441,9 +486,6 @@ killer.
1. As discussed [earlier](#pooling), use large values for
`fs.s3a.threads.max` and `fs.s3a.connection.maximum`.
1. Make sure that the bucket is using `sequential` or `normal` fadvise seek policies,
that is, `fs.s3a.experimental.input.fadvise` is not set to `random`
1. Perform listings in parallel by setting `-numListstatusThreads`
to a higher number. Make sure that `fs.s3a.connection.maximum`
is equal to or greater than the value used.
@ -451,6 +493,9 @@ is equal to or greater than the value used.
1. If using `-delete`, set `fs.trash.interval` to 0 to avoid the deleted
objects from being copied to a trash directory.
1. If using distcp to upload to a new path where no existing data exists,
consider adding the option `create` to the flags in `fs.s3a.performance.flag`.
*DO NOT* switch `fs.s3a.fast.upload.buffer` to buffer in memory.
If one distcp mapper runs out of memory it will fail,
and that runs the risk of failing the entire job.
@ -461,12 +506,6 @@ efficient in terms of HTTP connection use, and reduce the IOP rate against
the S3 bucket/shard.
```xml
<property>
<name>fs.s3a.experimental.input.fadvise</name>
<value>normal</value>
</property>
<property>
<name>fs.s3a.block.size</name>
<value>128M</value>
@ -481,6 +520,12 @@ the S3 bucket/shard.
<name>fs.trash.interval</name>
<value>0</value>
</property>
<!-- maybe -->
<property>
<name>fs.s3a.create.performance</name>
<value>create</value>
</property>
```
## <a name="rm"></a> hadoop shell commands `fs -rm`
@ -642,7 +687,7 @@ expects an immediate response. For example, a thread may block so long
that other liveness checks start to fail.
Consider spawning off an executor thread to do these background cleanup operations.
## <a name="coding"></a> Tuning SSL Performance
## <a name="ssl"></a> Tuning SSL Performance
By default, S3A uses HTTPS to communicate with AWS Services. This means that all
communication with S3 is encrypted using SSL. The overhead of this encryption
@ -666,8 +711,6 @@ running with the vanilla JSSE.
### <a name="openssl"></a> OpenSSL Acceleration
**Experimental Feature**
As of HADOOP-16050 and HADOOP-16346, `fs.s3a.ssl.channel.mode` can be set to
either `default` or `openssl` to enable native OpenSSL acceleration of HTTPS
requests. OpenSSL implements the SSL and TLS protocols using native code. For
@ -721,12 +764,12 @@ exception and S3A initialization will fail.
Supported values for `fs.s3a.ssl.channel.mode`:
| `fs.s3a.ssl.channel.mode` Value | Description |
|-------------------------------|-------------|
| `default_jsse` | Uses Java JSSE without GCM on Java 8 |
| `default_jsse_with_gcm` | Uses Java JSSE |
| `default` | Uses OpenSSL, falls back to `default_jsse` if OpenSSL cannot be loaded |
| `openssl` | Uses OpenSSL, fails if OpenSSL cannot be loaded |
| `fs.s3a.ssl.channel.mode` Value | Description |
|---------------------------------|------------------------------------------------------------------------|
| `default_jsse` | Uses Java JSSE without GCM on Java 8 |
| `default_jsse_with_gcm` | Uses Java JSSE |
| `default` | Uses OpenSSL, falls back to `default_jsse` if OpenSSL cannot be loaded |
| `openssl` | Uses OpenSSL, fails if OpenSSL cannot be loaded |
The naming convention is setup in order to preserve backwards compatibility
with the ABFS support of [HADOOP-15669](https://issues.apache.org/jira/browse/HADOOP-15669).
@ -734,7 +777,7 @@ with the ABFS support of [HADOOP-15669](https://issues.apache.org/jira/browse/HA
Other options may be added to `fs.s3a.ssl.channel.mode` in the future as
further SSL optimizations are made.
### WildFly classpath requirements
### WildFly classpath and SSL library requirements
For OpenSSL acceleration to work, a compatible version of the
wildfly JAR must be on the classpath. This is not explicitly declared
@ -744,21 +787,28 @@ optional.
If the wildfly JAR is not found, the network acceleration will fall back
to the JVM, always.
Note: there have been compatibility problems with wildfly JARs and openSSL
Similarly, the `libssl` library must be compatibile with wildfly.
Wildfly requires this native library to be part of an `openssl` installation.
Third party implementations may not work correctly.
This can be an isse in FIPS-compliant deployments, where the `libssl` library
is a third-party implementation built with restricted TLS protocols.
There have been compatibility problems with wildfly JARs and openSSL
releases in the past: version 1.0.4.Final is not compatible with openssl 1.1.1.
An extra complication was older versions of the `azure-data-lake-store-sdk`
JAR used in `hadoop-azure-datalake` contained an unshaded copy of the 1.0.4.Final
classes, causing binding problems even when a later version was explicitly
being placed on the classpath.
## <a name="initilization"></a> Tuning FileSystem Initialization.
## Tuning FileSystem Initialization.
### Disabling bucket existence checks
### Bucket existence checks
When an S3A Filesystem instance is created and initialized, the client
checks if the bucket provided is valid. This can be slow.
You can ignore bucket validation by configuring `fs.s3a.bucket.probe` as follows:
can be checks if the bucket provided is valid. This can be slow, which is why
it is disabled by default.
```xml
<property>
@ -767,9 +817,11 @@ You can ignore bucket validation by configuring `fs.s3a.bucket.probe` as follows
</property>
```
Note: if the bucket does not exist, this issue will surface when operations are performed
If the bucket does not exist, this issue will surface when operations are performed
on the filesystem; you will see `UnknownStoreException` stack traces.
Re-enabling the probe will force an early check but but is generally not needed.
### Rate limiting parallel FileSystem creation operations
Applications normally ask for filesystems from the shared cache,

View File

@ -30,6 +30,7 @@
import org.apache.hadoop.fs.s3a.S3ATestUtils;
import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE;
import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_PERFORMANCE_FLAGS;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
/**
@ -71,7 +72,8 @@ protected AbstractFSContract createContract(Configuration conf) {
protected Configuration createConfiguration() {
final Configuration conf = super.createConfiguration();
removeBaseAndBucketOverrides(conf,
FS_S3A_CREATE_PERFORMANCE);
FS_S3A_CREATE_PERFORMANCE,
FS_S3A_PERFORMANCE_FLAGS);
conf.setBoolean(FS_S3A_CREATE_PERFORMANCE, createPerformance);
S3ATestUtils.disableFilesystemCaching(conf);
return conf;

View File

@ -41,6 +41,7 @@
import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE;
import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_PERFORMANCE_FLAGS;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
import static org.apache.hadoop.fs.s3a.Statistic.*;
import static org.apache.hadoop.fs.s3a.performance.OperationCost.*;
@ -80,7 +81,9 @@ public ITestS3AFileOperationCost(
@Override
public Configuration createConfiguration() {
final Configuration conf = super.createConfiguration();
removeBaseAndBucketOverrides(conf, FS_S3A_CREATE_PERFORMANCE);
removeBaseAndBucketOverrides(conf,
FS_S3A_CREATE_PERFORMANCE,
FS_S3A_PERFORMANCE_FLAGS);
conf.setBoolean(FS_S3A_CREATE_PERFORMANCE, isKeepingMarkers());
return conf;
}

View File

@ -34,6 +34,7 @@
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.s3a.api.PerformanceFlagEnum;
import org.apache.hadoop.fs.s3a.auth.MarshalledCredentialBinding;
import org.apache.hadoop.fs.s3a.auth.MarshalledCredentials;
import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
@ -102,6 +103,7 @@
import java.util.stream.Collectors;
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
import static org.apache.hadoop.fs.impl.FlagSet.createFlagSet;
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit;
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletion;
import static org.apache.hadoop.fs.s3a.impl.S3ExpressStorage.STORE_CAPABILITY_S3_EXPRESS_STORAGE;
@ -992,6 +994,9 @@ public static StoreContext createMockStoreContext(
.setMultiObjectDeleteEnabled(multiDelete)
.setUseListV1(false)
.setContextAccessors(accessors)
.setPerformanceFlags(createFlagSet(
PerformanceFlagEnum.class,
FS_S3A_PERFORMANCE_FLAGS))
.build();
}

View File

@ -44,6 +44,7 @@
import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_TTL;
import static org.apache.hadoop.fs.s3a.Constants.ESTABLISH_TIMEOUT;
import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE;
import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_PERFORMANCE_FLAGS;
import static org.apache.hadoop.fs.s3a.Constants.MAXIMUM_CONNECTIONS;
import static org.apache.hadoop.fs.s3a.Constants.MAX_ERROR_RETRIES;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
@ -88,7 +89,8 @@ private Configuration timingOutConfiguration() {
PREFETCH_ENABLED_KEY,
REQUEST_TIMEOUT,
SOCKET_TIMEOUT,
FS_S3A_CREATE_PERFORMANCE
FS_S3A_CREATE_PERFORMANCE,
FS_S3A_PERFORMANCE_FLAGS
);
// only one connection is allowed, and the establish timeout is low

View File

@ -108,7 +108,8 @@ public Configuration createConfiguration() {
removeBaseAndBucketOverrides(bucketName, conf,
DIRECTORY_MARKER_POLICY,
AUTHORITATIVE_PATH,
FS_S3A_CREATE_PERFORMANCE);
FS_S3A_CREATE_PERFORMANCE,
FS_S3A_PERFORMANCE_FLAGS);
// directory marker options
conf.set(DIRECTORY_MARKER_POLICY,
keepMarkers

View File

@ -41,6 +41,7 @@
import static org.apache.hadoop.fs.contract.ContractTestUtils.toChar;
import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_HEADER;
import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE;
import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_PERFORMANCE_FLAGS;
import static org.apache.hadoop.fs.s3a.Constants.XA_HEADER_PREFIX;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_BULK_DELETE_REQUEST;
@ -105,7 +106,8 @@ private OperationCost expected(OperationCost source) {
public Configuration createConfiguration() {
final Configuration conf = super.createConfiguration();
removeBaseAndBucketOverrides(conf,
FS_S3A_CREATE_PERFORMANCE);
FS_S3A_CREATE_PERFORMANCE,
FS_S3A_PERFORMANCE_FLAGS);
conf.setBoolean(FS_S3A_CREATE_PERFORMANCE, createPerformance);
S3ATestUtils.disableFilesystemCaching(conf);
return conf;

View File

@ -56,6 +56,7 @@
import static org.apache.hadoop.fs.s3a.Constants.DIRECTORY_MARKER_POLICY_DELETE;
import static org.apache.hadoop.fs.s3a.Constants.DIRECTORY_MARKER_POLICY_KEEP;
import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE;
import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_PERFORMANCE_FLAGS;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
@ -201,7 +202,8 @@ protected Configuration createConfiguration() {
// directory marker options
removeBaseAndBucketOverrides(bucketName, conf,
DIRECTORY_MARKER_POLICY,
FS_S3A_CREATE_PERFORMANCE);
FS_S3A_CREATE_PERFORMANCE,
FS_S3A_PERFORMANCE_FLAGS);
conf.set(DIRECTORY_MARKER_POLICY,
keepMarkers
? DIRECTORY_MARKER_POLICY_KEEP

View File

@ -40,6 +40,7 @@
import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum;
import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE;
import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_PERFORMANCE_FLAGS;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
import static org.apache.hadoop.fs.s3a.Statistic.*;
import static org.apache.hadoop.fs.s3a.performance.OperationCost.*;
@ -80,7 +81,9 @@ public ITestS3ADeleteCost(final String name,
@Override
public Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
removeBaseAndBucketOverrides(conf, FS_S3A_CREATE_PERFORMANCE);
removeBaseAndBucketOverrides(conf,
FS_S3A_CREATE_PERFORMANCE,
FS_S3A_PERFORMANCE_FLAGS);
conf.setBoolean(FS_S3A_CREATE_PERFORMANCE, false);
return conf;
}

View File

@ -74,7 +74,8 @@ protected Configuration createConfiguration() {
S3A_BUCKET_PROBE,
DIRECTORY_MARKER_POLICY,
AUTHORITATIVE_PATH,
FS_S3A_CREATE_PERFORMANCE);
FS_S3A_CREATE_PERFORMANCE,
FS_S3A_PERFORMANCE_FLAGS);
// base FS is legacy
conf.set(DIRECTORY_MARKER_POLICY, DIRECTORY_MARKER_POLICY_DELETE);
conf.setBoolean(FS_S3A_CREATE_PERFORMANCE, false);