From 4c55adbb6bc25fe76943535fd97cbd2b6d350e33 Mon Sep 17 00:00:00 2001
From: Steve Loughran
Date: Fri, 5 Jul 2024 16:38:37 +0100
Subject: [PATCH] HADOOP-19205. S3A: initialization/close slower than with v1
SDK (#6892)
Adds new ClientManager interface/implementation which provides on-demand
creation of synchronous and asynchronous s3 clients, s3 transfer manager,
and in close() terminates these.
S3A FS is modified to
* Create a ClientManagerImpl instance and pass down to its S3Store.
* Use the same ClientManager interface against S3Store to demand-create
the services.
* Only create the async client as part of the transfer manager creation,
which will take place during the first rename() operation.
* Statistics on client creation count and duration are recorded.
+ Statistics on the time to initialize and shutdown the S3A FS are collected
in IOStatistics for reporting.
Adds to hadoop common class
LazyAtomicReference implements CallableRaisingIOE, Supplier
and subclass
LazyAutoCloseableReference
extends LazyAtomicReference implements AutoCloseable
These evaluate the Supplier/CallableRaisingIOE they were
constructed with on the first (successful) read of the the value.
Any exception raised during this operation will be rethrown, and on future
evaluations the same operation retried.
These classes implement the Supplier and CallableRaisingIOE
interfaces so can actually be used for to implement lazy function evaluation
as Haskell and some other functional languages do.
LazyAutoCloseableReference is AutoCloseable; its close() method will
close the inner reference if it is set
This class is used in ClientManagerImpl for the lazy S3 Cliehnt creation
and closure.
Contributed by Steve Loughran.
---
.../statistics/FileSystemStatisticNames.java | 45 +++
.../fs/statistics/StoreStatisticNames.java | 6 +
.../hadoop/util/functional/FunctionalIO.java | 23 +-
.../hadoop/util/functional/FutureIO.java | 50 +--
.../util/functional/LazyAtomicReference.java | 152 +++++++
.../LazyAutoCloseableReference.java | 102 +++++
.../util/functional/TestLazyReferences.java | 263 ++++++++++++
.../dev-support/findbugs-exclude.xml | 5 -
.../apache/hadoop/fs/s3a/S3AFileSystem.java | 257 +++++++-----
.../org/apache/hadoop/fs/s3a/S3AStore.java | 9 +-
.../apache/hadoop/fs/s3a/S3ClientFactory.java | 2 -
.../org/apache/hadoop/fs/s3a/Statistic.java | 16 +
.../hadoop/fs/s3a/impl/ClientManager.java | 50 +++
.../hadoop/fs/s3a/impl/ClientManagerImpl.java | 238 +++++++++++
.../hadoop/fs/s3a/impl/S3AStoreBuilder.java | 21 +-
.../hadoop/fs/s3a/impl/S3AStoreImpl.java | 121 ++++--
.../hadoop/fs/s3a/MockS3AFileSystem.java | 7 +
.../s3a/commit/staging/StagingTestBase.java | 11 +-
.../hadoop/fs/s3a/impl/TestClientManager.java | 379 ++++++++++++++++++
.../fs/s3a/test/StubS3ClientFactory.java | 122 ++++++
20 files changed, 1663 insertions(+), 216 deletions(-)
create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/FileSystemStatisticNames.java
create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/LazyAtomicReference.java
create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/LazyAutoCloseableReference.java
create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/functional/TestLazyReferences.java
create mode 100644 hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ClientManager.java
create mode 100644 hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ClientManagerImpl.java
create mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestClientManager.java
create mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/test/StubS3ClientFactory.java
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/FileSystemStatisticNames.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/FileSystemStatisticNames.java
new file mode 100644
index 0000000000..cd8df2f853
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/FileSystemStatisticNames.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.statistics;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Common statistic names for Filesystem-level statistics,
+ * including internals.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public final class FileSystemStatisticNames {
+
+ private FileSystemStatisticNames() {
+ }
+
+ /**
+ * How long did filesystem initialization take?
+ */
+ public static final String FILESYSTEM_INITIALIZATION = "filesystem_initialization";
+
+ /**
+ * How long did filesystem close take?
+ */
+ public static final String FILESYSTEM_CLOSE = "filesystem_close";
+
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java
index a513cffd84..44f794aa77 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java
@@ -176,6 +176,11 @@ public final class StoreStatisticNames {
public static final String DELEGATION_TOKENS_ISSUED
= "delegation_tokens_issued";
+ /**
+ * How long did any store client creation take?
+ */
+ public static final String STORE_CLIENT_CREATION = "store_client_creation";
+
/** Probe for store existing: {@value}. */
public static final String STORE_EXISTS_PROBE
= "store_exists_probe";
@@ -200,6 +205,7 @@ public final class StoreStatisticNames {
public static final String STORE_IO_RATE_LIMITED_DURATION
= "store_io_rate_limited_duration";
+
/**
* A store's equivalent of a paged LIST request was initiated: {@value}.
*/
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/FunctionalIO.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/FunctionalIO.java
index 6bc4a71030..bc9e2ea729 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/FunctionalIO.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/FunctionalIO.java
@@ -49,27 +49,6 @@ public static T uncheckIOExceptions(CallableRaisingIOE call) {
}
}
- /**
- * Wrap a {@link CallableRaisingIOE} as a {@link Supplier}.
- * This is similar to {@link CommonCallableSupplier}, except that
- * only IOExceptions are caught and wrapped; all other exceptions are
- * propagated unchanged.
- * @param type of result
- */
- private static final class UncheckedIOExceptionSupplier implements Supplier {
-
- private final CallableRaisingIOE call;
-
- private UncheckedIOExceptionSupplier(CallableRaisingIOE call) {
- this.call = call;
- }
-
- @Override
- public T get() {
- return uncheckIOExceptions(call);
- }
- }
-
/**
* Wrap a {@link CallableRaisingIOE} as a {@link Supplier}.
* @param call call to wrap
@@ -77,7 +56,7 @@ public T get() {
* @return a supplier which invokes the call.
*/
public static Supplier toUncheckedIOExceptionSupplier(CallableRaisingIOE call) {
- return new UncheckedIOExceptionSupplier<>(call);
+ return () -> uncheckIOExceptions(call);
}
/**
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/FutureIO.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/FutureIO.java
index 2f043b6499..0a0d023d93 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/FutureIO.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/FutureIO.java
@@ -38,9 +38,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
/**
* Future IO Helper methods.
*
@@ -62,7 +59,6 @@
@InterfaceStability.Unstable
public final class FutureIO {
- private static final Logger LOG = LoggerFactory.getLogger(FutureIO.class.getName());
private FutureIO() {
}
@@ -129,7 +125,6 @@ public static T awaitFuture(final Future future,
* If any future throws an exception during its execution, this method
* extracts and rethrows that exception.
*
- *
* @param collection collection of futures to be evaluated
* @param type of the result.
* @return the list of future's result, if all went well.
@@ -140,19 +135,10 @@ public static T awaitFuture(final Future future,
public static List awaitAllFutures(final Collection> collection)
throws InterruptedIOException, IOException, RuntimeException {
List results = new ArrayList<>();
- try {
- for (Future future : collection) {
- results.add(future.get());
- }
- return results;
- } catch (InterruptedException e) {
- LOG.debug("Execution of future interrupted ", e);
- throw (InterruptedIOException) new InterruptedIOException(e.toString())
- .initCause(e);
- } catch (ExecutionException e) {
- LOG.debug("Execution of future failed with exception", e.getCause());
- return raiseInnerCause(e);
+ for (Future future : collection) {
+ results.add(awaitFuture(future));
}
+ return results;
}
/**
@@ -163,7 +149,6 @@ public static List awaitAllFutures(final Collection> collection
* the timeout expires, whichever happens first. If any future throws an
* exception during its execution, this method extracts and rethrows that exception.
*
- *
* @param collection collection of futures to be evaluated
* @param duration timeout duration
* @param type of the result.
@@ -176,21 +161,12 @@ public static List awaitAllFutures(final Collection> collection
public static List awaitAllFutures(final Collection> collection,
final Duration duration)
throws InterruptedIOException, IOException, RuntimeException,
- TimeoutException {
+ TimeoutException {
List results = new ArrayList<>();
- try {
- for (Future future : collection) {
- results.add(future.get(duration.toMillis(), TimeUnit.MILLISECONDS));
- }
- return results;
- } catch (InterruptedException e) {
- LOG.debug("Execution of future interrupted ", e);
- throw (InterruptedIOException) new InterruptedIOException(e.toString())
- .initCause(e);
- } catch (ExecutionException e) {
- LOG.debug("Execution of future failed with exception", e.getCause());
- return raiseInnerCause(e);
+ for (Future future : collection) {
+ results.add(awaitFuture(future, duration.toMillis(), TimeUnit.MILLISECONDS));
}
+ return results;
}
/**
@@ -199,7 +175,6 @@ public static List awaitAllFutures(final Collection> collection
* This will always raise an exception, either the inner IOException,
* an inner RuntimeException, or a new IOException wrapping the raised
* exception.
- *
* @param e exception.
* @param type of return value.
* @return nothing, ever.
@@ -283,12 +258,11 @@ public static IOException unwrapInnerException(final Throwable e) {
* @param type of builder
* @return the builder passed in.
*/
- public static >
- FSBuilder propagateOptions(
- final FSBuilder builder,
- final Configuration conf,
- final String optionalPrefix,
- final String mandatoryPrefix) {
+ public static > FSBuilder propagateOptions(
+ final FSBuilder builder,
+ final Configuration conf,
+ final String optionalPrefix,
+ final String mandatoryPrefix) {
propagateOptions(builder, conf,
optionalPrefix, false);
propagateOptions(builder, conf,
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/LazyAtomicReference.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/LazyAtomicReference.java
new file mode 100644
index 0000000000..5f2d674bba
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/LazyAtomicReference.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util.functional;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.util.functional.FunctionalIO.uncheckIOExceptions;
+
+/**
+ * A lazily constructed reference, whose reference
+ * constructor is a {@link CallableRaisingIOE} so
+ * may raise IOExceptions.
+ *
+ * This {@code constructor} is only invoked on demand
+ * when the reference is first needed,
+ * after which the same value is returned.
+ * This value MUST NOT be null.
+ *
+ * Implements {@link CallableRaisingIOE} and {@code java.util.function.Supplier}.
+ * An instance of this can therefore be used in a functional IO chain.
+ * As such, it can act as a delayed and caching invocator of a function:
+ * the supplier passed in is only ever invoked once, and only when requested.
+ * @param type of reference
+ */
+public class LazyAtomicReference
+ implements CallableRaisingIOE, Supplier {
+
+ /**
+ * Underlying reference.
+ */
+ private final AtomicReference reference = new AtomicReference<>();
+
+ /**
+ * Constructor for lazy creation.
+ */
+ private final CallableRaisingIOE extends T> constructor;
+
+ /**
+ * Constructor for this instance.
+ * @param constructor method to invoke to actually construct the inner object.
+ */
+ public LazyAtomicReference(final CallableRaisingIOE extends T> constructor) {
+ this.constructor = requireNonNull(constructor);
+ }
+
+ /**
+ * Getter for the constructor.
+ * @return the constructor class
+ */
+ protected CallableRaisingIOE extends T> getConstructor() {
+ return constructor;
+ }
+
+ /**
+ * Get the reference.
+ * Subclasses working with this need to be careful working with this.
+ * @return the reference.
+ */
+ protected AtomicReference getReference() {
+ return reference;
+ }
+
+ /**
+ * Get the value, constructing it if needed.
+ * @return the value
+ * @throws IOException on any evaluation failure
+ * @throws NullPointerException if the evaluated function returned null.
+ */
+ public synchronized T eval() throws IOException {
+ final T v = reference.get();
+ if (v != null) {
+ return v;
+ }
+ reference.set(requireNonNull(constructor.apply()));
+ return reference.get();
+ }
+
+ /**
+ * Implementation of {@code CallableRaisingIOE.apply()}.
+ * Invoke {@link #eval()}.
+ * @return the value
+ * @throws IOException on any evaluation failure
+ */
+ @Override
+ public final T apply() throws IOException {
+ return eval();
+ }
+
+ /**
+ * Implementation of {@code Supplier.get()}.
+ *
+ * Invoke {@link #eval()} and convert IOEs to
+ * UncheckedIOException.
+ *
+ * This is the {@code Supplier.get()} implementation, which allows
+ * this class to passed into anything taking a supplier.
+ * @return the value
+ * @throws UncheckedIOException if the constructor raised an IOException.
+ */
+ @Override
+ public final T get() throws UncheckedIOException {
+ return uncheckIOExceptions(this::eval);
+ }
+
+ /**
+ * Is the reference set?
+ * @return true if the reference has been set.
+ */
+ public final boolean isSet() {
+ return reference.get() != null;
+ }
+
+ @Override
+ public String toString() {
+ return "LazyAtomicReference{" +
+ "reference=" + reference + '}';
+ }
+
+
+ /**
+ * Create from a supplier.
+ * This is not a constructor to avoid ambiguity when a lambda-expression is
+ * passed in.
+ * @param supplier supplier implementation.
+ * @return a lazy reference.
+ * @param type of reference
+ */
+ public static LazyAtomicReference lazyAtomicReferenceFromSupplier(
+ Supplier supplier) {
+ return new LazyAtomicReference<>(supplier::get);
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/LazyAutoCloseableReference.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/LazyAutoCloseableReference.java
new file mode 100644
index 0000000000..d6d625c125
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/LazyAutoCloseableReference.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util.functional;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
+
+import static org.apache.hadoop.util.Preconditions.checkState;
+
+/**
+ * A subclass of {@link LazyAtomicReference} which
+ * holds an {@code AutoCloseable} reference and calls {@code close()}
+ * when it itself is closed.
+ * @param type of reference.
+ */
+public class LazyAutoCloseableReference
+ extends LazyAtomicReference implements AutoCloseable {
+
+ /** Closed flag. */
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+
+ /**
+ * Constructor for this instance.
+ * @param constructor method to invoke to actually construct the inner object.
+ */
+ public LazyAutoCloseableReference(final CallableRaisingIOE extends T> constructor) {
+ super(constructor);
+ }
+
+ /**
+ * {@inheritDoc}
+ * @throws IllegalStateException if the reference is closed.
+ */
+ @Override
+ public synchronized T eval() throws IOException {
+ checkState(!closed.get(), "Reference is closed");
+ return super.eval();
+ }
+
+ /**
+ * Is the reference closed?
+ * @return true if the reference is closed.
+ */
+ public boolean isClosed() {
+ return closed.get();
+ }
+
+ /**
+ * Close the reference value if it is non-null.
+ * Sets the reference to null afterwards, even on
+ * a failure.
+ * @throws Exception failure to close.
+ */
+ @Override
+ public synchronized void close() throws Exception {
+ if (closed.getAndSet(true)) {
+ // already closed
+ return;
+ }
+ final T v = getReference().get();
+ // check the state.
+ // A null reference means it has not yet been evaluated,
+ if (v != null) {
+ try {
+ v.close();
+ } finally {
+ // set the reference to null, even on a failure.
+ getReference().set(null);
+ }
+ }
+ }
+
+
+ /**
+ * Create from a supplier.
+ * This is not a constructor to avoid ambiguity when a lambda-expression is
+ * passed in.
+ * @param supplier supplier implementation.
+ * @return a lazy reference.
+ * @param type of reference
+ */
+ public static LazyAutoCloseableReference lazyAutoCloseablefromSupplier(Supplier supplier) {
+ return new LazyAutoCloseableReference<>(supplier::get);
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/functional/TestLazyReferences.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/functional/TestLazyReferences.java
new file mode 100644
index 0000000000..4d1dae184b
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/functional/TestLazyReferences.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util.functional;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.net.UnknownHostException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+
+import org.apache.hadoop.test.AbstractHadoopTestBase;
+
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+import static org.apache.hadoop.test.LambdaTestUtils.verifyCause;
+import static org.apache.hadoop.util.Preconditions.checkState;
+
+/**
+ * Test {@link LazyAtomicReference} and {@link LazyAutoCloseableReference}.
+ */
+public class TestLazyReferences extends AbstractHadoopTestBase {
+
+ /**
+ * Format of exceptions to raise.
+ */
+ private static final String GENERATED = "generated[%d]";
+
+ /**
+ * Invocation counter, can be asserted on in {@link #assertCounterValue(int)}.
+ */
+ private final AtomicInteger counter = new AtomicInteger();
+
+ /**
+ * Assert that {@link #counter} has a specific value.
+ * @param val expected value
+ */
+ private void assertCounterValue(final int val) {
+ assertAtomicIntValue(counter, val);
+ }
+
+ /**
+ * Assert an atomic integer has a specific value.
+ * @param ai atomic integer
+ * @param val expected value
+ */
+ private static void assertAtomicIntValue(
+ final AtomicInteger ai, final int val) {
+ Assertions.assertThat(ai.get())
+ .describedAs("value of atomic integer %s", ai)
+ .isEqualTo(val);
+ }
+
+
+ /**
+ * Test the underlying {@link LazyAtomicReference} integration with java
+ * Supplier API.
+ */
+ @Test
+ public void testLazyAtomicReference() throws Throwable {
+
+ LazyAtomicReference ref = new LazyAtomicReference<>(counter::incrementAndGet);
+
+ // constructor does not invoke the supplier
+ assertCounterValue(0);
+
+ assertSetState(ref, false);
+
+ // second invocation does not
+ Assertions.assertThat(ref.eval())
+ .describedAs("first eval()")
+ .isEqualTo(1);
+ assertCounterValue(1);
+ assertSetState(ref, true);
+
+
+ // Callable.apply() returns the same value
+ Assertions.assertThat(ref.apply())
+ .describedAs("second get of %s", ref)
+ .isEqualTo(1);
+ // no new counter increment
+ assertCounterValue(1);
+ }
+
+ /**
+ * Assert that {@link LazyAtomicReference#isSet()} is in the expected state.
+ * @param ref reference
+ * @param expected expected value
+ */
+ private static void assertSetState(final LazyAtomicReference ref,
+ final boolean expected) {
+ Assertions.assertThat(ref.isSet())
+ .describedAs("isSet() of %s", ref)
+ .isEqualTo(expected);
+ }
+
+ /**
+ * Test the underlying {@link LazyAtomicReference} integration with java
+ * Supplier API.
+ */
+ @Test
+ public void testSupplierIntegration() throws Throwable {
+
+ LazyAtomicReference ref = LazyAtomicReference.lazyAtomicReferenceFromSupplier(counter::incrementAndGet);
+
+ // constructor does not invoke the supplier
+ assertCounterValue(0);
+ assertSetState(ref, false);
+
+ // second invocation does not
+ Assertions.assertThat(ref.get())
+ .describedAs("first get()")
+ .isEqualTo(1);
+ assertCounterValue(1);
+
+ // Callable.apply() returns the same value
+ Assertions.assertThat(ref.apply())
+ .describedAs("second get of %s", ref)
+ .isEqualTo(1);
+ // no new counter increment
+ assertCounterValue(1);
+ }
+
+ /**
+ * Test failure handling. through the supplier API.
+ */
+ @Test
+ public void testSupplierIntegrationFailureHandling() throws Throwable {
+
+ LazyAtomicReference ref = new LazyAtomicReference<>(() -> {
+ throw new UnknownHostException(String.format(GENERATED, counter.incrementAndGet()));
+ });
+
+ // the get() call will wrap the raised exception, which can be extracted
+ // and type checked.
+ verifyCause(UnknownHostException.class,
+ intercept(UncheckedIOException.class, "[1]", ref::get));
+
+ assertSetState(ref, false);
+
+ // counter goes up
+ intercept(UncheckedIOException.class, "[2]", ref::get);
+ }
+
+ @Test
+ public void testAutoCloseable() throws Throwable {
+ final LazyAutoCloseableReference ref =
+ LazyAutoCloseableReference.lazyAutoCloseablefromSupplier(CloseableClass::new);
+
+ assertSetState(ref, false);
+ ref.eval();
+ final CloseableClass closeable = ref.get();
+ Assertions.assertThat(closeable.isClosed())
+ .describedAs("closed flag of %s", closeable)
+ .isFalse();
+
+ // first close will close the class.
+ ref.close();
+ Assertions.assertThat(ref.isClosed())
+ .describedAs("closed flag of %s", ref)
+ .isTrue();
+ Assertions.assertThat(closeable.isClosed())
+ .describedAs("closed flag of %s", closeable)
+ .isTrue();
+
+ // second close will not raise an exception
+ ref.close();
+
+ // you cannot eval() a closed reference
+ intercept(IllegalStateException.class, "Reference is closed", ref::eval);
+ intercept(IllegalStateException.class, "Reference is closed", ref::get);
+ intercept(IllegalStateException.class, "Reference is closed", ref::apply);
+
+ Assertions.assertThat(ref.getReference().get())
+ .describedAs("inner referece of %s", ref)
+ .isNull();
+ }
+
+ /**
+ * Not an error to close a reference which was never evaluated.
+ */
+ @Test
+ public void testCloseableUnevaluated() throws Throwable {
+ final LazyAutoCloseableReference ref =
+ new LazyAutoCloseableReference<>(CloseableRaisingException::new);
+ ref.close();
+ ref.close();
+ }
+
+ /**
+ * If the close() call fails, that only raises an exception on the first attempt,
+ * and the reference is set to null.
+ */
+ @Test
+ public void testAutoCloseableFailureHandling() throws Throwable {
+ final LazyAutoCloseableReference ref =
+ new LazyAutoCloseableReference<>(CloseableRaisingException::new);
+ ref.eval();
+
+ // close reports the failure.
+ intercept(IOException.class, "raised", ref::close);
+
+ // but the reference is set to null
+ assertSetState(ref, false);
+ // second attept does nothing, so will not raise an exception.p
+ ref.close();
+ }
+
+ /**
+ * Closeable which sets the closed flag on close().
+ */
+ private static final class CloseableClass implements AutoCloseable {
+
+ /** closed flag. */
+ private boolean closed;
+
+ /**
+ * Close the resource.
+ * @throws IllegalArgumentException if already closed.
+ */
+ @Override
+ public void close() {
+ checkState(!closed, "Already closed");
+ closed = true;
+ }
+
+ /**
+ * Get the closed flag.
+ * @return the state.
+ */
+ private boolean isClosed() {
+ return closed;
+ }
+
+ }
+ /**
+ * Closeable which raises an IOE in close().
+ */
+ private static final class CloseableRaisingException implements AutoCloseable {
+
+ @Override
+ public void close() throws Exception {
+ throw new IOException("raised");
+ }
+ }
+
+}
diff --git a/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml b/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml
index 359ac0e80d..39a9e51ac8 100644
--- a/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml
+++ b/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml
@@ -64,11 +64,6 @@
-
-
-
-
-
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index d04ca70a68..f5937ae0a4 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -54,7 +54,6 @@
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.exception.SdkException;
-import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse;
@@ -88,7 +87,6 @@
import software.amazon.awssdk.transfer.s3.model.CompletedCopy;
import software.amazon.awssdk.transfer.s3.model.CompletedFileUpload;
import software.amazon.awssdk.transfer.s3.model.Copy;
-import software.amazon.awssdk.transfer.s3.S3TransferManager;
import software.amazon.awssdk.transfer.s3.model.CopyRequest;
import software.amazon.awssdk.transfer.s3.model.FileUpload;
import software.amazon.awssdk.transfer.s3.model.UploadFileRequest;
@@ -123,6 +121,8 @@
import org.apache.hadoop.fs.s3a.impl.BulkDeleteOperation;
import org.apache.hadoop.fs.s3a.impl.BulkDeleteOperationCallbacksImpl;
import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
+import org.apache.hadoop.fs.s3a.impl.ClientManager;
+import org.apache.hadoop.fs.s3a.impl.ClientManagerImpl;
import org.apache.hadoop.fs.s3a.impl.ConfigurationHelper;
import org.apache.hadoop.fs.s3a.impl.ContextAccessors;
import org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation;
@@ -152,6 +152,7 @@
import org.apache.hadoop.fs.s3a.tools.MarkerToolOperationsImpl;
import org.apache.hadoop.fs.statistics.DurationTracker;
import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
+import org.apache.hadoop.fs.statistics.FileSystemStatisticNames;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.statistics.IOStatisticsContext;
@@ -305,11 +306,13 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
*/
private S3AStore store;
+ /**
+ * The core S3 client is created and managed by the ClientManager.
+ * It is copied here within {@link #initialize(URI, Configuration)}.
+ * Some mocking tests modify this so take care with changes.
+ */
private S3Client s3Client;
- /** Async client is used for transfer manager. */
- private S3AsyncClient s3AsyncClient;
-
// initial callback policy is fail-once; it's there just to assist
// some mock tests and other codepaths trying to call the low level
// APIs on an uninitialized filesystem.
@@ -328,7 +331,6 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
private Listing listing;
private long partSize;
private boolean enableMultiObjectsDelete;
- private S3TransferManager transferManager;
private ExecutorService boundedThreadPool;
private ThreadPoolExecutor unboundedThreadPool;
@@ -548,6 +550,9 @@ public void initialize(URI name, Configuration originalConf)
// get the host; this is guaranteed to be non-null, non-empty
bucket = name.getHost();
AuditSpan span = null;
+ // track initialization duration; will only be set after
+ // statistics are set up.
+ Optional trackInitialization = Optional.empty();
try {
LOG.debug("Initializing S3AFileSystem for {}", bucket);
if (LOG.isTraceEnabled()) {
@@ -592,6 +597,18 @@ public void initialize(URI name, Configuration originalConf)
super.initialize(uri, conf);
setConf(conf);
+ // initialize statistics, after which statistics
+ // can be collected.
+ instrumentation = new S3AInstrumentation(uri);
+ initializeStatisticsBinding();
+
+ // track initialization duration.
+ // this should really be done in a onceTrackingDuration() call,
+ // but then all methods below would need to be in the lambda and
+ // it would create a merge/backport headache for all.
+ trackInitialization = Optional.of(
+ instrumentation.trackDuration(FileSystemStatisticNames.FILESYSTEM_INITIALIZATION));
+
s3aInternals = createS3AInternals();
// look for encryption data
@@ -600,8 +617,7 @@ public void initialize(URI name, Configuration originalConf)
buildEncryptionSecrets(bucket, conf));
invoker = new Invoker(new S3ARetryPolicy(getConf()), onRetry);
- instrumentation = new S3AInstrumentation(uri);
- initializeStatisticsBinding();
+
// If CSE-KMS method is set then CSE is enabled.
isCSEEnabled = S3AEncryptionMethods.CSE_KMS.getMethod()
.equals(getS3EncryptionAlgorithm().getMethod());
@@ -687,7 +703,7 @@ public void initialize(URI name, Configuration originalConf)
// the FS came with a DT
// this may do some patching of the configuration (e.g. setting
// the encryption algorithms)
- bindAWSClient(name, delegationTokensEnabled);
+ ClientManager clientManager = createClientManager(name, delegationTokensEnabled);
inputPolicy = S3AInputPolicy.getPolicy(
conf.getTrimmed(INPUT_FADVISE,
@@ -762,36 +778,55 @@ public void initialize(URI name, Configuration originalConf)
int rateLimitCapacity = intOption(conf, S3A_IO_RATE_LIMIT, DEFAULT_S3A_IO_RATE_LIMIT, 0);
// now create the store
- store = new S3AStoreBuilder()
- .withS3Client(s3Client)
- .withDurationTrackerFactory(getDurationTrackerFactory())
- .withStoreContextFactory(this)
- .withAuditSpanSource(getAuditManager())
- .withInstrumentation(getInstrumentation())
- .withStatisticsContext(statisticsContext)
- .withStorageStatistics(getStorageStatistics())
- .withReadRateLimiter(unlimitedRate())
- .withWriteRateLimiter(RateLimitingFactory.create(rateLimitCapacity))
- .build();
-
+ store = createS3AStore(clientManager, rateLimitCapacity);
+ // the s3 client is created through the store, rather than
+ // directly through the client manager.
+ // this is to aid mocking.
+ s3Client = store.getOrCreateS3Client();
// The filesystem is now ready to perform operations against
// S3
// This initiates a probe against S3 for the bucket existing.
doBucketProbing();
initMultipartUploads(conf);
+ trackInitialization.ifPresent(DurationTracker::close);
} catch (SdkException e) {
// amazon client exception: stop all services then throw the translation
cleanupWithLogger(LOG, span);
stopAllServices();
+ trackInitialization.ifPresent(DurationTracker::failed);
throw translateException("initializing ", new Path(name), e);
} catch (IOException | RuntimeException e) {
// other exceptions: stop the services.
cleanupWithLogger(LOG, span);
stopAllServices();
+ trackInitialization.ifPresent(DurationTracker::failed);
throw e;
}
}
+ /**
+ * Create the S3AStore instance.
+ * This is protected so that tests can override it.
+ * @param clientManager client manager
+ * @param rateLimitCapacity rate limit
+ * @return a new store instance
+ */
+ @VisibleForTesting
+ protected S3AStore createS3AStore(final ClientManager clientManager,
+ final int rateLimitCapacity) {
+ return new S3AStoreBuilder()
+ .withClientManager(clientManager)
+ .withDurationTrackerFactory(getDurationTrackerFactory())
+ .withStoreContextFactory(this)
+ .withAuditSpanSource(getAuditManager())
+ .withInstrumentation(getInstrumentation())
+ .withStatisticsContext(statisticsContext)
+ .withStorageStatistics(getStorageStatistics())
+ .withReadRateLimiter(unlimitedRate())
+ .withWriteRateLimiter(RateLimitingFactory.create(rateLimitCapacity))
+ .build();
+ }
+
/**
* Populates the configurations related to vectored IO operation
* in the context which has to passed down to input streams.
@@ -965,7 +1000,7 @@ protected void verifyBucketExists() throws UnknownStoreException, IOException {
STORE_EXISTS_PROBE, bucket, null, () ->
invoker.retry("doesBucketExist", bucket, true, () -> {
try {
- s3Client.headBucket(HeadBucketRequest.builder().bucket(bucket).build());
+ getS3Client().headBucket(HeadBucketRequest.builder().bucket(bucket).build());
return true;
} catch (AwsServiceException ex) {
int statusCode = ex.statusCode();
@@ -1014,14 +1049,22 @@ public Listing getListing() {
/**
* Set up the client bindings.
* If delegation tokens are enabled, the FS first looks for a DT
- * ahead of any other bindings;.
+ * ahead of any other bindings.
* If there is a DT it uses that to do the auth
- * and switches to the DT authenticator automatically (and exclusively)
- * @param name URI of the FS
+ * and switches to the DT authenticator automatically (and exclusively).
+ *
+ * Delegation tokens are configured and started, but the actual
+ * S3 clients are not: instead a {@link ClientManager} is created
+ * and returned, from which they can be created on demand.
+ * This is to reduce delays in FS initialization, especially
+ * for features (transfer manager, async client) which are not
+ * always used.
+ * @param fsURI URI of the FS
* @param dtEnabled are delegation tokens enabled?
+ * @return the client manager which can generate the clients.
* @throws IOException failure.
*/
- private void bindAWSClient(URI name, boolean dtEnabled) throws IOException {
+ private ClientManager createClientManager(URI fsURI, boolean dtEnabled) throws IOException {
Configuration conf = getConf();
credentials = null;
String uaSuffix = "";
@@ -1059,7 +1102,7 @@ private void bindAWSClient(URI name, boolean dtEnabled) throws IOException {
uaSuffix = tokens.getUserAgentField();
} else {
// DT support is disabled, so create the normal credential chain
- credentials = createAWSCredentialProviderList(name, conf);
+ credentials = createAWSCredentialProviderList(fsURI, conf);
}
LOG.debug("Using credential provider {}", credentials);
Class extends S3ClientFactory> s3ClientFactoryClass = conf.getClass(
@@ -1069,7 +1112,7 @@ private void bindAWSClient(URI name, boolean dtEnabled) throws IOException {
S3ClientFactory.S3ClientCreationParameters parameters =
new S3ClientFactory.S3ClientCreationParameters()
.withCredentialSet(credentials)
- .withPathUri(name)
+ .withPathUri(fsURI)
.withEndpoint(endpoint)
.withMetrics(statisticsContext.newStatisticsFromAwsSdk())
.withPathStyleAccess(conf.getBoolean(PATH_STYLE_ACCESS, false))
@@ -1088,22 +1131,27 @@ private void bindAWSClient(URI name, boolean dtEnabled) throws IOException {
conf.getBoolean(CHECKSUM_VALIDATION, CHECKSUM_VALIDATION_DEFAULT));
S3ClientFactory clientFactory = ReflectionUtils.newInstance(s3ClientFactoryClass, conf);
- s3Client = clientFactory.createS3Client(getUri(), parameters);
- createS3AsyncClient(clientFactory, parameters);
- transferManager = clientFactory.createS3TransferManager(getS3AsyncClient());
+ // this is where clients and the transfer manager are created on demand.
+ return createClientManager(clientFactory, parameters, getDurationTrackerFactory());
}
/**
- * Creates and configures the S3AsyncClient.
- * Uses synchronized method to suppress spotbugs error.
- *
- * @param clientFactory factory used to create S3AsyncClient
- * @param parameters parameter object
- * @throws IOException on any IO problem
+ * Create the Client Manager; protected to allow for mocking.
+ * Requires {@link #unboundedThreadPool} to be initialized.
+ * @param clientFactory (reflection-bonded) client factory.
+ * @param clientCreationParameters parameters for client creation.
+ * @param durationTrackerFactory factory for duration tracking.
+ * @return a client manager instance.
*/
- private void createS3AsyncClient(S3ClientFactory clientFactory,
- S3ClientFactory.S3ClientCreationParameters parameters) throws IOException {
- s3AsyncClient = clientFactory.createS3AsyncClient(getUri(), parameters);
+ @VisibleForTesting
+ protected ClientManager createClientManager(
+ final S3ClientFactory clientFactory,
+ final S3ClientFactory.S3ClientCreationParameters clientCreationParameters,
+ final DurationTrackerFactory durationTrackerFactory) {
+ return new ClientManagerImpl(clientFactory,
+ clientCreationParameters,
+ durationTrackerFactory
+ );
}
/**
@@ -1241,14 +1289,6 @@ public RequestFactory getRequestFactory() {
return requestFactory;
}
- /**
- * Get the S3 Async client.
- * @return the async s3 client.
- */
- private S3AsyncClient getS3AsyncClient() {
- return s3AsyncClient;
- }
-
/**
* Implementation of all operations used by delegation tokens.
*/
@@ -1335,7 +1375,8 @@ public void abortOutstandingMultipartUploads(long seconds)
invoker.retry("Purging multipart uploads", bucket, true,
() -> {
RemoteIterator uploadIterator =
- MultipartUtils.listMultipartUploads(createStoreContext(), s3Client, null, maxKeys);
+ MultipartUtils.listMultipartUploads(createStoreContext(),
+ getS3Client(), null, maxKeys);
while (uploadIterator.hasNext()) {
MultipartUpload upload = uploadIterator.next();
@@ -1395,12 +1436,23 @@ public int getDefaultPort() {
* Set the client -used in mocking tests to force in a different client.
* @param client client.
*/
+ @VisibleForTesting
protected void setAmazonS3Client(S3Client client) {
Preconditions.checkNotNull(client, "clientV2");
LOG.debug("Setting S3V2 client to {}", client);
s3Client = client;
}
+ /**
+ * Get the S3 client created in {@link #initialize(URI, Configuration)}.
+ * @return the s3Client
+ * @throws UncheckedIOException if the client could not be created.
+ */
+ @VisibleForTesting
+ protected S3Client getS3Client() {
+ return s3Client;
+ }
+
/**
* S3AInternals method.
* {@inheritDoc}.
@@ -1437,7 +1489,7 @@ private final class S3AInternalsImpl implements S3AInternals {
@Override
public S3Client getAmazonS3Client(String reason) {
LOG.debug("Access to S3 client requested, reason {}", reason);
- return s3Client;
+ return getS3Client();
}
@Override
@@ -1470,7 +1522,7 @@ public String getBucketLocation(String bucketName) throws IOException {
// If accessPoint then region is known from Arn
accessPoint != null
? accessPoint.getRegion()
- : s3Client.getBucketLocation(GetBucketLocationRequest.builder()
+ : getS3Client().getBucketLocation(GetBucketLocationRequest.builder()
.bucket(bucketName)
.build())
.locationConstraintAsString()));
@@ -1859,7 +1911,7 @@ public GetObjectRequest.Builder newGetRequestBuilder(final String key) {
public ResponseInputStream getObject(GetObjectRequest request) {
// active the audit span used for the operation
try (AuditSpan span = auditSpan.activate()) {
- return s3Client.getObject(request);
+ return getS3Client().getObject(request);
}
}
@@ -1888,7 +1940,7 @@ private final class WriteOperationHelperCallbacksImpl
@Override
public CompleteMultipartUploadResponse completeMultipartUpload(
CompleteMultipartUploadRequest request) {
- return s3Client.completeMultipartUpload(request);
+ return getS3Client().completeMultipartUpload(request);
}
}
@@ -2926,7 +2978,8 @@ protected HeadObjectResponse getObjectMetadata(String key,
if (changeTracker != null) {
changeTracker.maybeApplyConstraint(requestBuilder);
}
- HeadObjectResponse headObjectResponse = s3Client.headObject(requestBuilder.build());
+ HeadObjectResponse headObjectResponse = getS3Client()
+ .headObject(requestBuilder.build());
if (changeTracker != null) {
changeTracker.processMetadata(headObjectResponse, operation);
}
@@ -2960,7 +3013,7 @@ protected HeadBucketResponse getBucketMetadata() throws IOException {
final HeadBucketResponse response = trackDurationAndSpan(STORE_EXISTS_PROBE, bucket, null,
() -> invoker.retry("getBucketMetadata()", bucket, true, () -> {
try {
- return s3Client.headBucket(
+ return getS3Client().headBucket(
getRequestFactory().newHeadBucketRequestBuilder(bucket).build());
} catch (NoSuchBucketException e) {
throw new UnknownStoreException("s3a://" + bucket + "/", " Bucket does " + "not exist");
@@ -2995,9 +3048,9 @@ protected S3ListResult listObjects(S3ListRequest request,
OBJECT_LIST_REQUEST,
() -> {
if (useListV1) {
- return S3ListResult.v1(s3Client.listObjects(request.getV1()));
+ return S3ListResult.v1(getS3Client().listObjects(request.getV1()));
} else {
- return S3ListResult.v2(s3Client.listObjectsV2(request.getV2()));
+ return S3ListResult.v2(getS3Client().listObjectsV2(request.getV2()));
}
}));
}
@@ -3050,10 +3103,10 @@ protected S3ListResult continueListObjects(S3ListRequest request,
nextMarker = prevListResult.get(prevListResult.size() - 1).key();
}
- return S3ListResult.v1(s3Client.listObjects(
+ return S3ListResult.v1(getS3Client().listObjects(
request.getV1().toBuilder().marker(nextMarker).build()));
} else {
- return S3ListResult.v2(s3Client.listObjectsV2(request.getV2().toBuilder()
+ return S3ListResult.v2(getS3Client().listObjectsV2(request.getV2().toBuilder()
.continuationToken(prevResult.getV2().nextContinuationToken()).build()));
}
}));
@@ -3185,15 +3238,16 @@ public PutObjectRequest.Builder newPutObjectRequestBuilder(String key,
* @param file the file to be uploaded
* @param listener the progress listener for the request
* @return the upload initiated
+ * @throws IOException if transfer manager creation failed.
*/
@Retries.OnceRaw
public UploadInfo putObject(PutObjectRequest putObjectRequest, File file,
- ProgressableProgressListener listener) {
+ ProgressableProgressListener listener) throws IOException {
long len = getPutRequestLength(putObjectRequest);
LOG.debug("PUT {} bytes to {} via transfer manager ", len, putObjectRequest.key());
incrementPutStartStatistics(len);
- FileUpload upload = transferManager.uploadFile(
+ FileUpload upload = store.getOrCreateTransferManager().uploadFile(
UploadFileRequest.builder()
.putObjectRequest(putObjectRequest)
.source(file)
@@ -3233,9 +3287,10 @@ PutObjectResponse putObjectDirect(PutObjectRequest putObjectRequest,
PutObjectResponse response =
trackDurationOfSupplier(nonNullDurationTrackerFactory(durationTrackerFactory),
OBJECT_PUT_REQUESTS.getSymbol(),
- () -> isFile ?
- s3Client.putObject(putObjectRequest, RequestBody.fromFile(uploadData.getFile())) :
- s3Client.putObject(putObjectRequest,
+ () -> isFile
+ ? getS3Client().putObject(putObjectRequest,
+ RequestBody.fromFile(uploadData.getFile()))
+ : getS3Client().putObject(putObjectRequest,
RequestBody.fromInputStream(uploadData.getUploadStream(),
putObjectRequest.contentLength())));
incrementPutCompletedStatistics(true, len);
@@ -3285,7 +3340,7 @@ UploadPartResponse uploadPart(UploadPartRequest request, RequestBody body,
UploadPartResponse uploadPartResponse = trackDurationOfSupplier(
nonNullDurationTrackerFactory(durationTrackerFactory),
MULTIPART_UPLOAD_PART_PUT.getSymbol(), () ->
- s3Client.uploadPart(request, body));
+ getS3Client().uploadPart(request, body));
incrementPutCompletedStatistics(true, len);
return uploadPartResponse;
} catch (AwsServiceException e) {
@@ -4344,35 +4399,43 @@ public void close() throws IOException {
* both the expected state of this FS and of failures while being stopped.
*/
protected synchronized void stopAllServices() {
- closeAutocloseables(LOG, transferManager,
- s3Client,
- getS3AsyncClient());
- transferManager = null;
- s3Client = null;
- s3AsyncClient = null;
+ try {
+ trackDuration(getDurationTrackerFactory(), FILESYSTEM_CLOSE.getSymbol(), () -> {
+ closeAutocloseables(LOG, store);
+ store = null;
+ s3Client = null;
- // At this point the S3A client is shut down,
- // now the executor pools are closed
- HadoopExecutors.shutdown(boundedThreadPool, LOG,
- THREAD_POOL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
- boundedThreadPool = null;
- HadoopExecutors.shutdown(unboundedThreadPool, LOG,
- THREAD_POOL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
- unboundedThreadPool = null;
- if (futurePool != null) {
- futurePool.shutdown(LOG, THREAD_POOL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
- futurePool = null;
+ // At this point the S3A client is shut down,
+ // now the executor pools are closed
+ HadoopExecutors.shutdown(boundedThreadPool, LOG,
+ THREAD_POOL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
+ boundedThreadPool = null;
+ HadoopExecutors.shutdown(unboundedThreadPool, LOG,
+ THREAD_POOL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
+ unboundedThreadPool = null;
+ if (futurePool != null) {
+ futurePool.shutdown(LOG, THREAD_POOL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
+ futurePool = null;
+ }
+ // other services are shutdown.
+ cleanupWithLogger(LOG,
+ delegationTokens.orElse(null),
+ signerManager,
+ auditManager);
+ closeAutocloseables(LOG, credentials);
+ delegationTokens = Optional.empty();
+ signerManager = null;
+ credentials = null;
+ return null;
+ });
+ } catch (IOException e) {
+ // failure during shutdown.
+ // this should only be from the signature of trackDurationAndSpan().
+ LOG.warn("Failure during service shutdown", e);
}
+ // and once this duration has been tracked, close the statistics
// other services are shutdown.
- cleanupWithLogger(LOG,
- instrumentation,
- delegationTokens.orElse(null),
- signerManager,
- auditManager);
- closeAutocloseables(LOG, credentials);
- delegationTokens = Optional.empty();
- signerManager = null;
- credentials = null;
+ cleanupWithLogger(LOG, instrumentation);
}
/**
@@ -4559,7 +4622,7 @@ private CopyObjectResponse copyFile(String srcKey, String dstKey, long size,
() -> {
incrementStatistic(OBJECT_COPY_REQUESTS);
- Copy copy = transferManager.copy(
+ Copy copy = store.getOrCreateTransferManager().copy(
CopyRequest.builder()
.copyObjectRequest(copyRequest)
.build());
@@ -4589,7 +4652,7 @@ private CopyObjectResponse copyFile(String srcKey, String dstKey, long size,
LOG.debug("copyFile: single part copy {} -> {} of size {}", srcKey, dstKey, size);
incrementStatistic(OBJECT_COPY_REQUESTS);
try {
- return s3Client.copyObject(copyRequest);
+ return getS3Client().copyObject(copyRequest);
} catch (SdkException awsException) {
// if this is a 412 precondition failure, it may
// be converted to a RemoteFileChangedException
@@ -4620,7 +4683,7 @@ CreateMultipartUploadResponse initiateMultipartUpload(
LOG.debug("Initiate multipart upload to {}", request.key());
return trackDurationOfSupplier(getDurationTrackerFactory(),
OBJECT_MULTIPART_UPLOAD_INITIATED.getSymbol(),
- () -> s3Client.createMultipartUpload(request));
+ () -> getS3Client().createMultipartUpload(request));
}
/**
@@ -5343,7 +5406,7 @@ public RemoteIterator listUploadsUnderPrefix(
p = prefix + "/";
}
// duration tracking is done in iterator.
- return MultipartUtils.listMultipartUploads(storeContext, s3Client, p, maxKeys);
+ return MultipartUtils.listMultipartUploads(storeContext, getS3Client(), p, maxKeys);
}
/**
@@ -5368,7 +5431,7 @@ public List listMultipartUploads(String prefix)
final ListMultipartUploadsRequest request = getRequestFactory()
.newListMultipartUploadsRequestBuilder(p).build();
return trackDuration(getInstrumentation(), MULTIPART_UPLOAD_LIST.getSymbol(), () ->
- s3Client.listMultipartUploads(request).uploads());
+ getS3Client().listMultipartUploads(request).uploads());
});
}
@@ -5383,7 +5446,7 @@ public List listMultipartUploads(String prefix)
public void abortMultipartUpload(String destKey, String uploadId) throws IOException {
LOG.debug("Aborting multipart upload {} to {}", uploadId, destKey);
trackDuration(getInstrumentation(), OBJECT_MULTIPART_UPLOAD_ABORTED.getSymbol(), () ->
- s3Client.abortMultipartUpload(
+ getS3Client().abortMultipartUpload(
getRequestFactory().newAbortMultipartUploadRequestBuilder(
destKey,
uploadId).build()));
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStore.java
index 68eacc35b1..a11ed19670 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStore.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStore.java
@@ -32,6 +32,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.s3a.api.RequestFactory;
+import org.apache.hadoop.fs.s3a.impl.ClientManager;
import org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteException;
import org.apache.hadoop.fs.s3a.impl.StoreContext;
import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
@@ -42,10 +43,14 @@
* Interface for the S3A Store;
* S3 client interactions should be via this; mocking
* is possible for unit tests.
+ *
+ * The {@link ClientManager} interface is used to create the AWS clients;
+ * the base implementation forwards to the implementation of this interface
+ * passed in at construction time.
*/
@InterfaceAudience.LimitedPrivate("Extensions")
@InterfaceStability.Unstable
-public interface S3AStore extends IOStatisticsSource {
+public interface S3AStore extends IOStatisticsSource, ClientManager {
/**
* Acquire write capacity for operations.
@@ -71,6 +76,8 @@ public interface S3AStore extends IOStatisticsSource {
RequestFactory getRequestFactory();
+ ClientManager clientManager();
+
/**
* Perform a bulk object delete operation against S3.
* Increments the {@code OBJECT_DELETE_REQUESTS} and write
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java
index 0b01876ae5..e82eb4c918 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java
@@ -47,8 +47,6 @@
* implementing only the deprecated method will work.
* See https://github.com/apache/hbase-filesystem
*
- * @deprecated This interface will be replaced by one which uses the AWS SDK V2 S3 client as part of
- * upgrading S3A to SDK V2. See HADOOP-18073.
*/
@InterfaceAudience.LimitedPrivate("HBoss")
@InterfaceStability.Evolving
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
index 7c4883c3d9..3bee1008ce 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
@@ -24,6 +24,7 @@
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.audit.AuditStatisticNames;
import org.apache.hadoop.fs.s3a.statistics.StatisticTypeEnum;
+import org.apache.hadoop.fs.statistics.FileSystemStatisticNames;
import org.apache.hadoop.fs.statistics.StoreStatisticNames;
import org.apache.hadoop.fs.statistics.StreamStatisticNames;
@@ -65,6 +66,16 @@ public enum Statistic {
TYPE_DURATION),
/* FileSystem Level statistics */
+
+ FILESYSTEM_INITIALIZATION(
+ FileSystemStatisticNames.FILESYSTEM_INITIALIZATION,
+ "Filesystem initialization",
+ TYPE_DURATION),
+ FILESYSTEM_CLOSE(
+ FileSystemStatisticNames.FILESYSTEM_CLOSE,
+ "Filesystem close",
+ TYPE_DURATION),
+
DIRECTORIES_CREATED("directories_created",
"Total number of directories created through the object store.",
TYPE_COUNTER),
@@ -532,6 +543,11 @@ public enum Statistic {
TYPE_DURATION),
/* General Store operations */
+ STORE_CLIENT_CREATION(
+ StoreStatisticNames.STORE_CLIENT_CREATION,
+ "Store Client Creation",
+ TYPE_DURATION),
+
STORE_EXISTS_PROBE(StoreStatisticNames.STORE_EXISTS_PROBE,
"Store Existence Probe",
TYPE_DURATION),
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ClientManager.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ClientManager.java
new file mode 100644
index 0000000000..84770861cc
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ClientManager.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.transfer.s3.S3TransferManager;
+
+/**
+ * Interface for on-demand/async creation of AWS clients
+ * and extension services.
+ */
+public interface ClientManager extends Closeable {
+
+ /**
+ * Get the transfer manager, creating it and any dependencies if needed.
+ * @return a transfer manager
+ * @throws IOException on any failure to create the manager
+ */
+ S3TransferManager getOrCreateTransferManager()
+ throws IOException;
+
+ S3Client getOrCreateS3Client() throws IOException;
+
+ S3AsyncClient getOrCreateAsyncClient() throws IOException;
+
+ /**
+ * Close operation is required to not raise exceptions.
+ */
+ void close();
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ClientManagerImpl.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ClientManagerImpl.java
new file mode 100644
index 0000000000..ff6748e66d
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ClientManagerImpl.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.transfer.s3.S3TransferManager;
+
+import org.apache.hadoop.fs.s3a.S3ClientFactory;
+import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
+import org.apache.hadoop.util.functional.CallableRaisingIOE;
+import org.apache.hadoop.util.functional.LazyAutoCloseableReference;
+
+import static java.util.Objects.requireNonNull;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.supplyAsync;
+import static org.apache.hadoop.fs.s3a.Statistic.STORE_CLIENT_CREATION;
+import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration;
+import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfOperation;
+import static org.apache.hadoop.util.Preconditions.checkState;
+import static org.apache.hadoop.util.functional.FutureIO.awaitAllFutures;
+
+/**
+ * Client manager for on-demand creation of S3 clients,
+ * with parallelized close of them in {@link #close()}.
+ * Updates {@link org.apache.hadoop.fs.s3a.Statistic#STORE_CLIENT_CREATION}
+ * to track count and duration of client creation.
+ */
+public class ClientManagerImpl implements ClientManager {
+
+ public static final Logger LOG = LoggerFactory.getLogger(ClientManagerImpl.class);
+
+ /**
+ * Client factory to invoke.
+ */
+ private final S3ClientFactory clientFactory;
+
+ /**
+ * Closed flag.
+ */
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+
+ /**
+ * Parameters to create sync/async clients.
+ */
+ private final S3ClientFactory.S3ClientCreationParameters clientCreationParameters;
+
+ /**
+ * Duration tracker factory for creation.
+ */
+ private final DurationTrackerFactory durationTrackerFactory;
+
+ /**
+ * Core S3 client.
+ */
+ private final LazyAutoCloseableReference s3Client;
+
+ /** Async client is used for transfer manager. */
+ private final LazyAutoCloseableReference s3AsyncClient;
+
+ /** Transfer manager. */
+ private final LazyAutoCloseableReference transferManager;
+
+ /**
+ * Constructor.
+ * This does not create any clients.
+ * @param clientFactory client factory to invoke
+ * @param clientCreationParameters creation parameters.
+ * @param durationTrackerFactory duration tracker.
+ */
+ public ClientManagerImpl(
+ final S3ClientFactory clientFactory,
+ final S3ClientFactory.S3ClientCreationParameters clientCreationParameters,
+ final DurationTrackerFactory durationTrackerFactory) {
+ this.clientFactory = requireNonNull(clientFactory);
+ this.clientCreationParameters = requireNonNull(clientCreationParameters);
+ this.durationTrackerFactory = requireNonNull(durationTrackerFactory);
+ this.s3Client = new LazyAutoCloseableReference<>(createS3Client());
+ this.s3AsyncClient = new LazyAutoCloseableReference<>(createAyncClient());
+ this.transferManager = new LazyAutoCloseableReference<>(createTransferManager());
+ }
+
+ /**
+ * Create the function to create the S3 client.
+ * @return a callable which will create the client.
+ */
+ private CallableRaisingIOE createS3Client() {
+ return trackDurationOfOperation(
+ durationTrackerFactory,
+ STORE_CLIENT_CREATION.getSymbol(),
+ () -> clientFactory.createS3Client(getUri(), clientCreationParameters));
+ }
+
+ /**
+ * Create the function to create the S3 Async client.
+ * @return a callable which will create the client.
+ */
+ private CallableRaisingIOE createAyncClient() {
+ return trackDurationOfOperation(
+ durationTrackerFactory,
+ STORE_CLIENT_CREATION.getSymbol(),
+ () -> clientFactory.createS3AsyncClient(getUri(), clientCreationParameters));
+ }
+
+ /**
+ * Create the function to create the Transfer Manager.
+ * @return a callable which will create the component.
+ */
+ private CallableRaisingIOE createTransferManager() {
+ return () -> {
+ final S3AsyncClient asyncClient = s3AsyncClient.eval();
+ return trackDuration(durationTrackerFactory,
+ STORE_CLIENT_CREATION.getSymbol(), () ->
+ clientFactory.createS3TransferManager(asyncClient));
+ };
+ }
+
+ @Override
+ public synchronized S3Client getOrCreateS3Client() throws IOException {
+ checkNotClosed();
+ return s3Client.eval();
+ }
+
+ @Override
+ public synchronized S3AsyncClient getOrCreateAsyncClient() throws IOException {
+ checkNotClosed();
+ return s3AsyncClient.eval();
+ }
+
+ @Override
+ public synchronized S3TransferManager getOrCreateTransferManager() throws IOException {
+ checkNotClosed();
+ return transferManager.eval();
+ }
+
+ /**
+ * Check that the client manager is not closed.
+ * @throws IllegalStateException if it is closed.
+ */
+ private void checkNotClosed() {
+ checkState(!closed.get(), "Client manager is closed");
+ }
+
+ /**
+ * Close() is synchronized to avoid race conditions between
+ * slow client creation and this close operation.
+ *
+ * The objects are all deleted in parallel
+ */
+ @Override
+ public synchronized void close() {
+ if (closed.getAndSet(true)) {
+ // re-entrant close.
+ return;
+ }
+ // queue the closures.
+ List> l = new ArrayList<>();
+ l.add(closeAsync(transferManager));
+ l.add(closeAsync(s3AsyncClient));
+ l.add(closeAsync(s3Client));
+
+ // once all are queued, await their completion
+ // and swallow any exception.
+ try {
+ awaitAllFutures(l);
+ } catch (Exception e) {
+ // should never happen.
+ LOG.warn("Exception in close", e);
+ }
+ }
+
+ /**
+ * Get the URI of the filesystem.
+ * @return URI to use when creating clients.
+ */
+ public URI getUri() {
+ return clientCreationParameters.getPathUri();
+ }
+
+ /**
+ * Queue closing a closeable, logging any exception, and returning null
+ * to use in when awaiting a result.
+ * @param reference closeable.
+ * @param type of closeable
+ * @return null
+ */
+ private CompletableFuture