HADOOP-19044. S3A: AWS SDK V2 - Update region logic (#6479)
Improves region handling in the S3A connector, including enabling cross-region support when that is considered necessary. Consult the documentation in connecting.md/connecting.html for the current resolution process. Contributed by Viraj Jasani
This commit is contained in:
parent
7504b8505f
commit
d278b349f6
@ -267,9 +267,10 @@ protected ClientOverrideConfiguration.Builder createClientOverrideConfiguration(
|
||||
*/
|
||||
private <BuilderT extends S3BaseClientBuilder<BuilderT, ClientT>, ClientT> void configureEndpointAndRegion(
|
||||
BuilderT builder, S3ClientCreationParameters parameters, Configuration conf) {
|
||||
URI endpoint = getS3Endpoint(parameters.getEndpoint(), conf);
|
||||
final String endpointStr = parameters.getEndpoint();
|
||||
final URI endpoint = getS3Endpoint(endpointStr, conf);
|
||||
|
||||
String configuredRegion = parameters.getRegion();
|
||||
final String configuredRegion = parameters.getRegion();
|
||||
Region region = null;
|
||||
String origin = "";
|
||||
|
||||
@ -291,15 +292,33 @@ private <BuilderT extends S3BaseClientBuilder<BuilderT, ClientT>, ClientT> void
|
||||
if (endpoint != null) {
|
||||
checkArgument(!fipsEnabled,
|
||||
"%s : %s", ERROR_ENDPOINT_WITH_FIPS, endpoint);
|
||||
builder.endpointOverride(endpoint);
|
||||
// No region was configured, try to determine it from the endpoint.
|
||||
boolean endpointEndsWithCentral =
|
||||
endpointStr.endsWith(CENTRAL_ENDPOINT);
|
||||
|
||||
// No region was configured,
|
||||
// determine the region from the endpoint.
|
||||
if (region == null) {
|
||||
region = getS3RegionFromEndpoint(parameters.getEndpoint());
|
||||
region = getS3RegionFromEndpoint(endpointStr,
|
||||
endpointEndsWithCentral);
|
||||
if (region != null) {
|
||||
origin = "endpoint";
|
||||
}
|
||||
}
|
||||
LOG.debug("Setting endpoint to {}", endpoint);
|
||||
|
||||
// No need to override endpoint with "s3.amazonaws.com".
|
||||
// Let the client take care of endpoint resolution. Overriding
|
||||
// the endpoint with "s3.amazonaws.com" causes 400 Bad Request
|
||||
// errors for non-existent buckets and objects.
|
||||
// ref: https://github.com/aws/aws-sdk-java-v2/issues/4846
|
||||
if (!endpointEndsWithCentral) {
|
||||
builder.endpointOverride(endpoint);
|
||||
LOG.debug("Setting endpoint to {}", endpoint);
|
||||
} else {
|
||||
builder.crossRegionAccessEnabled(true);
|
||||
origin = "central endpoint with cross region access";
|
||||
LOG.debug("Enabling cross region access for endpoint {}",
|
||||
endpointStr);
|
||||
}
|
||||
}
|
||||
|
||||
if (region != null) {
|
||||
@ -354,20 +373,32 @@ private static URI getS3Endpoint(String endpoint, final Configuration conf) {
|
||||
|
||||
/**
|
||||
* Parses the endpoint to get the region.
|
||||
* If endpoint is the central one, use US_EAST_1.
|
||||
* If endpoint is the central one, use US_EAST_2.
|
||||
*
|
||||
* @param endpoint the configure endpoint.
|
||||
* @param endpointEndsWithCentral true if the endpoint is configured as central.
|
||||
* @return the S3 region, null if unable to resolve from endpoint.
|
||||
*/
|
||||
private static Region getS3RegionFromEndpoint(String endpoint) {
|
||||
private static Region getS3RegionFromEndpoint(final String endpoint,
|
||||
final boolean endpointEndsWithCentral) {
|
||||
|
||||
if(!endpoint.endsWith(CENTRAL_ENDPOINT)) {
|
||||
if (!endpointEndsWithCentral) {
|
||||
LOG.debug("Endpoint {} is not the default; parsing", endpoint);
|
||||
return AwsHostNameUtils.parseSigningRegion(endpoint, S3_SERVICE_NAME).orElse(null);
|
||||
}
|
||||
|
||||
// endpoint is for US_EAST_1;
|
||||
return Region.US_EAST_1;
|
||||
// Select default region here to enable cross-region access.
|
||||
// If both "fs.s3a.endpoint" and "fs.s3a.endpoint.region" are empty,
|
||||
// Spark sets "fs.s3a.endpoint" to "s3.amazonaws.com".
|
||||
// This applies to Spark versions with the changes of SPARK-35878.
|
||||
// ref:
|
||||
// https://github.com/apache/spark/blob/v3.5.0/core/
|
||||
// src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala#L528
|
||||
// If we do not allow cross region access, Spark would not be able to
|
||||
// access any bucket that is not present in the given region.
|
||||
// Hence, we should use default region us-east-2 to allow cross-region
|
||||
// access.
|
||||
return Region.of(AWS_S3_DEFAULT_REGION);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -83,8 +83,8 @@ The table below lists the configurations S3A was using and what they now map to.
|
||||
|
||||
Previously, if no endpoint and region was configured, fall back to using us-east-1. Set
|
||||
withForceGlobalBucketAccessEnabled(true) which will allow access to buckets not in this region too.
|
||||
Since the SDK V2 no longer supports cross region access, we need to set the region and endpoint of
|
||||
the bucket. The behaviour has now been changed to:
|
||||
Since the SDK V2 no longer supports cross region access, we need to set the region and
|
||||
endpoint of the bucket. The behaviour has now been changed to:
|
||||
|
||||
* If no endpoint is specified, use s3.amazonaws.com.
|
||||
* When setting the endpoint, also set the protocol (HTTP or HTTPS)
|
||||
|
@ -100,6 +100,42 @@ With the move to the AWS V2 SDK, there is more emphasis on the region, set by th
|
||||
|
||||
Normally, declaring the region in `fs.s3a.endpoint.region` should be sufficient to set up the network connection to correctly connect to an AWS-hosted S3 store.
|
||||
|
||||
### <a name="s3_endpoint_region_details"></a> S3 endpoint and region settings in detail
|
||||
|
||||
* Configs `fs.s3a.endpoint` and `fs.s3a.endpoint.region` are used to set values
|
||||
for S3 endpoint and region respectively.
|
||||
* If `fs.s3a.endpoint.region` is configured with valid AWS region value, S3A will
|
||||
configure the S3 client to use this value. If this is set to a region that does
|
||||
not match your bucket, you will receive a 301 redirect response.
|
||||
* If `fs.s3a.endpoint.region` is not set and `fs.s3a.endpoint` is set with valid
|
||||
endpoint value, S3A will attempt to parse the region from the endpoint and
|
||||
configure S3 client to use the region value.
|
||||
* If both `fs.s3a.endpoint` and `fs.s3a.endpoint.region` are not set, S3A will
|
||||
use `us-east-2` as default region and enable cross region access. In this case,
|
||||
S3A does not attempt to override the endpoint while configuring the S3 client.
|
||||
* If `fs.s3a.endpoint` is not set and `fs.s3a.endpoint.region` is set to an empty
|
||||
string, S3A will configure S3 client without any region or endpoint override.
|
||||
This will allow fallback to S3 SDK region resolution chain. More details
|
||||
[here](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/region-selection.html).
|
||||
* If `fs.s3a.endpoint` is set to central endpoint `s3.amazonaws.com` and
|
||||
`fs.s3a.endpoint.region` is not set, S3A will use `us-east-2` as default region
|
||||
and enable cross region access. In this case, S3A does not attempt to override
|
||||
the endpoint while configuring the S3 client.
|
||||
* If `fs.s3a.endpoint` is set to central endpoint `s3.amazonaws.com` and
|
||||
`fs.s3a.endpoint.region` is also set to some region, S3A will use that region
|
||||
value and enable cross region access. In this case, S3A does not attempt to
|
||||
override the endpoint while configuring the S3 client.
|
||||
|
||||
When the cross region access is enabled while configuring the S3 client, even if the
|
||||
region set is incorrect, S3 SDK determines the region. This is done by making the
|
||||
request, and if the SDK receives 301 redirect response, it determines the region at
|
||||
the cost of a HEAD request, and caches it.
|
||||
|
||||
Please note that some endpoint and region settings that require cross region access
|
||||
are complex and improving over time. Hence, they may be considered unstable.
|
||||
|
||||
If you are working with third party stores, please check [third party stores in detail](third_party_stores.html).
|
||||
|
||||
### <a name="timeouts"></a> Network timeouts
|
||||
|
||||
See [Timeouts](performance.html#timeouts).
|
||||
|
@ -226,7 +226,9 @@ If you do any of these: change your credentials immediately!
|
||||
|
||||
## Connecting to Amazon S3 or a third-party store
|
||||
|
||||
See [Connecting to an Amazon S3 Bucket through the S3A Connector](connecting.md).
|
||||
See [Connecting to an Amazon S3 Bucket through the S3A Connector](connecting.html).
|
||||
|
||||
Also, please check [S3 endpoint and region settings in detail](connecting.html#s3_endpoint_region_details).
|
||||
|
||||
## <a name="authenticating"></a> Authenticating with S3
|
||||
|
||||
|
@ -38,13 +38,20 @@
|
||||
import software.amazon.awssdk.services.s3.model.HeadBucketResponse;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext;
|
||||
import org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils;
|
||||
|
||||
import static org.apache.hadoop.fs.s3a.Constants.ALLOW_REQUESTER_PAYS;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.AWS_REGION;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.CENTRAL_ENDPOINT;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.PATH_STYLE_ACCESS;
|
||||
import static org.apache.hadoop.fs.s3a.DefaultS3ClientFactory.ERROR_ENDPOINT_WITH_FIPS;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
|
||||
import static org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.DEFAULT_REQUESTER_PAYS_BUCKET_NAME;
|
||||
import static org.apache.hadoop.io.IOUtils.closeStream;
|
||||
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
||||
|
||||
@ -146,11 +153,28 @@ public void testCentralEndpoint() throws Throwable {
|
||||
describe("Create a client with the central endpoint");
|
||||
Configuration conf = getConfiguration();
|
||||
|
||||
S3Client client = createS3Client(conf, CENTRAL_ENDPOINT, null, US_EAST_1, false);
|
||||
S3Client client = createS3Client(conf, CENTRAL_ENDPOINT, null, US_EAST_2, false);
|
||||
|
||||
expectInterceptorException(client);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCentralEndpointWithRegion() throws Throwable {
|
||||
describe("Create a client with the central endpoint but also specify region");
|
||||
Configuration conf = getConfiguration();
|
||||
|
||||
S3Client client = createS3Client(conf, CENTRAL_ENDPOINT, US_WEST_2,
|
||||
US_WEST_2, false);
|
||||
|
||||
expectInterceptorException(client);
|
||||
|
||||
client = createS3Client(conf, CENTRAL_ENDPOINT, US_EAST_1,
|
||||
US_EAST_1, false);
|
||||
|
||||
expectInterceptorException(client);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWithRegionConfig() throws Throwable {
|
||||
describe("Create a client with a configured region");
|
||||
@ -257,6 +281,141 @@ public void testWithVPCE() throws Throwable {
|
||||
expectInterceptorException(client);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCentralEndpointAndDifferentRegionThanBucket() throws Throwable {
|
||||
describe("Access public bucket using central endpoint and region "
|
||||
+ "different than that of the public bucket");
|
||||
final Configuration conf = getConfiguration();
|
||||
final Configuration newConf = new Configuration(conf);
|
||||
|
||||
removeBaseAndBucketOverrides(
|
||||
newConf,
|
||||
ENDPOINT,
|
||||
AWS_REGION,
|
||||
ALLOW_REQUESTER_PAYS,
|
||||
KEY_REQUESTER_PAYS_FILE);
|
||||
|
||||
removeBaseAndBucketOverrides(
|
||||
DEFAULT_REQUESTER_PAYS_BUCKET_NAME,
|
||||
newConf,
|
||||
ENDPOINT,
|
||||
AWS_REGION,
|
||||
ALLOW_REQUESTER_PAYS,
|
||||
KEY_REQUESTER_PAYS_FILE);
|
||||
|
||||
newConf.set(ENDPOINT, CENTRAL_ENDPOINT);
|
||||
newConf.set(AWS_REGION, EU_WEST_1);
|
||||
newConf.setBoolean(ALLOW_REQUESTER_PAYS, true);
|
||||
|
||||
Path filePath = new Path(PublicDatasetTestUtils
|
||||
.getRequesterPaysObject(newConf));
|
||||
newFS = (S3AFileSystem) filePath.getFileSystem(newConf);
|
||||
|
||||
Assertions
|
||||
.assertThat(newFS.exists(filePath))
|
||||
.describedAs("Existence of path: " + filePath)
|
||||
.isTrue();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCentralEndpointAndSameRegionAsBucket() throws Throwable {
|
||||
describe("Access public bucket using central endpoint and region "
|
||||
+ "same as that of the public bucket");
|
||||
final Configuration conf = getConfiguration();
|
||||
final Configuration newConf = new Configuration(conf);
|
||||
|
||||
removeBaseAndBucketOverrides(
|
||||
newConf,
|
||||
ENDPOINT,
|
||||
AWS_REGION,
|
||||
ALLOW_REQUESTER_PAYS,
|
||||
KEY_REQUESTER_PAYS_FILE);
|
||||
|
||||
removeBaseAndBucketOverrides(
|
||||
DEFAULT_REQUESTER_PAYS_BUCKET_NAME,
|
||||
newConf,
|
||||
ENDPOINT,
|
||||
AWS_REGION,
|
||||
ALLOW_REQUESTER_PAYS,
|
||||
KEY_REQUESTER_PAYS_FILE);
|
||||
|
||||
newConf.set(ENDPOINT, CENTRAL_ENDPOINT);
|
||||
newConf.set(AWS_REGION, US_WEST_2);
|
||||
newConf.setBoolean(ALLOW_REQUESTER_PAYS, true);
|
||||
|
||||
Path filePath = new Path(PublicDatasetTestUtils
|
||||
.getRequesterPaysObject(newConf));
|
||||
newFS = (S3AFileSystem) filePath.getFileSystem(newConf);
|
||||
|
||||
Assertions
|
||||
.assertThat(newFS.exists(filePath))
|
||||
.describedAs("Existence of path: " + filePath)
|
||||
.isTrue();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCentralEndpointAndNullRegionWithCRUD() throws Throwable {
|
||||
describe("Access the test bucket using central endpoint and"
|
||||
+ " null region, perform file system CRUD operations");
|
||||
final Configuration conf = getConfiguration();
|
||||
|
||||
final Configuration newConf = new Configuration(conf);
|
||||
|
||||
removeBaseAndBucketOverrides(
|
||||
newConf,
|
||||
ENDPOINT,
|
||||
AWS_REGION);
|
||||
|
||||
newConf.set(ENDPOINT, CENTRAL_ENDPOINT);
|
||||
|
||||
newFS = new S3AFileSystem();
|
||||
newFS.initialize(getFileSystem().getUri(), newConf);
|
||||
|
||||
assertOpsUsingNewFs();
|
||||
}
|
||||
|
||||
private void assertOpsUsingNewFs() throws IOException {
|
||||
final String file = getMethodName();
|
||||
final Path basePath = methodPath();
|
||||
final Path srcDir = new Path(basePath, "srcdir");
|
||||
newFS.mkdirs(srcDir);
|
||||
Path srcFilePath = new Path(srcDir, file);
|
||||
|
||||
try (FSDataOutputStream out = newFS.create(srcFilePath)) {
|
||||
out.write(new byte[] {1, 2, 3});
|
||||
}
|
||||
|
||||
Assertions
|
||||
.assertThat(newFS.exists(srcFilePath))
|
||||
.describedAs("Existence of file: " + srcFilePath)
|
||||
.isTrue();
|
||||
Assertions
|
||||
.assertThat(getFileSystem().exists(srcFilePath))
|
||||
.describedAs("Existence of file: " + srcFilePath)
|
||||
.isTrue();
|
||||
|
||||
byte[] buffer = new byte[3];
|
||||
|
||||
try (FSDataInputStream in = newFS.open(srcFilePath)) {
|
||||
in.readFully(buffer);
|
||||
Assertions
|
||||
.assertThat(buffer)
|
||||
.describedAs("Contents read from " + srcFilePath)
|
||||
.containsExactly(1, 2, 3);
|
||||
}
|
||||
|
||||
newFS.delete(srcDir, true);
|
||||
|
||||
Assertions
|
||||
.assertThat(newFS.exists(srcFilePath))
|
||||
.describedAs("Existence of file: " + srcFilePath + " using new FS")
|
||||
.isFalse();
|
||||
Assertions
|
||||
.assertThat(getFileSystem().exists(srcFilePath))
|
||||
.describedAs("Existence of file: " + srcFilePath + " using original FS")
|
||||
.isFalse();
|
||||
}
|
||||
|
||||
private final class RegionInterceptor implements ExecutionInterceptor {
|
||||
private final String endpoint;
|
||||
private final String region;
|
||||
@ -272,7 +431,7 @@ private final class RegionInterceptor implements ExecutionInterceptor {
|
||||
public void beforeExecution(Context.BeforeExecution context,
|
||||
ExecutionAttributes executionAttributes) {
|
||||
|
||||
if (endpoint != null) {
|
||||
if (endpoint != null && !endpoint.endsWith(CENTRAL_ENDPOINT)) {
|
||||
Assertions.assertThat(
|
||||
executionAttributes.getAttribute(AwsExecutionAttribute.ENDPOINT_OVERRIDDEN))
|
||||
.describedAs("Endpoint not overridden").isTrue();
|
||||
|
@ -53,6 +53,13 @@ private PublicDatasetTestUtils() {}
|
||||
private static final String DEFAULT_REQUESTER_PAYS_FILE
|
||||
= "s3a://usgs-landsat/collection02/catalog.json";
|
||||
|
||||
/**
|
||||
* Default bucket name for the requester pays bucket.
|
||||
* Value = {@value}.
|
||||
*/
|
||||
public static final String DEFAULT_REQUESTER_PAYS_BUCKET_NAME =
|
||||
"usgs-landsat";
|
||||
|
||||
/**
|
||||
* Default bucket for an S3A file system with many objects: {@value}.
|
||||
*
|
||||
|
Loading…
Reference in New Issue
Block a user