diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/FileSystemStatisticNames.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/FileSystemStatisticNames.java
new file mode 100644
index 0000000000..cd8df2f853
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/FileSystemStatisticNames.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.statistics;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Common statistic names for Filesystem-level statistics,
+ * including internals.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public final class FileSystemStatisticNames {
+
+ private FileSystemStatisticNames() {
+ }
+
+ /**
+ * How long did filesystem initialization take?
+ */
+ public static final String FILESYSTEM_INITIALIZATION = "filesystem_initialization";
+
+ /**
+ * How long did filesystem close take?
+ */
+ public static final String FILESYSTEM_CLOSE = "filesystem_close";
+
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java
index a513cffd84..44f794aa77 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java
@@ -176,6 +176,11 @@ public final class StoreStatisticNames {
public static final String DELEGATION_TOKENS_ISSUED
= "delegation_tokens_issued";
+ /**
+ * How long did any store client creation take?
+ */
+ public static final String STORE_CLIENT_CREATION = "store_client_creation";
+
/** Probe for store existing: {@value}. */
public static final String STORE_EXISTS_PROBE
= "store_exists_probe";
@@ -200,6 +205,7 @@ public final class StoreStatisticNames {
public static final String STORE_IO_RATE_LIMITED_DURATION
= "store_io_rate_limited_duration";
+
/**
* A store's equivalent of a paged LIST request was initiated: {@value}.
*/
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/FunctionalIO.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/FunctionalIO.java
index 6bc4a71030..bc9e2ea729 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/FunctionalIO.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/FunctionalIO.java
@@ -49,27 +49,6 @@ public static T uncheckIOExceptions(CallableRaisingIOE call) {
}
}
- /**
- * Wrap a {@link CallableRaisingIOE} as a {@link Supplier}.
- * This is similar to {@link CommonCallableSupplier}, except that
- * only IOExceptions are caught and wrapped; all other exceptions are
- * propagated unchanged.
- * @param type of result
- */
- private static final class UncheckedIOExceptionSupplier implements Supplier {
-
- private final CallableRaisingIOE call;
-
- private UncheckedIOExceptionSupplier(CallableRaisingIOE call) {
- this.call = call;
- }
-
- @Override
- public T get() {
- return uncheckIOExceptions(call);
- }
- }
-
/**
* Wrap a {@link CallableRaisingIOE} as a {@link Supplier}.
* @param call call to wrap
@@ -77,7 +56,7 @@ public T get() {
* @return a supplier which invokes the call.
*/
public static Supplier toUncheckedIOExceptionSupplier(CallableRaisingIOE call) {
- return new UncheckedIOExceptionSupplier<>(call);
+ return () -> uncheckIOExceptions(call);
}
/**
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/FutureIO.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/FutureIO.java
index 2f043b6499..0a0d023d93 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/FutureIO.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/FutureIO.java
@@ -38,9 +38,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
/**
* Future IO Helper methods.
*
@@ -62,7 +59,6 @@
@InterfaceStability.Unstable
public final class FutureIO {
- private static final Logger LOG = LoggerFactory.getLogger(FutureIO.class.getName());
private FutureIO() {
}
@@ -129,7 +125,6 @@ public static T awaitFuture(final Future future,
* If any future throws an exception during its execution, this method
* extracts and rethrows that exception.
*
- *
* @param collection collection of futures to be evaluated
* @param type of the result.
* @return the list of future's result, if all went well.
@@ -140,19 +135,10 @@ public static T awaitFuture(final Future future,
public static List awaitAllFutures(final Collection> collection)
throws InterruptedIOException, IOException, RuntimeException {
List results = new ArrayList<>();
- try {
- for (Future future : collection) {
- results.add(future.get());
- }
- return results;
- } catch (InterruptedException e) {
- LOG.debug("Execution of future interrupted ", e);
- throw (InterruptedIOException) new InterruptedIOException(e.toString())
- .initCause(e);
- } catch (ExecutionException e) {
- LOG.debug("Execution of future failed with exception", e.getCause());
- return raiseInnerCause(e);
+ for (Future future : collection) {
+ results.add(awaitFuture(future));
}
+ return results;
}
/**
@@ -163,7 +149,6 @@ public static List awaitAllFutures(final Collection> collection
* the timeout expires, whichever happens first. If any future throws an
* exception during its execution, this method extracts and rethrows that exception.
*
- *
* @param collection collection of futures to be evaluated
* @param duration timeout duration
* @param type of the result.
@@ -176,21 +161,12 @@ public static List awaitAllFutures(final Collection> collection
public static List awaitAllFutures(final Collection> collection,
final Duration duration)
throws InterruptedIOException, IOException, RuntimeException,
- TimeoutException {
+ TimeoutException {
List results = new ArrayList<>();
- try {
- for (Future future : collection) {
- results.add(future.get(duration.toMillis(), TimeUnit.MILLISECONDS));
- }
- return results;
- } catch (InterruptedException e) {
- LOG.debug("Execution of future interrupted ", e);
- throw (InterruptedIOException) new InterruptedIOException(e.toString())
- .initCause(e);
- } catch (ExecutionException e) {
- LOG.debug("Execution of future failed with exception", e.getCause());
- return raiseInnerCause(e);
+ for (Future future : collection) {
+ results.add(awaitFuture(future, duration.toMillis(), TimeUnit.MILLISECONDS));
}
+ return results;
}
/**
@@ -199,7 +175,6 @@ public static List awaitAllFutures(final Collection> collection
* This will always raise an exception, either the inner IOException,
* an inner RuntimeException, or a new IOException wrapping the raised
* exception.
- *
* @param e exception.
* @param type of return value.
* @return nothing, ever.
@@ -283,12 +258,11 @@ public static IOException unwrapInnerException(final Throwable e) {
* @param type of builder
* @return the builder passed in.
*/
- public static >
- FSBuilder propagateOptions(
- final FSBuilder builder,
- final Configuration conf,
- final String optionalPrefix,
- final String mandatoryPrefix) {
+ public static > FSBuilder propagateOptions(
+ final FSBuilder builder,
+ final Configuration conf,
+ final String optionalPrefix,
+ final String mandatoryPrefix) {
propagateOptions(builder, conf,
optionalPrefix, false);
propagateOptions(builder, conf,
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/LazyAtomicReference.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/LazyAtomicReference.java
new file mode 100644
index 0000000000..5f2d674bba
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/LazyAtomicReference.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util.functional;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.util.functional.FunctionalIO.uncheckIOExceptions;
+
+/**
+ * A lazily constructed reference, whose reference
+ * constructor is a {@link CallableRaisingIOE} so
+ * may raise IOExceptions.
+ *
+ * This {@code constructor} is only invoked on demand
+ * when the reference is first needed,
+ * after which the same value is returned.
+ * This value MUST NOT be null.
+ *
+ * Implements {@link CallableRaisingIOE} and {@code java.util.function.Supplier}.
+ * An instance of this can therefore be used in a functional IO chain.
+ * As such, it can act as a delayed and caching invocator of a function:
+ * the supplier passed in is only ever invoked once, and only when requested.
+ * @param type of reference
+ */
+public class LazyAtomicReference
+ implements CallableRaisingIOE, Supplier {
+
+ /**
+ * Underlying reference.
+ */
+ private final AtomicReference reference = new AtomicReference<>();
+
+ /**
+ * Constructor for lazy creation.
+ */
+ private final CallableRaisingIOE extends T> constructor;
+
+ /**
+ * Constructor for this instance.
+ * @param constructor method to invoke to actually construct the inner object.
+ */
+ public LazyAtomicReference(final CallableRaisingIOE extends T> constructor) {
+ this.constructor = requireNonNull(constructor);
+ }
+
+ /**
+ * Getter for the constructor.
+ * @return the constructor class
+ */
+ protected CallableRaisingIOE extends T> getConstructor() {
+ return constructor;
+ }
+
+ /**
+ * Get the reference.
+ * Subclasses working with this need to be careful working with this.
+ * @return the reference.
+ */
+ protected AtomicReference getReference() {
+ return reference;
+ }
+
+ /**
+ * Get the value, constructing it if needed.
+ * @return the value
+ * @throws IOException on any evaluation failure
+ * @throws NullPointerException if the evaluated function returned null.
+ */
+ public synchronized T eval() throws IOException {
+ final T v = reference.get();
+ if (v != null) {
+ return v;
+ }
+ reference.set(requireNonNull(constructor.apply()));
+ return reference.get();
+ }
+
+ /**
+ * Implementation of {@code CallableRaisingIOE.apply()}.
+ * Invoke {@link #eval()}.
+ * @return the value
+ * @throws IOException on any evaluation failure
+ */
+ @Override
+ public final T apply() throws IOException {
+ return eval();
+ }
+
+ /**
+ * Implementation of {@code Supplier.get()}.
+ *
+ * Invoke {@link #eval()} and convert IOEs to
+ * UncheckedIOException.
+ *
+ * This is the {@code Supplier.get()} implementation, which allows
+ * this class to passed into anything taking a supplier.
+ * @return the value
+ * @throws UncheckedIOException if the constructor raised an IOException.
+ */
+ @Override
+ public final T get() throws UncheckedIOException {
+ return uncheckIOExceptions(this::eval);
+ }
+
+ /**
+ * Is the reference set?
+ * @return true if the reference has been set.
+ */
+ public final boolean isSet() {
+ return reference.get() != null;
+ }
+
+ @Override
+ public String toString() {
+ return "LazyAtomicReference{" +
+ "reference=" + reference + '}';
+ }
+
+
+ /**
+ * Create from a supplier.
+ * This is not a constructor to avoid ambiguity when a lambda-expression is
+ * passed in.
+ * @param supplier supplier implementation.
+ * @return a lazy reference.
+ * @param type of reference
+ */
+ public static LazyAtomicReference lazyAtomicReferenceFromSupplier(
+ Supplier supplier) {
+ return new LazyAtomicReference<>(supplier::get);
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/LazyAutoCloseableReference.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/LazyAutoCloseableReference.java
new file mode 100644
index 0000000000..d6d625c125
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/LazyAutoCloseableReference.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util.functional;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
+
+import static org.apache.hadoop.util.Preconditions.checkState;
+
+/**
+ * A subclass of {@link LazyAtomicReference} which
+ * holds an {@code AutoCloseable} reference and calls {@code close()}
+ * when it itself is closed.
+ * @param type of reference.
+ */
+public class LazyAutoCloseableReference
+ extends LazyAtomicReference implements AutoCloseable {
+
+ /** Closed flag. */
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+
+ /**
+ * Constructor for this instance.
+ * @param constructor method to invoke to actually construct the inner object.
+ */
+ public LazyAutoCloseableReference(final CallableRaisingIOE extends T> constructor) {
+ super(constructor);
+ }
+
+ /**
+ * {@inheritDoc}
+ * @throws IllegalStateException if the reference is closed.
+ */
+ @Override
+ public synchronized T eval() throws IOException {
+ checkState(!closed.get(), "Reference is closed");
+ return super.eval();
+ }
+
+ /**
+ * Is the reference closed?
+ * @return true if the reference is closed.
+ */
+ public boolean isClosed() {
+ return closed.get();
+ }
+
+ /**
+ * Close the reference value if it is non-null.
+ * Sets the reference to null afterwards, even on
+ * a failure.
+ * @throws Exception failure to close.
+ */
+ @Override
+ public synchronized void close() throws Exception {
+ if (closed.getAndSet(true)) {
+ // already closed
+ return;
+ }
+ final T v = getReference().get();
+ // check the state.
+ // A null reference means it has not yet been evaluated,
+ if (v != null) {
+ try {
+ v.close();
+ } finally {
+ // set the reference to null, even on a failure.
+ getReference().set(null);
+ }
+ }
+ }
+
+
+ /**
+ * Create from a supplier.
+ * This is not a constructor to avoid ambiguity when a lambda-expression is
+ * passed in.
+ * @param supplier supplier implementation.
+ * @return a lazy reference.
+ * @param type of reference
+ */
+ public static LazyAutoCloseableReference lazyAutoCloseablefromSupplier(Supplier supplier) {
+ return new LazyAutoCloseableReference<>(supplier::get);
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/functional/TestLazyReferences.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/functional/TestLazyReferences.java
new file mode 100644
index 0000000000..4d1dae184b
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/functional/TestLazyReferences.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util.functional;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.net.UnknownHostException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+
+import org.apache.hadoop.test.AbstractHadoopTestBase;
+
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+import static org.apache.hadoop.test.LambdaTestUtils.verifyCause;
+import static org.apache.hadoop.util.Preconditions.checkState;
+
+/**
+ * Test {@link LazyAtomicReference} and {@link LazyAutoCloseableReference}.
+ */
+public class TestLazyReferences extends AbstractHadoopTestBase {
+
+ /**
+ * Format of exceptions to raise.
+ */
+ private static final String GENERATED = "generated[%d]";
+
+ /**
+ * Invocation counter, can be asserted on in {@link #assertCounterValue(int)}.
+ */
+ private final AtomicInteger counter = new AtomicInteger();
+
+ /**
+ * Assert that {@link #counter} has a specific value.
+ * @param val expected value
+ */
+ private void assertCounterValue(final int val) {
+ assertAtomicIntValue(counter, val);
+ }
+
+ /**
+ * Assert an atomic integer has a specific value.
+ * @param ai atomic integer
+ * @param val expected value
+ */
+ private static void assertAtomicIntValue(
+ final AtomicInteger ai, final int val) {
+ Assertions.assertThat(ai.get())
+ .describedAs("value of atomic integer %s", ai)
+ .isEqualTo(val);
+ }
+
+
+ /**
+ * Test the underlying {@link LazyAtomicReference} integration with java
+ * Supplier API.
+ */
+ @Test
+ public void testLazyAtomicReference() throws Throwable {
+
+ LazyAtomicReference ref = new LazyAtomicReference<>(counter::incrementAndGet);
+
+ // constructor does not invoke the supplier
+ assertCounterValue(0);
+
+ assertSetState(ref, false);
+
+ // second invocation does not
+ Assertions.assertThat(ref.eval())
+ .describedAs("first eval()")
+ .isEqualTo(1);
+ assertCounterValue(1);
+ assertSetState(ref, true);
+
+
+ // Callable.apply() returns the same value
+ Assertions.assertThat(ref.apply())
+ .describedAs("second get of %s", ref)
+ .isEqualTo(1);
+ // no new counter increment
+ assertCounterValue(1);
+ }
+
+ /**
+ * Assert that {@link LazyAtomicReference#isSet()} is in the expected state.
+ * @param ref reference
+ * @param expected expected value
+ */
+ private static void assertSetState(final LazyAtomicReference ref,
+ final boolean expected) {
+ Assertions.assertThat(ref.isSet())
+ .describedAs("isSet() of %s", ref)
+ .isEqualTo(expected);
+ }
+
+ /**
+ * Test the underlying {@link LazyAtomicReference} integration with java
+ * Supplier API.
+ */
+ @Test
+ public void testSupplierIntegration() throws Throwable {
+
+ LazyAtomicReference ref = LazyAtomicReference.lazyAtomicReferenceFromSupplier(counter::incrementAndGet);
+
+ // constructor does not invoke the supplier
+ assertCounterValue(0);
+ assertSetState(ref, false);
+
+ // second invocation does not
+ Assertions.assertThat(ref.get())
+ .describedAs("first get()")
+ .isEqualTo(1);
+ assertCounterValue(1);
+
+ // Callable.apply() returns the same value
+ Assertions.assertThat(ref.apply())
+ .describedAs("second get of %s", ref)
+ .isEqualTo(1);
+ // no new counter increment
+ assertCounterValue(1);
+ }
+
+ /**
+ * Test failure handling. through the supplier API.
+ */
+ @Test
+ public void testSupplierIntegrationFailureHandling() throws Throwable {
+
+ LazyAtomicReference ref = new LazyAtomicReference<>(() -> {
+ throw new UnknownHostException(String.format(GENERATED, counter.incrementAndGet()));
+ });
+
+ // the get() call will wrap the raised exception, which can be extracted
+ // and type checked.
+ verifyCause(UnknownHostException.class,
+ intercept(UncheckedIOException.class, "[1]", ref::get));
+
+ assertSetState(ref, false);
+
+ // counter goes up
+ intercept(UncheckedIOException.class, "[2]", ref::get);
+ }
+
+ @Test
+ public void testAutoCloseable() throws Throwable {
+ final LazyAutoCloseableReference ref =
+ LazyAutoCloseableReference.lazyAutoCloseablefromSupplier(CloseableClass::new);
+
+ assertSetState(ref, false);
+ ref.eval();
+ final CloseableClass closeable = ref.get();
+ Assertions.assertThat(closeable.isClosed())
+ .describedAs("closed flag of %s", closeable)
+ .isFalse();
+
+ // first close will close the class.
+ ref.close();
+ Assertions.assertThat(ref.isClosed())
+ .describedAs("closed flag of %s", ref)
+ .isTrue();
+ Assertions.assertThat(closeable.isClosed())
+ .describedAs("closed flag of %s", closeable)
+ .isTrue();
+
+ // second close will not raise an exception
+ ref.close();
+
+ // you cannot eval() a closed reference
+ intercept(IllegalStateException.class, "Reference is closed", ref::eval);
+ intercept(IllegalStateException.class, "Reference is closed", ref::get);
+ intercept(IllegalStateException.class, "Reference is closed", ref::apply);
+
+ Assertions.assertThat(ref.getReference().get())
+ .describedAs("inner referece of %s", ref)
+ .isNull();
+ }
+
+ /**
+ * Not an error to close a reference which was never evaluated.
+ */
+ @Test
+ public void testCloseableUnevaluated() throws Throwable {
+ final LazyAutoCloseableReference ref =
+ new LazyAutoCloseableReference<>(CloseableRaisingException::new);
+ ref.close();
+ ref.close();
+ }
+
+ /**
+ * If the close() call fails, that only raises an exception on the first attempt,
+ * and the reference is set to null.
+ */
+ @Test
+ public void testAutoCloseableFailureHandling() throws Throwable {
+ final LazyAutoCloseableReference ref =
+ new LazyAutoCloseableReference<>(CloseableRaisingException::new);
+ ref.eval();
+
+ // close reports the failure.
+ intercept(IOException.class, "raised", ref::close);
+
+ // but the reference is set to null
+ assertSetState(ref, false);
+ // second attept does nothing, so will not raise an exception.p
+ ref.close();
+ }
+
+ /**
+ * Closeable which sets the closed flag on close().
+ */
+ private static final class CloseableClass implements AutoCloseable {
+
+ /** closed flag. */
+ private boolean closed;
+
+ /**
+ * Close the resource.
+ * @throws IllegalArgumentException if already closed.
+ */
+ @Override
+ public void close() {
+ checkState(!closed, "Already closed");
+ closed = true;
+ }
+
+ /**
+ * Get the closed flag.
+ * @return the state.
+ */
+ private boolean isClosed() {
+ return closed;
+ }
+
+ }
+ /**
+ * Closeable which raises an IOE in close().
+ */
+ private static final class CloseableRaisingException implements AutoCloseable {
+
+ @Override
+ public void close() throws Exception {
+ throw new IOException("raised");
+ }
+ }
+
+}
diff --git a/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml b/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml
index 359ac0e80d..39a9e51ac8 100644
--- a/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml
+++ b/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml
@@ -64,11 +64,6 @@
-
-
-
-
-
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index d04ca70a68..f5937ae0a4 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -54,7 +54,6 @@
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.exception.SdkException;
-import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse;
@@ -88,7 +87,6 @@
import software.amazon.awssdk.transfer.s3.model.CompletedCopy;
import software.amazon.awssdk.transfer.s3.model.CompletedFileUpload;
import software.amazon.awssdk.transfer.s3.model.Copy;
-import software.amazon.awssdk.transfer.s3.S3TransferManager;
import software.amazon.awssdk.transfer.s3.model.CopyRequest;
import software.amazon.awssdk.transfer.s3.model.FileUpload;
import software.amazon.awssdk.transfer.s3.model.UploadFileRequest;
@@ -123,6 +121,8 @@
import org.apache.hadoop.fs.s3a.impl.BulkDeleteOperation;
import org.apache.hadoop.fs.s3a.impl.BulkDeleteOperationCallbacksImpl;
import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
+import org.apache.hadoop.fs.s3a.impl.ClientManager;
+import org.apache.hadoop.fs.s3a.impl.ClientManagerImpl;
import org.apache.hadoop.fs.s3a.impl.ConfigurationHelper;
import org.apache.hadoop.fs.s3a.impl.ContextAccessors;
import org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation;
@@ -152,6 +152,7 @@
import org.apache.hadoop.fs.s3a.tools.MarkerToolOperationsImpl;
import org.apache.hadoop.fs.statistics.DurationTracker;
import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
+import org.apache.hadoop.fs.statistics.FileSystemStatisticNames;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.statistics.IOStatisticsContext;
@@ -305,11 +306,13 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
*/
private S3AStore store;
+ /**
+ * The core S3 client is created and managed by the ClientManager.
+ * It is copied here within {@link #initialize(URI, Configuration)}.
+ * Some mocking tests modify this so take care with changes.
+ */
private S3Client s3Client;
- /** Async client is used for transfer manager. */
- private S3AsyncClient s3AsyncClient;
-
// initial callback policy is fail-once; it's there just to assist
// some mock tests and other codepaths trying to call the low level
// APIs on an uninitialized filesystem.
@@ -328,7 +331,6 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
private Listing listing;
private long partSize;
private boolean enableMultiObjectsDelete;
- private S3TransferManager transferManager;
private ExecutorService boundedThreadPool;
private ThreadPoolExecutor unboundedThreadPool;
@@ -548,6 +550,9 @@ public void initialize(URI name, Configuration originalConf)
// get the host; this is guaranteed to be non-null, non-empty
bucket = name.getHost();
AuditSpan span = null;
+ // track initialization duration; will only be set after
+ // statistics are set up.
+ Optional trackInitialization = Optional.empty();
try {
LOG.debug("Initializing S3AFileSystem for {}", bucket);
if (LOG.isTraceEnabled()) {
@@ -592,6 +597,18 @@ public void initialize(URI name, Configuration originalConf)
super.initialize(uri, conf);
setConf(conf);
+ // initialize statistics, after which statistics
+ // can be collected.
+ instrumentation = new S3AInstrumentation(uri);
+ initializeStatisticsBinding();
+
+ // track initialization duration.
+ // this should really be done in a onceTrackingDuration() call,
+ // but then all methods below would need to be in the lambda and
+ // it would create a merge/backport headache for all.
+ trackInitialization = Optional.of(
+ instrumentation.trackDuration(FileSystemStatisticNames.FILESYSTEM_INITIALIZATION));
+
s3aInternals = createS3AInternals();
// look for encryption data
@@ -600,8 +617,7 @@ public void initialize(URI name, Configuration originalConf)
buildEncryptionSecrets(bucket, conf));
invoker = new Invoker(new S3ARetryPolicy(getConf()), onRetry);
- instrumentation = new S3AInstrumentation(uri);
- initializeStatisticsBinding();
+
// If CSE-KMS method is set then CSE is enabled.
isCSEEnabled = S3AEncryptionMethods.CSE_KMS.getMethod()
.equals(getS3EncryptionAlgorithm().getMethod());
@@ -687,7 +703,7 @@ public void initialize(URI name, Configuration originalConf)
// the FS came with a DT
// this may do some patching of the configuration (e.g. setting
// the encryption algorithms)
- bindAWSClient(name, delegationTokensEnabled);
+ ClientManager clientManager = createClientManager(name, delegationTokensEnabled);
inputPolicy = S3AInputPolicy.getPolicy(
conf.getTrimmed(INPUT_FADVISE,
@@ -762,36 +778,55 @@ public void initialize(URI name, Configuration originalConf)
int rateLimitCapacity = intOption(conf, S3A_IO_RATE_LIMIT, DEFAULT_S3A_IO_RATE_LIMIT, 0);
// now create the store
- store = new S3AStoreBuilder()
- .withS3Client(s3Client)
- .withDurationTrackerFactory(getDurationTrackerFactory())
- .withStoreContextFactory(this)
- .withAuditSpanSource(getAuditManager())
- .withInstrumentation(getInstrumentation())
- .withStatisticsContext(statisticsContext)
- .withStorageStatistics(getStorageStatistics())
- .withReadRateLimiter(unlimitedRate())
- .withWriteRateLimiter(RateLimitingFactory.create(rateLimitCapacity))
- .build();
-
+ store = createS3AStore(clientManager, rateLimitCapacity);
+ // the s3 client is created through the store, rather than
+ // directly through the client manager.
+ // this is to aid mocking.
+ s3Client = store.getOrCreateS3Client();
// The filesystem is now ready to perform operations against
// S3
// This initiates a probe against S3 for the bucket existing.
doBucketProbing();
initMultipartUploads(conf);
+ trackInitialization.ifPresent(DurationTracker::close);
} catch (SdkException e) {
// amazon client exception: stop all services then throw the translation
cleanupWithLogger(LOG, span);
stopAllServices();
+ trackInitialization.ifPresent(DurationTracker::failed);
throw translateException("initializing ", new Path(name), e);
} catch (IOException | RuntimeException e) {
// other exceptions: stop the services.
cleanupWithLogger(LOG, span);
stopAllServices();
+ trackInitialization.ifPresent(DurationTracker::failed);
throw e;
}
}
+ /**
+ * Create the S3AStore instance.
+ * This is protected so that tests can override it.
+ * @param clientManager client manager
+ * @param rateLimitCapacity rate limit
+ * @return a new store instance
+ */
+ @VisibleForTesting
+ protected S3AStore createS3AStore(final ClientManager clientManager,
+ final int rateLimitCapacity) {
+ return new S3AStoreBuilder()
+ .withClientManager(clientManager)
+ .withDurationTrackerFactory(getDurationTrackerFactory())
+ .withStoreContextFactory(this)
+ .withAuditSpanSource(getAuditManager())
+ .withInstrumentation(getInstrumentation())
+ .withStatisticsContext(statisticsContext)
+ .withStorageStatistics(getStorageStatistics())
+ .withReadRateLimiter(unlimitedRate())
+ .withWriteRateLimiter(RateLimitingFactory.create(rateLimitCapacity))
+ .build();
+ }
+
/**
* Populates the configurations related to vectored IO operation
* in the context which has to passed down to input streams.
@@ -965,7 +1000,7 @@ protected void verifyBucketExists() throws UnknownStoreException, IOException {
STORE_EXISTS_PROBE, bucket, null, () ->
invoker.retry("doesBucketExist", bucket, true, () -> {
try {
- s3Client.headBucket(HeadBucketRequest.builder().bucket(bucket).build());
+ getS3Client().headBucket(HeadBucketRequest.builder().bucket(bucket).build());
return true;
} catch (AwsServiceException ex) {
int statusCode = ex.statusCode();
@@ -1014,14 +1049,22 @@ public Listing getListing() {
/**
* Set up the client bindings.
* If delegation tokens are enabled, the FS first looks for a DT
- * ahead of any other bindings;.
+ * ahead of any other bindings.
* If there is a DT it uses that to do the auth
- * and switches to the DT authenticator automatically (and exclusively)
- * @param name URI of the FS
+ * and switches to the DT authenticator automatically (and exclusively).
+ *
+ * Delegation tokens are configured and started, but the actual
+ * S3 clients are not: instead a {@link ClientManager} is created
+ * and returned, from which they can be created on demand.
+ * This is to reduce delays in FS initialization, especially
+ * for features (transfer manager, async client) which are not
+ * always used.
+ * @param fsURI URI of the FS
* @param dtEnabled are delegation tokens enabled?
+ * @return the client manager which can generate the clients.
* @throws IOException failure.
*/
- private void bindAWSClient(URI name, boolean dtEnabled) throws IOException {
+ private ClientManager createClientManager(URI fsURI, boolean dtEnabled) throws IOException {
Configuration conf = getConf();
credentials = null;
String uaSuffix = "";
@@ -1059,7 +1102,7 @@ private void bindAWSClient(URI name, boolean dtEnabled) throws IOException {
uaSuffix = tokens.getUserAgentField();
} else {
// DT support is disabled, so create the normal credential chain
- credentials = createAWSCredentialProviderList(name, conf);
+ credentials = createAWSCredentialProviderList(fsURI, conf);
}
LOG.debug("Using credential provider {}", credentials);
Class extends S3ClientFactory> s3ClientFactoryClass = conf.getClass(
@@ -1069,7 +1112,7 @@ private void bindAWSClient(URI name, boolean dtEnabled) throws IOException {
S3ClientFactory.S3ClientCreationParameters parameters =
new S3ClientFactory.S3ClientCreationParameters()
.withCredentialSet(credentials)
- .withPathUri(name)
+ .withPathUri(fsURI)
.withEndpoint(endpoint)
.withMetrics(statisticsContext.newStatisticsFromAwsSdk())
.withPathStyleAccess(conf.getBoolean(PATH_STYLE_ACCESS, false))
@@ -1088,22 +1131,27 @@ private void bindAWSClient(URI name, boolean dtEnabled) throws IOException {
conf.getBoolean(CHECKSUM_VALIDATION, CHECKSUM_VALIDATION_DEFAULT));
S3ClientFactory clientFactory = ReflectionUtils.newInstance(s3ClientFactoryClass, conf);
- s3Client = clientFactory.createS3Client(getUri(), parameters);
- createS3AsyncClient(clientFactory, parameters);
- transferManager = clientFactory.createS3TransferManager(getS3AsyncClient());
+ // this is where clients and the transfer manager are created on demand.
+ return createClientManager(clientFactory, parameters, getDurationTrackerFactory());
}
/**
- * Creates and configures the S3AsyncClient.
- * Uses synchronized method to suppress spotbugs error.
- *
- * @param clientFactory factory used to create S3AsyncClient
- * @param parameters parameter object
- * @throws IOException on any IO problem
+ * Create the Client Manager; protected to allow for mocking.
+ * Requires {@link #unboundedThreadPool} to be initialized.
+ * @param clientFactory (reflection-bonded) client factory.
+ * @param clientCreationParameters parameters for client creation.
+ * @param durationTrackerFactory factory for duration tracking.
+ * @return a client manager instance.
*/
- private void createS3AsyncClient(S3ClientFactory clientFactory,
- S3ClientFactory.S3ClientCreationParameters parameters) throws IOException {
- s3AsyncClient = clientFactory.createS3AsyncClient(getUri(), parameters);
+ @VisibleForTesting
+ protected ClientManager createClientManager(
+ final S3ClientFactory clientFactory,
+ final S3ClientFactory.S3ClientCreationParameters clientCreationParameters,
+ final DurationTrackerFactory durationTrackerFactory) {
+ return new ClientManagerImpl(clientFactory,
+ clientCreationParameters,
+ durationTrackerFactory
+ );
}
/**
@@ -1241,14 +1289,6 @@ public RequestFactory getRequestFactory() {
return requestFactory;
}
- /**
- * Get the S3 Async client.
- * @return the async s3 client.
- */
- private S3AsyncClient getS3AsyncClient() {
- return s3AsyncClient;
- }
-
/**
* Implementation of all operations used by delegation tokens.
*/
@@ -1335,7 +1375,8 @@ public void abortOutstandingMultipartUploads(long seconds)
invoker.retry("Purging multipart uploads", bucket, true,
() -> {
RemoteIterator uploadIterator =
- MultipartUtils.listMultipartUploads(createStoreContext(), s3Client, null, maxKeys);
+ MultipartUtils.listMultipartUploads(createStoreContext(),
+ getS3Client(), null, maxKeys);
while (uploadIterator.hasNext()) {
MultipartUpload upload = uploadIterator.next();
@@ -1395,12 +1436,23 @@ public int getDefaultPort() {
* Set the client -used in mocking tests to force in a different client.
* @param client client.
*/
+ @VisibleForTesting
protected void setAmazonS3Client(S3Client client) {
Preconditions.checkNotNull(client, "clientV2");
LOG.debug("Setting S3V2 client to {}", client);
s3Client = client;
}
+ /**
+ * Get the S3 client created in {@link #initialize(URI, Configuration)}.
+ * @return the s3Client
+ * @throws UncheckedIOException if the client could not be created.
+ */
+ @VisibleForTesting
+ protected S3Client getS3Client() {
+ return s3Client;
+ }
+
/**
* S3AInternals method.
* {@inheritDoc}.
@@ -1437,7 +1489,7 @@ private final class S3AInternalsImpl implements S3AInternals {
@Override
public S3Client getAmazonS3Client(String reason) {
LOG.debug("Access to S3 client requested, reason {}", reason);
- return s3Client;
+ return getS3Client();
}
@Override
@@ -1470,7 +1522,7 @@ public String getBucketLocation(String bucketName) throws IOException {
// If accessPoint then region is known from Arn
accessPoint != null
? accessPoint.getRegion()
- : s3Client.getBucketLocation(GetBucketLocationRequest.builder()
+ : getS3Client().getBucketLocation(GetBucketLocationRequest.builder()
.bucket(bucketName)
.build())
.locationConstraintAsString()));
@@ -1859,7 +1911,7 @@ public GetObjectRequest.Builder newGetRequestBuilder(final String key) {
public ResponseInputStream getObject(GetObjectRequest request) {
// active the audit span used for the operation
try (AuditSpan span = auditSpan.activate()) {
- return s3Client.getObject(request);
+ return getS3Client().getObject(request);
}
}
@@ -1888,7 +1940,7 @@ private final class WriteOperationHelperCallbacksImpl
@Override
public CompleteMultipartUploadResponse completeMultipartUpload(
CompleteMultipartUploadRequest request) {
- return s3Client.completeMultipartUpload(request);
+ return getS3Client().completeMultipartUpload(request);
}
}
@@ -2926,7 +2978,8 @@ protected HeadObjectResponse getObjectMetadata(String key,
if (changeTracker != null) {
changeTracker.maybeApplyConstraint(requestBuilder);
}
- HeadObjectResponse headObjectResponse = s3Client.headObject(requestBuilder.build());
+ HeadObjectResponse headObjectResponse = getS3Client()
+ .headObject(requestBuilder.build());
if (changeTracker != null) {
changeTracker.processMetadata(headObjectResponse, operation);
}
@@ -2960,7 +3013,7 @@ protected HeadBucketResponse getBucketMetadata() throws IOException {
final HeadBucketResponse response = trackDurationAndSpan(STORE_EXISTS_PROBE, bucket, null,
() -> invoker.retry("getBucketMetadata()", bucket, true, () -> {
try {
- return s3Client.headBucket(
+ return getS3Client().headBucket(
getRequestFactory().newHeadBucketRequestBuilder(bucket).build());
} catch (NoSuchBucketException e) {
throw new UnknownStoreException("s3a://" + bucket + "/", " Bucket does " + "not exist");
@@ -2995,9 +3048,9 @@ protected S3ListResult listObjects(S3ListRequest request,
OBJECT_LIST_REQUEST,
() -> {
if (useListV1) {
- return S3ListResult.v1(s3Client.listObjects(request.getV1()));
+ return S3ListResult.v1(getS3Client().listObjects(request.getV1()));
} else {
- return S3ListResult.v2(s3Client.listObjectsV2(request.getV2()));
+ return S3ListResult.v2(getS3Client().listObjectsV2(request.getV2()));
}
}));
}
@@ -3050,10 +3103,10 @@ protected S3ListResult continueListObjects(S3ListRequest request,
nextMarker = prevListResult.get(prevListResult.size() - 1).key();
}
- return S3ListResult.v1(s3Client.listObjects(
+ return S3ListResult.v1(getS3Client().listObjects(
request.getV1().toBuilder().marker(nextMarker).build()));
} else {
- return S3ListResult.v2(s3Client.listObjectsV2(request.getV2().toBuilder()
+ return S3ListResult.v2(getS3Client().listObjectsV2(request.getV2().toBuilder()
.continuationToken(prevResult.getV2().nextContinuationToken()).build()));
}
}));
@@ -3185,15 +3238,16 @@ public PutObjectRequest.Builder newPutObjectRequestBuilder(String key,
* @param file the file to be uploaded
* @param listener the progress listener for the request
* @return the upload initiated
+ * @throws IOException if transfer manager creation failed.
*/
@Retries.OnceRaw
public UploadInfo putObject(PutObjectRequest putObjectRequest, File file,
- ProgressableProgressListener listener) {
+ ProgressableProgressListener listener) throws IOException {
long len = getPutRequestLength(putObjectRequest);
LOG.debug("PUT {} bytes to {} via transfer manager ", len, putObjectRequest.key());
incrementPutStartStatistics(len);
- FileUpload upload = transferManager.uploadFile(
+ FileUpload upload = store.getOrCreateTransferManager().uploadFile(
UploadFileRequest.builder()
.putObjectRequest(putObjectRequest)
.source(file)
@@ -3233,9 +3287,10 @@ PutObjectResponse putObjectDirect(PutObjectRequest putObjectRequest,
PutObjectResponse response =
trackDurationOfSupplier(nonNullDurationTrackerFactory(durationTrackerFactory),
OBJECT_PUT_REQUESTS.getSymbol(),
- () -> isFile ?
- s3Client.putObject(putObjectRequest, RequestBody.fromFile(uploadData.getFile())) :
- s3Client.putObject(putObjectRequest,
+ () -> isFile
+ ? getS3Client().putObject(putObjectRequest,
+ RequestBody.fromFile(uploadData.getFile()))
+ : getS3Client().putObject(putObjectRequest,
RequestBody.fromInputStream(uploadData.getUploadStream(),
putObjectRequest.contentLength())));
incrementPutCompletedStatistics(true, len);
@@ -3285,7 +3340,7 @@ UploadPartResponse uploadPart(UploadPartRequest request, RequestBody body,
UploadPartResponse uploadPartResponse = trackDurationOfSupplier(
nonNullDurationTrackerFactory(durationTrackerFactory),
MULTIPART_UPLOAD_PART_PUT.getSymbol(), () ->
- s3Client.uploadPart(request, body));
+ getS3Client().uploadPart(request, body));
incrementPutCompletedStatistics(true, len);
return uploadPartResponse;
} catch (AwsServiceException e) {
@@ -4344,35 +4399,43 @@ public void close() throws IOException {
* both the expected state of this FS and of failures while being stopped.
*/
protected synchronized void stopAllServices() {
- closeAutocloseables(LOG, transferManager,
- s3Client,
- getS3AsyncClient());
- transferManager = null;
- s3Client = null;
- s3AsyncClient = null;
+ try {
+ trackDuration(getDurationTrackerFactory(), FILESYSTEM_CLOSE.getSymbol(), () -> {
+ closeAutocloseables(LOG, store);
+ store = null;
+ s3Client = null;
- // At this point the S3A client is shut down,
- // now the executor pools are closed
- HadoopExecutors.shutdown(boundedThreadPool, LOG,
- THREAD_POOL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
- boundedThreadPool = null;
- HadoopExecutors.shutdown(unboundedThreadPool, LOG,
- THREAD_POOL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
- unboundedThreadPool = null;
- if (futurePool != null) {
- futurePool.shutdown(LOG, THREAD_POOL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
- futurePool = null;
+ // At this point the S3A client is shut down,
+ // now the executor pools are closed
+ HadoopExecutors.shutdown(boundedThreadPool, LOG,
+ THREAD_POOL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
+ boundedThreadPool = null;
+ HadoopExecutors.shutdown(unboundedThreadPool, LOG,
+ THREAD_POOL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
+ unboundedThreadPool = null;
+ if (futurePool != null) {
+ futurePool.shutdown(LOG, THREAD_POOL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
+ futurePool = null;
+ }
+ // other services are shutdown.
+ cleanupWithLogger(LOG,
+ delegationTokens.orElse(null),
+ signerManager,
+ auditManager);
+ closeAutocloseables(LOG, credentials);
+ delegationTokens = Optional.empty();
+ signerManager = null;
+ credentials = null;
+ return null;
+ });
+ } catch (IOException e) {
+ // failure during shutdown.
+ // this should only be from the signature of trackDurationAndSpan().
+ LOG.warn("Failure during service shutdown", e);
}
+ // and once this duration has been tracked, close the statistics
// other services are shutdown.
- cleanupWithLogger(LOG,
- instrumentation,
- delegationTokens.orElse(null),
- signerManager,
- auditManager);
- closeAutocloseables(LOG, credentials);
- delegationTokens = Optional.empty();
- signerManager = null;
- credentials = null;
+ cleanupWithLogger(LOG, instrumentation);
}
/**
@@ -4559,7 +4622,7 @@ private CopyObjectResponse copyFile(String srcKey, String dstKey, long size,
() -> {
incrementStatistic(OBJECT_COPY_REQUESTS);
- Copy copy = transferManager.copy(
+ Copy copy = store.getOrCreateTransferManager().copy(
CopyRequest.builder()
.copyObjectRequest(copyRequest)
.build());
@@ -4589,7 +4652,7 @@ private CopyObjectResponse copyFile(String srcKey, String dstKey, long size,
LOG.debug("copyFile: single part copy {} -> {} of size {}", srcKey, dstKey, size);
incrementStatistic(OBJECT_COPY_REQUESTS);
try {
- return s3Client.copyObject(copyRequest);
+ return getS3Client().copyObject(copyRequest);
} catch (SdkException awsException) {
// if this is a 412 precondition failure, it may
// be converted to a RemoteFileChangedException
@@ -4620,7 +4683,7 @@ CreateMultipartUploadResponse initiateMultipartUpload(
LOG.debug("Initiate multipart upload to {}", request.key());
return trackDurationOfSupplier(getDurationTrackerFactory(),
OBJECT_MULTIPART_UPLOAD_INITIATED.getSymbol(),
- () -> s3Client.createMultipartUpload(request));
+ () -> getS3Client().createMultipartUpload(request));
}
/**
@@ -5343,7 +5406,7 @@ public RemoteIterator listUploadsUnderPrefix(
p = prefix + "/";
}
// duration tracking is done in iterator.
- return MultipartUtils.listMultipartUploads(storeContext, s3Client, p, maxKeys);
+ return MultipartUtils.listMultipartUploads(storeContext, getS3Client(), p, maxKeys);
}
/**
@@ -5368,7 +5431,7 @@ public List listMultipartUploads(String prefix)
final ListMultipartUploadsRequest request = getRequestFactory()
.newListMultipartUploadsRequestBuilder(p).build();
return trackDuration(getInstrumentation(), MULTIPART_UPLOAD_LIST.getSymbol(), () ->
- s3Client.listMultipartUploads(request).uploads());
+ getS3Client().listMultipartUploads(request).uploads());
});
}
@@ -5383,7 +5446,7 @@ public List listMultipartUploads(String prefix)
public void abortMultipartUpload(String destKey, String uploadId) throws IOException {
LOG.debug("Aborting multipart upload {} to {}", uploadId, destKey);
trackDuration(getInstrumentation(), OBJECT_MULTIPART_UPLOAD_ABORTED.getSymbol(), () ->
- s3Client.abortMultipartUpload(
+ getS3Client().abortMultipartUpload(
getRequestFactory().newAbortMultipartUploadRequestBuilder(
destKey,
uploadId).build()));
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStore.java
index 68eacc35b1..a11ed19670 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStore.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStore.java
@@ -32,6 +32,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.s3a.api.RequestFactory;
+import org.apache.hadoop.fs.s3a.impl.ClientManager;
import org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteException;
import org.apache.hadoop.fs.s3a.impl.StoreContext;
import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
@@ -42,10 +43,14 @@
* Interface for the S3A Store;
* S3 client interactions should be via this; mocking
* is possible for unit tests.
+ *
+ * The {@link ClientManager} interface is used to create the AWS clients;
+ * the base implementation forwards to the implementation of this interface
+ * passed in at construction time.
*/
@InterfaceAudience.LimitedPrivate("Extensions")
@InterfaceStability.Unstable
-public interface S3AStore extends IOStatisticsSource {
+public interface S3AStore extends IOStatisticsSource, ClientManager {
/**
* Acquire write capacity for operations.
@@ -71,6 +76,8 @@ public interface S3AStore extends IOStatisticsSource {
RequestFactory getRequestFactory();
+ ClientManager clientManager();
+
/**
* Perform a bulk object delete operation against S3.
* Increments the {@code OBJECT_DELETE_REQUESTS} and write
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java
index 0b01876ae5..e82eb4c918 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java
@@ -47,8 +47,6 @@
* implementing only the deprecated method will work.
* See https://github.com/apache/hbase-filesystem
*
- * @deprecated This interface will be replaced by one which uses the AWS SDK V2 S3 client as part of
- * upgrading S3A to SDK V2. See HADOOP-18073.
*/
@InterfaceAudience.LimitedPrivate("HBoss")
@InterfaceStability.Evolving
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
index 7c4883c3d9..3bee1008ce 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
@@ -24,6 +24,7 @@
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.audit.AuditStatisticNames;
import org.apache.hadoop.fs.s3a.statistics.StatisticTypeEnum;
+import org.apache.hadoop.fs.statistics.FileSystemStatisticNames;
import org.apache.hadoop.fs.statistics.StoreStatisticNames;
import org.apache.hadoop.fs.statistics.StreamStatisticNames;
@@ -65,6 +66,16 @@ public enum Statistic {
TYPE_DURATION),
/* FileSystem Level statistics */
+
+ FILESYSTEM_INITIALIZATION(
+ FileSystemStatisticNames.FILESYSTEM_INITIALIZATION,
+ "Filesystem initialization",
+ TYPE_DURATION),
+ FILESYSTEM_CLOSE(
+ FileSystemStatisticNames.FILESYSTEM_CLOSE,
+ "Filesystem close",
+ TYPE_DURATION),
+
DIRECTORIES_CREATED("directories_created",
"Total number of directories created through the object store.",
TYPE_COUNTER),
@@ -532,6 +543,11 @@ public enum Statistic {
TYPE_DURATION),
/* General Store operations */
+ STORE_CLIENT_CREATION(
+ StoreStatisticNames.STORE_CLIENT_CREATION,
+ "Store Client Creation",
+ TYPE_DURATION),
+
STORE_EXISTS_PROBE(StoreStatisticNames.STORE_EXISTS_PROBE,
"Store Existence Probe",
TYPE_DURATION),
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ClientManager.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ClientManager.java
new file mode 100644
index 0000000000..84770861cc
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ClientManager.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.transfer.s3.S3TransferManager;
+
+/**
+ * Interface for on-demand/async creation of AWS clients
+ * and extension services.
+ */
+public interface ClientManager extends Closeable {
+
+ /**
+ * Get the transfer manager, creating it and any dependencies if needed.
+ * @return a transfer manager
+ * @throws IOException on any failure to create the manager
+ */
+ S3TransferManager getOrCreateTransferManager()
+ throws IOException;
+
+ S3Client getOrCreateS3Client() throws IOException;
+
+ S3AsyncClient getOrCreateAsyncClient() throws IOException;
+
+ /**
+ * Close operation is required to not raise exceptions.
+ */
+ void close();
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ClientManagerImpl.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ClientManagerImpl.java
new file mode 100644
index 0000000000..ff6748e66d
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ClientManagerImpl.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.transfer.s3.S3TransferManager;
+
+import org.apache.hadoop.fs.s3a.S3ClientFactory;
+import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
+import org.apache.hadoop.util.functional.CallableRaisingIOE;
+import org.apache.hadoop.util.functional.LazyAutoCloseableReference;
+
+import static java.util.Objects.requireNonNull;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.supplyAsync;
+import static org.apache.hadoop.fs.s3a.Statistic.STORE_CLIENT_CREATION;
+import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration;
+import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfOperation;
+import static org.apache.hadoop.util.Preconditions.checkState;
+import static org.apache.hadoop.util.functional.FutureIO.awaitAllFutures;
+
+/**
+ * Client manager for on-demand creation of S3 clients,
+ * with parallelized close of them in {@link #close()}.
+ * Updates {@link org.apache.hadoop.fs.s3a.Statistic#STORE_CLIENT_CREATION}
+ * to track count and duration of client creation.
+ */
+public class ClientManagerImpl implements ClientManager {
+
+ public static final Logger LOG = LoggerFactory.getLogger(ClientManagerImpl.class);
+
+ /**
+ * Client factory to invoke.
+ */
+ private final S3ClientFactory clientFactory;
+
+ /**
+ * Closed flag.
+ */
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+
+ /**
+ * Parameters to create sync/async clients.
+ */
+ private final S3ClientFactory.S3ClientCreationParameters clientCreationParameters;
+
+ /**
+ * Duration tracker factory for creation.
+ */
+ private final DurationTrackerFactory durationTrackerFactory;
+
+ /**
+ * Core S3 client.
+ */
+ private final LazyAutoCloseableReference s3Client;
+
+ /** Async client is used for transfer manager. */
+ private final LazyAutoCloseableReference s3AsyncClient;
+
+ /** Transfer manager. */
+ private final LazyAutoCloseableReference transferManager;
+
+ /**
+ * Constructor.
+ * This does not create any clients.
+ * @param clientFactory client factory to invoke
+ * @param clientCreationParameters creation parameters.
+ * @param durationTrackerFactory duration tracker.
+ */
+ public ClientManagerImpl(
+ final S3ClientFactory clientFactory,
+ final S3ClientFactory.S3ClientCreationParameters clientCreationParameters,
+ final DurationTrackerFactory durationTrackerFactory) {
+ this.clientFactory = requireNonNull(clientFactory);
+ this.clientCreationParameters = requireNonNull(clientCreationParameters);
+ this.durationTrackerFactory = requireNonNull(durationTrackerFactory);
+ this.s3Client = new LazyAutoCloseableReference<>(createS3Client());
+ this.s3AsyncClient = new LazyAutoCloseableReference<>(createAyncClient());
+ this.transferManager = new LazyAutoCloseableReference<>(createTransferManager());
+ }
+
+ /**
+ * Create the function to create the S3 client.
+ * @return a callable which will create the client.
+ */
+ private CallableRaisingIOE createS3Client() {
+ return trackDurationOfOperation(
+ durationTrackerFactory,
+ STORE_CLIENT_CREATION.getSymbol(),
+ () -> clientFactory.createS3Client(getUri(), clientCreationParameters));
+ }
+
+ /**
+ * Create the function to create the S3 Async client.
+ * @return a callable which will create the client.
+ */
+ private CallableRaisingIOE createAyncClient() {
+ return trackDurationOfOperation(
+ durationTrackerFactory,
+ STORE_CLIENT_CREATION.getSymbol(),
+ () -> clientFactory.createS3AsyncClient(getUri(), clientCreationParameters));
+ }
+
+ /**
+ * Create the function to create the Transfer Manager.
+ * @return a callable which will create the component.
+ */
+ private CallableRaisingIOE createTransferManager() {
+ return () -> {
+ final S3AsyncClient asyncClient = s3AsyncClient.eval();
+ return trackDuration(durationTrackerFactory,
+ STORE_CLIENT_CREATION.getSymbol(), () ->
+ clientFactory.createS3TransferManager(asyncClient));
+ };
+ }
+
+ @Override
+ public synchronized S3Client getOrCreateS3Client() throws IOException {
+ checkNotClosed();
+ return s3Client.eval();
+ }
+
+ @Override
+ public synchronized S3AsyncClient getOrCreateAsyncClient() throws IOException {
+ checkNotClosed();
+ return s3AsyncClient.eval();
+ }
+
+ @Override
+ public synchronized S3TransferManager getOrCreateTransferManager() throws IOException {
+ checkNotClosed();
+ return transferManager.eval();
+ }
+
+ /**
+ * Check that the client manager is not closed.
+ * @throws IllegalStateException if it is closed.
+ */
+ private void checkNotClosed() {
+ checkState(!closed.get(), "Client manager is closed");
+ }
+
+ /**
+ * Close() is synchronized to avoid race conditions between
+ * slow client creation and this close operation.
+ *
+ * The objects are all deleted in parallel
+ */
+ @Override
+ public synchronized void close() {
+ if (closed.getAndSet(true)) {
+ // re-entrant close.
+ return;
+ }
+ // queue the closures.
+ List> l = new ArrayList<>();
+ l.add(closeAsync(transferManager));
+ l.add(closeAsync(s3AsyncClient));
+ l.add(closeAsync(s3Client));
+
+ // once all are queued, await their completion
+ // and swallow any exception.
+ try {
+ awaitAllFutures(l);
+ } catch (Exception e) {
+ // should never happen.
+ LOG.warn("Exception in close", e);
+ }
+ }
+
+ /**
+ * Get the URI of the filesystem.
+ * @return URI to use when creating clients.
+ */
+ public URI getUri() {
+ return clientCreationParameters.getPathUri();
+ }
+
+ /**
+ * Queue closing a closeable, logging any exception, and returning null
+ * to use in when awaiting a result.
+ * @param reference closeable.
+ * @param type of closeable
+ * @return null
+ */
+ private CompletableFuture