HADOOP-18915. Tune/extend S3A http connection and thread pool settings (#6180)

Increases existing pool sizes, as with server scale and vector
IO, larger pools are needed

  fs.s3a.connection.maximum 200
  fs.s3a.threads.max 96

Adds new configuration options for v2 sdk internal timeouts,
both with default of 60s:

  fs.s3a.connection.acquisition.timeout
  fs.s3a.connection.idle.time

All the pool/timoeut options are covered in performance.md

Moves all timeout/duration options in the s3a FS to taking
temporal units (h, m, s, ms,...); retaining the previous default
unit (normally millisecond)

Adds a minimum duration for most of these, in order to recover from
deployments where a timeout has been set on the assumption the unit
was seconds, not millis.

Uses java.time.Duration throughout the codebase;
retaining the older numeric constants in
org.apache.hadoop.fs.s3a.Constants for backwards compatibility;
these are now deprecated.

Adds new class AWSApiCallTimeoutException to be raised on
sdk-related methods and also gateway timeouts. This is a subclass
of org.apache.hadoop.net.ConnectTimeoutException to support
existing retry logic.

+ reverted default value of fs.s3a.create.performance to false; 
inadvertently set to true during testing.

Contributed by Steve Loughran.
This commit is contained in:
Steve Loughran 2023-11-29 15:12:44 +00:00 committed by GitHub
parent d25cba7e85
commit 5cda162a80
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 1198 additions and 230 deletions

View File

@ -1530,7 +1530,7 @@
<property>
<name>fs.s3a.connection.maximum</name>
<value>96</value>
<value>200</value>
<description>Controls the maximum number of simultaneous connections to S3.
This must be bigger than the value of fs.s3a.threads.max so as to stop
threads being blocked waiting for new HTTPS connections.
@ -1608,14 +1608,21 @@
<property>
<name>fs.s3a.connection.establish.timeout</name>
<value>5000</value>
<description>Socket connection setup timeout in milliseconds.</description>
<value>5s</value>
<description>Socket connection setup timeout in milliseconds; this will be retried
more than once.</description>
</property>
<property>
<name>fs.s3a.connection.timeout</name>
<value>200000</value>
<description>Socket connection timeout in milliseconds.</description>
<value>200s</value>
<description>Socket connection timeout.</description>
</property>
<property>
<name>fs.s3a.connection.ttl</name>
<value>5m</value>
<description>Expiry time for any active connection.</description>
</property>
<property>
@ -1639,14 +1646,14 @@
<property>
<name>fs.s3a.threads.max</name>
<value>64</value>
<value>96</value>
<description>The total number of threads available in the filesystem for data
uploads *or any other queued filesystem operation*.</description>
</property>
<property>
<name>fs.s3a.threads.keepalivetime</name>
<value>60</value>
<value>60s</value>
<description>Number of seconds a thread can be idle before being
terminated.</description>
</property>
@ -1726,7 +1733,7 @@
<property>
<name>fs.s3a.multipart.purge.age</name>
<value>86400</value>
<value>24h</value>
<description>Minimum age in seconds of multipart uploads to purge
on startup if "fs.s3a.multipart.purge" is true
</description>
@ -2091,18 +2098,15 @@
<property>
<name>fs.s3a.connection.request.timeout</name>
<value>0</value>
<value>0s</value>
<description>
Time out on HTTP requests to the AWS service; 0 means no timeout.
Measured in seconds; the usual time suffixes are all supported
Important: this is the maximum duration of any AWS service call,
including upload and copy operations. If non-zero, it must be larger
than the time to upload multi-megabyte blocks to S3 from the client,
and to rename many-GB files. Use with care.
Values that are larger than Integer.MAX_VALUE milliseconds are
converged to Integer.MAX_VALUE milliseconds
</description>
</property>

View File

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import org.apache.hadoop.net.ConnectTimeoutException;
/**
* IOException equivalent of an {@code ApiCallTimeoutException}.
* Declared as a subclass of {@link ConnectTimeoutException} to allow
* for existing code to catch it.
*/
public class AWSApiCallTimeoutException extends ConnectTimeoutException {
/**
* Constructor.
* @param operation operation in progress
* @param cause cause.
*/
public AWSApiCallTimeoutException(
final String operation,
final Exception cause) {
super(operation);
initCause(cause);
}
}

View File

@ -23,6 +23,7 @@
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
/**
@ -33,7 +34,14 @@
* as deprecated and simply ignored.
*
* All S3Guard related constants are marked as Deprecated and either ignored (ddb config)
* or rejected (setting the metastore to anything other than the null store)
* or rejected (setting the metastore to anything other than the null store).
* <p>
* Timeout default values are declared as integers or long values in milliseconds and
* occasionally seconds.
* There are now {@code Duration} constants for these default values; the original
* fields are retained for compatibility, and derive their value from the Duration equivalent.
* <p>
* New timeout/duration constants do not get the equivalent integer/long fields.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
@ -144,25 +152,47 @@ private Constants() {
SimpleAWSCredentialsProvider.NAME;
// the maximum number of tasks cached if all threads are already uploading
/**
* The maximum number of tasks queued (other than prefetcher tasks) if all threads are
* busy: {@value}.
*/
public static final String MAX_TOTAL_TASKS = "fs.s3a.max.total.tasks";
/**
* Default value for {@link #MAX_TOTAL_TASKS}: {@value}.
*/
public static final int DEFAULT_MAX_TOTAL_TASKS = 32;
// number of simultaneous connections to s3
/**
* Number of simultaneous connections to S3: {@value}.
*/
public static final String MAXIMUM_CONNECTIONS = "fs.s3a.connection.maximum";
public static final int DEFAULT_MAXIMUM_CONNECTIONS = 96;
/**
* Default value for {@link #MAXIMUM_CONNECTIONS}: {@value}.
* Future releases are likely to increase this value.
* Keep in sync with the value in {@code core-default.xml}
*/
public static final int DEFAULT_MAXIMUM_CONNECTIONS = 200;
/**
* Configuration option to configure expiration time of
* s3 http connection from the connection pool in milliseconds: {@value}.
* S3 http connection from the connection pool: {@value}.
*/
public static final String CONNECTION_TTL = "fs.s3a.connection.ttl";
/**
* Default value for {@code CONNECTION_TTL}: {@value}.
* Default duration for {@link #CONNECTION_TTL}: 5 minutes.
*/
public static final long DEFAULT_CONNECTION_TTL = 5 * 60_000;
public static final Duration DEFAULT_CONNECTION_TTL_DURATION =
Duration.ofMinutes(5);
/**
* Default value in millis for {@link #CONNECTION_TTL}: 5 minutes.
* @deprecated use {@link #DEFAULT_CONNECTION_TTL_DURATION}
*/
public static final long DEFAULT_CONNECTION_TTL =
DEFAULT_CONNECTION_TTL_DURATION.toMillis();
// connect to s3 over ssl?
public static final String SECURE_CONNECTIONS =
@ -264,19 +294,111 @@ private Constants() {
public static final boolean EXPERIMENTAL_AWS_INTERNAL_THROTTLING_DEFAULT =
true;
// milliseconds until we give up trying to establish a connection to s3
/**
* This is the minimum operation duration unless programmatically set.
* It ensures that even if a configuration has mistaken a millisecond
* option for seconds, a viable duration will actually be used.
* Value: 15s.
*/
public static final Duration MINIMUM_NETWORK_OPERATION_DURATION = Duration.ofSeconds(15);
/**
* Milliseconds until a connection is established: {@value}.
*/
public static final String ESTABLISH_TIMEOUT =
"fs.s3a.connection.establish.timeout";
public static final int DEFAULT_ESTABLISH_TIMEOUT = 5000;
// milliseconds until we give up on a connection to s3
/**
* Default TCP/(and TLS?) establish timeout: 30 seconds.
*/
public static final Duration DEFAULT_ESTABLISH_TIMEOUT_DURATION = Duration.ofSeconds(30);
/**
* Default establish timeout in millis: 30 seconds.
* @deprecated use {@link #DEFAULT_ESTABLISH_TIMEOUT_DURATION}
*/
public static final int DEFAULT_ESTABLISH_TIMEOUT =
(int)DEFAULT_ESTABLISH_TIMEOUT_DURATION.toMillis();
/**
* Milliseconds until we give up on a connection to s3: {@value}.
*/
public static final String SOCKET_TIMEOUT = "fs.s3a.connection.timeout";
public static final int DEFAULT_SOCKET_TIMEOUT = 200000;
// milliseconds until a request is timed-out
/**
* Default socket timeout: 200 seconds.
*/
public static final Duration DEFAULT_SOCKET_TIMEOUT_DURATION = Duration.ofSeconds(200);
/**
* Default socket timeout: {@link #DEFAULT_SOCKET_TIMEOUT_DURATION}.
* @deprecated use {@link #DEFAULT_SOCKET_TIMEOUT_DURATION}
*/
public static final int DEFAULT_SOCKET_TIMEOUT = (int)DEFAULT_SOCKET_TIMEOUT_DURATION.toMillis();
/**
* Time until a request is timed-out: {@value}.
* If zero, there is no timeout.
*/
public static final String REQUEST_TIMEOUT =
"fs.s3a.connection.request.timeout";
public static final int DEFAULT_REQUEST_TIMEOUT = 0;
/**
* Default duration of a request before it is timed out: Zero.
*/
public static final Duration DEFAULT_REQUEST_TIMEOUT_DURATION = Duration.ZERO;
/**
* Default duration of a request before it is timed out: Zero.
* @deprecated use {@link #DEFAULT_REQUEST_TIMEOUT_DURATION}
*/
public static final int DEFAULT_REQUEST_TIMEOUT =
(int)DEFAULT_REQUEST_TIMEOUT_DURATION.toMillis();
/**
* Acquisition timeout for connections from the pool:
* {@value}.
* Default unit is milliseconds for consistency with other options.
*/
public static final String CONNECTION_ACQUISITION_TIMEOUT =
"fs.s3a.connection.acquisition.timeout";
/**
* Default acquisition timeout: 60 seconds.
*/
public static final Duration DEFAULT_CONNECTION_ACQUISITION_TIMEOUT_DURATION =
Duration.ofSeconds(60);
/**
* Should TCP Keepalive be enabled on the socket?
* This adds some network IO, but finds failures faster.
* {@value}.
*/
public static final String CONNECTION_KEEPALIVE =
"fs.s3a.connection.keepalive";
/**
* Default value of {@link #CONNECTION_KEEPALIVE}: {@value}.
*/
public static final boolean DEFAULT_CONNECTION_KEEPALIVE = false;
/**
* Maximum idle time for connections in the pool: {@value}.
* <p>
* Too low: overhead of creating connections.
* Too high, risk of stale connections and inability to use the
* adaptive load balancing of the S3 front end.
* <p>
* Default unit is milliseconds for consistency with other options.
*/
public static final String CONNECTION_IDLE_TIME =
"fs.s3a.connection.idle.time";
/**
* Default idle time: 60 seconds.
*/
public static final Duration DEFAULT_CONNECTION_IDLE_TIME_DURATION =
Duration.ofSeconds(60);
// socket send buffer to be used in Amazon client
public static final String SOCKET_SEND_BUFFER = "fs.s3a.socket.send.buffer";
@ -290,13 +412,34 @@ private Constants() {
public static final String MAX_PAGING_KEYS = "fs.s3a.paging.maximum";
public static final int DEFAULT_MAX_PAGING_KEYS = 5000;
// the maximum number of threads to allow in the pool used by TransferManager
/**
* The maximum number of threads to allow in the pool used by S3A.
* Value: {@value}.
*/
public static final String MAX_THREADS = "fs.s3a.threads.max";
public static final int DEFAULT_MAX_THREADS = 10;
// the time an idle thread waits before terminating
/**
* Default value of {@link #MAX_THREADS}: {@value}.
*/
public static final int DEFAULT_MAX_THREADS = 96;
/**
* The time an idle thread waits before terminating: {@value}.
* This is in SECONDS unless the optional unit is given.
*/
public static final String KEEPALIVE_TIME = "fs.s3a.threads.keepalivetime";
public static final int DEFAULT_KEEPALIVE_TIME = 60;
/**
* Default value of {@link #KEEPALIVE_TIME}: 60s.
*/
public static final Duration DEFAULT_KEEPALIVE_TIME_DURATION = Duration.ofSeconds(60);
/**
* Default value of {@link #KEEPALIVE_TIME}: 60s.
* @deprecated use {@link #DEFAULT_KEEPALIVE_TIME_DURATION}
*/
public static final int DEFAULT_KEEPALIVE_TIME =
(int)DEFAULT_KEEPALIVE_TIME_DURATION.getSeconds();
// size of each of or multipart pieces in bytes
public static final String MULTIPART_SIZE = "fs.s3a.multipart.size";
@ -501,10 +644,17 @@ private Constants() {
"fs.s3a.multipart.purge";
public static final boolean DEFAULT_PURGE_EXISTING_MULTIPART = false;
// purge any multipart uploads older than this number of seconds
/**
* purge any multipart uploads older than this number of seconds.
*/
public static final String PURGE_EXISTING_MULTIPART_AGE =
"fs.s3a.multipart.purge.age";
public static final long DEFAULT_PURGE_EXISTING_MULTIPART_AGE = 86400;
/**
* Default Age.
*/
public static final long DEFAULT_PURGE_EXISTING_MULTIPART_AGE =
Duration.ofDays(1).getSeconds();
/**
* s3 server-side encryption, see
@ -1201,8 +1351,7 @@ private Constants() {
* Default value for create performance in an S3A FS.
* Value {@value}.
*/
public static final boolean FS_S3A_CREATE_PERFORMANCE_DEFAULT = true;
public static final boolean FS_S3A_CREATE_PERFORMANCE_DEFAULT = false;
/**
* Capability to indicate that the FS has been instantiated with

View File

@ -28,6 +28,7 @@
import java.nio.file.AccessDeniedException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
@ -122,6 +123,7 @@
import org.apache.hadoop.fs.s3a.impl.AWSHeaders;
import org.apache.hadoop.fs.s3a.impl.BulkDeleteRetryHandler;
import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
import org.apache.hadoop.fs.s3a.impl.ConfigurationHelper;
import org.apache.hadoop.fs.s3a.impl.ContextAccessors;
import org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation;
import org.apache.hadoop.fs.s3a.impl.CreateFileBuilder;
@ -828,8 +830,13 @@ private void initThreadPools(Configuration conf) {
}
int totalTasks = intOption(conf,
MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS, 1);
long keepAliveTime = longOption(conf, KEEPALIVE_TIME,
DEFAULT_KEEPALIVE_TIME, 0);
// keepalive time takes a time suffix; default unit is seconds
long keepAliveTime = ConfigurationHelper.getDuration(conf,
KEEPALIVE_TIME,
Duration.ofSeconds(DEFAULT_KEEPALIVE_TIME),
TimeUnit.SECONDS,
Duration.ZERO).getSeconds();
int numPrefetchThreads = this.prefetchEnabled ? this.prefetchBlockCount : 0;
int activeTasksForBoundedThreadPool = maxThreads;
@ -1226,12 +1233,15 @@ private void initCannedAcls(Configuration conf) {
private void initMultipartUploads(Configuration conf) throws IOException {
boolean purgeExistingMultipart = conf.getBoolean(PURGE_EXISTING_MULTIPART,
DEFAULT_PURGE_EXISTING_MULTIPART);
long purgeExistingMultipartAge = longOption(conf,
PURGE_EXISTING_MULTIPART_AGE, DEFAULT_PURGE_EXISTING_MULTIPART_AGE, 0);
if (purgeExistingMultipart) {
try {
abortOutstandingMultipartUploads(purgeExistingMultipartAge);
Duration purgeDuration = ConfigurationHelper.getDuration(conf,
PURGE_EXISTING_MULTIPART_AGE,
Duration.ofSeconds(DEFAULT_PURGE_EXISTING_MULTIPART_AGE),
TimeUnit.SECONDS,
Duration.ZERO);
abortOutstandingMultipartUploads(purgeDuration.getSeconds());
} catch (AccessDeniedException e) {
instrumentation.errorIgnored();
LOG.debug("Failed to purge multipart uploads against {}," +

View File

@ -201,6 +201,9 @@ protected Map<Class<? extends Exception>, RetryPolicy> createExceptionMap() {
// Treated as an immediate failure
policyMap.put(AWSBadRequestException.class, fail);
// API call timeout.
policyMap.put(AWSApiCallTimeoutException.class, retryAwsClientExceptions);
// use specific retry policy for aws client exceptions
// nested IOExceptions will already have been extracted and used
// in this map.

View File

@ -20,6 +20,8 @@
import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.core.exception.AbortedException;
import software.amazon.awssdk.core.exception.ApiCallAttemptTimeoutException;
import software.amazon.awssdk.core.exception.ApiCallTimeoutException;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.core.retry.RetryUtils;
import software.amazon.awssdk.services.s3.model.S3Exception;
@ -171,9 +173,23 @@ public static IOException translateException(@Nullable String operation,
operation,
StringUtils.isNotEmpty(path)? (" on " + path) : "",
exception);
// timeout issues
// ApiCallAttemptTimeoutException: a single HTTP request attempt failed.
// ApiCallTimeoutException: a request with any configured retries failed.
// The ApiCallTimeoutException exception should be the only one seen in
// the S3A code, but for due diligence both are handled and mapped to
// our own AWSApiCallTimeoutException.
if (exception instanceof ApiCallTimeoutException
|| exception instanceof ApiCallAttemptTimeoutException) {
// An API call to an AWS service timed out.
// This is a subclass of ConnectTimeoutException so
// all retry logic for that exception is handled without
// having to look down the stack for a
return new AWSApiCallTimeoutException(message, exception);
}
if (!(exception instanceof AwsServiceException)) {
// exceptions raised client-side: connectivity, auth, network problems...
Exception innerCause = containsInterruptedException(exception);
if (innerCause != null) {
// interrupted IO, or a socket exception underneath that class
@ -293,6 +309,11 @@ public static IOException translateException(@Nullable String operation,
ioe = new AWSServiceThrottledException(message, ase);
break;
// gateway timeout
case SC_504_GATEWAY_TIMEOUT:
ioe = new AWSApiCallTimeoutException(message, ase);
break;
// internal error
case SC_500_INTERNAL_SERVER_ERROR:
ioe = new AWSStatus500Exception(message, ase);

View File

@ -28,29 +28,37 @@
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
import software.amazon.awssdk.core.retry.RetryMode;
import software.amazon.awssdk.core.retry.RetryPolicy;
import software.amazon.awssdk.http.apache.ApacheHttpClient;
import software.amazon.awssdk.http.apache.ProxyConfiguration;
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.s3a.S3AUtils;
import org.apache.hadoop.fs.s3a.auth.SignerFactory;
import org.apache.hadoop.util.VersionInfo;
import org.apache.http.client.utils.URIBuilder;
import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_ACQUISITION_TIMEOUT;
import static org.apache.hadoop.fs.s3a.Constants.AWS_SERVICE_IDENTIFIER_S3;
import static org.apache.hadoop.fs.s3a.Constants.AWS_SERVICE_IDENTIFIER_STS;
import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_IDLE_TIME;
import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_KEEPALIVE;
import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_TTL;
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_CONNECTION_TTL;
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_ESTABLISH_TIMEOUT;
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_CONNECTION_ACQUISITION_TIMEOUT_DURATION;
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_CONNECTION_IDLE_TIME_DURATION;
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_CONNECTION_KEEPALIVE;
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_CONNECTION_TTL_DURATION;
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_ESTABLISH_TIMEOUT_DURATION;
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_MAXIMUM_CONNECTIONS;
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_MAX_ERROR_RETRIES;
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_REQUEST_TIMEOUT;
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_SOCKET_TIMEOUT;
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_REQUEST_TIMEOUT_DURATION;
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_SOCKET_TIMEOUT_DURATION;
import static org.apache.hadoop.fs.s3a.Constants.ESTABLISH_TIMEOUT;
import static org.apache.hadoop.fs.s3a.Constants.MAXIMUM_CONNECTIONS;
import static org.apache.hadoop.fs.s3a.Constants.MAX_ERROR_RETRIES;
import static org.apache.hadoop.fs.s3a.Constants.MINIMUM_NETWORK_OPERATION_DURATION;
import static org.apache.hadoop.fs.s3a.Constants.PROXY_DOMAIN;
import static org.apache.hadoop.fs.s3a.Constants.PROXY_HOST;
import static org.apache.hadoop.fs.s3a.Constants.PROXY_PASSWORD;
@ -64,19 +72,40 @@
import static org.apache.hadoop.fs.s3a.Constants.SIGNING_ALGORITHM_STS;
import static org.apache.hadoop.fs.s3a.Constants.SOCKET_TIMEOUT;
import static org.apache.hadoop.fs.s3a.Constants.USER_AGENT_PREFIX;
import static org.apache.hadoop.fs.s3a.S3AUtils.longOption;
import static org.apache.hadoop.fs.s3a.impl.ConfigurationHelper.enforceMinimumDuration;
import static org.apache.hadoop.fs.s3a.impl.ConfigurationHelper.getDuration;
import static org.apache.hadoop.util.Preconditions.checkArgument;
/**
* Methods for configuring the S3 client.
* These methods are used when creating and configuring
* {@link software.amazon.awssdk.services.s3.S3Client} which communicates with the S3 service.
* the HTTP clients which communicate with the S3 service.
* <p>
* See {@code software.amazon.awssdk.http.SdkHttpConfigurationOption}
* for the default values.
*/
public final class AWSClientConfig {
private static final Logger LOG = LoggerFactory.getLogger(AWSClientConfig.class);
/**
* The minimum operation duration.
*/
private static Duration minimumOperationDuration = MINIMUM_NETWORK_OPERATION_DURATION;
private AWSClientConfig() {
}
/**
* Create the config for a given service...the service identifier is used
* to determine signature implementation.
* @param conf configuration
* @param awsServiceIdentifier service
* @return the builder inited with signer, timeouts and UA.
* @throws IOException failure.
*/
public static ClientOverrideConfiguration.Builder createClientConfigBuilder(Configuration conf,
String awsServiceIdentifier) throws IOException {
ClientOverrideConfiguration.Builder overrideConfigBuilder =
@ -99,7 +128,10 @@ public static ClientOverrideConfiguration.Builder createClientConfigBuilder(Conf
}
/**
* Configures the http client.
* Create and configure the http client-based connector with timeouts for:
* connection acquisition, max idle, timeout, TTL, socket and keepalive.
* SSL channel mode is set up via
* {@link NetworkBinding#bindSSLChannelMode(Configuration, ApacheHttpClient.Builder)}.
*
* @param conf The Hadoop configuration
* @return Http client builder
@ -107,24 +139,17 @@ public static ClientOverrideConfiguration.Builder createClientConfigBuilder(Conf
*/
public static ApacheHttpClient.Builder createHttpClientBuilder(Configuration conf)
throws IOException {
final ConnectionSettings conn = createConnectionSettings(conf);
ApacheHttpClient.Builder httpClientBuilder =
ApacheHttpClient.builder();
httpClientBuilder.maxConnections(S3AUtils.intOption(conf, MAXIMUM_CONNECTIONS,
DEFAULT_MAXIMUM_CONNECTIONS, 1));
int connectionEstablishTimeout =
S3AUtils.intOption(conf, ESTABLISH_TIMEOUT, DEFAULT_ESTABLISH_TIMEOUT, 0);
int socketTimeout = S3AUtils.intOption(conf, SOCKET_TIMEOUT, DEFAULT_SOCKET_TIMEOUT, 0);
httpClientBuilder.connectionTimeout(Duration.ofMillis(connectionEstablishTimeout));
httpClientBuilder.socketTimeout(Duration.ofMillis(socketTimeout));
// set the connection TTL irrespective of whether the connection is in use or not.
// this can balance requests over different S3 servers, and avoid failed
// connections. See HADOOP-18845.
long ttl = longOption(conf, CONNECTION_TTL, DEFAULT_CONNECTION_TTL, -1);
httpClientBuilder.connectionTimeToLive(Duration.ofMillis(ttl));
ApacheHttpClient.builder()
.connectionAcquisitionTimeout(conn.getAcquisitionTimeout())
.connectionMaxIdleTime(conn.getMaxIdleTime())
.connectionTimeout(conn.getEstablishTimeout())
.connectionTimeToLive(conn.getConnectionTTL())
.maxConnections(conn.getMaxConnections())
.socketTimeout(conn.getSocketTimeout())
.tcpKeepAlive(conn.isKeepAlive())
.useIdleConnectionReaper(true); // true by default in the SDK
NetworkBinding.bindSSLChannelMode(conf, httpClientBuilder);
@ -132,31 +157,26 @@ public static ApacheHttpClient.Builder createHttpClientBuilder(Configuration con
}
/**
* Configures the async http client.
*
* Create and configure the async http client with timeouts for:
* connection acquisition, max idle, timeout, TTL, socket and keepalive.
* This is netty based and does not allow for the SSL channel mode to be set.
* @param conf The Hadoop configuration
* @return Http client builder
* @return Async Http client builder
*/
public static NettyNioAsyncHttpClient.Builder createAsyncHttpClientBuilder(Configuration conf) {
final ConnectionSettings conn = createConnectionSettings(conf);
NettyNioAsyncHttpClient.Builder httpClientBuilder =
NettyNioAsyncHttpClient.builder();
httpClientBuilder.maxConcurrency(S3AUtils.intOption(conf, MAXIMUM_CONNECTIONS,
DEFAULT_MAXIMUM_CONNECTIONS, 1));
int connectionEstablishTimeout =
S3AUtils.intOption(conf, ESTABLISH_TIMEOUT, DEFAULT_ESTABLISH_TIMEOUT, 0);
int socketTimeout = S3AUtils.intOption(conf, SOCKET_TIMEOUT, DEFAULT_SOCKET_TIMEOUT, 0);
httpClientBuilder.connectionTimeout(Duration.ofMillis(connectionEstablishTimeout));
httpClientBuilder.readTimeout(Duration.ofMillis(socketTimeout));
httpClientBuilder.writeTimeout(Duration.ofMillis(socketTimeout));
// set the connection TTL irrespective of whether the connection is in use or not.
// this can balance requests over different S3 servers, and avoid failed
// connections. See HADOOP-18845.
long ttl = longOption(conf, CONNECTION_TTL, DEFAULT_CONNECTION_TTL, -1);
httpClientBuilder.connectionTimeToLive(Duration.ofMillis(ttl));
NettyNioAsyncHttpClient.builder()
.connectionAcquisitionTimeout(conn.getAcquisitionTimeout())
.connectionMaxIdleTime(conn.getMaxIdleTime())
.connectionTimeout(conn.getEstablishTimeout())
.connectionTimeToLive(conn.getConnectionTTL())
.maxConcurrency(conn.getMaxConnections())
.readTimeout(conn.getSocketTimeout())
.tcpKeepAlive(conn.isKeepAlive())
.useIdleConnectionReaper(true) // true by default in the SDK
.writeTimeout(conn.getSocketTimeout());
// TODO: Don't think you can set a socket factory for the netty client.
// NetworkBinding.bindSSLChannelMode(conf, awsConf);
@ -166,13 +186,19 @@ public static NettyNioAsyncHttpClient.Builder createAsyncHttpClientBuilder(Confi
/**
* Configures the retry policy.
* Retry policy is {@code RetryMode.ADAPTIVE}, which
* "dynamically limits the rate of AWS requests to maximize success rate",
* possibly at the expense of latency.
* Based on the ABFS experience, it is better to limit the rate requests are
* made rather than have to resort to exponential backoff after failures come
* in -especially as that backoff is per http connection.
*
* @param conf The Hadoop configuration
* @return Retry policy builder
*/
public static RetryPolicy.Builder createRetryPolicyBuilder(Configuration conf) {
RetryPolicy.Builder retryPolicyBuilder = RetryPolicy.builder();
RetryPolicy.Builder retryPolicyBuilder = RetryPolicy.builder(RetryMode.ADAPTIVE);
retryPolicyBuilder.numRetries(S3AUtils.intOption(conf, MAX_ERROR_RETRIES,
DEFAULT_MAX_ERROR_RETRIES, 0));
@ -236,11 +262,14 @@ public static ProxyConfiguration createProxyConfiguration(Configuration conf,
}
return proxyConfigBuilder.build();
}
/**
* Configures the proxy for the async http client.
*
* <p>
* Although this is netty specific, it is part of the AWS SDK public API
* and not any shaded netty internal class.
* @param conf The Hadoop configuration
* @param bucket Optional bucket to use to look up per-bucket proxy secrets
* @return Proxy configuration
@ -307,7 +336,7 @@ public static ProxyConfiguration createProxyConfiguration(Configuration conf,
return proxyConfigBuilder.build();
}
/***
/**
* Builds a URI, throws an IllegalArgumentException in case of errors.
*
* @param host proxy host
@ -316,7 +345,7 @@ public static ProxyConfiguration createProxyConfiguration(Configuration conf,
*/
private static URI buildURI(String scheme, String host, int port) {
try {
return new URIBuilder().setScheme(scheme).setHost(host).setPort(port).build();
return new URI(scheme, null, host, port, null, null, null);
} catch (URISyntaxException e) {
String msg =
"Proxy error: incorrect " + PROXY_HOST + " or " + PROXY_PORT;
@ -346,6 +375,13 @@ private static void initUserAgent(Configuration conf,
clientConfig.putAdvancedOption(SdkAdvancedClientOption.USER_AGENT_PREFIX, userAgent);
}
/**
* Initializes the signer override for the given service.
* @param conf hadoop configuration
* @param clientConfig client configuration to update
* @param awsServiceIdentifier service name
* @throws IOException failure.
*/
private static void initSigner(Configuration conf,
ClientOverrideConfiguration.Builder clientConfig, String awsServiceIdentifier)
throws IOException {
@ -371,24 +407,219 @@ private static void initSigner(Configuration conf,
}
/**
* Configures request timeout.
* Configures request timeout in the client configuration.
* This is independent of the timeouts set in the sync and async HTTP clients;
* the same method
*
* @param conf Hadoop configuration
* @param clientConfig AWS SDK configuration to update
*/
private static void initRequestTimeout(Configuration conf,
ClientOverrideConfiguration.Builder clientConfig) {
long requestTimeoutMillis = conf.getTimeDuration(REQUEST_TIMEOUT,
DEFAULT_REQUEST_TIMEOUT, TimeUnit.SECONDS, TimeUnit.MILLISECONDS);
// Get the connection settings
final Duration callTimeout = createApiConnectionSettings(conf).getApiCallTimeout();
if (requestTimeoutMillis > Integer.MAX_VALUE) {
LOG.debug("Request timeout is too high({} ms). Setting to {} ms instead",
requestTimeoutMillis, Integer.MAX_VALUE);
requestTimeoutMillis = Integer.MAX_VALUE;
if (callTimeout.toMillis() > 0) {
clientConfig.apiCallAttemptTimeout(callTimeout);
clientConfig.apiCallTimeout(callTimeout);
}
}
if(requestTimeoutMillis > 0) {
clientConfig.apiCallAttemptTimeout(Duration.ofMillis(requestTimeoutMillis));
/**
* Reset the minimum operation duration to the default.
* For test use only; Logs at INFO.
* <p>
* This MUST be called in test teardown in any test suite which
* called {@link #setMinimumOperationDuration(Duration)}.
*/
@VisibleForTesting
public static void resetMinimumOperationDuration() {
setMinimumOperationDuration(MINIMUM_NETWORK_OPERATION_DURATION);
}
/**
* Set the minimum operation duration.
* This is for testing and will log at info; does require a non-negative duration.
* <p>
* Test suites must call {@link #resetMinimumOperationDuration()} in their teardown
* to avoid breaking subsequent tests in the same process.
* @param duration non-negative duration
* @throws IllegalArgumentException if the duration is negative.
*/
@VisibleForTesting
public static void setMinimumOperationDuration(Duration duration) {
LOG.info("Setting minimum operation duration to {}ms", duration.toMillis());
checkArgument(duration.compareTo(Duration.ZERO) >= 0,
"Duration must be positive: %sms", duration.toMillis());
minimumOperationDuration = duration;
}
/**
* Get the current minimum operation duration.
* @return current duration.
*/
public static Duration getMinimumOperationDuration() {
return minimumOperationDuration;
}
/**
* Settings for the AWS client, rather than the http client.
*/
static class ClientSettings {
private final Duration apiCallTimeout;
private ClientSettings(final Duration apiCallTimeout) {
this.apiCallTimeout = apiCallTimeout;
}
Duration getApiCallTimeout() {
return apiCallTimeout;
}
@Override
public String toString() {
return "ClientSettings{" +
"apiCallTimeout=" + apiCallTimeout +
'}';
}
}
/**
* All the connection settings, wrapped as a class for use by
* both the sync and async client.
*/
static class ConnectionSettings {
private final int maxConnections;
private final boolean keepAlive;
private final Duration acquisitionTimeout;
private final Duration connectionTTL;
private final Duration establishTimeout;
private final Duration maxIdleTime;
private final Duration socketTimeout;
private ConnectionSettings(
final int maxConnections,
final boolean keepAlive,
final Duration acquisitionTimeout,
final Duration connectionTTL,
final Duration establishTimeout,
final Duration maxIdleTime,
final Duration socketTimeout) {
this.maxConnections = maxConnections;
this.keepAlive = keepAlive;
this.acquisitionTimeout = acquisitionTimeout;
this.connectionTTL = connectionTTL;
this.establishTimeout = establishTimeout;
this.maxIdleTime = maxIdleTime;
this.socketTimeout = socketTimeout;
}
int getMaxConnections() {
return maxConnections;
}
boolean isKeepAlive() {
return keepAlive;
}
Duration getAcquisitionTimeout() {
return acquisitionTimeout;
}
Duration getConnectionTTL() {
return connectionTTL;
}
Duration getEstablishTimeout() {
return establishTimeout;
}
Duration getMaxIdleTime() {
return maxIdleTime;
}
Duration getSocketTimeout() {
return socketTimeout;
}
@Override
public String toString() {
return "ConnectionSettings{" +
"maxConnections=" + maxConnections +
", keepAlive=" + keepAlive +
", acquisitionTimeout=" + acquisitionTimeout +
", connectionTTL=" + connectionTTL +
", establishTimeout=" + establishTimeout +
", maxIdleTime=" + maxIdleTime +
", socketTimeout=" + socketTimeout +
'}';
}
}
/**
* Build a client settings object.
* @param conf configuration to evaluate
* @return connection settings.
*/
static ClientSettings createApiConnectionSettings(Configuration conf) {
Duration apiCallTimeout = getDuration(conf, REQUEST_TIMEOUT,
DEFAULT_REQUEST_TIMEOUT_DURATION, TimeUnit.MILLISECONDS, Duration.ZERO);
// if the API call timeout is set, it must be at least the minimum duration
if (apiCallTimeout.compareTo(Duration.ZERO) > 0) {
apiCallTimeout = enforceMinimumDuration(REQUEST_TIMEOUT,
apiCallTimeout, minimumOperationDuration);
}
return new ClientSettings(apiCallTimeout);
}
/**
* Build the HTTP connection settings object from the configuration.
* All settings are calculated, including the api call timeout.
* @param conf configuration to evaluate
* @return connection settings.
*/
static ConnectionSettings createConnectionSettings(Configuration conf) {
int maxConnections = S3AUtils.intOption(conf, MAXIMUM_CONNECTIONS,
DEFAULT_MAXIMUM_CONNECTIONS, 1);
final boolean keepAlive = conf.getBoolean(CONNECTION_KEEPALIVE,
DEFAULT_CONNECTION_KEEPALIVE);
// time to acquire a connection from the pool
Duration acquisitionTimeout = getDuration(conf, CONNECTION_ACQUISITION_TIMEOUT,
DEFAULT_CONNECTION_ACQUISITION_TIMEOUT_DURATION, TimeUnit.MILLISECONDS,
minimumOperationDuration);
// set the connection TTL irrespective of whether the connection is in use or not.
// this can balance requests over different S3 servers, and avoid failed
// connections. See HADOOP-18845.
Duration connectionTTL = getDuration(conf, CONNECTION_TTL,
DEFAULT_CONNECTION_TTL_DURATION, TimeUnit.MILLISECONDS,
null);
Duration establishTimeout = getDuration(conf, ESTABLISH_TIMEOUT,
DEFAULT_ESTABLISH_TIMEOUT_DURATION, TimeUnit.MILLISECONDS,
minimumOperationDuration);
// limit on the time a connection can be idle in the pool
Duration maxIdleTime = getDuration(conf, CONNECTION_IDLE_TIME,
DEFAULT_CONNECTION_IDLE_TIME_DURATION, TimeUnit.MILLISECONDS, Duration.ZERO);
Duration socketTimeout = getDuration(conf, SOCKET_TIMEOUT,
DEFAULT_SOCKET_TIMEOUT_DURATION, TimeUnit.MILLISECONDS,
minimumOperationDuration);
return new ConnectionSettings(
maxConnections,
keepAlive,
acquisitionTimeout,
connectionTTL,
establishTimeout,
maxIdleTime,
socketTimeout);
}
}

View File

@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.impl;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.store.LogExactlyOnce;
/**
* Helper class for configuration; where methods related to extracting
* configuration should go instead of {@code S3AUtils}.
*/
@InterfaceAudience.Private
public final class ConfigurationHelper {
private static final Logger LOG = LoggerFactory.getLogger(ConfigurationHelper.class);
/** Log to warn of range issues on a timeout. */
private static final LogExactlyOnce DURATION_WARN_LOG = new LogExactlyOnce(LOG);
private ConfigurationHelper() {
}
/**
* Get a duration. This may be negative; callers must check or set the minimum to zero.
* If the config option is greater than {@code Integer.MAX_VALUE} milliseconds,
* it is set to that max.
* If {@code minimumDuration} is set, and the value is less than that, then
* the minimum is used.
* Logs the value for diagnostics.
* @param conf config
* @param name option name
* @param defaultDuration default duration
* @param defaultUnit default unit on the config option if not declared.
* @param minimumDuration optional minimum duration;
* @return duration. may be negative.
*/
public static Duration getDuration(
final Configuration conf,
final String name,
final Duration defaultDuration,
final TimeUnit defaultUnit,
@Nullable final Duration minimumDuration) {
long timeMillis = conf.getTimeDuration(name,
defaultDuration.toMillis(), defaultUnit, TimeUnit.MILLISECONDS);
if (timeMillis > Integer.MAX_VALUE) {
DURATION_WARN_LOG.warn("Option {} is too high({} ms). Setting to {} ms instead",
name, timeMillis, Integer.MAX_VALUE);
LOG.debug("Option {} is too high({} ms). Setting to {} ms instead",
name, timeMillis, Integer.MAX_VALUE);
timeMillis = Integer.MAX_VALUE;
}
Duration duration = enforceMinimumDuration(name, Duration.ofMillis(timeMillis),
minimumDuration);
LOG.debug("Duration of {} = {}", name, duration);
return duration;
}
/**
* Set a duration as a time in seconds, with the suffix "s" added.
* @param conf configuration to update.
* @param name option name
* @param duration duration
*/
public static void setDurationAsSeconds(
final Configuration conf,
final String name,
final Duration duration) {
conf.set(name, duration.getSeconds() + "s");
}
/**
* Set a duration as a time in milliseconds, with the suffix "ms" added.
* @param conf configuration to update.
* @param name option name
* @param duration duration
*/
public static void setDurationAsMillis(
final Configuration conf,
final String name,
final Duration duration) {
conf.set(name, duration.toMillis()+ "ms");
}
/**
* Enforce a minimum duration of a configuration option, if the supplied
* value is non-null.
* @param name option name
* @param duration duration to check
* @param minimumDuration minimum duration; may be null
* @return a duration which, if the minimum duration is set, is at least that value.
*/
public static Duration enforceMinimumDuration(
final String name,
final Duration duration,
@Nullable final Duration minimumDuration) {
if (minimumDuration != null && duration.compareTo(minimumDuration) < 0) {
String message = String.format("Option %s is too low (%,d ms). Setting to %,d ms instead",
name, duration.toMillis(), minimumDuration.toMillis());
DURATION_WARN_LOG.warn(message);
LOG.debug(message);
return minimumDuration;
}
return duration;
}
}

View File

@ -172,6 +172,9 @@ private InternalConstants() {
/** 503 status code: Service Unavailable. on AWS S3: throttle response. */
public static final int SC_503_SERVICE_UNAVAILABLE = 503;
/** 504 Gateway Timeout. AWS SDK considers retryable. */
public static final int SC_504_GATEWAY_TIMEOUT = 504;
/** Name of the log for throttling events. Value: {@value}. */
public static final String THROTTLE_LOG_NAME =
"org.apache.hadoop.fs.s3a.throttled";

View File

@ -703,8 +703,12 @@ to allow different buckets to override the shared settings. This is commonly
used to change the endpoint, encryption and authentication mechanisms of buckets.
and various minor options.
Here are the S3A properties for use in production; some testing-related
options are covered in [Testing](./testing.md).
Here are some the S3A properties for use in production.
* See [Performance](./performance.html) for performance related settings including
thread and network pool options.
* Testing-related options are covered in [Testing](./testing.md).
```xml
@ -830,16 +834,6 @@ options are covered in [Testing](./testing.md).
</description>
</property>
<property>
<name>fs.s3a.connection.maximum</name>
<value>96</value>
<description>Controls the maximum number of simultaneous connections to S3.
This must be bigger than the value of fs.s3a.threads.max so as to stop
threads being blocked waiting for new HTTPS connections.
Why not equal? The AWS SDK transfer manager also uses these connections.
</description>
</property>
<property>
<name>fs.s3a.connection.ssl.enabled</name>
<value>true</value>
@ -908,18 +902,6 @@ options are covered in [Testing](./testing.md).
</description>
</property>
<property>
<name>fs.s3a.connection.establish.timeout</name>
<value>5000</value>
<description>Socket connection setup timeout in milliseconds.</description>
</property>
<property>
<name>fs.s3a.connection.timeout</name>
<value>200000</value>
<description>Socket connection timeout in milliseconds.</description>
</property>
<property>
<name>fs.s3a.socket.send.buffer</name>
<value>8192</value>
@ -939,43 +921,6 @@ options are covered in [Testing](./testing.md).
directory listings at a time.</description>
</property>
<property>
<name>fs.s3a.threads.max</name>
<value>64</value>
<description>The total number of threads available in the filesystem for data
uploads *or any other queued filesystem operation*.</description>
</property>
<property>
<name>fs.s3a.threads.keepalivetime</name>
<value>60</value>
<description>Number of seconds a thread can be idle before being
terminated.</description>
</property>
<property>
<name>fs.s3a.max.total.tasks</name>
<value>32</value>
<description>The number of operations which can be queued for execution.
This is in addition to the number of active threads in fs.s3a.threads.max.
</description>
</property>
<property>
<name>fs.s3a.executor.capacity</name>
<value>16</value>
<description>The maximum number of submitted tasks which is a single
operation (e.g. rename(), delete()) may submit simultaneously for
execution -excluding the IO-heavy block uploads, whose capacity
is set in "fs.s3a.fast.upload.active.blocks"
All tasks are submitted to the shared thread pool whose size is
set in "fs.s3a.threads.max"; the value of capacity should be less than that
of the thread pool itself, as the goal is to stop a single operation
from overloading that thread pool.
</description>
</property>
<property>
<name>fs.s3a.multipart.size</name>
<value>64M</value>
@ -2232,43 +2177,33 @@ from VMs running on EC2.
</description>
</property>
<property>
<name>fs.s3a.threads.max</name>
<value>10</value>
<description>The total number of threads available in the filesystem for data
uploads *or any other queued filesystem operation*.</description>
</property>
<property>
<name>fs.s3a.max.total.tasks</name>
<value>5</value>
<description>The number of operations which can be queued for execution</description>
</property>
<property>
<name>fs.s3a.threads.keepalivetime</name>
<value>60</value>
<description>Number of seconds a thread can be idle before being
terminated.</description>
</property>
```
### <a name="multipart_purge"></a>Cleaning up after partial Upload Failures
There are two mechanisms for cleaning up after leftover multipart
There are four mechanisms for cleaning up after leftover multipart
uploads:
- AWS Lifecycle rules. This is the simplest and SHOULD be used unless there
are are good reasons, such as the store not supporting lifecycle rules.
- Hadoop s3guard CLI commands for listing and deleting uploads by their
age. Documented in the [S3Guard](./s3guard.html) section.
- Setting `fs.s3a.directory.operations.purge.uploads` to `true` for automatic
scan and delete during directory rename and delete
- The configuration parameter `fs.s3a.multipart.purge`, covered below.
If a large stream write operation is interrupted, there may be
intermediate partitions uploaded to S3 —data which will be billed for.
If an S3A committer job is halted partway through, again, there may be
many incomplete multipart uploads in the output directory.
These charges can be reduced by enabling `fs.s3a.multipart.purge`,
and setting a purge time in seconds, such as 86400 seconds —24 hours.
and setting a purge time in seconds, such as 24 hours.
When an S3A FileSystem instance is instantiated with the purge time greater
than zero, it will, on startup, delete all outstanding partition requests
older than this time.
older than this time. However, this makes filesystem instantiate slow, especially
against very large buckets, as a full scan is made.
Consider avoiding this in future.
```xml
<property>
@ -2280,7 +2215,7 @@ older than this time.
<property>
<name>fs.s3a.multipart.purge.age</name>
<value>86400</value>
<value>24h</value>
<description>Minimum age in seconds of multipart uploads to purge</description>
</property>
```

View File

@ -196,40 +196,93 @@ Fix: Use one of the dedicated [S3A Committers](committers.md).
## <a name="tuning"></a> Options to Tune
### <a name="pooling"></a> Thread and connection pool sizes.
### <a name="pooling"></a> Thread and connection pool settings.
Each S3A client interacting with a single bucket, as a single user, has its
own dedicated pool of open HTTP 1.1 connections alongside a pool of threads used
for upload and copy operations.
own dedicated pool of open HTTP connections alongside a pool of threads used
for background/parallel operations in addition to the worker threads of the
actual application.
The default pool sizes are intended to strike a balance between performance
and memory/thread use.
You can have a larger pool of (reused) HTTP connections and threads
for parallel IO (especially uploads) by setting the properties
for parallel IO (especially uploads, prefetching and vector reads) by setting the appropriate
properties. Note: S3A Connectors have their own thread pools for job commit, but
everything uses the same HTTP connection pool.
| Property | Default | Meaning |
|--------------------------------|---------|------------------------------------------------------------------|
| `fs.s3a.threads.max` | `96` | Threads in the thread pool |
| `fs.s3a.threads.keepalivetime` | `60s` | Expiry time for idle threads in the thread pool |
| `fs.s3a.executor.capacity` | `16` | Maximum threads for any single operation |
| `fs.s3a.max.total.tasks` | `16` | Extra tasks which can be queued excluding prefetching operations |
| property | meaning | default |
|----------|---------|---------|
| `fs.s3a.threads.max`| Threads in the AWS transfer manager| 10 |
| `fs.s3a.connection.maximum`| Maximum number of HTTP connections | 10|
Network timeout options can be tuned to make the client fail faster *or* retry more.
The choice is yours. Generally recovery is better, but sometimes fail-fast is more useful.
We recommend using larger values for processes which perform
a lot of IO: `DistCp`, Spark Workers and similar.
```xml
<property>
<name>fs.s3a.threads.max</name>
<value>20</value>
</property>
<property>
<name>fs.s3a.connection.maximum</name>
<value>20</value>
</property>
```
| Property | Default | V2 | Meaning |
|-----------------------------------------|---------|:----|-------------------------------------------------------|
| `fs.s3a.connection.maximum` | `200` | | Connection pool size |
| `fs.s3a.connection.keepalive` | `false` | `*` | Use TCP keepalive on open channels |
| `fs.s3a.connection.acquisition.timeout` | `60s` | `*` | Timeout for waiting for a connection from the pool. |
| `fs.s3a.connection.establish.timeout` | `30s` | | Time to establish the TCP/TLS connection |
| `fs.s3a.connection.idle.time` | `60s` | `*` | Maximum time for idle HTTP connections in the pool |
| `fs.s3a.connection.request.timeout` | `0` | | If greater than zero, maximum duration of any request |
| `fs.s3a.connection.timeout` | `200s` | | Timeout for socket problems on a TCP channel |
| `fs.s3a.connection.ttl` | `5m` | | Lifetime of HTTP connections from the pool |
Units:
1. The default unit for all these options except for `fs.s3a.threads.keepalivetime` is milliseconds, unless a time suffix is declared.
2. Versions of Hadoop built with the AWS V1 SDK *only* support milliseconds rather than suffix values.
If configurations are intended to apply across hadoop releases, you MUST use milliseconds without a suffix.
3. `fs.s3a.threads.keepalivetime` has a default unit of seconds on all hadoop releases.
4. Options flagged as "V2" are new with the AWS V2 SDK; they are ignored on V1 releases.
---
There are some hard tuning decisions related to pool size and expiry.
As servers add more cores and services add many more worker threads, a larger pool size is more and more important:
the default values in `core-default.xml` have been slowly increased over time but should be treated as
"the best", simply what is considered a good starting case.
With Vectored IO adding multiple GET requests per Spark/Hive worker thread,
and stream prefetching performing background block prefetch, larger pool and thread sizes are even more important.
In large hive deployments, thread and connection pools of thousands have been known to have been set.
Small pool: small value in `fs.s3a.connection.maximum`.
* Keeps network/memory cost of having many S3A instances in the same process low.
* But: limit on how many connections can be open at at a time.
* Large Pool. More HTTP connections can be created and kept, but cost of keeping network connections increases
unless idle time is reduced through `fs.s3a.connection.idle.time`.
If exceptions are raised with about timeouts acquiring connections from the pool, this can be a symptom of
* Heavy load. Increase pool size and acquisition timeout `fs.s3a.connection.acquisition.timeout`
* Process failing to close open input streams from the S3 store.
Fix: Find uses of `open()`/`openFile()` and make sure that the streams are being `close()d`
*Retirement of HTTP Connections.*
Connections are retired from the pool by `fs.s3a.connection.idle.time`, the maximum time for idle connections,
and `fs.s3a.connection.ttl`, the maximum life of any connection in the pool, even if it repeatedly reused.
Limiting idle time saves on network connections, at the cost of requiring new connections on subsequent S3 operations.
Limiting connection TTL is useful to spread across load balancers and recover from some network
connection problems, including those caused by proxies.
*Request timeout*: `fs.s3a.connection.request.timeout`
If set, this sets an upper limit on any non-streaming API call (i.e. everything but `GET`).
A timeout is good to detect and recover from failures.
However, it also sets a limit on the duration of a POST/PUT of data
-which, if after a timeout, will only be repeated, ultimately to failure.
Be aware, however, that processes which perform many parallel queries
may consume large amounts of resources if each query is working with
a different set of s3 buckets, or are acting on behalf of different users.
### For large data uploads, tune the block size: `fs.s3a.block.size`
@ -327,18 +380,6 @@ efficient in terms of HTTP connection use, and reduce the IOP rate against
the S3 bucket/shard.
```xml
<property>
<name>fs.s3a.threads.max</name>
<value>20</value>
</property>
<property>
<name>fs.s3a.connection.maximum</name>
<value>30</value>
<descriptiom>
Make greater than both fs.s3a.threads.max and -numListstatusThreads
</descriptiom>
</property>
<property>
<name>fs.s3a.experimental.input.fadvise</name>

View File

@ -24,6 +24,7 @@
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
import static org.apache.hadoop.fs.s3a.audit.AuditIntegration.maybeTranslateAuditException;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.*;
import static org.apache.hadoop.test.LambdaTestUtils.verifyCause;
import static org.junit.Assert.*;
import java.io.EOFException;
@ -37,6 +38,8 @@
import org.assertj.core.api.Assertions;
import software.amazon.awssdk.awscore.exception.AwsErrorDetails;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.core.exception.ApiCallAttemptTimeoutException;
import software.amazon.awssdk.core.exception.ApiCallTimeoutException;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.http.SdkHttpResponse;
@ -44,11 +47,12 @@
import org.junit.Test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.s3a.api.UnsupportedRequestException;
import org.apache.hadoop.fs.s3a.audit.AuditFailureException;
import org.apache.hadoop.fs.s3a.audit.AuditOperationRejectedException;
import org.apache.hadoop.fs.s3a.impl.ErrorTranslation;
import org.apache.hadoop.io.retry.RetryPolicy;
import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
@ -330,4 +334,51 @@ public void testTranslateNonAuditException() throws Throwable {
.isNull();
}
/**
* 504 gateway timeout is translated to a {@link AWSApiCallTimeoutException}.
*/
@Test
public void test504ToTimeout() throws Throwable {
AWSApiCallTimeoutException ex =
verifyExceptionClass(AWSApiCallTimeoutException.class,
translateException("test", "/", createS3Exception(504)));
verifyCause(S3Exception.class, ex);
}
/**
* SDK ApiCallTimeoutException is translated to a
* {@link AWSApiCallTimeoutException}.
*/
@Test
public void testApiCallTimeoutExceptionToTimeout() throws Throwable {
AWSApiCallTimeoutException ex =
verifyExceptionClass(AWSApiCallTimeoutException.class,
translateException("test", "/",
ApiCallTimeoutException.builder()
.message("timeout")
.build()));
verifyCause(ApiCallTimeoutException.class, ex);
}
/**
* SDK ApiCallAttemptTimeoutException is translated to a
* {@link AWSApiCallTimeoutException}.
*/
@Test
public void testApiCallAttemptTimeoutExceptionToTimeout() throws Throwable {
AWSApiCallTimeoutException ex =
verifyExceptionClass(AWSApiCallTimeoutException.class,
translateException("test", "/",
ApiCallAttemptTimeoutException.builder()
.message("timeout")
.build()));
verifyCause(ApiCallAttemptTimeoutException.class, ex);
// and confirm these timeouts are retried.
final S3ARetryPolicy retryPolicy = new S3ARetryPolicy(new Configuration(false));
Assertions.assertThat(retryPolicy.shouldRetry(ex, 0, 0, true).action)
.describedAs("retry policy for exception %s", ex)
.isEqualTo(RetryPolicy.RetryAction.RetryDecision.RETRY);
}
}

View File

@ -0,0 +1,149 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.impl;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import org.junit.Test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.ConnectTimeoutException;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_ACQUISITION_TIMEOUT;
import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_IDLE_TIME;
import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_TTL;
import static org.apache.hadoop.fs.s3a.Constants.ESTABLISH_TIMEOUT;
import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE;
import static org.apache.hadoop.fs.s3a.Constants.MAXIMUM_CONNECTIONS;
import static org.apache.hadoop.fs.s3a.Constants.MAX_ERROR_RETRIES;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
import static org.apache.hadoop.fs.s3a.Constants.REQUEST_TIMEOUT;
import static org.apache.hadoop.fs.s3a.Constants.RETRY_LIMIT;
import static org.apache.hadoop.fs.s3a.Constants.SOCKET_TIMEOUT;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
import static org.apache.hadoop.fs.s3a.impl.ConfigurationHelper.setDurationAsMillis;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
/**
* Test for timeout generation/handling, especially those related to connection
* pools.
* This test has proven a bit brittle to parallel test runs, so test cases should
* create their own FS instances.
* The likely cause is actually -Dprefetch test runs as these return connections to
* the pool.
* However, it is also important to have a non-brittle FS for creating the test file
* and teardow, again, this makes for a flaky test..
*/
public class ITestConnectionTimeouts extends AbstractS3ATestBase {
/**
* How big a file to create?
*/
public static final int FILE_SIZE = 1024;
/**
* Create a configuration for an FS which has timeouts set to very low values
* and no retries.
* @return a configuration to use for the brittle FS.
*/
private Configuration timingOutConfiguration() {
Configuration conf = new Configuration(getConfiguration());
removeBaseAndBucketOverrides(conf,
CONNECTION_TTL,
CONNECTION_ACQUISITION_TIMEOUT,
CONNECTION_IDLE_TIME,
ESTABLISH_TIMEOUT,
MAX_ERROR_RETRIES,
MAXIMUM_CONNECTIONS,
PREFETCH_ENABLED_KEY,
REQUEST_TIMEOUT,
SOCKET_TIMEOUT,
FS_S3A_CREATE_PERFORMANCE
);
// only one connection is allowed, and the establish timeout is low
conf.setInt(MAXIMUM_CONNECTIONS, 1);
conf.setInt(MAX_ERROR_RETRIES, 0);
// needed to ensure that streams are kept open.
// without this the tests is unreliable in batch runs.
conf.setBoolean(PREFETCH_ENABLED_KEY, false);
conf.setInt(RETRY_LIMIT, 0);
conf.setBoolean(FS_S3A_CREATE_PERFORMANCE, true);
final Duration ms10 = Duration.ofMillis(10);
setDurationAsMillis(conf, CONNECTION_ACQUISITION_TIMEOUT, ms10);
setDurationAsMillis(conf, ESTABLISH_TIMEOUT, ms10);
return conf;
}
@Override
public void teardown() throws Exception {
AWSClientConfig.resetMinimumOperationDuration();
super.teardown();
}
/**
* Use up the connection pool and expect the failure to be handled.
*/
@Test
public void testGeneratePoolTimeouts() throws Throwable {
byte[] data = dataset(FILE_SIZE, '0', 10);
AWSClientConfig.setMinimumOperationDuration(Duration.ZERO);
Configuration conf = timingOutConfiguration();
Path path = methodPath();
int streamsToCreate = 100;
List<FSDataInputStream> streams = new ArrayList<>(streamsToCreate);
final S3AFileSystem fs = getFileSystem();
// create the test file using the good fs, to avoid connection timeouts
// during setup.
ContractTestUtils.createFile(fs, path, true, data);
final FileStatus st = fs.getFileStatus(path);
try (FileSystem brittleFS = FileSystem.newInstance(fs.getUri(), conf)) {
intercept(ConnectTimeoutException.class, () -> {
for (int i = 0; i < streamsToCreate; i++) {
FutureDataInputStreamBuilder b = brittleFS.openFile(path);
b.withFileStatus(st);
b.opt(FS_OPTION_OPENFILE_READ_POLICY, FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE);
final FSDataInputStream in = b.build().get();
streams.add(in);
// kick off the read so forcing a GET.
in.read();
}
});
} finally {
streams.forEach(s -> {
IOUtils.cleanupWithLogger(LOG, s);
});
}
}
}

View File

@ -0,0 +1,188 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.impl;
import java.time.Duration;
import java.util.Arrays;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.test.AbstractHadoopTestBase;
import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_ACQUISITION_TIMEOUT;
import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_IDLE_TIME;
import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_KEEPALIVE;
import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_TTL;
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_CONNECTION_ACQUISITION_TIMEOUT_DURATION;
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_CONNECTION_IDLE_TIME_DURATION;
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_CONNECTION_KEEPALIVE;
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_CONNECTION_TTL_DURATION;
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_ESTABLISH_TIMEOUT_DURATION;
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_MAXIMUM_CONNECTIONS;
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_SOCKET_TIMEOUT_DURATION;
import static org.apache.hadoop.fs.s3a.Constants.ESTABLISH_TIMEOUT;
import static org.apache.hadoop.fs.s3a.Constants.MAXIMUM_CONNECTIONS;
import static org.apache.hadoop.fs.s3a.Constants.MINIMUM_NETWORK_OPERATION_DURATION;
import static org.apache.hadoop.fs.s3a.Constants.REQUEST_TIMEOUT;
import static org.apache.hadoop.fs.s3a.Constants.SOCKET_TIMEOUT;
import static org.apache.hadoop.fs.s3a.impl.AWSClientConfig.createApiConnectionSettings;
import static org.apache.hadoop.fs.s3a.impl.AWSClientConfig.createConnectionSettings;
import static org.apache.hadoop.fs.s3a.impl.ConfigurationHelper.enforceMinimumDuration;
/**
* Unit tests for {@link AWSClientConfig}.
* These may play with the config timeout settings, so reset the timeouts
* during teardown.
* For isolation from any site settings, the tests create configurations
* without loading of defaut/site XML files.
*/
public class TestAwsClientConfig extends AbstractHadoopTestBase {
private static final Logger LOG = LoggerFactory.getLogger(TestAwsClientConfig.class);
@After
public void teardown() throws Exception {
AWSClientConfig.resetMinimumOperationDuration();
}
/**
* Create a new empty configuration.
* @return configuration.
*/
private Configuration conf() {
return new Configuration(false);
}
/**
* Innermost duration enforcement, which is not applied if
* the minimum value is null.
*/
@Test
public void testEnforceMinDuration() {
final Duration s10 = Duration.ofSeconds(10);
final Duration s1 = Duration.ofSeconds(1);
Assertions.assertThat(enforceMinimumDuration("key", s1, s10))
.describedAs("10s")
.isEqualTo(s10);
// and a null check
Assertions.assertThat(enforceMinimumDuration("key",
s1, null))
.describedAs("10s")
.isEqualTo(s1);
}
/**
* When loading a connection settings from an empty configuration, the
* correct default values are loaded.
*/
@Test
public void testLoadUnsetValues() {
final AWSClientConfig.ConnectionSettings conn = createConnectionSettings(conf());
assertDuration(CONNECTION_ACQUISITION_TIMEOUT, DEFAULT_CONNECTION_ACQUISITION_TIMEOUT_DURATION,
conn.getAcquisitionTimeout());
assertDuration(CONNECTION_TTL, DEFAULT_CONNECTION_TTL_DURATION,
conn.getConnectionTTL());
assertDuration(CONNECTION_IDLE_TIME, DEFAULT_CONNECTION_IDLE_TIME_DURATION,
conn.getMaxIdleTime());
assertDuration(ESTABLISH_TIMEOUT, DEFAULT_ESTABLISH_TIMEOUT_DURATION,
conn.getEstablishTimeout());
assertDuration(SOCKET_TIMEOUT, DEFAULT_SOCKET_TIMEOUT_DURATION,
conn.getSocketTimeout());
Assertions.assertThat(conn.getMaxConnections())
.describedAs(MAXIMUM_CONNECTIONS)
.isEqualTo(DEFAULT_MAXIMUM_CONNECTIONS);
Assertions.assertThat(conn.isKeepAlive())
.describedAs(CONNECTION_KEEPALIVE)
.isEqualTo(DEFAULT_CONNECTION_KEEPALIVE);
}
/**
* If we set a minimum duration that is bigger than the configured value,
* the minimum value wins.
* Some options have a minimum value of zero.
*/
@Test
public void testMinimumDurationWins() {
final Configuration conf = conf();
setOptionsToValue("1s", conf,
CONNECTION_ACQUISITION_TIMEOUT,
CONNECTION_TTL,
CONNECTION_IDLE_TIME,
ESTABLISH_TIMEOUT,
SOCKET_TIMEOUT);
final AWSClientConfig.ConnectionSettings conn = createConnectionSettings(conf);
LOG.info("Connection settings: {}", conn);
assertDuration(CONNECTION_ACQUISITION_TIMEOUT, MINIMUM_NETWORK_OPERATION_DURATION,
conn.getAcquisitionTimeout());
assertDuration(ESTABLISH_TIMEOUT, MINIMUM_NETWORK_OPERATION_DURATION,
conn.getEstablishTimeout());
assertDuration(SOCKET_TIMEOUT, MINIMUM_NETWORK_OPERATION_DURATION,
conn.getSocketTimeout());
// those options with a minimum of zero
final Duration s1 = Duration.ofSeconds(1);
assertDuration(CONNECTION_TTL, s1, conn.getConnectionTTL());
assertDuration(CONNECTION_IDLE_TIME, s1, conn.getMaxIdleTime());
}
/**
* Assert that a a duration has the expected value.
* @param name option name for assertion text
* @param expected expected duration
* @param actual actual duration
*/
private void assertDuration(String name, Duration expected, Duration actual) {
Assertions.assertThat(actual)
.describedAs("Duration of %s", name)
.isEqualTo(expected);
}
/**
* Test {@link AWSClientConfig#createApiConnectionSettings(Configuration)}.
*/
@Test
public void testCreateApiConnectionSettings() {
final Configuration conf = conf();
conf.set(REQUEST_TIMEOUT, "1h");
final AWSClientConfig.ClientSettings settings =
createApiConnectionSettings(conf);
Assertions.assertThat(settings.getApiCallTimeout())
.describedAs("%s in %s", REQUEST_TIMEOUT, settings)
.isEqualTo(Duration.ofHours(1));
}
/**
* Set a list of keys to the same value.
* @param value value to set
* @param conf configuration to patch
* @param keys keys
*/
private void setOptionsToValue(String value, Configuration conf, String... keys) {
Arrays.stream(keys).forEach(key -> conf.set(key, value));
}
}

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.fs.s3a.performance;
import java.io.IOException;
import java.time.Duration;
import org.assertj.core.api.Assertions;
import org.junit.Test;
@ -34,6 +35,7 @@
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3AInputPolicy;
import org.apache.hadoop.fs.s3a.S3AInputStream;
import org.apache.hadoop.fs.s3a.impl.AWSClientConfig;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.io.IOUtils;
@ -51,6 +53,7 @@
import static org.apache.hadoop.fs.s3a.Constants.RETRY_LIMIT;
import static org.apache.hadoop.fs.s3a.Constants.SOCKET_TIMEOUT;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
import static org.apache.hadoop.fs.s3a.impl.ConfigurationHelper.setDurationAsSeconds;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.retrieveIOStatistics;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ABORTED;
@ -119,16 +122,23 @@ public void setup() throws Exception {
// now create a new FS with minimal http capacity and recovery
// a separate one is used to avoid test teardown suffering
// from the lack of http connections and short timeouts.
try {
// allow small durations.
AWSClientConfig.setMinimumOperationDuration(Duration.ZERO);
Configuration conf = getConfiguration();
// kick off async drain for any data
conf.setInt(ASYNC_DRAIN_THRESHOLD, 1);
conf.setInt(MAXIMUM_CONNECTIONS, 2);
conf.setInt(MAX_ERROR_RETRIES, 1);
conf.setInt(ESTABLISH_TIMEOUT, 1000);
conf.setInt(READAHEAD_RANGE, READAHEAD);
conf.setInt(RETRY_LIMIT, 1);
setDurationAsSeconds(conf, ESTABLISH_TIMEOUT,
Duration.ofSeconds(1));
brittleFS = FileSystem.newInstance(getFileSystem().getUri(), conf);
} finally {
AWSClientConfig.resetMinimumOperationDuration();
}
}
@Override