HADOOP-19205. S3A: initialization/close slower than with v1 SDK (#6892)

Adds new ClientManager interface/implementation which provides on-demand
creation of synchronous and asynchronous s3 clients, s3 transfer manager,
and in close() terminates these.

S3A FS is modified to
* Create a ClientManagerImpl instance and pass down to its S3Store.
* Use the same ClientManager interface against S3Store to demand-create
  the services.
* Only create the async client as part of the transfer manager creation,
  which will take place during the first rename() operation.
* Statistics on client creation count and duration are recorded.
+ Statistics on the time to initialize and shutdown the S3A FS are collected
  in IOStatistics for reporting.

Adds to hadoop common class
  LazyAtomicReference<T> implements CallableRaisingIOE<T>, Supplier<T>
and subclass
  LazyAutoCloseableReference<T extends AutoCloseable>
    extends LazyAtomicReference<T> implements AutoCloseable

These evaluate the Supplier<T>/CallableRaisingIOE<T> they were
constructed with on the first (successful) read of the the value.
Any exception raised during this operation will be rethrown, and on future
evaluations the same operation retried.

These classes implement the Supplier and CallableRaisingIOE
interfaces so can actually be used for to implement lazy function evaluation
as Haskell and some other functional languages do.

LazyAutoCloseableReference is AutoCloseable; its close() method will
close the inner reference if it is set

This class is used in ClientManagerImpl for the lazy S3 Cliehnt creation
and closure.

Contributed by Steve Loughran.
This commit is contained in:
Steve Loughran 2024-07-05 16:38:37 +01:00 committed by GitHub
parent ae76e9475c
commit 4c55adbb6b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 1663 additions and 216 deletions

View File

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.statistics;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Common statistic names for Filesystem-level statistics,
* including internals.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public final class FileSystemStatisticNames {
private FileSystemStatisticNames() {
}
/**
* How long did filesystem initialization take?
*/
public static final String FILESYSTEM_INITIALIZATION = "filesystem_initialization";
/**
* How long did filesystem close take?
*/
public static final String FILESYSTEM_CLOSE = "filesystem_close";
}

View File

@ -176,6 +176,11 @@ public final class StoreStatisticNames {
public static final String DELEGATION_TOKENS_ISSUED
= "delegation_tokens_issued";
/**
* How long did any store client creation take?
*/
public static final String STORE_CLIENT_CREATION = "store_client_creation";
/** Probe for store existing: {@value}. */
public static final String STORE_EXISTS_PROBE
= "store_exists_probe";
@ -200,6 +205,7 @@ public final class StoreStatisticNames {
public static final String STORE_IO_RATE_LIMITED_DURATION
= "store_io_rate_limited_duration";
/**
* A store's equivalent of a paged LIST request was initiated: {@value}.
*/

View File

@ -49,27 +49,6 @@ public static <T> T uncheckIOExceptions(CallableRaisingIOE<T> call) {
}
}
/**
* Wrap a {@link CallableRaisingIOE} as a {@link Supplier}.
* This is similar to {@link CommonCallableSupplier}, except that
* only IOExceptions are caught and wrapped; all other exceptions are
* propagated unchanged.
* @param <T> type of result
*/
private static final class UncheckedIOExceptionSupplier<T> implements Supplier<T> {
private final CallableRaisingIOE<T> call;
private UncheckedIOExceptionSupplier(CallableRaisingIOE<T> call) {
this.call = call;
}
@Override
public T get() {
return uncheckIOExceptions(call);
}
}
/**
* Wrap a {@link CallableRaisingIOE} as a {@link Supplier}.
* @param call call to wrap
@ -77,7 +56,7 @@ public T get() {
* @return a supplier which invokes the call.
*/
public static <T> Supplier<T> toUncheckedIOExceptionSupplier(CallableRaisingIOE<T> call) {
return new UncheckedIOExceptionSupplier<>(call);
return () -> uncheckIOExceptions(call);
}
/**

View File

@ -38,9 +38,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Future IO Helper methods.
* <p>
@ -62,7 +59,6 @@
@InterfaceStability.Unstable
public final class FutureIO {
private static final Logger LOG = LoggerFactory.getLogger(FutureIO.class.getName());
private FutureIO() {
}
@ -129,7 +125,6 @@ public static <T> T awaitFuture(final Future<T> future,
* If any future throws an exception during its execution, this method
* extracts and rethrows that exception.
* </p>
*
* @param collection collection of futures to be evaluated
* @param <T> type of the result.
* @return the list of future's result, if all went well.
@ -140,19 +135,10 @@ public static <T> T awaitFuture(final Future<T> future,
public static <T> List<T> awaitAllFutures(final Collection<Future<T>> collection)
throws InterruptedIOException, IOException, RuntimeException {
List<T> results = new ArrayList<>();
try {
for (Future<T> future : collection) {
results.add(future.get());
}
return results;
} catch (InterruptedException e) {
LOG.debug("Execution of future interrupted ", e);
throw (InterruptedIOException) new InterruptedIOException(e.toString())
.initCause(e);
} catch (ExecutionException e) {
LOG.debug("Execution of future failed with exception", e.getCause());
return raiseInnerCause(e);
for (Future<T> future : collection) {
results.add(awaitFuture(future));
}
return results;
}
/**
@ -163,7 +149,6 @@ public static <T> List<T> awaitAllFutures(final Collection<Future<T>> collection
* the timeout expires, whichever happens first. If any future throws an
* exception during its execution, this method extracts and rethrows that exception.
* </p>
*
* @param collection collection of futures to be evaluated
* @param duration timeout duration
* @param <T> type of the result.
@ -176,21 +161,12 @@ public static <T> List<T> awaitAllFutures(final Collection<Future<T>> collection
public static <T> List<T> awaitAllFutures(final Collection<Future<T>> collection,
final Duration duration)
throws InterruptedIOException, IOException, RuntimeException,
TimeoutException {
TimeoutException {
List<T> results = new ArrayList<>();
try {
for (Future<T> future : collection) {
results.add(future.get(duration.toMillis(), TimeUnit.MILLISECONDS));
}
return results;
} catch (InterruptedException e) {
LOG.debug("Execution of future interrupted ", e);
throw (InterruptedIOException) new InterruptedIOException(e.toString())
.initCause(e);
} catch (ExecutionException e) {
LOG.debug("Execution of future failed with exception", e.getCause());
return raiseInnerCause(e);
for (Future<T> future : collection) {
results.add(awaitFuture(future, duration.toMillis(), TimeUnit.MILLISECONDS));
}
return results;
}
/**
@ -199,7 +175,6 @@ public static <T> List<T> awaitAllFutures(final Collection<Future<T>> collection
* This will always raise an exception, either the inner IOException,
* an inner RuntimeException, or a new IOException wrapping the raised
* exception.
*
* @param e exception.
* @param <T> type of return value.
* @return nothing, ever.
@ -283,12 +258,11 @@ public static IOException unwrapInnerException(final Throwable e) {
* @param <U> type of builder
* @return the builder passed in.
*/
public static <T, U extends FSBuilder<T, U>>
FSBuilder<T, U> propagateOptions(
final FSBuilder<T, U> builder,
final Configuration conf,
final String optionalPrefix,
final String mandatoryPrefix) {
public static <T, U extends FSBuilder<T, U>> FSBuilder<T, U> propagateOptions(
final FSBuilder<T, U> builder,
final Configuration conf,
final String optionalPrefix,
final String mandatoryPrefix) {
propagateOptions(builder, conf,
optionalPrefix, false);
propagateOptions(builder, conf,

View File

@ -0,0 +1,152 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.util.functional;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.util.functional.FunctionalIO.uncheckIOExceptions;
/**
* A lazily constructed reference, whose reference
* constructor is a {@link CallableRaisingIOE} so
* may raise IOExceptions.
* <p>
* This {@code constructor} is only invoked on demand
* when the reference is first needed,
* after which the same value is returned.
* This value MUST NOT be null.
* <p>
* Implements {@link CallableRaisingIOE} and {@code java.util.function.Supplier}.
* An instance of this can therefore be used in a functional IO chain.
* As such, it can act as a delayed and caching invocator of a function:
* the supplier passed in is only ever invoked once, and only when requested.
* @param <T> type of reference
*/
public class LazyAtomicReference<T>
implements CallableRaisingIOE<T>, Supplier<T> {
/**
* Underlying reference.
*/
private final AtomicReference<T> reference = new AtomicReference<>();
/**
* Constructor for lazy creation.
*/
private final CallableRaisingIOE<? extends T> constructor;
/**
* Constructor for this instance.
* @param constructor method to invoke to actually construct the inner object.
*/
public LazyAtomicReference(final CallableRaisingIOE<? extends T> constructor) {
this.constructor = requireNonNull(constructor);
}
/**
* Getter for the constructor.
* @return the constructor class
*/
protected CallableRaisingIOE<? extends T> getConstructor() {
return constructor;
}
/**
* Get the reference.
* Subclasses working with this need to be careful working with this.
* @return the reference.
*/
protected AtomicReference<T> getReference() {
return reference;
}
/**
* Get the value, constructing it if needed.
* @return the value
* @throws IOException on any evaluation failure
* @throws NullPointerException if the evaluated function returned null.
*/
public synchronized T eval() throws IOException {
final T v = reference.get();
if (v != null) {
return v;
}
reference.set(requireNonNull(constructor.apply()));
return reference.get();
}
/**
* Implementation of {@code CallableRaisingIOE.apply()}.
* Invoke {@link #eval()}.
* @return the value
* @throws IOException on any evaluation failure
*/
@Override
public final T apply() throws IOException {
return eval();
}
/**
* Implementation of {@code Supplier.get()}.
* <p>
* Invoke {@link #eval()} and convert IOEs to
* UncheckedIOException.
* <p>
* This is the {@code Supplier.get()} implementation, which allows
* this class to passed into anything taking a supplier.
* @return the value
* @throws UncheckedIOException if the constructor raised an IOException.
*/
@Override
public final T get() throws UncheckedIOException {
return uncheckIOExceptions(this::eval);
}
/**
* Is the reference set?
* @return true if the reference has been set.
*/
public final boolean isSet() {
return reference.get() != null;
}
@Override
public String toString() {
return "LazyAtomicReference{" +
"reference=" + reference + '}';
}
/**
* Create from a supplier.
* This is not a constructor to avoid ambiguity when a lambda-expression is
* passed in.
* @param supplier supplier implementation.
* @return a lazy reference.
* @param <T> type of reference
*/
public static <T> LazyAtomicReference<T> lazyAtomicReferenceFromSupplier(
Supplier<T> supplier) {
return new LazyAtomicReference<>(supplier::get);
}
}

View File

@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.util.functional;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import static org.apache.hadoop.util.Preconditions.checkState;
/**
* A subclass of {@link LazyAtomicReference} which
* holds an {@code AutoCloseable} reference and calls {@code close()}
* when it itself is closed.
* @param <T> type of reference.
*/
public class LazyAutoCloseableReference<T extends AutoCloseable>
extends LazyAtomicReference<T> implements AutoCloseable {
/** Closed flag. */
private final AtomicBoolean closed = new AtomicBoolean(false);
/**
* Constructor for this instance.
* @param constructor method to invoke to actually construct the inner object.
*/
public LazyAutoCloseableReference(final CallableRaisingIOE<? extends T> constructor) {
super(constructor);
}
/**
* {@inheritDoc}
* @throws IllegalStateException if the reference is closed.
*/
@Override
public synchronized T eval() throws IOException {
checkState(!closed.get(), "Reference is closed");
return super.eval();
}
/**
* Is the reference closed?
* @return true if the reference is closed.
*/
public boolean isClosed() {
return closed.get();
}
/**
* Close the reference value if it is non-null.
* Sets the reference to null afterwards, even on
* a failure.
* @throws Exception failure to close.
*/
@Override
public synchronized void close() throws Exception {
if (closed.getAndSet(true)) {
// already closed
return;
}
final T v = getReference().get();
// check the state.
// A null reference means it has not yet been evaluated,
if (v != null) {
try {
v.close();
} finally {
// set the reference to null, even on a failure.
getReference().set(null);
}
}
}
/**
* Create from a supplier.
* This is not a constructor to avoid ambiguity when a lambda-expression is
* passed in.
* @param supplier supplier implementation.
* @return a lazy reference.
* @param <T> type of reference
*/
public static <T extends AutoCloseable> LazyAutoCloseableReference<T> lazyAutoCloseablefromSupplier(Supplier<T> supplier) {
return new LazyAutoCloseableReference<>(supplier::get);
}
}

View File

@ -0,0 +1,263 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.util.functional;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.UnknownHostException;
import java.util.concurrent.atomic.AtomicInteger;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.apache.hadoop.test.AbstractHadoopTestBase;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.apache.hadoop.test.LambdaTestUtils.verifyCause;
import static org.apache.hadoop.util.Preconditions.checkState;
/**
* Test {@link LazyAtomicReference} and {@link LazyAutoCloseableReference}.
*/
public class TestLazyReferences extends AbstractHadoopTestBase {
/**
* Format of exceptions to raise.
*/
private static final String GENERATED = "generated[%d]";
/**
* Invocation counter, can be asserted on in {@link #assertCounterValue(int)}.
*/
private final AtomicInteger counter = new AtomicInteger();
/**
* Assert that {@link #counter} has a specific value.
* @param val expected value
*/
private void assertCounterValue(final int val) {
assertAtomicIntValue(counter, val);
}
/**
* Assert an atomic integer has a specific value.
* @param ai atomic integer
* @param val expected value
*/
private static void assertAtomicIntValue(
final AtomicInteger ai, final int val) {
Assertions.assertThat(ai.get())
.describedAs("value of atomic integer %s", ai)
.isEqualTo(val);
}
/**
* Test the underlying {@link LazyAtomicReference} integration with java
* Supplier API.
*/
@Test
public void testLazyAtomicReference() throws Throwable {
LazyAtomicReference<Integer> ref = new LazyAtomicReference<>(counter::incrementAndGet);
// constructor does not invoke the supplier
assertCounterValue(0);
assertSetState(ref, false);
// second invocation does not
Assertions.assertThat(ref.eval())
.describedAs("first eval()")
.isEqualTo(1);
assertCounterValue(1);
assertSetState(ref, true);
// Callable.apply() returns the same value
Assertions.assertThat(ref.apply())
.describedAs("second get of %s", ref)
.isEqualTo(1);
// no new counter increment
assertCounterValue(1);
}
/**
* Assert that {@link LazyAtomicReference#isSet()} is in the expected state.
* @param ref reference
* @param expected expected value
*/
private static <T> void assertSetState(final LazyAtomicReference<T> ref,
final boolean expected) {
Assertions.assertThat(ref.isSet())
.describedAs("isSet() of %s", ref)
.isEqualTo(expected);
}
/**
* Test the underlying {@link LazyAtomicReference} integration with java
* Supplier API.
*/
@Test
public void testSupplierIntegration() throws Throwable {
LazyAtomicReference<Integer> ref = LazyAtomicReference.lazyAtomicReferenceFromSupplier(counter::incrementAndGet);
// constructor does not invoke the supplier
assertCounterValue(0);
assertSetState(ref, false);
// second invocation does not
Assertions.assertThat(ref.get())
.describedAs("first get()")
.isEqualTo(1);
assertCounterValue(1);
// Callable.apply() returns the same value
Assertions.assertThat(ref.apply())
.describedAs("second get of %s", ref)
.isEqualTo(1);
// no new counter increment
assertCounterValue(1);
}
/**
* Test failure handling. through the supplier API.
*/
@Test
public void testSupplierIntegrationFailureHandling() throws Throwable {
LazyAtomicReference<Integer> ref = new LazyAtomicReference<>(() -> {
throw new UnknownHostException(String.format(GENERATED, counter.incrementAndGet()));
});
// the get() call will wrap the raised exception, which can be extracted
// and type checked.
verifyCause(UnknownHostException.class,
intercept(UncheckedIOException.class, "[1]", ref::get));
assertSetState(ref, false);
// counter goes up
intercept(UncheckedIOException.class, "[2]", ref::get);
}
@Test
public void testAutoCloseable() throws Throwable {
final LazyAutoCloseableReference<CloseableClass> ref =
LazyAutoCloseableReference.lazyAutoCloseablefromSupplier(CloseableClass::new);
assertSetState(ref, false);
ref.eval();
final CloseableClass closeable = ref.get();
Assertions.assertThat(closeable.isClosed())
.describedAs("closed flag of %s", closeable)
.isFalse();
// first close will close the class.
ref.close();
Assertions.assertThat(ref.isClosed())
.describedAs("closed flag of %s", ref)
.isTrue();
Assertions.assertThat(closeable.isClosed())
.describedAs("closed flag of %s", closeable)
.isTrue();
// second close will not raise an exception
ref.close();
// you cannot eval() a closed reference
intercept(IllegalStateException.class, "Reference is closed", ref::eval);
intercept(IllegalStateException.class, "Reference is closed", ref::get);
intercept(IllegalStateException.class, "Reference is closed", ref::apply);
Assertions.assertThat(ref.getReference().get())
.describedAs("inner referece of %s", ref)
.isNull();
}
/**
* Not an error to close a reference which was never evaluated.
*/
@Test
public void testCloseableUnevaluated() throws Throwable {
final LazyAutoCloseableReference<CloseableRaisingException> ref =
new LazyAutoCloseableReference<>(CloseableRaisingException::new);
ref.close();
ref.close();
}
/**
* If the close() call fails, that only raises an exception on the first attempt,
* and the reference is set to null.
*/
@Test
public void testAutoCloseableFailureHandling() throws Throwable {
final LazyAutoCloseableReference<CloseableRaisingException> ref =
new LazyAutoCloseableReference<>(CloseableRaisingException::new);
ref.eval();
// close reports the failure.
intercept(IOException.class, "raised", ref::close);
// but the reference is set to null
assertSetState(ref, false);
// second attept does nothing, so will not raise an exception.p
ref.close();
}
/**
* Closeable which sets the closed flag on close().
*/
private static final class CloseableClass implements AutoCloseable {
/** closed flag. */
private boolean closed;
/**
* Close the resource.
* @throws IllegalArgumentException if already closed.
*/
@Override
public void close() {
checkState(!closed, "Already closed");
closed = true;
}
/**
* Get the closed flag.
* @return the state.
*/
private boolean isClosed() {
return closed;
}
}
/**
* Closeable which raises an IOE in close().
*/
private static final class CloseableRaisingException implements AutoCloseable {
@Override
public void close() throws Exception {
throw new IOException("raised");
}
}
}

View File

@ -64,11 +64,6 @@
<Field name="futurePool"/>
<Bug pattern="IS2_INCONSISTENT_SYNC"/>
</Match>
<Match>
<Class name="org.apache.hadoop.fs.s3a.S3AFileSystem"/>
<Field name="s3AsyncClient"/>
<Bug pattern="IS2_INCONSISTENT_SYNC"/>
</Match>
<Match>
<Class name="org.apache.hadoop.fs.s3a.s3guard.S3GuardTool$BucketInfo"/>
<Method name="run"/>

View File

@ -54,7 +54,6 @@
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse;
@ -88,7 +87,6 @@
import software.amazon.awssdk.transfer.s3.model.CompletedCopy;
import software.amazon.awssdk.transfer.s3.model.CompletedFileUpload;
import software.amazon.awssdk.transfer.s3.model.Copy;
import software.amazon.awssdk.transfer.s3.S3TransferManager;
import software.amazon.awssdk.transfer.s3.model.CopyRequest;
import software.amazon.awssdk.transfer.s3.model.FileUpload;
import software.amazon.awssdk.transfer.s3.model.UploadFileRequest;
@ -123,6 +121,8 @@
import org.apache.hadoop.fs.s3a.impl.BulkDeleteOperation;
import org.apache.hadoop.fs.s3a.impl.BulkDeleteOperationCallbacksImpl;
import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
import org.apache.hadoop.fs.s3a.impl.ClientManager;
import org.apache.hadoop.fs.s3a.impl.ClientManagerImpl;
import org.apache.hadoop.fs.s3a.impl.ConfigurationHelper;
import org.apache.hadoop.fs.s3a.impl.ContextAccessors;
import org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation;
@ -152,6 +152,7 @@
import org.apache.hadoop.fs.s3a.tools.MarkerToolOperationsImpl;
import org.apache.hadoop.fs.statistics.DurationTracker;
import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
import org.apache.hadoop.fs.statistics.FileSystemStatisticNames;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.statistics.IOStatisticsContext;
@ -305,11 +306,13 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
*/
private S3AStore store;
/**
* The core S3 client is created and managed by the ClientManager.
* It is copied here within {@link #initialize(URI, Configuration)}.
* Some mocking tests modify this so take care with changes.
*/
private S3Client s3Client;
/** Async client is used for transfer manager. */
private S3AsyncClient s3AsyncClient;
// initial callback policy is fail-once; it's there just to assist
// some mock tests and other codepaths trying to call the low level
// APIs on an uninitialized filesystem.
@ -328,7 +331,6 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
private Listing listing;
private long partSize;
private boolean enableMultiObjectsDelete;
private S3TransferManager transferManager;
private ExecutorService boundedThreadPool;
private ThreadPoolExecutor unboundedThreadPool;
@ -548,6 +550,9 @@ public void initialize(URI name, Configuration originalConf)
// get the host; this is guaranteed to be non-null, non-empty
bucket = name.getHost();
AuditSpan span = null;
// track initialization duration; will only be set after
// statistics are set up.
Optional<DurationTracker> trackInitialization = Optional.empty();
try {
LOG.debug("Initializing S3AFileSystem for {}", bucket);
if (LOG.isTraceEnabled()) {
@ -592,6 +597,18 @@ public void initialize(URI name, Configuration originalConf)
super.initialize(uri, conf);
setConf(conf);
// initialize statistics, after which statistics
// can be collected.
instrumentation = new S3AInstrumentation(uri);
initializeStatisticsBinding();
// track initialization duration.
// this should really be done in a onceTrackingDuration() call,
// but then all methods below would need to be in the lambda and
// it would create a merge/backport headache for all.
trackInitialization = Optional.of(
instrumentation.trackDuration(FileSystemStatisticNames.FILESYSTEM_INITIALIZATION));
s3aInternals = createS3AInternals();
// look for encryption data
@ -600,8 +617,7 @@ public void initialize(URI name, Configuration originalConf)
buildEncryptionSecrets(bucket, conf));
invoker = new Invoker(new S3ARetryPolicy(getConf()), onRetry);
instrumentation = new S3AInstrumentation(uri);
initializeStatisticsBinding();
// If CSE-KMS method is set then CSE is enabled.
isCSEEnabled = S3AEncryptionMethods.CSE_KMS.getMethod()
.equals(getS3EncryptionAlgorithm().getMethod());
@ -687,7 +703,7 @@ public void initialize(URI name, Configuration originalConf)
// the FS came with a DT
// this may do some patching of the configuration (e.g. setting
// the encryption algorithms)
bindAWSClient(name, delegationTokensEnabled);
ClientManager clientManager = createClientManager(name, delegationTokensEnabled);
inputPolicy = S3AInputPolicy.getPolicy(
conf.getTrimmed(INPUT_FADVISE,
@ -762,36 +778,55 @@ public void initialize(URI name, Configuration originalConf)
int rateLimitCapacity = intOption(conf, S3A_IO_RATE_LIMIT, DEFAULT_S3A_IO_RATE_LIMIT, 0);
// now create the store
store = new S3AStoreBuilder()
.withS3Client(s3Client)
.withDurationTrackerFactory(getDurationTrackerFactory())
.withStoreContextFactory(this)
.withAuditSpanSource(getAuditManager())
.withInstrumentation(getInstrumentation())
.withStatisticsContext(statisticsContext)
.withStorageStatistics(getStorageStatistics())
.withReadRateLimiter(unlimitedRate())
.withWriteRateLimiter(RateLimitingFactory.create(rateLimitCapacity))
.build();
store = createS3AStore(clientManager, rateLimitCapacity);
// the s3 client is created through the store, rather than
// directly through the client manager.
// this is to aid mocking.
s3Client = store.getOrCreateS3Client();
// The filesystem is now ready to perform operations against
// S3
// This initiates a probe against S3 for the bucket existing.
doBucketProbing();
initMultipartUploads(conf);
trackInitialization.ifPresent(DurationTracker::close);
} catch (SdkException e) {
// amazon client exception: stop all services then throw the translation
cleanupWithLogger(LOG, span);
stopAllServices();
trackInitialization.ifPresent(DurationTracker::failed);
throw translateException("initializing ", new Path(name), e);
} catch (IOException | RuntimeException e) {
// other exceptions: stop the services.
cleanupWithLogger(LOG, span);
stopAllServices();
trackInitialization.ifPresent(DurationTracker::failed);
throw e;
}
}
/**
* Create the S3AStore instance.
* This is protected so that tests can override it.
* @param clientManager client manager
* @param rateLimitCapacity rate limit
* @return a new store instance
*/
@VisibleForTesting
protected S3AStore createS3AStore(final ClientManager clientManager,
final int rateLimitCapacity) {
return new S3AStoreBuilder()
.withClientManager(clientManager)
.withDurationTrackerFactory(getDurationTrackerFactory())
.withStoreContextFactory(this)
.withAuditSpanSource(getAuditManager())
.withInstrumentation(getInstrumentation())
.withStatisticsContext(statisticsContext)
.withStorageStatistics(getStorageStatistics())
.withReadRateLimiter(unlimitedRate())
.withWriteRateLimiter(RateLimitingFactory.create(rateLimitCapacity))
.build();
}
/**
* Populates the configurations related to vectored IO operation
* in the context which has to passed down to input streams.
@ -965,7 +1000,7 @@ protected void verifyBucketExists() throws UnknownStoreException, IOException {
STORE_EXISTS_PROBE, bucket, null, () ->
invoker.retry("doesBucketExist", bucket, true, () -> {
try {
s3Client.headBucket(HeadBucketRequest.builder().bucket(bucket).build());
getS3Client().headBucket(HeadBucketRequest.builder().bucket(bucket).build());
return true;
} catch (AwsServiceException ex) {
int statusCode = ex.statusCode();
@ -1014,14 +1049,22 @@ public Listing getListing() {
/**
* Set up the client bindings.
* If delegation tokens are enabled, the FS first looks for a DT
* ahead of any other bindings;.
* ahead of any other bindings.
* If there is a DT it uses that to do the auth
* and switches to the DT authenticator automatically (and exclusively)
* @param name URI of the FS
* and switches to the DT authenticator automatically (and exclusively).
* <p>
* Delegation tokens are configured and started, but the actual
* S3 clients are not: instead a {@link ClientManager} is created
* and returned, from which they can be created on demand.
* This is to reduce delays in FS initialization, especially
* for features (transfer manager, async client) which are not
* always used.
* @param fsURI URI of the FS
* @param dtEnabled are delegation tokens enabled?
* @return the client manager which can generate the clients.
* @throws IOException failure.
*/
private void bindAWSClient(URI name, boolean dtEnabled) throws IOException {
private ClientManager createClientManager(URI fsURI, boolean dtEnabled) throws IOException {
Configuration conf = getConf();
credentials = null;
String uaSuffix = "";
@ -1059,7 +1102,7 @@ private void bindAWSClient(URI name, boolean dtEnabled) throws IOException {
uaSuffix = tokens.getUserAgentField();
} else {
// DT support is disabled, so create the normal credential chain
credentials = createAWSCredentialProviderList(name, conf);
credentials = createAWSCredentialProviderList(fsURI, conf);
}
LOG.debug("Using credential provider {}", credentials);
Class<? extends S3ClientFactory> s3ClientFactoryClass = conf.getClass(
@ -1069,7 +1112,7 @@ private void bindAWSClient(URI name, boolean dtEnabled) throws IOException {
S3ClientFactory.S3ClientCreationParameters parameters =
new S3ClientFactory.S3ClientCreationParameters()
.withCredentialSet(credentials)
.withPathUri(name)
.withPathUri(fsURI)
.withEndpoint(endpoint)
.withMetrics(statisticsContext.newStatisticsFromAwsSdk())
.withPathStyleAccess(conf.getBoolean(PATH_STYLE_ACCESS, false))
@ -1088,22 +1131,27 @@ private void bindAWSClient(URI name, boolean dtEnabled) throws IOException {
conf.getBoolean(CHECKSUM_VALIDATION, CHECKSUM_VALIDATION_DEFAULT));
S3ClientFactory clientFactory = ReflectionUtils.newInstance(s3ClientFactoryClass, conf);
s3Client = clientFactory.createS3Client(getUri(), parameters);
createS3AsyncClient(clientFactory, parameters);
transferManager = clientFactory.createS3TransferManager(getS3AsyncClient());
// this is where clients and the transfer manager are created on demand.
return createClientManager(clientFactory, parameters, getDurationTrackerFactory());
}
/**
* Creates and configures the S3AsyncClient.
* Uses synchronized method to suppress spotbugs error.
*
* @param clientFactory factory used to create S3AsyncClient
* @param parameters parameter object
* @throws IOException on any IO problem
* Create the Client Manager; protected to allow for mocking.
* Requires {@link #unboundedThreadPool} to be initialized.
* @param clientFactory (reflection-bonded) client factory.
* @param clientCreationParameters parameters for client creation.
* @param durationTrackerFactory factory for duration tracking.
* @return a client manager instance.
*/
private void createS3AsyncClient(S3ClientFactory clientFactory,
S3ClientFactory.S3ClientCreationParameters parameters) throws IOException {
s3AsyncClient = clientFactory.createS3AsyncClient(getUri(), parameters);
@VisibleForTesting
protected ClientManager createClientManager(
final S3ClientFactory clientFactory,
final S3ClientFactory.S3ClientCreationParameters clientCreationParameters,
final DurationTrackerFactory durationTrackerFactory) {
return new ClientManagerImpl(clientFactory,
clientCreationParameters,
durationTrackerFactory
);
}
/**
@ -1241,14 +1289,6 @@ public RequestFactory getRequestFactory() {
return requestFactory;
}
/**
* Get the S3 Async client.
* @return the async s3 client.
*/
private S3AsyncClient getS3AsyncClient() {
return s3AsyncClient;
}
/**
* Implementation of all operations used by delegation tokens.
*/
@ -1335,7 +1375,8 @@ public void abortOutstandingMultipartUploads(long seconds)
invoker.retry("Purging multipart uploads", bucket, true,
() -> {
RemoteIterator<MultipartUpload> uploadIterator =
MultipartUtils.listMultipartUploads(createStoreContext(), s3Client, null, maxKeys);
MultipartUtils.listMultipartUploads(createStoreContext(),
getS3Client(), null, maxKeys);
while (uploadIterator.hasNext()) {
MultipartUpload upload = uploadIterator.next();
@ -1395,12 +1436,23 @@ public int getDefaultPort() {
* Set the client -used in mocking tests to force in a different client.
* @param client client.
*/
@VisibleForTesting
protected void setAmazonS3Client(S3Client client) {
Preconditions.checkNotNull(client, "clientV2");
LOG.debug("Setting S3V2 client to {}", client);
s3Client = client;
}
/**
* Get the S3 client created in {@link #initialize(URI, Configuration)}.
* @return the s3Client
* @throws UncheckedIOException if the client could not be created.
*/
@VisibleForTesting
protected S3Client getS3Client() {
return s3Client;
}
/**
* S3AInternals method.
* {@inheritDoc}.
@ -1437,7 +1489,7 @@ private final class S3AInternalsImpl implements S3AInternals {
@Override
public S3Client getAmazonS3Client(String reason) {
LOG.debug("Access to S3 client requested, reason {}", reason);
return s3Client;
return getS3Client();
}
@Override
@ -1470,7 +1522,7 @@ public String getBucketLocation(String bucketName) throws IOException {
// If accessPoint then region is known from Arn
accessPoint != null
? accessPoint.getRegion()
: s3Client.getBucketLocation(GetBucketLocationRequest.builder()
: getS3Client().getBucketLocation(GetBucketLocationRequest.builder()
.bucket(bucketName)
.build())
.locationConstraintAsString()));
@ -1859,7 +1911,7 @@ public GetObjectRequest.Builder newGetRequestBuilder(final String key) {
public ResponseInputStream<GetObjectResponse> getObject(GetObjectRequest request) {
// active the audit span used for the operation
try (AuditSpan span = auditSpan.activate()) {
return s3Client.getObject(request);
return getS3Client().getObject(request);
}
}
@ -1888,7 +1940,7 @@ private final class WriteOperationHelperCallbacksImpl
@Override
public CompleteMultipartUploadResponse completeMultipartUpload(
CompleteMultipartUploadRequest request) {
return s3Client.completeMultipartUpload(request);
return getS3Client().completeMultipartUpload(request);
}
}
@ -2926,7 +2978,8 @@ protected HeadObjectResponse getObjectMetadata(String key,
if (changeTracker != null) {
changeTracker.maybeApplyConstraint(requestBuilder);
}
HeadObjectResponse headObjectResponse = s3Client.headObject(requestBuilder.build());
HeadObjectResponse headObjectResponse = getS3Client()
.headObject(requestBuilder.build());
if (changeTracker != null) {
changeTracker.processMetadata(headObjectResponse, operation);
}
@ -2960,7 +3013,7 @@ protected HeadBucketResponse getBucketMetadata() throws IOException {
final HeadBucketResponse response = trackDurationAndSpan(STORE_EXISTS_PROBE, bucket, null,
() -> invoker.retry("getBucketMetadata()", bucket, true, () -> {
try {
return s3Client.headBucket(
return getS3Client().headBucket(
getRequestFactory().newHeadBucketRequestBuilder(bucket).build());
} catch (NoSuchBucketException e) {
throw new UnknownStoreException("s3a://" + bucket + "/", " Bucket does " + "not exist");
@ -2995,9 +3048,9 @@ protected S3ListResult listObjects(S3ListRequest request,
OBJECT_LIST_REQUEST,
() -> {
if (useListV1) {
return S3ListResult.v1(s3Client.listObjects(request.getV1()));
return S3ListResult.v1(getS3Client().listObjects(request.getV1()));
} else {
return S3ListResult.v2(s3Client.listObjectsV2(request.getV2()));
return S3ListResult.v2(getS3Client().listObjectsV2(request.getV2()));
}
}));
}
@ -3050,10 +3103,10 @@ protected S3ListResult continueListObjects(S3ListRequest request,
nextMarker = prevListResult.get(prevListResult.size() - 1).key();
}
return S3ListResult.v1(s3Client.listObjects(
return S3ListResult.v1(getS3Client().listObjects(
request.getV1().toBuilder().marker(nextMarker).build()));
} else {
return S3ListResult.v2(s3Client.listObjectsV2(request.getV2().toBuilder()
return S3ListResult.v2(getS3Client().listObjectsV2(request.getV2().toBuilder()
.continuationToken(prevResult.getV2().nextContinuationToken()).build()));
}
}));
@ -3185,15 +3238,16 @@ public PutObjectRequest.Builder newPutObjectRequestBuilder(String key,
* @param file the file to be uploaded
* @param listener the progress listener for the request
* @return the upload initiated
* @throws IOException if transfer manager creation failed.
*/
@Retries.OnceRaw
public UploadInfo putObject(PutObjectRequest putObjectRequest, File file,
ProgressableProgressListener listener) {
ProgressableProgressListener listener) throws IOException {
long len = getPutRequestLength(putObjectRequest);
LOG.debug("PUT {} bytes to {} via transfer manager ", len, putObjectRequest.key());
incrementPutStartStatistics(len);
FileUpload upload = transferManager.uploadFile(
FileUpload upload = store.getOrCreateTransferManager().uploadFile(
UploadFileRequest.builder()
.putObjectRequest(putObjectRequest)
.source(file)
@ -3233,9 +3287,10 @@ PutObjectResponse putObjectDirect(PutObjectRequest putObjectRequest,
PutObjectResponse response =
trackDurationOfSupplier(nonNullDurationTrackerFactory(durationTrackerFactory),
OBJECT_PUT_REQUESTS.getSymbol(),
() -> isFile ?
s3Client.putObject(putObjectRequest, RequestBody.fromFile(uploadData.getFile())) :
s3Client.putObject(putObjectRequest,
() -> isFile
? getS3Client().putObject(putObjectRequest,
RequestBody.fromFile(uploadData.getFile()))
: getS3Client().putObject(putObjectRequest,
RequestBody.fromInputStream(uploadData.getUploadStream(),
putObjectRequest.contentLength())));
incrementPutCompletedStatistics(true, len);
@ -3285,7 +3340,7 @@ UploadPartResponse uploadPart(UploadPartRequest request, RequestBody body,
UploadPartResponse uploadPartResponse = trackDurationOfSupplier(
nonNullDurationTrackerFactory(durationTrackerFactory),
MULTIPART_UPLOAD_PART_PUT.getSymbol(), () ->
s3Client.uploadPart(request, body));
getS3Client().uploadPart(request, body));
incrementPutCompletedStatistics(true, len);
return uploadPartResponse;
} catch (AwsServiceException e) {
@ -4344,35 +4399,43 @@ public void close() throws IOException {
* both the expected state of this FS and of failures while being stopped.
*/
protected synchronized void stopAllServices() {
closeAutocloseables(LOG, transferManager,
s3Client,
getS3AsyncClient());
transferManager = null;
s3Client = null;
s3AsyncClient = null;
try {
trackDuration(getDurationTrackerFactory(), FILESYSTEM_CLOSE.getSymbol(), () -> {
closeAutocloseables(LOG, store);
store = null;
s3Client = null;
// At this point the S3A client is shut down,
// now the executor pools are closed
HadoopExecutors.shutdown(boundedThreadPool, LOG,
THREAD_POOL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
boundedThreadPool = null;
HadoopExecutors.shutdown(unboundedThreadPool, LOG,
THREAD_POOL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
unboundedThreadPool = null;
if (futurePool != null) {
futurePool.shutdown(LOG, THREAD_POOL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
futurePool = null;
// At this point the S3A client is shut down,
// now the executor pools are closed
HadoopExecutors.shutdown(boundedThreadPool, LOG,
THREAD_POOL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
boundedThreadPool = null;
HadoopExecutors.shutdown(unboundedThreadPool, LOG,
THREAD_POOL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
unboundedThreadPool = null;
if (futurePool != null) {
futurePool.shutdown(LOG, THREAD_POOL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
futurePool = null;
}
// other services are shutdown.
cleanupWithLogger(LOG,
delegationTokens.orElse(null),
signerManager,
auditManager);
closeAutocloseables(LOG, credentials);
delegationTokens = Optional.empty();
signerManager = null;
credentials = null;
return null;
});
} catch (IOException e) {
// failure during shutdown.
// this should only be from the signature of trackDurationAndSpan().
LOG.warn("Failure during service shutdown", e);
}
// and once this duration has been tracked, close the statistics
// other services are shutdown.
cleanupWithLogger(LOG,
instrumentation,
delegationTokens.orElse(null),
signerManager,
auditManager);
closeAutocloseables(LOG, credentials);
delegationTokens = Optional.empty();
signerManager = null;
credentials = null;
cleanupWithLogger(LOG, instrumentation);
}
/**
@ -4559,7 +4622,7 @@ private CopyObjectResponse copyFile(String srcKey, String dstKey, long size,
() -> {
incrementStatistic(OBJECT_COPY_REQUESTS);
Copy copy = transferManager.copy(
Copy copy = store.getOrCreateTransferManager().copy(
CopyRequest.builder()
.copyObjectRequest(copyRequest)
.build());
@ -4589,7 +4652,7 @@ private CopyObjectResponse copyFile(String srcKey, String dstKey, long size,
LOG.debug("copyFile: single part copy {} -> {} of size {}", srcKey, dstKey, size);
incrementStatistic(OBJECT_COPY_REQUESTS);
try {
return s3Client.copyObject(copyRequest);
return getS3Client().copyObject(copyRequest);
} catch (SdkException awsException) {
// if this is a 412 precondition failure, it may
// be converted to a RemoteFileChangedException
@ -4620,7 +4683,7 @@ CreateMultipartUploadResponse initiateMultipartUpload(
LOG.debug("Initiate multipart upload to {}", request.key());
return trackDurationOfSupplier(getDurationTrackerFactory(),
OBJECT_MULTIPART_UPLOAD_INITIATED.getSymbol(),
() -> s3Client.createMultipartUpload(request));
() -> getS3Client().createMultipartUpload(request));
}
/**
@ -5343,7 +5406,7 @@ public RemoteIterator<MultipartUpload> listUploadsUnderPrefix(
p = prefix + "/";
}
// duration tracking is done in iterator.
return MultipartUtils.listMultipartUploads(storeContext, s3Client, p, maxKeys);
return MultipartUtils.listMultipartUploads(storeContext, getS3Client(), p, maxKeys);
}
/**
@ -5368,7 +5431,7 @@ public List<MultipartUpload> listMultipartUploads(String prefix)
final ListMultipartUploadsRequest request = getRequestFactory()
.newListMultipartUploadsRequestBuilder(p).build();
return trackDuration(getInstrumentation(), MULTIPART_UPLOAD_LIST.getSymbol(), () ->
s3Client.listMultipartUploads(request).uploads());
getS3Client().listMultipartUploads(request).uploads());
});
}
@ -5383,7 +5446,7 @@ public List<MultipartUpload> listMultipartUploads(String prefix)
public void abortMultipartUpload(String destKey, String uploadId) throws IOException {
LOG.debug("Aborting multipart upload {} to {}", uploadId, destKey);
trackDuration(getInstrumentation(), OBJECT_MULTIPART_UPLOAD_ABORTED.getSymbol(), () ->
s3Client.abortMultipartUpload(
getS3Client().abortMultipartUpload(
getRequestFactory().newAbortMultipartUploadRequestBuilder(
destKey,
uploadId).build()));

View File

@ -32,6 +32,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.s3a.api.RequestFactory;
import org.apache.hadoop.fs.s3a.impl.ClientManager;
import org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteException;
import org.apache.hadoop.fs.s3a.impl.StoreContext;
import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
@ -42,10 +43,14 @@
* Interface for the S3A Store;
* S3 client interactions should be via this; mocking
* is possible for unit tests.
* <p>
* The {@link ClientManager} interface is used to create the AWS clients;
* the base implementation forwards to the implementation of this interface
* passed in at construction time.
*/
@InterfaceAudience.LimitedPrivate("Extensions")
@InterfaceStability.Unstable
public interface S3AStore extends IOStatisticsSource {
public interface S3AStore extends IOStatisticsSource, ClientManager {
/**
* Acquire write capacity for operations.
@ -71,6 +76,8 @@ public interface S3AStore extends IOStatisticsSource {
RequestFactory getRequestFactory();
ClientManager clientManager();
/**
* Perform a bulk object delete operation against S3.
* Increments the {@code OBJECT_DELETE_REQUESTS} and write

View File

@ -47,8 +47,6 @@
* implementing only the deprecated method will work.
* See https://github.com/apache/hbase-filesystem
*
* @deprecated This interface will be replaced by one which uses the AWS SDK V2 S3 client as part of
* upgrading S3A to SDK V2. See HADOOP-18073.
*/
@InterfaceAudience.LimitedPrivate("HBoss")
@InterfaceStability.Evolving

View File

@ -24,6 +24,7 @@
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.audit.AuditStatisticNames;
import org.apache.hadoop.fs.s3a.statistics.StatisticTypeEnum;
import org.apache.hadoop.fs.statistics.FileSystemStatisticNames;
import org.apache.hadoop.fs.statistics.StoreStatisticNames;
import org.apache.hadoop.fs.statistics.StreamStatisticNames;
@ -65,6 +66,16 @@ public enum Statistic {
TYPE_DURATION),
/* FileSystem Level statistics */
FILESYSTEM_INITIALIZATION(
FileSystemStatisticNames.FILESYSTEM_INITIALIZATION,
"Filesystem initialization",
TYPE_DURATION),
FILESYSTEM_CLOSE(
FileSystemStatisticNames.FILESYSTEM_CLOSE,
"Filesystem close",
TYPE_DURATION),
DIRECTORIES_CREATED("directories_created",
"Total number of directories created through the object store.",
TYPE_COUNTER),
@ -532,6 +543,11 @@ public enum Statistic {
TYPE_DURATION),
/* General Store operations */
STORE_CLIENT_CREATION(
StoreStatisticNames.STORE_CLIENT_CREATION,
"Store Client Creation",
TYPE_DURATION),
STORE_EXISTS_PROBE(StoreStatisticNames.STORE_EXISTS_PROBE,
"Store Existence Probe",
TYPE_DURATION),

View File

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.impl;
import java.io.Closeable;
import java.io.IOException;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.transfer.s3.S3TransferManager;
/**
* Interface for on-demand/async creation of AWS clients
* and extension services.
*/
public interface ClientManager extends Closeable {
/**
* Get the transfer manager, creating it and any dependencies if needed.
* @return a transfer manager
* @throws IOException on any failure to create the manager
*/
S3TransferManager getOrCreateTransferManager()
throws IOException;
S3Client getOrCreateS3Client() throws IOException;
S3AsyncClient getOrCreateAsyncClient() throws IOException;
/**
* Close operation is required to not raise exceptions.
*/
void close();
}

View File

@ -0,0 +1,238 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.impl;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.transfer.s3.S3TransferManager;
import org.apache.hadoop.fs.s3a.S3ClientFactory;
import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
import org.apache.hadoop.util.functional.CallableRaisingIOE;
import org.apache.hadoop.util.functional.LazyAutoCloseableReference;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.supplyAsync;
import static org.apache.hadoop.fs.s3a.Statistic.STORE_CLIENT_CREATION;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfOperation;
import static org.apache.hadoop.util.Preconditions.checkState;
import static org.apache.hadoop.util.functional.FutureIO.awaitAllFutures;
/**
* Client manager for on-demand creation of S3 clients,
* with parallelized close of them in {@link #close()}.
* Updates {@link org.apache.hadoop.fs.s3a.Statistic#STORE_CLIENT_CREATION}
* to track count and duration of client creation.
*/
public class ClientManagerImpl implements ClientManager {
public static final Logger LOG = LoggerFactory.getLogger(ClientManagerImpl.class);
/**
* Client factory to invoke.
*/
private final S3ClientFactory clientFactory;
/**
* Closed flag.
*/
private final AtomicBoolean closed = new AtomicBoolean(false);
/**
* Parameters to create sync/async clients.
*/
private final S3ClientFactory.S3ClientCreationParameters clientCreationParameters;
/**
* Duration tracker factory for creation.
*/
private final DurationTrackerFactory durationTrackerFactory;
/**
* Core S3 client.
*/
private final LazyAutoCloseableReference<S3Client> s3Client;
/** Async client is used for transfer manager. */
private final LazyAutoCloseableReference<S3AsyncClient> s3AsyncClient;
/** Transfer manager. */
private final LazyAutoCloseableReference<S3TransferManager> transferManager;
/**
* Constructor.
* This does not create any clients.
* @param clientFactory client factory to invoke
* @param clientCreationParameters creation parameters.
* @param durationTrackerFactory duration tracker.
*/
public ClientManagerImpl(
final S3ClientFactory clientFactory,
final S3ClientFactory.S3ClientCreationParameters clientCreationParameters,
final DurationTrackerFactory durationTrackerFactory) {
this.clientFactory = requireNonNull(clientFactory);
this.clientCreationParameters = requireNonNull(clientCreationParameters);
this.durationTrackerFactory = requireNonNull(durationTrackerFactory);
this.s3Client = new LazyAutoCloseableReference<>(createS3Client());
this.s3AsyncClient = new LazyAutoCloseableReference<>(createAyncClient());
this.transferManager = new LazyAutoCloseableReference<>(createTransferManager());
}
/**
* Create the function to create the S3 client.
* @return a callable which will create the client.
*/
private CallableRaisingIOE<S3Client> createS3Client() {
return trackDurationOfOperation(
durationTrackerFactory,
STORE_CLIENT_CREATION.getSymbol(),
() -> clientFactory.createS3Client(getUri(), clientCreationParameters));
}
/**
* Create the function to create the S3 Async client.
* @return a callable which will create the client.
*/
private CallableRaisingIOE<S3AsyncClient> createAyncClient() {
return trackDurationOfOperation(
durationTrackerFactory,
STORE_CLIENT_CREATION.getSymbol(),
() -> clientFactory.createS3AsyncClient(getUri(), clientCreationParameters));
}
/**
* Create the function to create the Transfer Manager.
* @return a callable which will create the component.
*/
private CallableRaisingIOE<S3TransferManager> createTransferManager() {
return () -> {
final S3AsyncClient asyncClient = s3AsyncClient.eval();
return trackDuration(durationTrackerFactory,
STORE_CLIENT_CREATION.getSymbol(), () ->
clientFactory.createS3TransferManager(asyncClient));
};
}
@Override
public synchronized S3Client getOrCreateS3Client() throws IOException {
checkNotClosed();
return s3Client.eval();
}
@Override
public synchronized S3AsyncClient getOrCreateAsyncClient() throws IOException {
checkNotClosed();
return s3AsyncClient.eval();
}
@Override
public synchronized S3TransferManager getOrCreateTransferManager() throws IOException {
checkNotClosed();
return transferManager.eval();
}
/**
* Check that the client manager is not closed.
* @throws IllegalStateException if it is closed.
*/
private void checkNotClosed() {
checkState(!closed.get(), "Client manager is closed");
}
/**
* Close() is synchronized to avoid race conditions between
* slow client creation and this close operation.
* <p>
* The objects are all deleted in parallel
*/
@Override
public synchronized void close() {
if (closed.getAndSet(true)) {
// re-entrant close.
return;
}
// queue the closures.
List<Future<Object>> l = new ArrayList<>();
l.add(closeAsync(transferManager));
l.add(closeAsync(s3AsyncClient));
l.add(closeAsync(s3Client));
// once all are queued, await their completion
// and swallow any exception.
try {
awaitAllFutures(l);
} catch (Exception e) {
// should never happen.
LOG.warn("Exception in close", e);
}
}
/**
* Get the URI of the filesystem.
* @return URI to use when creating clients.
*/
public URI getUri() {
return clientCreationParameters.getPathUri();
}
/**
* Queue closing a closeable, logging any exception, and returning null
* to use in when awaiting a result.
* @param reference closeable.
* @param <T> type of closeable
* @return null
*/
private <T extends AutoCloseable> CompletableFuture<Object> closeAsync(
LazyAutoCloseableReference<T> reference) {
if (!reference.isSet()) {
// no-op
return completedFuture(null);
}
return supplyAsync(() -> {
try {
reference.close();
} catch (Exception e) {
LOG.warn("Failed to close {}", reference, e);
}
return null;
});
}
@Override
public String toString() {
return "ClientManagerImpl{" +
"closed=" + closed.get() +
", s3Client=" + s3Client +
", s3AsyncClient=" + s3AsyncClient +
", transferManager=" + transferManager +
'}';
}
}

View File

@ -18,8 +18,6 @@
package org.apache.hadoop.fs.s3a.impl;
import software.amazon.awssdk.services.s3.S3Client;
import org.apache.hadoop.fs.s3a.S3AInstrumentation;
import org.apache.hadoop.fs.s3a.S3AStorageStatistics;
import org.apache.hadoop.fs.s3a.S3AStore;
@ -36,7 +34,7 @@ public class S3AStoreBuilder {
private StoreContextFactory storeContextFactory;
private S3Client s3Client;
private ClientManager clientManager;
private DurationTrackerFactory durationTrackerFactory;
@ -58,9 +56,9 @@ public S3AStoreBuilder withStoreContextFactory(
return this;
}
public S3AStoreBuilder withS3Client(
final S3Client s3ClientValue) {
this.s3Client = s3ClientValue;
public S3AStoreBuilder withClientManager(
final ClientManager manager) {
this.clientManager = manager;
return this;
}
@ -107,7 +105,14 @@ public S3AStoreBuilder withAuditSpanSource(
}
public S3AStore build() {
return new S3AStoreImpl(storeContextFactory, s3Client, durationTrackerFactory, instrumentation,
statisticsContext, storageStatistics, readRateLimiter, writeRateLimiter, auditSpanSource);
return new S3AStoreImpl(storeContextFactory,
clientManager,
durationTrackerFactory,
instrumentation,
statisticsContext,
storageStatistics,
readRateLimiter,
writeRateLimiter,
auditSpanSource);
}
}

View File

@ -38,6 +38,7 @@
import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse;
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.model.S3Error;
import software.amazon.awssdk.transfer.s3.S3TransferManager;
import org.apache.hadoop.fs.s3a.Invoker;
import org.apache.hadoop.fs.s3a.Retries;
@ -57,10 +58,16 @@
import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.fs.s3a.S3AUtils.isThrottleException;
import static org.apache.hadoop.fs.s3a.Statistic.*;
import static org.apache.hadoop.fs.s3a.Statistic.IGNORED_ERRORS;
import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_BULK_DELETE_REQUEST;
import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_DELETE_OBJECTS;
import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_DELETE_REQUEST;
import static org.apache.hadoop.fs.s3a.Statistic.STORE_IO_RATE_LIMITED;
import static org.apache.hadoop.fs.s3a.Statistic.STORE_IO_RETRY;
import static org.apache.hadoop.fs.s3a.Statistic.STORE_IO_THROTTLED;
import static org.apache.hadoop.fs.s3a.Statistic.STORE_IO_THROTTLE_RATE;
import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.isObjectNotFound;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DELETE_CONSIDERED_IDEMPOTENT;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.STORE_IO_RATE_LIMITED_DURATION;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfOperation;
import static org.apache.hadoop.util.Preconditions.checkArgument;
@ -76,8 +83,8 @@ public class S3AStoreImpl implements S3AStore {
/** Factory to create store contexts. */
private final StoreContextFactory storeContextFactory;
/** The S3 client used to communicate with S3 bucket. */
private final S3Client s3Client;
/** Source of the S3 clients. */
private final ClientManager clientManager;
/** The S3 bucket to communicate with. */
private final String bucket;
@ -85,9 +92,6 @@ public class S3AStoreImpl implements S3AStore {
/** Request factory for creating requests. */
private final RequestFactory requestFactory;
/** Async client is used for transfer manager. */
private S3AsyncClient s3AsyncClient;
/** Duration tracker factory. */
private final DurationTrackerFactory durationTrackerFactory;
@ -117,7 +121,7 @@ public class S3AStoreImpl implements S3AStore {
/** Constructor to create S3A store. */
S3AStoreImpl(StoreContextFactory storeContextFactory,
S3Client s3Client,
ClientManager clientManager,
DurationTrackerFactory durationTrackerFactory,
S3AInstrumentation instrumentation,
S3AStatisticsContext statisticsContext,
@ -126,7 +130,7 @@ public class S3AStoreImpl implements S3AStore {
RateLimiting writeRateLimiter,
AuditSpanSource<AuditSpanS3A> auditSpanSource) {
this.storeContextFactory = requireNonNull(storeContextFactory);
this.s3Client = requireNonNull(s3Client);
this.clientManager = requireNonNull(clientManager);
this.durationTrackerFactory = requireNonNull(durationTrackerFactory);
this.instrumentation = requireNonNull(instrumentation);
this.statisticsContext = requireNonNull(statisticsContext);
@ -140,6 +144,11 @@ public class S3AStoreImpl implements S3AStore {
this.requestFactory = storeContext.getRequestFactory();
}
@Override
public void close() {
clientManager.close();
}
/** Acquire write capacity for rate limiting {@inheritDoc}. */
@Override
public Duration acquireWriteCapacity(final int capacity) {
@ -166,8 +175,28 @@ public StoreContext getStoreContext() {
return storeContext;
}
private S3Client getS3Client() {
return s3Client;
/**
* Get the S3 client.
* @return the S3 client.
* @throws IOException on any failure to create the client.
*/
private S3Client getS3Client() throws IOException {
return clientManager.getOrCreateS3Client();
}
@Override
public S3TransferManager getOrCreateTransferManager() throws IOException {
return clientManager.getOrCreateTransferManager();
}
@Override
public S3Client getOrCreateS3Client() throws IOException {
return clientManager.getOrCreateS3Client();
}
@Override
public S3AsyncClient getOrCreateAsyncClient() throws IOException {
return clientManager.getOrCreateAsyncClient();
}
@Override
@ -193,6 +222,15 @@ public RequestFactory getRequestFactory() {
return requestFactory;
}
/**
* Get the client manager.
* @return the client manager.
*/
@Override
public ClientManager clientManager() {
return clientManager;
}
/**
* Increment a statistic by 1.
* This increments both the instrumentation and storage statistics.
@ -298,7 +336,7 @@ private void blockRootDelete(String key) throws IllegalArgumentException {
@Override
@Retries.RetryRaw
public Map.Entry<Duration, DeleteObjectsResponse> deleteObjects(
final DeleteObjectsRequest deleteRequest)
final DeleteObjectsRequest deleteRequest)
throws SdkException {
DeleteObjectsResponse response;
@ -318,22 +356,23 @@ public Map.Entry<Duration, DeleteObjectsResponse> deleteObjects(
try (DurationInfo d = new DurationInfo(LOG, false, "DELETE %d keys", keyCount)) {
response =
invoker.retryUntranslated("delete",
DELETE_CONSIDERED_IDEMPOTENT, (text, e, r, i) -> {
// handle the failure
retryHandler.bulkDeleteRetried(deleteRequest, e);
},
// duration is tracked in the bulk delete counters
trackDurationOfOperation(getDurationTrackerFactory(),
OBJECT_BULK_DELETE_REQUEST.getSymbol(), () -> {
// acquire the write capacity for the number of keys to delete and record the duration.
Duration durationToAcquireWriteCapacity = acquireWriteCapacity(keyCount);
instrumentation.recordDuration(STORE_IO_RATE_LIMITED,
true,
durationToAcquireWriteCapacity);
incrementStatistic(OBJECT_DELETE_OBJECTS, keyCount);
return s3Client.deleteObjects(deleteRequest);
}));
invoker.retryUntranslated("delete",
DELETE_CONSIDERED_IDEMPOTENT, (text, e, r, i) -> {
// handle the failure
retryHandler.bulkDeleteRetried(deleteRequest, e);
},
// duration is tracked in the bulk delete counters
trackDurationOfOperation(getDurationTrackerFactory(),
OBJECT_BULK_DELETE_REQUEST.getSymbol(), () -> {
// acquire the write capacity for the number of keys to delete
// and record the duration.
Duration durationToAcquireWriteCapacity = acquireWriteCapacity(keyCount);
instrumentation.recordDuration(STORE_IO_RATE_LIMITED,
true,
durationToAcquireWriteCapacity);
incrementStatistic(OBJECT_DELETE_OBJECTS, keyCount);
return getS3Client().deleteObjects(deleteRequest);
}));
if (!response.errors().isEmpty()) {
// one or more of the keys could not be deleted.
// log and then throw
@ -361,25 +400,25 @@ public Map.Entry<Duration, DeleteObjectsResponse> deleteObjects(
@Override
@Retries.RetryRaw
public Map.Entry<Duration, Optional<DeleteObjectResponse>> deleteObject(
final DeleteObjectRequest request)
throws SdkException {
final DeleteObjectRequest request)
throws SdkException {
String key = request.key();
blockRootDelete(key);
DurationInfo d = new DurationInfo(LOG, false, "deleting %s", key);
try {
DeleteObjectResponse response =
invoker.retryUntranslated(String.format("Delete %s:/%s", bucket, key),
DELETE_CONSIDERED_IDEMPOTENT,
trackDurationOfOperation(getDurationTrackerFactory(),
OBJECT_DELETE_REQUEST.getSymbol(), () -> {
incrementStatistic(OBJECT_DELETE_OBJECTS);
// We try to acquire write capacity just before delete call.
Duration durationToAcquireWriteCapacity = acquireWriteCapacity(1);
instrumentation.recordDuration(STORE_IO_RATE_LIMITED,
true, durationToAcquireWriteCapacity);
return s3Client.deleteObject(request);
}));
invoker.retryUntranslated(String.format("Delete %s:/%s", bucket, key),
DELETE_CONSIDERED_IDEMPOTENT,
trackDurationOfOperation(getDurationTrackerFactory(),
OBJECT_DELETE_REQUEST.getSymbol(), () -> {
incrementStatistic(OBJECT_DELETE_OBJECTS);
// We try to acquire write capacity just before delete call.
Duration durationToAcquireWriteCapacity = acquireWriteCapacity(1);
instrumentation.recordDuration(STORE_IO_RATE_LIMITED,
true, durationToAcquireWriteCapacity);
return getS3Client().deleteObject(request);
}));
d.close();
return Tuples.pair(d.asDuration(), Optional.of(response));
} catch (AwsServiceException ase) {

View File

@ -40,6 +40,7 @@
import org.apache.hadoop.fs.s3a.audit.AuditTestSupport;
import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
import org.apache.hadoop.fs.s3a.commit.staging.StagingTestBase;
import org.apache.hadoop.fs.s3a.impl.ClientManager;
import org.apache.hadoop.fs.s3a.impl.PutObjectOptions;
import org.apache.hadoop.fs.s3a.impl.RequestFactoryImpl;
import org.apache.hadoop.fs.s3a.impl.StoreContext;
@ -119,6 +120,12 @@ public MockS3AFileSystem(S3AFileSystem mock,
private static void prepareRequest(SdkRequest.Builder t) {}
@Override
protected S3AStore createS3AStore(final ClientManager clientManager,
final int rateLimitCapacity) {
return super.createS3AStore(clientManager, rateLimitCapacity);
}
@Override
public RequestFactory getRequestFactory() {
return REQUEST_FACTORY;

View File

@ -146,11 +146,18 @@ protected static S3AFileSystem createAndBindMockFSInstance(Configuration conf,
return mockFs;
}
private static S3AFileSystem mockS3AFileSystemRobustly(S3Client mockS3Client) {
private static S3AFileSystem mockS3AFileSystemRobustly(S3Client mockS3Client) throws IOException {
S3AFileSystem mockFS = mock(S3AFileSystem.class);
S3AStore store = mock(S3AStore.class);
when(store.getOrCreateS3Client())
.thenReturn(mockS3Client);
S3AInternals s3AInternals = mock(S3AInternals.class);
when(mockFS.getS3AInternals()).thenReturn(s3AInternals);
when(s3AInternals.getStore()).thenReturn(mock(S3AStore.class));
when(s3AInternals.getStore()).thenReturn(store);
when(s3AInternals.getAmazonS3Client(anyString()))
.thenReturn(mockS3Client);
doNothing().when(mockFS).incrementReadOperations();

View File

@ -0,0 +1,379 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.impl;
import java.io.UncheckedIOException;
import java.net.URI;
import java.net.UnknownHostException;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.transfer.s3.S3TransferManager;
import org.apache.hadoop.fs.s3a.S3ClientFactory;
import org.apache.hadoop.fs.s3a.test.StubS3ClientFactory;
import org.apache.hadoop.fs.statistics.impl.StubDurationTrackerFactory;
import org.apache.hadoop.test.AbstractHadoopTestBase;
import org.apache.hadoop.util.functional.InvocationRaisingIOE;
import org.apache.hadoop.util.functional.LazyAtomicReference;
import static java.util.concurrent.CompletableFuture.supplyAsync;
import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.apache.hadoop.util.functional.FunctionalIO.toUncheckedIOExceptionSupplier;
import static org.mockito.Mockito.mock;
/**
* Test the client manager.
* <p>
* The tests with "Parallel" in the title generate delays in the second thread
* so stress the concurrency logic.
*/
public class TestClientManager extends AbstractHadoopTestBase {
private static final Logger LOG = LoggerFactory.getLogger(TestClientManager.class);
/**
* Factory delay for the multithreaded operations.
*/
private static final Duration FACTORY_DELAY = Duration.ofSeconds(5);
/**
* How long for the second thread to sleep before it tries to get()
* the client.
*/
private static final Duration SECOND_THREAD_DELAY = Duration.ofSeconds(2);
/**
* Format of exceptions raise.
*/
private static final String GENERATED = "generated[%d]";
private S3Client s3Client;
private S3AsyncClient asyncClient;
private S3TransferManager transferManager;
private URI uri;
@Before
public void setUp() throws Exception {
asyncClient = mock(S3AsyncClient.class);
transferManager = mock(S3TransferManager.class);
s3Client = mock(S3Client.class);
uri = new URI("https://bucket/");
}
/**
* Create a stub client factory where there is a specific delay.
* @param delay delay when creating a client.
* @return the factory
*/
private StubS3ClientFactory factory(final Duration delay) {
return factory(() -> sleep(delay));
}
/**
* Create a stub client factory. where the the invocation is called before
* the operation is executed.
* @param invocationRaisingIOE invocation to call before returning a client.
* @return the factory
*/
private StubS3ClientFactory factory(final InvocationRaisingIOE invocationRaisingIOE) {
return new StubS3ClientFactory(s3Client, asyncClient, transferManager,
invocationRaisingIOE);
}
/**
* Create a manager instance using the given factory.
* @param factory factory for clients.
* @return a client manager
*/
private ClientManager manager(final StubS3ClientFactory factory) {
return new ClientManagerImpl(
factory,
new S3ClientFactory.S3ClientCreationParameters()
.withPathUri(uri),
StubDurationTrackerFactory.STUB_DURATION_TRACKER_FACTORY);
}
/**
* Create a single s3 client.
*/
@Test
public void testCreateS3Client() throws Throwable {
final StubS3ClientFactory factory = factory(Duration.ZERO);
final ClientManager manager = manager(factory);
Assertions.assertThat(manager.getOrCreateS3Client())
.describedAs("manager %s", manager)
.isSameAs(s3Client);
Assertions.assertThat(factory.clientCreationCount())
.describedAs("client creation count")
.isEqualTo(1);
// second attempt returns same instance
Assertions.assertThat(manager.getOrCreateS3Client())
.describedAs("manager %s", manager)
.isSameAs(s3Client);
// and the factory counter is not incremented.
Assertions.assertThat(factory.clientCreationCount())
.describedAs("client creation count")
.isEqualTo(1);
// now close
manager.close();
// and expect a failure
intercept(IllegalStateException.class, manager::getOrCreateS3Client);
}
/**
* Sleep for a given period; interrupts are swallowed.
* @param delay delay
*/
private static void sleep(final Duration delay) {
try {
Thread.sleep(delay.toMillis());
} catch (InterruptedException e) {
}
}
/**
* Get an async s3 client twice and verify it is only created once.
*/
@Test
public void testCreateAsyncS3Client() throws Throwable {
final StubS3ClientFactory factory = factory(Duration.ofMillis(100));
final ClientManager manager = manager(factory);
Assertions.assertThat(manager.getOrCreateAsyncClient())
.describedAs("manager %s", manager)
.isSameAs(asyncClient);
manager.getOrCreateAsyncClient();
// and the factory counter is not incremented.
Assertions.assertThat(factory.asyncClientCreationCount())
.describedAs("client creation count")
.isEqualTo(1);
// now close
manager.close();
// and expect a failure
intercept(IllegalStateException.class, () ->
manager.getOrCreateAsyncClient());
}
/**
* Create a transfer manager; this will demand create an async s3 client
* if needed.
*/
@Test
public void testCreateTransferManagerAndAsyncClient() throws Throwable {
final StubS3ClientFactory factory = factory(Duration.ZERO);
final ClientManager manager = manager(factory);
Assertions.assertThat(manager.getOrCreateTransferManager())
.describedAs("manager %s", manager)
.isSameAs(transferManager);
// and we created an async client
Assertions.assertThat(factory.asyncClientCreationCount())
.describedAs("client creation count")
.isEqualTo(1);
Assertions.assertThat(factory.transferManagerCreationCount())
.describedAs("client creation count")
.isEqualTo(1);
// now close
manager.close();
// and expect a failure
intercept(IllegalStateException.class, manager::getOrCreateTransferManager);
}
/**
* Create a transfer manager with the async client already created.
*/
@Test
public void testCreateTransferManagerWithAsyncClientAlreadyCreated() throws Throwable {
final StubS3ClientFactory factory = factory(Duration.ZERO);
final ClientManager manager = manager(factory);
manager.getOrCreateAsyncClient();
Assertions.assertThat(manager.getOrCreateTransferManager())
.describedAs("manager %s", manager)
.isSameAs(transferManager);
// no new async client was created.
Assertions.assertThat(factory.asyncClientCreationCount())
.describedAs("client creation count")
.isEqualTo(1);
}
/**
* Create clients in parallel and verify that the first one blocks
* the others.
* There's a bit of ordering complexity which uses a semaphore and a sleep
* to block one of the acquisitions until the initial operation has started.
* There's then an assertion that the time the first client created
*/
@Test
public void testParallelClientCreation() throws Throwable {
// semaphore
Semaphore sem = new Semaphore(1);
// reference of thread where the construction took place
AtomicReference threadRef = new AtomicReference();
// this factory releases the semaphore on its invocation then
// sleeps
final ClientManager manager = manager(factory(() -> {
threadRef.set(Thread.currentThread());
sem.release();
sleep(FACTORY_DELAY);
}));
// acquire that semaphore.
sem.acquire(1);
// execute the first creation in a separate thread.
final CompletableFuture<S3Client> futureClient =
supplyAsync(toUncheckedIOExceptionSupplier(() -> {
LOG.info("creating #1 s3 client");
final S3Client client = manager.getOrCreateS3Client();
LOG.info("#1 s3 client created");
return client;
}));
// wait until the async creation has started
sem.acquire();
sleep(SECOND_THREAD_DELAY);
// expect to block.
LOG.info("creating #2 s3 client");
final S3Client client2 = manager.getOrCreateS3Client();
LOG.info("created #2 s3 client");
// now assert that the #1 client has succeeded, without
// even calling futureClient.get() to evaluate the result.
Assertions.assertThat(threadRef.get())
.describedAs("Thread in which client #1 was created")
.isNotSameAs(Thread.currentThread());
final S3Client orig = futureClient.get();
Assertions.assertThat(orig)
.describedAs("second getOrCreate() call to %s", manager)
.isSameAs(client2);
}
/**
* Parallel transfer manager creation.
* This will force creation of the async client
*/
@Test
public void testParallelTransferManagerCreation() throws Throwable {
// semaphore
Semaphore sem = new Semaphore(1);
// reference of thread where the construction took place
AtomicReference threadRef = new AtomicReference();
// this factory releases the semaphore on its invocation, then
// sleeps
final ClientManager manager = manager(factory(() -> {
threadRef.set(Thread.currentThread());
sem.release();
sleep(FACTORY_DELAY);
}));
// acquire that semaphore.
sem.acquire(1);
sleep(SECOND_THREAD_DELAY);
// execute the first creation in a separate thread.
final CompletableFuture<S3TransferManager> futureClient =
supplyAsync(toUncheckedIOExceptionSupplier(() -> {
LOG.info("creating #1 instance");
sem.release();
final S3TransferManager r = manager.getOrCreateTransferManager();
LOG.info("#1 instance created");
return r;
}));
// wait until the async creation has started
sem.acquire();
// expect to block.
LOG.info("creating #2 s3 client");
final S3TransferManager client2 = manager.getOrCreateTransferManager();
LOG.info("created #2 s3 client");
// now assert that the #1 client has succeeded, without
// even calling futureClient.get() to evaluate the result.
Assertions.assertThat(threadRef.get())
.describedAs("Thread in which client #1 was created")
.isNotSameAs(Thread.currentThread());
futureClient.get();
}
/**
* Verify that if an exception is thrown during creation, the
* operation will be repeated -there's no attempt to record
* that an exception was raised on the first attempt.
*/
@Test
public void testClientCreationFailure() throws Throwable {
// counter is incremented on every eval(), so can be used to assert
// the number of invocations.
final AtomicInteger counter = new AtomicInteger(0);
final ClientManager manager = manager(factory(() -> {
throw new UnknownHostException(String.format(GENERATED, counter.incrementAndGet()));
}));
// first attempt fails
intercept(UnknownHostException.class,
String.format(GENERATED, 1),
manager::getOrCreateS3Client);
// subsequent tests will also retry; the exception message changes each time,
// showing that it is regenerated rather than cached
intercept(UnknownHostException.class, "[2]", manager::getOrCreateS3Client);
intercept(UnknownHostException.class, "[3]", manager::getOrCreateAsyncClient);
intercept(UnknownHostException.class, "[4]", manager::getOrCreateTransferManager);
manager.close();
}
}

View File

@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.test;
import java.io.IOException;
import java.net.URI;
import java.util.concurrent.atomic.AtomicInteger;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.transfer.s3.S3TransferManager;
import org.apache.hadoop.fs.s3a.S3ClientFactory;
import org.apache.hadoop.util.functional.InvocationRaisingIOE;
/**
* Stub implementation of {@link S3ClientFactory}.
* returns the preconfigured clients.
* No checks for null values are made.
* <p>
* The {@link #launcher} operation is invoked before creating
* the sync and async client libraries, which is where failures,
* delays etc can be added.
* It is not used in {@link #createS3TransferManager(S3AsyncClient)}
* because that is normally a fast phase.
*/
public final class StubS3ClientFactory implements S3ClientFactory {
/**
* The class name of this factory.
*/
public static final String STUB_FACTORY = StubS3ClientFactory.class.getName();
private final S3Client client;
private final S3AsyncClient asyncClient;
private final S3TransferManager transferManager;
private final InvocationRaisingIOE launcher;
private AtomicInteger clientCreationCount = new AtomicInteger(0);
private AtomicInteger asyncClientCreationCount = new AtomicInteger(0);
private AtomicInteger transferManagerCreationCount = new AtomicInteger(0);
public StubS3ClientFactory(
final S3Client client,
final S3AsyncClient asyncClient,
final S3TransferManager transferManager,
final InvocationRaisingIOE launcher) {
this.client = client;
this.asyncClient = asyncClient;
this.transferManager = transferManager;
this.launcher = launcher;
}
@Override
public S3Client createS3Client(final URI uri, final S3ClientCreationParameters parameters)
throws IOException {
clientCreationCount.incrementAndGet();
launcher.apply();
return client;
}
@Override
public S3AsyncClient createS3AsyncClient(final URI uri,
final S3ClientCreationParameters parameters)
throws IOException {
asyncClientCreationCount.incrementAndGet();
launcher.apply();
return asyncClient;
}
@Override
public S3TransferManager createS3TransferManager(final S3AsyncClient s3AsyncClient) {
transferManagerCreationCount.incrementAndGet();
return transferManager;
}
public int clientCreationCount() {
return clientCreationCount.get();
}
public int asyncClientCreationCount() {
return asyncClientCreationCount.get();
}
public int transferManagerCreationCount() {
return transferManagerCreationCount.get();
}
@Override
public String toString() {
return "StubS3ClientFactory{" +
"client=" + client +
", asyncClient=" + asyncClient +
", transferManager=" + transferManager +
", clientCreationCount=" + clientCreationCount.get() +
", asyncClientCreationCount=" + asyncClientCreationCount.get() +
", transferManagerCreationCount=" + transferManagerCreationCount.get() +
'}';
}
}