HADOOP-18845. Add ability to configure s3 connection ttl using fs.s3a.connection.ttl (#5948)
Contributed By: Mukund Thakur
This commit is contained in:
parent
11404c57cb
commit
28d190b904
@ -154,6 +154,17 @@ private Constants() {
|
|||||||
public static final String MAXIMUM_CONNECTIONS = "fs.s3a.connection.maximum";
|
public static final String MAXIMUM_CONNECTIONS = "fs.s3a.connection.maximum";
|
||||||
public static final int DEFAULT_MAXIMUM_CONNECTIONS = 96;
|
public static final int DEFAULT_MAXIMUM_CONNECTIONS = 96;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Configuration option to configure expiration time of
|
||||||
|
* s3 http connection from the connection pool in milliseconds: {@value}.
|
||||||
|
*/
|
||||||
|
public static final String CONNECTION_TTL = "fs.s3a.connection.ttl";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Default value for {@code CONNECTION_TTL}: {@value}.
|
||||||
|
*/
|
||||||
|
public static final long DEFAULT_CONNECTION_TTL = 5 * 60_000;
|
||||||
|
|
||||||
// connect to s3 over ssl?
|
// connect to s3 over ssl?
|
||||||
public static final String SECURE_CONNECTIONS =
|
public static final String SECURE_CONNECTIONS =
|
||||||
"fs.s3a.connection.ssl.enabled";
|
"fs.s3a.connection.ssl.enabled";
|
||||||
|
@ -462,7 +462,7 @@ public <T> T retryUntranslated(
|
|||||||
do {
|
do {
|
||||||
try {
|
try {
|
||||||
if (retryCount > 0) {
|
if (retryCount > 0) {
|
||||||
LOG.debug("retry #{}", retryCount);
|
LOG.debug("{} retry #{}", text, retryCount);
|
||||||
}
|
}
|
||||||
// execute the operation, returning if successful
|
// execute the operation, returning if successful
|
||||||
return operation.apply();
|
return operation.apply();
|
||||||
@ -471,7 +471,8 @@ public <T> T retryUntranslated(
|
|||||||
}
|
}
|
||||||
// you only get here if the operation didn't complete
|
// you only get here if the operation didn't complete
|
||||||
// normally, hence caught != null
|
// normally, hence caught != null
|
||||||
|
LOG.debug("{} ; {}, ", text, caught.toString());
|
||||||
|
LOG.trace("", caught);
|
||||||
// translate the exception into an IOE for the retry logic
|
// translate the exception into an IOE for the retry logic
|
||||||
IOException translated;
|
IOException translated;
|
||||||
if (caught instanceof IOException) {
|
if (caught instanceof IOException) {
|
||||||
|
@ -1299,6 +1299,8 @@ public static void initConnectionSettings(Configuration conf,
|
|||||||
ClientConfiguration awsConf) throws IOException {
|
ClientConfiguration awsConf) throws IOException {
|
||||||
awsConf.setMaxConnections(intOption(conf, MAXIMUM_CONNECTIONS,
|
awsConf.setMaxConnections(intOption(conf, MAXIMUM_CONNECTIONS,
|
||||||
DEFAULT_MAXIMUM_CONNECTIONS, 1));
|
DEFAULT_MAXIMUM_CONNECTIONS, 1));
|
||||||
|
awsConf.setConnectionTTL(longOption(conf, CONNECTION_TTL,
|
||||||
|
DEFAULT_CONNECTION_TTL, -1));
|
||||||
initProtocolSettings(conf, awsConf);
|
initProtocolSettings(conf, awsConf);
|
||||||
awsConf.setMaxErrorRetry(intOption(conf, MAX_ERROR_RETRIES,
|
awsConf.setMaxErrorRetry(intOption(conf, MAX_ERROR_RETRIES,
|
||||||
DEFAULT_MAX_ERROR_RETRIES, 0));
|
DEFAULT_MAX_ERROR_RETRIES, 0));
|
||||||
|
@ -1782,6 +1782,26 @@ will attempt to retry the operation; it may just be a transient event. If there
|
|||||||
are many such exceptions in logs, it may be a symptom of connectivity or network
|
are many such exceptions in logs, it may be a symptom of connectivity or network
|
||||||
problems.
|
problems.
|
||||||
|
|
||||||
|
The above error could be because of a stale http connections. The default value in AWS
|
||||||
|
SDK is set to -1 (infinite) which means the connection will be reused indefinitely.
|
||||||
|
We have introduced a new config `fs.s3a.connection.ttl` to configure this.
|
||||||
|
Tuning this setting down (together with an appropriately-low setting for Java's DNS cache TTL)
|
||||||
|
ensures that your application will quickly rotate over to new IP addresses when the
|
||||||
|
service begins announcing them through DNS, at the cost of having to re-establish new
|
||||||
|
connections more frequently.
|
||||||
|
|
||||||
|
```xml
|
||||||
|
<property>
|
||||||
|
<name>fs.s3a.connection.ttl</name>
|
||||||
|
<value>300000</value>
|
||||||
|
<description>
|
||||||
|
Expiration time for a connection in the connection pool in milliseconds.
|
||||||
|
When a connection is retrieved from the connection pool,
|
||||||
|
this parameter is checked to see if the connection can be reused.
|
||||||
|
Default value is 5 minutes.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
```
|
||||||
### `AWSBadRequestException` IllegalLocationConstraintException/The unspecified location constraint is incompatible
|
### `AWSBadRequestException` IllegalLocationConstraintException/The unspecified location constraint is incompatible
|
||||||
|
|
||||||
```
|
```
|
||||||
|
@ -31,6 +31,8 @@
|
|||||||
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
||||||
import org.apache.hadoop.fs.s3native.S3xLoginHelper;
|
import org.apache.hadoop.fs.s3native.S3xLoginHelper;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
|
||||||
|
import org.assertj.core.api.Assertions;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Rule;
|
import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
@ -511,6 +513,34 @@ public void testConfOptionPropagationToFS() throws Exception {
|
|||||||
assertOptionEquals(updated, "fs.s3a.propagation", "propagated");
|
assertOptionEquals(updated, "fs.s3a.propagation", "propagated");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 10_000L)
|
||||||
|
public void testConnectTtlPropagation() throws Exception {
|
||||||
|
Configuration config = new Configuration(false);
|
||||||
|
ClientConfiguration awsConf = new ClientConfiguration();
|
||||||
|
initConnectionSettings(config, awsConf);
|
||||||
|
Assertions.assertThat(awsConf.getConnectionTTL())
|
||||||
|
.describedAs("connection ttl should be set to default value as" +
|
||||||
|
" %s is not set", CONNECTION_TTL)
|
||||||
|
.isEqualTo(DEFAULT_CONNECTION_TTL);
|
||||||
|
long connectionTtlTestVal = 1000;
|
||||||
|
config.setLong(CONNECTION_TTL, connectionTtlTestVal);
|
||||||
|
initConnectionSettings(config, awsConf);
|
||||||
|
Assertions.assertThat(awsConf.getConnectionTTL())
|
||||||
|
.describedAs("%s not propagated to aws conf", CONNECTION_TTL)
|
||||||
|
.isEqualTo(connectionTtlTestVal);
|
||||||
|
|
||||||
|
long connectionTtlTestVal1 = -1;
|
||||||
|
config.setLong(CONNECTION_TTL, connectionTtlTestVal1);
|
||||||
|
initConnectionSettings(config, awsConf);
|
||||||
|
Assertions.assertThat(awsConf.getConnectionTTL())
|
||||||
|
.describedAs("%s not propagated to aws conf", CONNECTION_TTL)
|
||||||
|
.isEqualTo(connectionTtlTestVal1);
|
||||||
|
|
||||||
|
long connectionTtlTestVal2 = -100;
|
||||||
|
config.setLong(CONNECTION_TTL, connectionTtlTestVal2);
|
||||||
|
intercept(IllegalArgumentException.class, () -> initConnectionSettings(config, awsConf));
|
||||||
|
}
|
||||||
|
|
||||||
@Test(timeout = 10_000L)
|
@Test(timeout = 10_000L)
|
||||||
public void testS3SpecificSignerOverride() throws IOException {
|
public void testS3SpecificSignerOverride() throws IOException {
|
||||||
ClientConfiguration clientConfiguration = null;
|
ClientConfiguration clientConfiguration = null;
|
||||||
|
Loading…
Reference in New Issue
Block a user