+ * Timeout default values are declared as integers or long values in milliseconds and + * occasionally seconds. + * There are now {@code Duration} constants for these default values; the original + * fields are retained for compatibility, and derive their value from the Duration equivalent. + *
+ * New timeout/duration constants do not get the equivalent integer/long fields. */ @InterfaceAudience.Public @InterfaceStability.Evolving @@ -144,25 +152,47 @@ private Constants() { SimpleAWSCredentialsProvider.NAME; - // the maximum number of tasks cached if all threads are already uploading + /** + * The maximum number of tasks queued (other than prefetcher tasks) if all threads are + * busy: {@value}. + */ public static final String MAX_TOTAL_TASKS = "fs.s3a.max.total.tasks"; + /** + * Default value for {@link #MAX_TOTAL_TASKS}: {@value}. + */ public static final int DEFAULT_MAX_TOTAL_TASKS = 32; - // number of simultaneous connections to s3 + /** + * Number of simultaneous connections to S3: {@value}. + */ public static final String MAXIMUM_CONNECTIONS = "fs.s3a.connection.maximum"; - public static final int DEFAULT_MAXIMUM_CONNECTIONS = 96; + + /** + * Default value for {@link #MAXIMUM_CONNECTIONS}: {@value}. + * Future releases are likely to increase this value. + * Keep in sync with the value in {@code core-default.xml} + */ + public static final int DEFAULT_MAXIMUM_CONNECTIONS = 200; /** * Configuration option to configure expiration time of - * s3 http connection from the connection pool in milliseconds: {@value}. + * S3 http connection from the connection pool: {@value}. */ public static final String CONNECTION_TTL = "fs.s3a.connection.ttl"; /** - * Default value for {@code CONNECTION_TTL}: {@value}. + * Default duration for {@link #CONNECTION_TTL}: 5 minutes. */ - public static final long DEFAULT_CONNECTION_TTL = 5 * 60_000; + public static final Duration DEFAULT_CONNECTION_TTL_DURATION = + Duration.ofMinutes(5); + + /** + * Default value in millis for {@link #CONNECTION_TTL}: 5 minutes. + * @deprecated use {@link #DEFAULT_CONNECTION_TTL_DURATION} + */ + public static final long DEFAULT_CONNECTION_TTL = + DEFAULT_CONNECTION_TTL_DURATION.toMillis(); // connect to s3 over ssl? public static final String SECURE_CONNECTIONS = @@ -264,19 +294,111 @@ private Constants() { public static final boolean EXPERIMENTAL_AWS_INTERNAL_THROTTLING_DEFAULT = true; - // milliseconds until we give up trying to establish a connection to s3 + /** + * This is the minimum operation duration unless programmatically set. + * It ensures that even if a configuration has mistaken a millisecond + * option for seconds, a viable duration will actually be used. + * Value: 15s. + */ + public static final Duration MINIMUM_NETWORK_OPERATION_DURATION = Duration.ofSeconds(15); + + /** + * Milliseconds until a connection is established: {@value}. + */ public static final String ESTABLISH_TIMEOUT = "fs.s3a.connection.establish.timeout"; - public static final int DEFAULT_ESTABLISH_TIMEOUT = 5000; - // milliseconds until we give up on a connection to s3 + /** + * Default TCP/(and TLS?) establish timeout: 30 seconds. + */ + public static final Duration DEFAULT_ESTABLISH_TIMEOUT_DURATION = Duration.ofSeconds(30); + + /** + * Default establish timeout in millis: 30 seconds. + * @deprecated use {@link #DEFAULT_ESTABLISH_TIMEOUT_DURATION} + */ + public static final int DEFAULT_ESTABLISH_TIMEOUT = + (int)DEFAULT_ESTABLISH_TIMEOUT_DURATION.toMillis(); + + /** + * Milliseconds until we give up on a connection to s3: {@value}. + */ public static final String SOCKET_TIMEOUT = "fs.s3a.connection.timeout"; - public static final int DEFAULT_SOCKET_TIMEOUT = 200000; - // milliseconds until a request is timed-out + /** + * Default socket timeout: 200 seconds. + */ + public static final Duration DEFAULT_SOCKET_TIMEOUT_DURATION = Duration.ofSeconds(200); + + /** + * Default socket timeout: {@link #DEFAULT_SOCKET_TIMEOUT_DURATION}. + * @deprecated use {@link #DEFAULT_SOCKET_TIMEOUT_DURATION} + */ + public static final int DEFAULT_SOCKET_TIMEOUT = (int)DEFAULT_SOCKET_TIMEOUT_DURATION.toMillis(); + + /** + * Time until a request is timed-out: {@value}. + * If zero, there is no timeout. + */ public static final String REQUEST_TIMEOUT = "fs.s3a.connection.request.timeout"; - public static final int DEFAULT_REQUEST_TIMEOUT = 0; + + /** + * Default duration of a request before it is timed out: Zero. + */ + public static final Duration DEFAULT_REQUEST_TIMEOUT_DURATION = Duration.ZERO; + + /** + * Default duration of a request before it is timed out: Zero. + * @deprecated use {@link #DEFAULT_REQUEST_TIMEOUT_DURATION} + */ + public static final int DEFAULT_REQUEST_TIMEOUT = + (int)DEFAULT_REQUEST_TIMEOUT_DURATION.toMillis(); + + /** + * Acquisition timeout for connections from the pool: + * {@value}. + * Default unit is milliseconds for consistency with other options. + */ + public static final String CONNECTION_ACQUISITION_TIMEOUT = + "fs.s3a.connection.acquisition.timeout"; + + /** + * Default acquisition timeout: 60 seconds. + */ + public static final Duration DEFAULT_CONNECTION_ACQUISITION_TIMEOUT_DURATION = + Duration.ofSeconds(60); + + /** + * Should TCP Keepalive be enabled on the socket? + * This adds some network IO, but finds failures faster. + * {@value}. + */ + public static final String CONNECTION_KEEPALIVE = + "fs.s3a.connection.keepalive"; + + /** + * Default value of {@link #CONNECTION_KEEPALIVE}: {@value}. + */ + public static final boolean DEFAULT_CONNECTION_KEEPALIVE = false; + + /** + * Maximum idle time for connections in the pool: {@value}. + *
+ * Too low: overhead of creating connections. + * Too high, risk of stale connections and inability to use the + * adaptive load balancing of the S3 front end. + *
+ * Default unit is milliseconds for consistency with other options.
+ */
+ public static final String CONNECTION_IDLE_TIME =
+ "fs.s3a.connection.idle.time";
+
+ /**
+ * Default idle time: 60 seconds.
+ */
+ public static final Duration DEFAULT_CONNECTION_IDLE_TIME_DURATION =
+ Duration.ofSeconds(60);
// socket send buffer to be used in Amazon client
public static final String SOCKET_SEND_BUFFER = "fs.s3a.socket.send.buffer";
@@ -290,13 +412,34 @@ private Constants() {
public static final String MAX_PAGING_KEYS = "fs.s3a.paging.maximum";
public static final int DEFAULT_MAX_PAGING_KEYS = 5000;
- // the maximum number of threads to allow in the pool used by TransferManager
+ /**
+ * The maximum number of threads to allow in the pool used by S3A.
+ * Value: {@value}.
+ */
public static final String MAX_THREADS = "fs.s3a.threads.max";
- public static final int DEFAULT_MAX_THREADS = 10;
- // the time an idle thread waits before terminating
+ /**
+ * Default value of {@link #MAX_THREADS}: {@value}.
+ */
+ public static final int DEFAULT_MAX_THREADS = 96;
+
+ /**
+ * The time an idle thread waits before terminating: {@value}.
+ * This is in SECONDS unless the optional unit is given.
+ */
public static final String KEEPALIVE_TIME = "fs.s3a.threads.keepalivetime";
- public static final int DEFAULT_KEEPALIVE_TIME = 60;
+
+ /**
+ * Default value of {@link #KEEPALIVE_TIME}: 60s.
+ */
+ public static final Duration DEFAULT_KEEPALIVE_TIME_DURATION = Duration.ofSeconds(60);
+
+ /**
+ * Default value of {@link #KEEPALIVE_TIME}: 60s.
+ * @deprecated use {@link #DEFAULT_KEEPALIVE_TIME_DURATION}
+ */
+ public static final int DEFAULT_KEEPALIVE_TIME =
+ (int)DEFAULT_KEEPALIVE_TIME_DURATION.getSeconds();
// size of each of or multipart pieces in bytes
public static final String MULTIPART_SIZE = "fs.s3a.multipart.size";
@@ -501,10 +644,17 @@ private Constants() {
"fs.s3a.multipart.purge";
public static final boolean DEFAULT_PURGE_EXISTING_MULTIPART = false;
- // purge any multipart uploads older than this number of seconds
+ /**
+ * purge any multipart uploads older than this number of seconds.
+ */
public static final String PURGE_EXISTING_MULTIPART_AGE =
"fs.s3a.multipart.purge.age";
- public static final long DEFAULT_PURGE_EXISTING_MULTIPART_AGE = 86400;
+
+ /**
+ * Default Age.
+ */
+ public static final long DEFAULT_PURGE_EXISTING_MULTIPART_AGE =
+ Duration.ofDays(1).getSeconds();
/**
* s3 server-side encryption, see
@@ -1201,8 +1351,7 @@ private Constants() {
* Default value for create performance in an S3A FS.
* Value {@value}.
*/
- public static final boolean FS_S3A_CREATE_PERFORMANCE_DEFAULT = true;
-
+ public static final boolean FS_S3A_CREATE_PERFORMANCE_DEFAULT = false;
/**
* Capability to indicate that the FS has been instantiated with
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index b978707ce0..96c4f6268b 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -28,6 +28,7 @@
import java.nio.file.AccessDeniedException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
+import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
@@ -122,6 +123,7 @@
import org.apache.hadoop.fs.s3a.impl.AWSHeaders;
import org.apache.hadoop.fs.s3a.impl.BulkDeleteRetryHandler;
import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
+import org.apache.hadoop.fs.s3a.impl.ConfigurationHelper;
import org.apache.hadoop.fs.s3a.impl.ContextAccessors;
import org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation;
import org.apache.hadoop.fs.s3a.impl.CreateFileBuilder;
@@ -828,8 +830,13 @@ private void initThreadPools(Configuration conf) {
}
int totalTasks = intOption(conf,
MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS, 1);
- long keepAliveTime = longOption(conf, KEEPALIVE_TIME,
- DEFAULT_KEEPALIVE_TIME, 0);
+ // keepalive time takes a time suffix; default unit is seconds
+ long keepAliveTime = ConfigurationHelper.getDuration(conf,
+ KEEPALIVE_TIME,
+ Duration.ofSeconds(DEFAULT_KEEPALIVE_TIME),
+ TimeUnit.SECONDS,
+ Duration.ZERO).getSeconds();
+
int numPrefetchThreads = this.prefetchEnabled ? this.prefetchBlockCount : 0;
int activeTasksForBoundedThreadPool = maxThreads;
@@ -1226,12 +1233,15 @@ private void initCannedAcls(Configuration conf) {
private void initMultipartUploads(Configuration conf) throws IOException {
boolean purgeExistingMultipart = conf.getBoolean(PURGE_EXISTING_MULTIPART,
DEFAULT_PURGE_EXISTING_MULTIPART);
- long purgeExistingMultipartAge = longOption(conf,
- PURGE_EXISTING_MULTIPART_AGE, DEFAULT_PURGE_EXISTING_MULTIPART_AGE, 0);
if (purgeExistingMultipart) {
try {
- abortOutstandingMultipartUploads(purgeExistingMultipartAge);
+ Duration purgeDuration = ConfigurationHelper.getDuration(conf,
+ PURGE_EXISTING_MULTIPART_AGE,
+ Duration.ofSeconds(DEFAULT_PURGE_EXISTING_MULTIPART_AGE),
+ TimeUnit.SECONDS,
+ Duration.ZERO);
+ abortOutstandingMultipartUploads(purgeDuration.getSeconds());
} catch (AccessDeniedException e) {
instrumentation.errorIgnored();
LOG.debug("Failed to purge multipart uploads against {}," +
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java
index cc30da4fbe..9438ac22bd 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java
@@ -201,6 +201,9 @@ protected Map
+ * See {@code software.amazon.awssdk.http.SdkHttpConfigurationOption}
+ * for the default values.
*/
public final class AWSClientConfig {
+
private static final Logger LOG = LoggerFactory.getLogger(AWSClientConfig.class);
+ /**
+ * The minimum operation duration.
+ */
+ private static Duration minimumOperationDuration = MINIMUM_NETWORK_OPERATION_DURATION;
+
+
private AWSClientConfig() {
}
+ /**
+ * Create the config for a given service...the service identifier is used
+ * to determine signature implementation.
+ * @param conf configuration
+ * @param awsServiceIdentifier service
+ * @return the builder inited with signer, timeouts and UA.
+ * @throws IOException failure.
+ */
public static ClientOverrideConfiguration.Builder createClientConfigBuilder(Configuration conf,
String awsServiceIdentifier) throws IOException {
ClientOverrideConfiguration.Builder overrideConfigBuilder =
@@ -99,7 +128,10 @@ public static ClientOverrideConfiguration.Builder createClientConfigBuilder(Conf
}
/**
- * Configures the http client.
+ * Create and configure the http client-based connector with timeouts for:
+ * connection acquisition, max idle, timeout, TTL, socket and keepalive.
+ * SSL channel mode is set up via
+ * {@link NetworkBinding#bindSSLChannelMode(Configuration, ApacheHttpClient.Builder)}.
*
* @param conf The Hadoop configuration
* @return Http client builder
@@ -107,24 +139,17 @@ public static ClientOverrideConfiguration.Builder createClientConfigBuilder(Conf
*/
public static ApacheHttpClient.Builder createHttpClientBuilder(Configuration conf)
throws IOException {
+ final ConnectionSettings conn = createConnectionSettings(conf);
ApacheHttpClient.Builder httpClientBuilder =
- ApacheHttpClient.builder();
-
- httpClientBuilder.maxConnections(S3AUtils.intOption(conf, MAXIMUM_CONNECTIONS,
- DEFAULT_MAXIMUM_CONNECTIONS, 1));
-
- int connectionEstablishTimeout =
- S3AUtils.intOption(conf, ESTABLISH_TIMEOUT, DEFAULT_ESTABLISH_TIMEOUT, 0);
- int socketTimeout = S3AUtils.intOption(conf, SOCKET_TIMEOUT, DEFAULT_SOCKET_TIMEOUT, 0);
-
- httpClientBuilder.connectionTimeout(Duration.ofMillis(connectionEstablishTimeout));
- httpClientBuilder.socketTimeout(Duration.ofMillis(socketTimeout));
-
- // set the connection TTL irrespective of whether the connection is in use or not.
- // this can balance requests over different S3 servers, and avoid failed
- // connections. See HADOOP-18845.
- long ttl = longOption(conf, CONNECTION_TTL, DEFAULT_CONNECTION_TTL, -1);
- httpClientBuilder.connectionTimeToLive(Duration.ofMillis(ttl));
+ ApacheHttpClient.builder()
+ .connectionAcquisitionTimeout(conn.getAcquisitionTimeout())
+ .connectionMaxIdleTime(conn.getMaxIdleTime())
+ .connectionTimeout(conn.getEstablishTimeout())
+ .connectionTimeToLive(conn.getConnectionTTL())
+ .maxConnections(conn.getMaxConnections())
+ .socketTimeout(conn.getSocketTimeout())
+ .tcpKeepAlive(conn.isKeepAlive())
+ .useIdleConnectionReaper(true); // true by default in the SDK
NetworkBinding.bindSSLChannelMode(conf, httpClientBuilder);
@@ -132,31 +157,26 @@ public static ApacheHttpClient.Builder createHttpClientBuilder(Configuration con
}
/**
- * Configures the async http client.
- *
+ * Create and configure the async http client with timeouts for:
+ * connection acquisition, max idle, timeout, TTL, socket and keepalive.
+ * This is netty based and does not allow for the SSL channel mode to be set.
* @param conf The Hadoop configuration
- * @return Http client builder
+ * @return Async Http client builder
*/
public static NettyNioAsyncHttpClient.Builder createAsyncHttpClientBuilder(Configuration conf) {
+ final ConnectionSettings conn = createConnectionSettings(conf);
+
NettyNioAsyncHttpClient.Builder httpClientBuilder =
- NettyNioAsyncHttpClient.builder();
-
- httpClientBuilder.maxConcurrency(S3AUtils.intOption(conf, MAXIMUM_CONNECTIONS,
- DEFAULT_MAXIMUM_CONNECTIONS, 1));
-
- int connectionEstablishTimeout =
- S3AUtils.intOption(conf, ESTABLISH_TIMEOUT, DEFAULT_ESTABLISH_TIMEOUT, 0);
- int socketTimeout = S3AUtils.intOption(conf, SOCKET_TIMEOUT, DEFAULT_SOCKET_TIMEOUT, 0);
-
- httpClientBuilder.connectionTimeout(Duration.ofMillis(connectionEstablishTimeout));
- httpClientBuilder.readTimeout(Duration.ofMillis(socketTimeout));
- httpClientBuilder.writeTimeout(Duration.ofMillis(socketTimeout));
-
- // set the connection TTL irrespective of whether the connection is in use or not.
- // this can balance requests over different S3 servers, and avoid failed
- // connections. See HADOOP-18845.
- long ttl = longOption(conf, CONNECTION_TTL, DEFAULT_CONNECTION_TTL, -1);
- httpClientBuilder.connectionTimeToLive(Duration.ofMillis(ttl));
+ NettyNioAsyncHttpClient.builder()
+ .connectionAcquisitionTimeout(conn.getAcquisitionTimeout())
+ .connectionMaxIdleTime(conn.getMaxIdleTime())
+ .connectionTimeout(conn.getEstablishTimeout())
+ .connectionTimeToLive(conn.getConnectionTTL())
+ .maxConcurrency(conn.getMaxConnections())
+ .readTimeout(conn.getSocketTimeout())
+ .tcpKeepAlive(conn.isKeepAlive())
+ .useIdleConnectionReaper(true) // true by default in the SDK
+ .writeTimeout(conn.getSocketTimeout());
// TODO: Don't think you can set a socket factory for the netty client.
// NetworkBinding.bindSSLChannelMode(conf, awsConf);
@@ -166,13 +186,19 @@ public static NettyNioAsyncHttpClient.Builder createAsyncHttpClientBuilder(Confi
/**
* Configures the retry policy.
+ * Retry policy is {@code RetryMode.ADAPTIVE}, which
+ * "dynamically limits the rate of AWS requests to maximize success rate",
+ * possibly at the expense of latency.
+ * Based on the ABFS experience, it is better to limit the rate requests are
+ * made rather than have to resort to exponential backoff after failures come
+ * in -especially as that backoff is per http connection.
*
* @param conf The Hadoop configuration
* @return Retry policy builder
*/
public static RetryPolicy.Builder createRetryPolicyBuilder(Configuration conf) {
- RetryPolicy.Builder retryPolicyBuilder = RetryPolicy.builder();
+ RetryPolicy.Builder retryPolicyBuilder = RetryPolicy.builder(RetryMode.ADAPTIVE);
retryPolicyBuilder.numRetries(S3AUtils.intOption(conf, MAX_ERROR_RETRIES,
DEFAULT_MAX_ERROR_RETRIES, 0));
@@ -236,11 +262,14 @@ public static ProxyConfiguration createProxyConfiguration(Configuration conf,
}
return proxyConfigBuilder.build();
+
}
/**
* Configures the proxy for the async http client.
- *
+ *
+ * Although this is netty specific, it is part of the AWS SDK public API
+ * and not any shaded netty internal class.
* @param conf The Hadoop configuration
* @param bucket Optional bucket to use to look up per-bucket proxy secrets
* @return Proxy configuration
@@ -248,7 +277,7 @@ public static ProxyConfiguration createProxyConfiguration(Configuration conf,
*/
public static software.amazon.awssdk.http.nio.netty.ProxyConfiguration
createAsyncProxyConfiguration(Configuration conf,
- String bucket) throws IOException {
+ String bucket) throws IOException {
software.amazon.awssdk.http.nio.netty.ProxyConfiguration.Builder proxyConfigBuilder =
software.amazon.awssdk.http.nio.netty.ProxyConfiguration.builder();
@@ -307,7 +336,7 @@ public static ProxyConfiguration createProxyConfiguration(Configuration conf,
return proxyConfigBuilder.build();
}
- /***
+ /**
* Builds a URI, throws an IllegalArgumentException in case of errors.
*
* @param host proxy host
@@ -316,7 +345,7 @@ public static ProxyConfiguration createProxyConfiguration(Configuration conf,
*/
private static URI buildURI(String scheme, String host, int port) {
try {
- return new URIBuilder().setScheme(scheme).setHost(host).setPort(port).build();
+ return new URI(scheme, null, host, port, null, null, null);
} catch (URISyntaxException e) {
String msg =
"Proxy error: incorrect " + PROXY_HOST + " or " + PROXY_PORT;
@@ -346,6 +375,13 @@ private static void initUserAgent(Configuration conf,
clientConfig.putAdvancedOption(SdkAdvancedClientOption.USER_AGENT_PREFIX, userAgent);
}
+ /**
+ * Initializes the signer override for the given service.
+ * @param conf hadoop configuration
+ * @param clientConfig client configuration to update
+ * @param awsServiceIdentifier service name
+ * @throws IOException failure.
+ */
private static void initSigner(Configuration conf,
ClientOverrideConfiguration.Builder clientConfig, String awsServiceIdentifier)
throws IOException {
@@ -371,24 +407,219 @@ private static void initSigner(Configuration conf,
}
/**
- * Configures request timeout.
+ * Configures request timeout in the client configuration.
+ * This is independent of the timeouts set in the sync and async HTTP clients;
+ * the same method
*
* @param conf Hadoop configuration
* @param clientConfig AWS SDK configuration to update
*/
private static void initRequestTimeout(Configuration conf,
ClientOverrideConfiguration.Builder clientConfig) {
- long requestTimeoutMillis = conf.getTimeDuration(REQUEST_TIMEOUT,
- DEFAULT_REQUEST_TIMEOUT, TimeUnit.SECONDS, TimeUnit.MILLISECONDS);
+ // Get the connection settings
+ final Duration callTimeout = createApiConnectionSettings(conf).getApiCallTimeout();
- if (requestTimeoutMillis > Integer.MAX_VALUE) {
- LOG.debug("Request timeout is too high({} ms). Setting to {} ms instead",
- requestTimeoutMillis, Integer.MAX_VALUE);
- requestTimeoutMillis = Integer.MAX_VALUE;
- }
-
- if(requestTimeoutMillis > 0) {
- clientConfig.apiCallAttemptTimeout(Duration.ofMillis(requestTimeoutMillis));
+ if (callTimeout.toMillis() > 0) {
+ clientConfig.apiCallAttemptTimeout(callTimeout);
+ clientConfig.apiCallTimeout(callTimeout);
}
}
+
+ /**
+ * Reset the minimum operation duration to the default.
+ * For test use only; Logs at INFO.
+ *
+ * This MUST be called in test teardown in any test suite which
+ * called {@link #setMinimumOperationDuration(Duration)}.
+ */
+ @VisibleForTesting
+ public static void resetMinimumOperationDuration() {
+ setMinimumOperationDuration(MINIMUM_NETWORK_OPERATION_DURATION);
+ }
+
+ /**
+ * Set the minimum operation duration.
+ * This is for testing and will log at info; does require a non-negative duration.
+ *
+ * Test suites must call {@link #resetMinimumOperationDuration()} in their teardown
+ * to avoid breaking subsequent tests in the same process.
+ * @param duration non-negative duration
+ * @throws IllegalArgumentException if the duration is negative.
+ */
+ @VisibleForTesting
+ public static void setMinimumOperationDuration(Duration duration) {
+ LOG.info("Setting minimum operation duration to {}ms", duration.toMillis());
+ checkArgument(duration.compareTo(Duration.ZERO) >= 0,
+ "Duration must be positive: %sms", duration.toMillis());
+ minimumOperationDuration = duration;
+ }
+
+ /**
+ * Get the current minimum operation duration.
+ * @return current duration.
+ */
+ public static Duration getMinimumOperationDuration() {
+ return minimumOperationDuration;
+ }
+
+ /**
+ * Settings for the AWS client, rather than the http client.
+ */
+ static class ClientSettings {
+ private final Duration apiCallTimeout;
+
+ private ClientSettings(final Duration apiCallTimeout) {
+ this.apiCallTimeout = apiCallTimeout;
+ }
+
+ Duration getApiCallTimeout() {
+ return apiCallTimeout;
+ }
+
+ @Override
+ public String toString() {
+ return "ClientSettings{" +
+ "apiCallTimeout=" + apiCallTimeout +
+ '}';
+ }
+ }
+
+ /**
+ * All the connection settings, wrapped as a class for use by
+ * both the sync and async client.
+ */
+ static class ConnectionSettings {
+ private final int maxConnections;
+ private final boolean keepAlive;
+ private final Duration acquisitionTimeout;
+ private final Duration connectionTTL;
+ private final Duration establishTimeout;
+ private final Duration maxIdleTime;
+ private final Duration socketTimeout;
+
+ private ConnectionSettings(
+ final int maxConnections,
+ final boolean keepAlive,
+ final Duration acquisitionTimeout,
+ final Duration connectionTTL,
+ final Duration establishTimeout,
+ final Duration maxIdleTime,
+ final Duration socketTimeout) {
+ this.maxConnections = maxConnections;
+ this.keepAlive = keepAlive;
+ this.acquisitionTimeout = acquisitionTimeout;
+ this.connectionTTL = connectionTTL;
+ this.establishTimeout = establishTimeout;
+ this.maxIdleTime = maxIdleTime;
+ this.socketTimeout = socketTimeout;
+ }
+
+ int getMaxConnections() {
+ return maxConnections;
+ }
+
+ boolean isKeepAlive() {
+ return keepAlive;
+ }
+
+ Duration getAcquisitionTimeout() {
+ return acquisitionTimeout;
+ }
+
+ Duration getConnectionTTL() {
+ return connectionTTL;
+ }
+
+ Duration getEstablishTimeout() {
+ return establishTimeout;
+ }
+
+ Duration getMaxIdleTime() {
+ return maxIdleTime;
+ }
+
+ Duration getSocketTimeout() {
+ return socketTimeout;
+ }
+
+ @Override
+ public String toString() {
+ return "ConnectionSettings{" +
+ "maxConnections=" + maxConnections +
+ ", keepAlive=" + keepAlive +
+ ", acquisitionTimeout=" + acquisitionTimeout +
+ ", connectionTTL=" + connectionTTL +
+ ", establishTimeout=" + establishTimeout +
+ ", maxIdleTime=" + maxIdleTime +
+ ", socketTimeout=" + socketTimeout +
+ '}';
+ }
+ }
+
+ /**
+ * Build a client settings object.
+ * @param conf configuration to evaluate
+ * @return connection settings.
+ */
+ static ClientSettings createApiConnectionSettings(Configuration conf) {
+
+ Duration apiCallTimeout = getDuration(conf, REQUEST_TIMEOUT,
+ DEFAULT_REQUEST_TIMEOUT_DURATION, TimeUnit.MILLISECONDS, Duration.ZERO);
+
+ // if the API call timeout is set, it must be at least the minimum duration
+ if (apiCallTimeout.compareTo(Duration.ZERO) > 0) {
+ apiCallTimeout = enforceMinimumDuration(REQUEST_TIMEOUT,
+ apiCallTimeout, minimumOperationDuration);
+ }
+ return new ClientSettings(apiCallTimeout);
+ }
+
+ /**
+ * Build the HTTP connection settings object from the configuration.
+ * All settings are calculated, including the api call timeout.
+ * @param conf configuration to evaluate
+ * @return connection settings.
+ */
+ static ConnectionSettings createConnectionSettings(Configuration conf) {
+
+ int maxConnections = S3AUtils.intOption(conf, MAXIMUM_CONNECTIONS,
+ DEFAULT_MAXIMUM_CONNECTIONS, 1);
+
+ final boolean keepAlive = conf.getBoolean(CONNECTION_KEEPALIVE,
+ DEFAULT_CONNECTION_KEEPALIVE);
+
+ // time to acquire a connection from the pool
+ Duration acquisitionTimeout = getDuration(conf, CONNECTION_ACQUISITION_TIMEOUT,
+ DEFAULT_CONNECTION_ACQUISITION_TIMEOUT_DURATION, TimeUnit.MILLISECONDS,
+ minimumOperationDuration);
+
+ // set the connection TTL irrespective of whether the connection is in use or not.
+ // this can balance requests over different S3 servers, and avoid failed
+ // connections. See HADOOP-18845.
+ Duration connectionTTL = getDuration(conf, CONNECTION_TTL,
+ DEFAULT_CONNECTION_TTL_DURATION, TimeUnit.MILLISECONDS,
+ null);
+
+ Duration establishTimeout = getDuration(conf, ESTABLISH_TIMEOUT,
+ DEFAULT_ESTABLISH_TIMEOUT_DURATION, TimeUnit.MILLISECONDS,
+ minimumOperationDuration);
+
+ // limit on the time a connection can be idle in the pool
+ Duration maxIdleTime = getDuration(conf, CONNECTION_IDLE_TIME,
+ DEFAULT_CONNECTION_IDLE_TIME_DURATION, TimeUnit.MILLISECONDS, Duration.ZERO);
+
+ Duration socketTimeout = getDuration(conf, SOCKET_TIMEOUT,
+ DEFAULT_SOCKET_TIMEOUT_DURATION, TimeUnit.MILLISECONDS,
+ minimumOperationDuration);
+
+ return new ConnectionSettings(
+ maxConnections,
+ keepAlive,
+ acquisitionTimeout,
+ connectionTTL,
+ establishTimeout,
+ maxIdleTime,
+ socketTimeout);
+ }
+
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ConfigurationHelper.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ConfigurationHelper.java
new file mode 100644
index 0000000000..8acd20a821
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ConfigurationHelper.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.store.LogExactlyOnce;
+
+/**
+ * Helper class for configuration; where methods related to extracting
+ * configuration should go instead of {@code S3AUtils}.
+ */
+@InterfaceAudience.Private
+public final class ConfigurationHelper {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ConfigurationHelper.class);
+
+ /** Log to warn of range issues on a timeout. */
+ private static final LogExactlyOnce DURATION_WARN_LOG = new LogExactlyOnce(LOG);
+
+ private ConfigurationHelper() {
+ }
+
+ /**
+ * Get a duration. This may be negative; callers must check or set the minimum to zero.
+ * If the config option is greater than {@code Integer.MAX_VALUE} milliseconds,
+ * it is set to that max.
+ * If {@code minimumDuration} is set, and the value is less than that, then
+ * the minimum is used.
+ * Logs the value for diagnostics.
+ * @param conf config
+ * @param name option name
+ * @param defaultDuration default duration
+ * @param defaultUnit default unit on the config option if not declared.
+ * @param minimumDuration optional minimum duration;
+ * @return duration. may be negative.
+ */
+ public static Duration getDuration(
+ final Configuration conf,
+ final String name,
+ final Duration defaultDuration,
+ final TimeUnit defaultUnit,
+ @Nullable final Duration minimumDuration) {
+ long timeMillis = conf.getTimeDuration(name,
+ defaultDuration.toMillis(), defaultUnit, TimeUnit.MILLISECONDS);
+
+ if (timeMillis > Integer.MAX_VALUE) {
+ DURATION_WARN_LOG.warn("Option {} is too high({} ms). Setting to {} ms instead",
+ name, timeMillis, Integer.MAX_VALUE);
+ LOG.debug("Option {} is too high({} ms). Setting to {} ms instead",
+ name, timeMillis, Integer.MAX_VALUE);
+ timeMillis = Integer.MAX_VALUE;
+ }
+
+ Duration duration = enforceMinimumDuration(name, Duration.ofMillis(timeMillis),
+ minimumDuration);
+ LOG.debug("Duration of {} = {}", name, duration);
+ return duration;
+ }
+
+ /**
+ * Set a duration as a time in seconds, with the suffix "s" added.
+ * @param conf configuration to update.
+ * @param name option name
+ * @param duration duration
+ */
+ public static void setDurationAsSeconds(
+ final Configuration conf,
+ final String name,
+ final Duration duration) {
+ conf.set(name, duration.getSeconds() + "s");
+ }
+
+ /**
+ * Set a duration as a time in milliseconds, with the suffix "ms" added.
+ * @param conf configuration to update.
+ * @param name option name
+ * @param duration duration
+ */
+ public static void setDurationAsMillis(
+ final Configuration conf,
+ final String name,
+ final Duration duration) {
+ conf.set(name, duration.toMillis()+ "ms");
+ }
+
+ /**
+ * Enforce a minimum duration of a configuration option, if the supplied
+ * value is non-null.
+ * @param name option name
+ * @param duration duration to check
+ * @param minimumDuration minimum duration; may be null
+ * @return a duration which, if the minimum duration is set, is at least that value.
+ */
+ public static Duration enforceMinimumDuration(
+ final String name,
+ final Duration duration,
+ @Nullable final Duration minimumDuration) {
+
+ if (minimumDuration != null && duration.compareTo(minimumDuration) < 0) {
+ String message = String.format("Option %s is too low (%,d ms). Setting to %,d ms instead",
+ name, duration.toMillis(), minimumDuration.toMillis());
+ DURATION_WARN_LOG.warn(message);
+ LOG.debug(message);
+ return minimumDuration;
+ }
+ return duration;
+ }
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java
index e7fcbe0351..f17295d0f5 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java
@@ -172,6 +172,9 @@ private InternalConstants() {
/** 503 status code: Service Unavailable. on AWS S3: throttle response. */
public static final int SC_503_SERVICE_UNAVAILABLE = 503;
+ /** 504 Gateway Timeout. AWS SDK considers retryable. */
+ public static final int SC_504_GATEWAY_TIMEOUT = 504;
+
/** Name of the log for throttling events. Value: {@value}. */
public static final String THROTTLE_LOG_NAME =
"org.apache.hadoop.fs.s3a.throttled";
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
index 0730f86cd1..dcf3be2b08 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
@@ -703,8 +703,12 @@ to allow different buckets to override the shared settings. This is commonly
used to change the endpoint, encryption and authentication mechanisms of buckets.
and various minor options.
-Here are the S3A properties for use in production; some testing-related
-options are covered in [Testing](./testing.md).
+Here are some the S3A properties for use in production.
+
+* See [Performance](./performance.html) for performance related settings including
+ thread and network pool options.
+* Testing-related options are covered in [Testing](./testing.md).
+
```xml
@@ -830,16 +834,6 @@ options are covered in [Testing](./testing.md).