HADOOP-18908. Improve S3A region handling. (#6187)
S3A region logic improved for better inference and to be compatible with previous releases 1. If you are using an AWS S3 AccessPoint, its region is determined from the ARN itself. 2. If fs.s3a.endpoint.region is set and non-empty, it is used. 3. If fs.s3a.endpoint is an s3.*.amazonaws.com url, the region is determined by by parsing the URL Note: vpce endpoints are not handled by this. 4. If fs.s3a.endpoint.region==null, and none could be determined from the endpoint, use us-east-2 as default. 5. If fs.s3a.endpoint.region=="" then it is handed off to The default AWS SDK resolution process. Consult the AWS SDK documentation for the details on its resolution process, knowing that it is complicated and may use environment variables, entries in ~/.aws/config, IAM instance information within EC2 deployments and possibly even JSON resources on the classpath. Put differently: it is somewhat brittle across deployments. Contributed by Ahmar Suhail
This commit is contained in:
parent
e5eb404bb3
commit
e0563fed50
@ -407,10 +407,6 @@ public final class StoreStatisticNames {
|
|||||||
public static final String MULTIPART_UPLOAD_LIST
|
public static final String MULTIPART_UPLOAD_LIST
|
||||||
= "multipart_upload_list";
|
= "multipart_upload_list";
|
||||||
|
|
||||||
/** Probe for store region: {@value}. */
|
|
||||||
public static final String STORE_REGION_PROBE
|
|
||||||
= "store_region_probe";
|
|
||||||
|
|
||||||
private StoreStatisticNames() {
|
private StoreStatisticNames() {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1179,6 +1179,12 @@ private Constants() {
|
|||||||
*/
|
*/
|
||||||
public static final String AWS_S3_CENTRAL_REGION = "us-east-1";
|
public static final String AWS_S3_CENTRAL_REGION = "us-east-1";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The default S3 region when using cross region client.
|
||||||
|
* Value {@value}.
|
||||||
|
*/
|
||||||
|
public static final String AWS_S3_DEFAULT_REGION = "us-east-2";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Require that all S3 access is made through Access Points.
|
* Require that all S3 access is made through Access Points.
|
||||||
*/
|
*/
|
||||||
|
@ -26,6 +26,7 @@
|
|||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import software.amazon.awssdk.awscore.util.AwsHostNameUtils;
|
||||||
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
|
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
|
||||||
import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
|
import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
|
||||||
import software.amazon.awssdk.core.interceptor.ExecutionInterceptor;
|
import software.amazon.awssdk.core.interceptor.ExecutionInterceptor;
|
||||||
@ -48,6 +49,9 @@
|
|||||||
import org.apache.hadoop.fs.s3a.statistics.impl.AwsStatisticsCollector;
|
import org.apache.hadoop.fs.s3a.statistics.impl.AwsStatisticsCollector;
|
||||||
import org.apache.hadoop.fs.store.LogExactlyOnce;
|
import org.apache.hadoop.fs.store.LogExactlyOnce;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.s3a.Constants.AWS_REGION;
|
||||||
|
import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_DEFAULT_REGION;
|
||||||
|
import static org.apache.hadoop.fs.s3a.Constants.CENTRAL_ENDPOINT;
|
||||||
import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.REQUESTER_PAYS_HEADER;
|
import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.REQUESTER_PAYS_HEADER;
|
||||||
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_SECURE_CONNECTIONS;
|
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_SECURE_CONNECTIONS;
|
||||||
import static org.apache.hadoop.fs.s3a.Constants.SECURE_CONNECTIONS;
|
import static org.apache.hadoop.fs.s3a.Constants.SECURE_CONNECTIONS;
|
||||||
@ -66,12 +70,27 @@ public class DefaultS3ClientFactory extends Configured
|
|||||||
|
|
||||||
private static final String REQUESTER_PAYS_HEADER_VALUE = "requester";
|
private static final String REQUESTER_PAYS_HEADER_VALUE = "requester";
|
||||||
|
|
||||||
|
private static final String S3_SERVICE_NAME = "s3";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Subclasses refer to this.
|
* Subclasses refer to this.
|
||||||
*/
|
*/
|
||||||
protected static final Logger LOG =
|
protected static final Logger LOG =
|
||||||
LoggerFactory.getLogger(DefaultS3ClientFactory.class);
|
LoggerFactory.getLogger(DefaultS3ClientFactory.class);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A one-off warning of default region chains in use.
|
||||||
|
*/
|
||||||
|
private static final LogExactlyOnce WARN_OF_DEFAULT_REGION_CHAIN =
|
||||||
|
new LogExactlyOnce(LOG);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Warning message printed when the SDK Region chain is in use.
|
||||||
|
*/
|
||||||
|
private static final String SDK_REGION_CHAIN_IN_USE =
|
||||||
|
"S3A filesystem client is using"
|
||||||
|
+ " the SDK region resolution chain.";
|
||||||
|
|
||||||
|
|
||||||
/** Exactly once log to inform about ignoring the AWS-SDK Warnings for CSE. */
|
/** Exactly once log to inform about ignoring the AWS-SDK Warnings for CSE. */
|
||||||
private static final LogExactlyOnce IGNORE_CSE_WARN = new LogExactlyOnce(LOG);
|
private static final LogExactlyOnce IGNORE_CSE_WARN = new LogExactlyOnce(LOG);
|
||||||
@ -138,15 +157,7 @@ private <BuilderT extends S3BaseClientBuilder<BuilderT, ClientT>, ClientT> Build
|
|||||||
BuilderT builder, S3ClientCreationParameters parameters, Configuration conf, String bucket)
|
BuilderT builder, S3ClientCreationParameters parameters, Configuration conf, String bucket)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
|
||||||
Region region = parameters.getRegion();
|
configureEndpointAndRegion(builder, parameters, conf);
|
||||||
LOG.debug("Using region {}", region);
|
|
||||||
|
|
||||||
URI endpoint = getS3Endpoint(parameters.getEndpoint(), conf);
|
|
||||||
|
|
||||||
if (endpoint != null) {
|
|
||||||
builder.endpointOverride(endpoint);
|
|
||||||
LOG.debug("Using endpoint {}", endpoint);
|
|
||||||
}
|
|
||||||
|
|
||||||
S3Configuration serviceConfiguration = S3Configuration.builder()
|
S3Configuration serviceConfiguration = S3Configuration.builder()
|
||||||
.pathStyleAccessEnabled(parameters.isPathStyleAccess())
|
.pathStyleAccessEnabled(parameters.isPathStyleAccess())
|
||||||
@ -155,7 +166,6 @@ private <BuilderT extends S3BaseClientBuilder<BuilderT, ClientT>, ClientT> Build
|
|||||||
return builder
|
return builder
|
||||||
.overrideConfiguration(createClientOverrideConfiguration(parameters, conf))
|
.overrideConfiguration(createClientOverrideConfiguration(parameters, conf))
|
||||||
.credentialsProvider(parameters.getCredentialSet())
|
.credentialsProvider(parameters.getCredentialSet())
|
||||||
.region(region)
|
|
||||||
.serviceConfiguration(serviceConfiguration);
|
.serviceConfiguration(serviceConfiguration);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -201,6 +211,72 @@ protected ClientOverrideConfiguration createClientOverrideConfiguration(
|
|||||||
return clientOverrideConfigBuilder.build();
|
return clientOverrideConfigBuilder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This method configures the endpoint and region for a S3 client.
|
||||||
|
* The order of configuration is:
|
||||||
|
*
|
||||||
|
* <ol>
|
||||||
|
* <li>If region is configured via fs.s3a.endpoint.region, use it.</li>
|
||||||
|
* <li>If endpoint is configured via via fs.s3a.endpoint, set it.
|
||||||
|
* If no region is configured, try to parse region from endpoint. </li>
|
||||||
|
* <li> If no region is configured, and it could not be parsed from the endpoint,
|
||||||
|
* set the default region as US_EAST_2 and enable cross region access. </li>
|
||||||
|
* <li> If configured region is empty, fallback to SDK resolution chain. </li>
|
||||||
|
* </ol>
|
||||||
|
*
|
||||||
|
* @param builder S3 client builder.
|
||||||
|
* @param parameters parameter object
|
||||||
|
* @param conf conf configuration object
|
||||||
|
* @param <BuilderT> S3 client builder type
|
||||||
|
* @param <ClientT> S3 client type
|
||||||
|
*/
|
||||||
|
private <BuilderT extends S3BaseClientBuilder<BuilderT, ClientT>, ClientT> void configureEndpointAndRegion(
|
||||||
|
BuilderT builder, S3ClientCreationParameters parameters, Configuration conf) {
|
||||||
|
URI endpoint = getS3Endpoint(parameters.getEndpoint(), conf);
|
||||||
|
|
||||||
|
String configuredRegion = parameters.getRegion();
|
||||||
|
Region region = null;
|
||||||
|
String origin = "";
|
||||||
|
|
||||||
|
// If the region was configured, set it.
|
||||||
|
if (configuredRegion != null && !configuredRegion.isEmpty()) {
|
||||||
|
origin = AWS_REGION;
|
||||||
|
region = Region.of(configuredRegion);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (endpoint != null) {
|
||||||
|
builder.endpointOverride(endpoint);
|
||||||
|
// No region was configured, try to determine it from the endpoint.
|
||||||
|
if (region == null) {
|
||||||
|
region = getS3RegionFromEndpoint(parameters.getEndpoint());
|
||||||
|
if (region != null) {
|
||||||
|
origin = "endpoint";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
LOG.debug("Setting endpoint to {}", endpoint);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (region != null) {
|
||||||
|
builder.region(region);
|
||||||
|
} else if (configuredRegion == null) {
|
||||||
|
// no region is configured, and none could be determined from the endpoint.
|
||||||
|
// Use US_EAST_2 as default.
|
||||||
|
region = Region.of(AWS_S3_DEFAULT_REGION);
|
||||||
|
builder.crossRegionAccessEnabled(true);
|
||||||
|
builder.region(region);
|
||||||
|
origin = "cross region access fallback";
|
||||||
|
} else if (configuredRegion.isEmpty()) {
|
||||||
|
// region configuration was set to empty string.
|
||||||
|
// allow this if people really want it; it is OK to rely on this
|
||||||
|
// when deployed in EC2.
|
||||||
|
WARN_OF_DEFAULT_REGION_CHAIN.warn(SDK_REGION_CHAIN_IN_USE);
|
||||||
|
LOG.debug(SDK_REGION_CHAIN_IN_USE);
|
||||||
|
origin = "SDK region chain";
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG.debug("Setting region to {} from {}", region, origin);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Given a endpoint string, create the endpoint URI.
|
* Given a endpoint string, create the endpoint URI.
|
||||||
*
|
*
|
||||||
@ -229,4 +305,23 @@ private static URI getS3Endpoint(String endpoint, final Configuration conf) {
|
|||||||
throw new IllegalArgumentException(e);
|
throw new IllegalArgumentException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Parses the endpoint to get the region.
|
||||||
|
* If endpoint is the central one, use US_EAST_1.
|
||||||
|
*
|
||||||
|
* @param endpoint the configure endpoint.
|
||||||
|
* @return the S3 region, null if unable to resolve from endpoint.
|
||||||
|
*/
|
||||||
|
private static Region getS3RegionFromEndpoint(String endpoint) {
|
||||||
|
|
||||||
|
if(!endpoint.endsWith(CENTRAL_ENDPOINT)) {
|
||||||
|
LOG.debug("Endpoint {} is not the default; parsing", endpoint);
|
||||||
|
return AwsHostNameUtils.parseSigningRegion(endpoint, S3_SERVICE_NAME).orElse(null);
|
||||||
|
}
|
||||||
|
|
||||||
|
// endpoint is for US_EAST_1;
|
||||||
|
return Region.US_EAST_1;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -34,7 +34,6 @@
|
|||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Locale;
|
import java.util.Locale;
|
||||||
@ -54,7 +53,6 @@
|
|||||||
|
|
||||||
import software.amazon.awssdk.core.ResponseInputStream;
|
import software.amazon.awssdk.core.ResponseInputStream;
|
||||||
import software.amazon.awssdk.core.exception.SdkException;
|
import software.amazon.awssdk.core.exception.SdkException;
|
||||||
import software.amazon.awssdk.regions.Region;
|
|
||||||
import software.amazon.awssdk.services.s3.S3AsyncClient;
|
import software.amazon.awssdk.services.s3.S3AsyncClient;
|
||||||
import software.amazon.awssdk.services.s3.S3Client;
|
import software.amazon.awssdk.services.s3.S3Client;
|
||||||
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
|
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
|
||||||
@ -83,7 +81,6 @@
|
|||||||
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
|
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
|
||||||
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
|
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
|
||||||
import software.amazon.awssdk.services.s3.model.S3Error;
|
import software.amazon.awssdk.services.s3.model.S3Error;
|
||||||
import software.amazon.awssdk.services.s3.model.S3Exception;
|
|
||||||
import software.amazon.awssdk.services.s3.model.S3Object;
|
import software.amazon.awssdk.services.s3.model.S3Object;
|
||||||
import software.amazon.awssdk.services.s3.model.SelectObjectContentRequest;
|
import software.amazon.awssdk.services.s3.model.SelectObjectContentRequest;
|
||||||
import software.amazon.awssdk.services.s3.model.SelectObjectContentResponseHandler;
|
import software.amazon.awssdk.services.s3.model.SelectObjectContentResponseHandler;
|
||||||
@ -98,7 +95,6 @@
|
|||||||
import software.amazon.awssdk.transfer.s3.model.FileUpload;
|
import software.amazon.awssdk.transfer.s3.model.FileUpload;
|
||||||
import software.amazon.awssdk.transfer.s3.model.UploadFileRequest;
|
import software.amazon.awssdk.transfer.s3.model.UploadFileRequest;
|
||||||
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
|
||||||
import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool;
|
import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
@ -246,7 +242,6 @@
|
|||||||
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.CSE_PADDING_LENGTH;
|
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.CSE_PADDING_LENGTH;
|
||||||
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DEFAULT_UPLOAD_PART_COUNT_LIMIT;
|
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DEFAULT_UPLOAD_PART_COUNT_LIMIT;
|
||||||
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DELETE_CONSIDERED_IDEMPOTENT;
|
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DELETE_CONSIDERED_IDEMPOTENT;
|
||||||
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_301_MOVED_PERMANENTLY;
|
|
||||||
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_403_FORBIDDEN;
|
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_403_FORBIDDEN;
|
||||||
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_404_NOT_FOUND;
|
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_404_NOT_FOUND;
|
||||||
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.UPLOAD_PART_COUNT_LIMIT;
|
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.UPLOAD_PART_COUNT_LIMIT;
|
||||||
@ -332,8 +327,6 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|||||||
private int executorCapacity;
|
private int executorCapacity;
|
||||||
private long multiPartThreshold;
|
private long multiPartThreshold;
|
||||||
public static final Logger LOG = LoggerFactory.getLogger(S3AFileSystem.class);
|
public static final Logger LOG = LoggerFactory.getLogger(S3AFileSystem.class);
|
||||||
/** Exactly once log to warn about setting the region in config to avoid probe. */
|
|
||||||
private static final LogExactlyOnce SET_REGION_WARNING = new LogExactlyOnce(LOG);
|
|
||||||
|
|
||||||
/** Log to warn of storage class configuration problems. */
|
/** Log to warn of storage class configuration problems. */
|
||||||
private static final LogExactlyOnce STORAGE_CLASS_WARNING = new LogExactlyOnce(LOG);
|
private static final LogExactlyOnce STORAGE_CLASS_WARNING = new LogExactlyOnce(LOG);
|
||||||
@ -461,8 +454,6 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|||||||
*/
|
*/
|
||||||
private String scheme = FS_S3A;
|
private String scheme = FS_S3A;
|
||||||
|
|
||||||
private final static Map<String, Region> BUCKET_REGIONS = new HashMap<>();
|
|
||||||
|
|
||||||
/** Add any deprecated keys. */
|
/** Add any deprecated keys. */
|
||||||
@SuppressWarnings("deprecation")
|
@SuppressWarnings("deprecation")
|
||||||
private static void addDeprecatedKeys() {
|
private static void addDeprecatedKeys() {
|
||||||
@ -870,9 +861,6 @@ protected void verifyBucketExists() throws UnknownStoreException, IOException {
|
|||||||
STORE_EXISTS_PROBE, bucket, null, () ->
|
STORE_EXISTS_PROBE, bucket, null, () ->
|
||||||
invoker.retry("doesBucketExist", bucket, true, () -> {
|
invoker.retry("doesBucketExist", bucket, true, () -> {
|
||||||
try {
|
try {
|
||||||
if (BUCKET_REGIONS.containsKey(bucket)) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
s3Client.headBucket(HeadBucketRequest.builder().bucket(bucket).build());
|
s3Client.headBucket(HeadBucketRequest.builder().bucket(bucket).build());
|
||||||
return true;
|
return true;
|
||||||
} catch (AwsServiceException ex) {
|
} catch (AwsServiceException ex) {
|
||||||
@ -982,8 +970,6 @@ private void bindAWSClient(URI name, boolean dtEnabled) throws IOException {
|
|||||||
? conf.getTrimmed(AWS_REGION)
|
? conf.getTrimmed(AWS_REGION)
|
||||||
: accessPoint.getRegion();
|
: accessPoint.getRegion();
|
||||||
|
|
||||||
Region region = getS3Region(configuredRegion);
|
|
||||||
|
|
||||||
S3ClientFactory.S3ClientCreationParameters parameters =
|
S3ClientFactory.S3ClientCreationParameters parameters =
|
||||||
new S3ClientFactory.S3ClientCreationParameters()
|
new S3ClientFactory.S3ClientCreationParameters()
|
||||||
.withCredentialSet(credentials)
|
.withCredentialSet(credentials)
|
||||||
@ -998,7 +984,7 @@ private void bindAWSClient(URI name, boolean dtEnabled) throws IOException {
|
|||||||
.withMultipartCopyEnabled(isMultipartCopyEnabled)
|
.withMultipartCopyEnabled(isMultipartCopyEnabled)
|
||||||
.withMultipartThreshold(multiPartThreshold)
|
.withMultipartThreshold(multiPartThreshold)
|
||||||
.withTransferManagerExecutor(unboundedThreadPool)
|
.withTransferManagerExecutor(unboundedThreadPool)
|
||||||
.withRegion(region);
|
.withRegion(configuredRegion);
|
||||||
|
|
||||||
S3ClientFactory clientFactory = ReflectionUtils.newInstance(s3ClientFactoryClass, conf);
|
S3ClientFactory clientFactory = ReflectionUtils.newInstance(s3ClientFactoryClass, conf);
|
||||||
s3Client = clientFactory.createS3Client(getUri(), parameters);
|
s3Client = clientFactory.createS3Client(getUri(), parameters);
|
||||||
@ -1019,75 +1005,6 @@ private void createS3AsyncClient(S3ClientFactory clientFactory,
|
|||||||
s3AsyncClient = clientFactory.createS3AsyncClient(getUri(), parameters);
|
s3AsyncClient = clientFactory.createS3AsyncClient(getUri(), parameters);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Get the bucket region.
|
|
||||||
*
|
|
||||||
* @param region AWS S3 Region set in the config. This property may not be set, in which case
|
|
||||||
* ask S3 for the region.
|
|
||||||
* @return region of the bucket.
|
|
||||||
*/
|
|
||||||
private Region getS3Region(String region) throws IOException {
|
|
||||||
|
|
||||||
if (!StringUtils.isBlank(region)) {
|
|
||||||
return Region.of(region);
|
|
||||||
}
|
|
||||||
|
|
||||||
Region cachedRegion = BUCKET_REGIONS.get(bucket);
|
|
||||||
|
|
||||||
if (cachedRegion != null) {
|
|
||||||
LOG.debug("Got region {} for bucket {} from cache", cachedRegion, bucket);
|
|
||||||
return cachedRegion;
|
|
||||||
}
|
|
||||||
|
|
||||||
Region s3Region = trackDurationAndSpan(STORE_REGION_PROBE, bucket, null,
|
|
||||||
() -> invoker.retry("getS3Region", bucket, true, () -> {
|
|
||||||
try {
|
|
||||||
|
|
||||||
SET_REGION_WARNING.warn(
|
|
||||||
"Getting region for bucket {} from S3, this will slow down FS initialisation. "
|
|
||||||
+ "To avoid this, set the region using property {}", bucket,
|
|
||||||
FS_S3A_BUCKET_PREFIX + bucket + ".endpoint.region");
|
|
||||||
|
|
||||||
// build a s3 client with region eu-west-1 that can be used to get the region of the
|
|
||||||
// bucket. Using eu-west-1, as headBucket() doesn't work with us-east-1. This is because
|
|
||||||
// us-east-1 uses the endpoint s3.amazonaws.com, which resolves bucket.s3.amazonaws.com
|
|
||||||
// to the actual region the bucket is in. As the request is signed with us-east-1 and
|
|
||||||
// not the bucket's region, it fails.
|
|
||||||
S3Client getRegionS3Client =
|
|
||||||
S3Client.builder().region(Region.EU_WEST_1).credentialsProvider(credentials)
|
|
||||||
.build();
|
|
||||||
|
|
||||||
HeadBucketResponse headBucketResponse =
|
|
||||||
getRegionS3Client.headBucket(HeadBucketRequest.builder().bucket(bucket).build());
|
|
||||||
|
|
||||||
Region bucketRegion = Region.of(
|
|
||||||
headBucketResponse.sdkHttpResponse().headers().get(BUCKET_REGION_HEADER).get(0));
|
|
||||||
BUCKET_REGIONS.put(bucket, bucketRegion);
|
|
||||||
|
|
||||||
return bucketRegion;
|
|
||||||
} catch (S3Exception exception) {
|
|
||||||
if (exception.statusCode() == SC_301_MOVED_PERMANENTLY) {
|
|
||||||
Region bucketRegion = Region.of(
|
|
||||||
exception.awsErrorDetails().sdkHttpResponse().headers().get(BUCKET_REGION_HEADER)
|
|
||||||
.get(0));
|
|
||||||
BUCKET_REGIONS.put(bucket, bucketRegion);
|
|
||||||
|
|
||||||
return bucketRegion;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (exception.statusCode() == SC_404_NOT_FOUND) {
|
|
||||||
throw new UnknownStoreException("s3a://" + bucket + "/",
|
|
||||||
" Bucket does not exist: " + exception,
|
|
||||||
exception);
|
|
||||||
}
|
|
||||||
|
|
||||||
throw exception;
|
|
||||||
}
|
|
||||||
}));
|
|
||||||
|
|
||||||
return s3Region;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Initialize and launch the audit manager and service.
|
* Initialize and launch the audit manager and service.
|
||||||
* As this takes the FS IOStatistics store, it must be invoked
|
* As this takes the FS IOStatistics store, it must be invoked
|
||||||
|
@ -28,7 +28,6 @@
|
|||||||
|
|
||||||
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
|
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
|
||||||
import software.amazon.awssdk.core.interceptor.ExecutionInterceptor;
|
import software.amazon.awssdk.core.interceptor.ExecutionInterceptor;
|
||||||
import software.amazon.awssdk.regions.Region;
|
|
||||||
import software.amazon.awssdk.services.s3.S3AsyncClient;
|
import software.amazon.awssdk.services.s3.S3AsyncClient;
|
||||||
import software.amazon.awssdk.services.s3.S3Client;
|
import software.amazon.awssdk.services.s3.S3Client;
|
||||||
import software.amazon.awssdk.transfer.s3.S3TransferManager;
|
import software.amazon.awssdk.transfer.s3.S3TransferManager;
|
||||||
@ -169,7 +168,7 @@ final class S3ClientCreationParameters {
|
|||||||
/**
|
/**
|
||||||
* Region of the S3 bucket.
|
* Region of the S3 bucket.
|
||||||
*/
|
*/
|
||||||
private Region region;
|
private String region;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -385,26 +384,6 @@ public S3ClientCreationParameters withTransferManagerExecutor(
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Set region.
|
|
||||||
*
|
|
||||||
* @param value new value
|
|
||||||
* @return the builder
|
|
||||||
*/
|
|
||||||
public S3ClientCreationParameters withRegion(
|
|
||||||
final Region value) {
|
|
||||||
region = value;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get the region.
|
|
||||||
* @return invoker
|
|
||||||
*/
|
|
||||||
public Region getRegion() {
|
|
||||||
return region;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set the multipart flag..
|
* Set the multipart flag..
|
||||||
*
|
*
|
||||||
@ -423,5 +402,25 @@ public S3ClientCreationParameters withMultipartCopyEnabled(final boolean value)
|
|||||||
public boolean isMultipartCopy() {
|
public boolean isMultipartCopy() {
|
||||||
return multipartCopy;
|
return multipartCopy;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set region.
|
||||||
|
*
|
||||||
|
* @param value new value
|
||||||
|
* @return the builder
|
||||||
|
*/
|
||||||
|
public S3ClientCreationParameters withRegion(
|
||||||
|
final String value) {
|
||||||
|
region = value;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the region.
|
||||||
|
* @return invoker
|
||||||
|
*/
|
||||||
|
public String getRegion() {
|
||||||
|
return region;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -548,11 +548,6 @@ public enum Statistic {
|
|||||||
StoreStatisticNames.STORE_IO_THROTTLE_RATE,
|
StoreStatisticNames.STORE_IO_THROTTLE_RATE,
|
||||||
"Rate of S3 request throttling",
|
"Rate of S3 request throttling",
|
||||||
TYPE_QUANTILE),
|
TYPE_QUANTILE),
|
||||||
STORE_REGION_PROBE(
|
|
||||||
StoreStatisticNames.STORE_REGION_PROBE,
|
|
||||||
"Store Region Probe",
|
|
||||||
TYPE_DURATION
|
|
||||||
),
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Delegation Token Operations.
|
* Delegation Token Operations.
|
||||||
|
@ -406,6 +406,13 @@ public int run(String[] args, PrintStream out)
|
|||||||
// Note and continue.
|
// Note and continue.
|
||||||
LOG.debug("failed to get bucket location", e);
|
LOG.debug("failed to get bucket location", e);
|
||||||
println(out, LOCATION_UNKNOWN);
|
println(out, LOCATION_UNKNOWN);
|
||||||
|
|
||||||
|
// it may be the bucket is not found; we can't differentiate
|
||||||
|
// that and handle third party store issues where the API may
|
||||||
|
// not work.
|
||||||
|
// Fallback to looking for bucket root attributes.
|
||||||
|
println(out, "Probing for bucket existence");
|
||||||
|
fs.listXAttrs(new Path("/"));
|
||||||
}
|
}
|
||||||
|
|
||||||
// print any auth paths for directory marker info
|
// print any auth paths for directory marker info
|
||||||
|
@ -59,6 +59,7 @@
|
|||||||
import org.apache.http.HttpStatus;
|
import org.apache.http.HttpStatus;
|
||||||
|
|
||||||
import static java.util.Objects.requireNonNull;
|
import static java.util.Objects.requireNonNull;
|
||||||
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
|
||||||
import static org.apache.hadoop.fs.s3a.Constants.*;
|
import static org.apache.hadoop.fs.s3a.Constants.*;
|
||||||
import static org.apache.hadoop.fs.s3a.S3ATestConstants.EU_WEST_1;
|
import static org.apache.hadoop.fs.s3a.S3ATestConstants.EU_WEST_1;
|
||||||
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
|
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
|
||||||
@ -366,6 +367,7 @@ public void shouldBeAbleToSwitchOnS3PathStyleAccessViaConfigProperty()
|
|||||||
throws Exception {
|
throws Exception {
|
||||||
|
|
||||||
conf = new Configuration();
|
conf = new Configuration();
|
||||||
|
skipIfCrossRegionClient(conf);
|
||||||
conf.set(Constants.PATH_STYLE_ACCESS, Boolean.toString(true));
|
conf.set(Constants.PATH_STYLE_ACCESS, Boolean.toString(true));
|
||||||
assertTrue(conf.getBoolean(Constants.PATH_STYLE_ACCESS, false));
|
assertTrue(conf.getBoolean(Constants.PATH_STYLE_ACCESS, false));
|
||||||
|
|
||||||
@ -404,6 +406,7 @@ public void shouldBeAbleToSwitchOnS3PathStyleAccessViaConfigProperty()
|
|||||||
@Test
|
@Test
|
||||||
public void testDefaultUserAgent() throws Exception {
|
public void testDefaultUserAgent() throws Exception {
|
||||||
conf = new Configuration();
|
conf = new Configuration();
|
||||||
|
skipIfCrossRegionClient(conf);
|
||||||
fs = S3ATestUtils.createTestFileSystem(conf);
|
fs = S3ATestUtils.createTestFileSystem(conf);
|
||||||
assertNotNull(fs);
|
assertNotNull(fs);
|
||||||
S3Client s3 = getS3Client("User Agent");
|
S3Client s3 = getS3Client("User Agent");
|
||||||
@ -417,6 +420,7 @@ public void testDefaultUserAgent() throws Exception {
|
|||||||
@Test
|
@Test
|
||||||
public void testCustomUserAgent() throws Exception {
|
public void testCustomUserAgent() throws Exception {
|
||||||
conf = new Configuration();
|
conf = new Configuration();
|
||||||
|
skipIfCrossRegionClient(conf);
|
||||||
conf.set(Constants.USER_AGENT_PREFIX, "MyApp");
|
conf.set(Constants.USER_AGENT_PREFIX, "MyApp");
|
||||||
fs = S3ATestUtils.createTestFileSystem(conf);
|
fs = S3ATestUtils.createTestFileSystem(conf);
|
||||||
assertNotNull(fs);
|
assertNotNull(fs);
|
||||||
@ -431,6 +435,7 @@ public void testCustomUserAgent() throws Exception {
|
|||||||
@Test
|
@Test
|
||||||
public void testRequestTimeout() throws Exception {
|
public void testRequestTimeout() throws Exception {
|
||||||
conf = new Configuration();
|
conf = new Configuration();
|
||||||
|
skipIfCrossRegionClient(conf);
|
||||||
conf.set(REQUEST_TIMEOUT, "120");
|
conf.set(REQUEST_TIMEOUT, "120");
|
||||||
fs = S3ATestUtils.createTestFileSystem(conf);
|
fs = S3ATestUtils.createTestFileSystem(conf);
|
||||||
S3Client s3 = getS3Client("Request timeout (ms)");
|
S3Client s3 = getS3Client("Request timeout (ms)");
|
||||||
@ -610,4 +615,16 @@ public static boolean isSTSSignerCalled() {
|
|||||||
return stsSignerCalled;
|
return stsSignerCalled;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Skip a test if client created is cross region.
|
||||||
|
* @param configuration configuration to probe
|
||||||
|
*/
|
||||||
|
private static void skipIfCrossRegionClient(
|
||||||
|
Configuration configuration) {
|
||||||
|
if (configuration.get(ENDPOINT, null) == null
|
||||||
|
&& configuration.get(AWS_REGION, null) == null) {
|
||||||
|
skip("Skipping test as cross region client is in use ");
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -27,6 +27,7 @@
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import org.assertj.core.api.Assertions;
|
import org.assertj.core.api.Assertions;
|
||||||
|
import org.junit.Ignore;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import software.amazon.awssdk.awscore.AwsExecutionAttribute;
|
import software.amazon.awssdk.awscore.AwsExecutionAttribute;
|
||||||
import software.amazon.awssdk.awscore.exception.AwsServiceException;
|
import software.amazon.awssdk.awscore.exception.AwsServiceException;
|
||||||
@ -42,8 +43,9 @@
|
|||||||
import static org.apache.hadoop.fs.s3a.Constants.AWS_REGION;
|
import static org.apache.hadoop.fs.s3a.Constants.AWS_REGION;
|
||||||
import static org.apache.hadoop.fs.s3a.Constants.PATH_STYLE_ACCESS;
|
import static org.apache.hadoop.fs.s3a.Constants.PATH_STYLE_ACCESS;
|
||||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
|
||||||
import static org.apache.hadoop.fs.s3a.Statistic.STORE_REGION_PROBE;
|
|
||||||
import static org.apache.hadoop.io.IOUtils.closeStream;
|
import static org.apache.hadoop.io.IOUtils.closeStream;
|
||||||
|
import static org.apache.hadoop.fs.s3a.Constants.CENTRAL_ENDPOINT;
|
||||||
|
|
||||||
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -54,9 +56,17 @@ public class ITestS3AEndpointRegion extends AbstractS3ATestBase {
|
|||||||
|
|
||||||
private static final String AWS_ENDPOINT_TEST = "test-endpoint";
|
private static final String AWS_ENDPOINT_TEST = "test-endpoint";
|
||||||
|
|
||||||
private static final String USW_2_BUCKET = "landsat-pds";
|
private static final String US_EAST_1 = "us-east-1";
|
||||||
|
|
||||||
public static final String USW_2_STORE = "s3a://" + USW_2_BUCKET;
|
private static final String US_EAST_2 = "us-east-2";
|
||||||
|
|
||||||
|
private static final String US_WEST_2 = "us-west-2";
|
||||||
|
|
||||||
|
private static final String EU_WEST_2 = "eu-west-2";
|
||||||
|
|
||||||
|
private static final String CN_NORTHWEST_1 = "cn-northwest-1";
|
||||||
|
|
||||||
|
private static final String US_GOV_EAST_1 = "us-gov-east-1";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* If anyone were ever to create a bucket with this UUID pair it would break the tests.
|
* If anyone were ever to create a bucket with this UUID pair it would break the tests.
|
||||||
@ -64,6 +74,14 @@ public class ITestS3AEndpointRegion extends AbstractS3ATestBase {
|
|||||||
public static final String UNKNOWN_BUCKET = "23FA76D4-5F17-48B8-9D7D-9050269D0E40"
|
public static final String UNKNOWN_BUCKET = "23FA76D4-5F17-48B8-9D7D-9050269D0E40"
|
||||||
+ "-8281BAF2-DBCF-47AA-8A27-F2FA3589656A";
|
+ "-8281BAF2-DBCF-47AA-8A27-F2FA3589656A";
|
||||||
|
|
||||||
|
private static final String EU_WEST_2_ENDPOINT = "s3.eu-west-2.amazonaws.com";
|
||||||
|
|
||||||
|
private static final String CN_ENDPOINT = "s3.cn-northwest-1.amazonaws.com.cn";
|
||||||
|
|
||||||
|
private static final String GOV_ENDPOINT = "s3-fips.us-gov-east-1.amazonaws.com";
|
||||||
|
|
||||||
|
private static final String VPC_ENDPOINT = "vpce-1a2b3c4d-5e6f.s3.us-west-2.vpce.amazonaws.com";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* New FS instance which will be closed in teardown.
|
* New FS instance which will be closed in teardown.
|
||||||
*/
|
*/
|
||||||
@ -75,11 +93,6 @@ public void teardown() throws Exception {
|
|||||||
super.teardown();
|
super.teardown();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Test to verify that not setting the region config, will lead to the client factory making
|
|
||||||
* a HEAD bucket call to configure the correct region. If an incorrect region is set, the HEAD
|
|
||||||
* bucket call in this test will raise an exception.
|
|
||||||
*/
|
|
||||||
@Test
|
@Test
|
||||||
public void testWithoutRegionConfig() throws IOException {
|
public void testWithoutRegionConfig() throws IOException {
|
||||||
describe("Verify that region lookup takes place");
|
describe("Verify that region lookup takes place");
|
||||||
@ -96,7 +109,6 @@ public void testWithoutRegionConfig() throws IOException {
|
|||||||
} catch (UnknownHostException | UnknownStoreException | AccessDeniedException allowed) {
|
} catch (UnknownHostException | UnknownStoreException | AccessDeniedException allowed) {
|
||||||
// these are all valid failure modes from different test environments.
|
// these are all valid failure modes from different test environments.
|
||||||
}
|
}
|
||||||
assertRegionProbeCount(1);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -115,82 +127,128 @@ public void testUnknownBucket() throws Exception {
|
|||||||
} catch (UnknownHostException | UnknownStoreException expected) {
|
} catch (UnknownHostException | UnknownStoreException expected) {
|
||||||
// this is good.
|
// this is good.
|
||||||
}
|
}
|
||||||
assertRegionProbeCount(1);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testWithRegionConfig() throws IOException, URISyntaxException {
|
|
||||||
describe("Verify that region lookup is skipped if the region property is set");
|
|
||||||
Configuration conf = getConfiguration();
|
|
||||||
removeBaseAndBucketOverrides(conf, AWS_REGION, PATH_STYLE_ACCESS);
|
|
||||||
|
|
||||||
conf.set(AWS_REGION, "us-east-2");
|
|
||||||
|
|
||||||
newFS = new S3AFileSystem();
|
|
||||||
newFS.initialize(new URI(USW_2_STORE), conf);
|
|
||||||
assertRegionProbeCount(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testRegionCache() throws IOException, URISyntaxException {
|
|
||||||
describe("Verify that region lookup is cached on the second attempt");
|
|
||||||
Configuration conf = getConfiguration();
|
|
||||||
removeBaseAndBucketOverrides(USW_2_BUCKET, conf, AWS_REGION, PATH_STYLE_ACCESS);
|
|
||||||
|
|
||||||
newFS = new S3AFileSystem();
|
|
||||||
|
|
||||||
newFS.initialize(new URI(USW_2_STORE), conf);
|
|
||||||
|
|
||||||
assertRegionProbeCount(1);
|
|
||||||
closeStream(newFS);
|
|
||||||
|
|
||||||
// create a new instance
|
|
||||||
newFS = new S3AFileSystem();
|
|
||||||
newFS.initialize(new URI(USW_2_STORE), conf);
|
|
||||||
|
|
||||||
// value should already be cached.
|
|
||||||
assertRegionProbeCount(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void assertRegionProbeCount(final int expected) {
|
|
||||||
Assertions.assertThat(newFS.getInstrumentation().getCounterValue(STORE_REGION_PROBE))
|
|
||||||
.describedAs("Incorrect number of calls made to get bucket region").isEqualTo(expected);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testEndpointOverride() throws Throwable {
|
public void testEndpointOverride() throws Throwable {
|
||||||
describe("Create a client with no region and the default endpoint");
|
describe("Create a client with a configured endpoint");
|
||||||
Configuration conf = getConfiguration();
|
Configuration conf = getConfiguration();
|
||||||
|
|
||||||
S3Client client = createS3Client(conf, AWS_ENDPOINT_TEST);
|
S3Client client = createS3Client(conf, AWS_ENDPOINT_TEST, null, US_EAST_2);
|
||||||
|
|
||||||
|
intercept(AwsServiceException.class, "Exception thrown by interceptor", () -> client.headBucket(
|
||||||
|
HeadBucketRequest.builder().bucket(getFileSystem().getBucket()).build()));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCentralEndpoint() throws Throwable {
|
||||||
|
describe("Create a client with the central endpoint");
|
||||||
|
Configuration conf = getConfiguration();
|
||||||
|
|
||||||
|
S3Client client = createS3Client(conf, CENTRAL_ENDPOINT, null, US_EAST_1);
|
||||||
|
|
||||||
|
intercept(AwsServiceException.class, "Exception thrown by interceptor", () -> client.headBucket(
|
||||||
|
HeadBucketRequest.builder().bucket(getFileSystem().getBucket()).build()));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testWithRegionConfig() throws Throwable {
|
||||||
|
describe("Create a client with a configured region");
|
||||||
|
Configuration conf = getConfiguration();
|
||||||
|
|
||||||
|
S3Client client = createS3Client(conf, null, EU_WEST_2, EU_WEST_2);
|
||||||
|
|
||||||
intercept(AwsServiceException.class, "Exception thrown by interceptor", () -> client.headBucket(
|
intercept(AwsServiceException.class, "Exception thrown by interceptor", () -> client.headBucket(
|
||||||
HeadBucketRequest.builder().bucket(getFileSystem().getBucket()).build()));
|
HeadBucketRequest.builder().bucket(getFileSystem().getBucket()).build()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
class RegionInterceptor implements ExecutionInterceptor {
|
public void testEUWest2Endpoint() throws Throwable {
|
||||||
private boolean endpointOverridden;
|
describe("Create a client with the eu west 2 endpoint");
|
||||||
|
Configuration conf = getConfiguration();
|
||||||
|
|
||||||
RegionInterceptor(boolean endpointOverridden) {
|
S3Client client = createS3Client(conf, EU_WEST_2_ENDPOINT, null, EU_WEST_2);
|
||||||
this.endpointOverridden = endpointOverridden;
|
|
||||||
|
intercept(AwsServiceException.class, "Exception thrown by interceptor", () -> client.headBucket(
|
||||||
|
HeadBucketRequest.builder().bucket(getFileSystem().getBucket()).build()));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testWithRegionAndEndpointConfig() throws Throwable {
|
||||||
|
describe("Test that when both region and endpoint are configured, region takes precedence");
|
||||||
|
Configuration conf = getConfiguration();
|
||||||
|
|
||||||
|
S3Client client = createS3Client(conf, EU_WEST_2_ENDPOINT, US_WEST_2, US_WEST_2);
|
||||||
|
|
||||||
|
intercept(AwsServiceException.class, "Exception thrown by interceptor", () -> client.headBucket(
|
||||||
|
HeadBucketRequest.builder().bucket(getFileSystem().getBucket()).build()));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testWithChinaEndpoint() throws Throwable {
|
||||||
|
describe("Test with a china endpoint");
|
||||||
|
Configuration conf = getConfiguration();
|
||||||
|
|
||||||
|
S3Client client = createS3Client(conf, CN_ENDPOINT, null, CN_NORTHWEST_1);
|
||||||
|
|
||||||
|
intercept(AwsServiceException.class, "Exception thrown by interceptor", () -> client.headBucket(
|
||||||
|
HeadBucketRequest.builder().bucket(getFileSystem().getBucket()).build()));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testWithGovCloudEndpoint() throws Throwable {
|
||||||
|
describe("Test with a gov cloud endpoint");
|
||||||
|
Configuration conf = getConfiguration();
|
||||||
|
|
||||||
|
S3Client client = createS3Client(conf, GOV_ENDPOINT, null, US_GOV_EAST_1);
|
||||||
|
|
||||||
|
intercept(AwsServiceException.class, "Exception thrown by interceptor", () -> client.headBucket(
|
||||||
|
HeadBucketRequest.builder().bucket(getFileSystem().getBucket()).build()));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@Ignore("Pending HADOOP-18938. S3A region logic to handle vpce and non standard endpoints")
|
||||||
|
public void testWithVPCE() throws Throwable {
|
||||||
|
describe("Test with vpc endpoint");
|
||||||
|
Configuration conf = getConfiguration();
|
||||||
|
|
||||||
|
S3Client client = createS3Client(conf, VPC_ENDPOINT, null, US_WEST_2);
|
||||||
|
|
||||||
|
intercept(AwsServiceException.class, "Exception thrown by interceptor", () -> client.headBucket(
|
||||||
|
HeadBucketRequest.builder().bucket(getFileSystem().getBucket()).build()));
|
||||||
|
}
|
||||||
|
|
||||||
|
class RegionInterceptor implements ExecutionInterceptor {
|
||||||
|
private String endpoint;
|
||||||
|
private String region;
|
||||||
|
|
||||||
|
RegionInterceptor(String endpoint, String region) {
|
||||||
|
this.endpoint = endpoint;
|
||||||
|
this.region = region;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void beforeExecution(Context.BeforeExecution context,
|
public void beforeExecution(Context.BeforeExecution context,
|
||||||
ExecutionAttributes executionAttributes) {
|
ExecutionAttributes executionAttributes) {
|
||||||
|
|
||||||
if (endpointOverridden) {
|
if (endpoint != null) {
|
||||||
Assertions.assertThat(
|
Assertions.assertThat(
|
||||||
executionAttributes.getAttribute(AwsExecutionAttribute.ENDPOINT_OVERRIDDEN))
|
executionAttributes.getAttribute(AwsExecutionAttribute.ENDPOINT_OVERRIDDEN))
|
||||||
.describedAs("Endpoint not overridden").isTrue();
|
.describedAs("Endpoint not overridden").isTrue();
|
||||||
|
|
||||||
Assertions.assertThat(
|
Assertions.assertThat(
|
||||||
executionAttributes.getAttribute(AwsExecutionAttribute.CLIENT_ENDPOINT).toString())
|
executionAttributes.getAttribute(AwsExecutionAttribute.CLIENT_ENDPOINT).toString())
|
||||||
.describedAs("There is an endpoint mismatch").isEqualTo("https://" + AWS_ENDPOINT_TEST);
|
.describedAs("There is an endpoint mismatch").isEqualTo("https://" + endpoint);
|
||||||
|
} else {
|
||||||
|
Assertions.assertThat(
|
||||||
|
executionAttributes.getAttribute(AwsExecutionAttribute.ENDPOINT_OVERRIDDEN))
|
||||||
|
.describedAs("Endpoint is overridden").isEqualTo(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Assertions.assertThat(
|
||||||
|
executionAttributes.getAttribute(AwsExecutionAttribute.AWS_REGION).toString())
|
||||||
|
.describedAs("Incorrect region set").isEqualTo(region);
|
||||||
|
|
||||||
// We don't actually want to make a request, so exit early.
|
// We don't actually want to make a request, so exit early.
|
||||||
throw AwsServiceException.builder().message("Exception thrown by interceptor").build();
|
throw AwsServiceException.builder().message("Exception thrown by interceptor").build();
|
||||||
}
|
}
|
||||||
@ -202,23 +260,18 @@ public void beforeExecution(Context.BeforeExecution context,
|
|||||||
* value.
|
* value.
|
||||||
* @param conf configuration to use.
|
* @param conf configuration to use.
|
||||||
* @param endpoint endpoint.
|
* @param endpoint endpoint.
|
||||||
|
* @param expectedRegion the region that should be set in the client.
|
||||||
* @return the client.
|
* @return the client.
|
||||||
* @throws URISyntaxException parse problems.
|
* @throws URISyntaxException parse problems.
|
||||||
* @throws IOException IO problems
|
* @throws IOException IO problems
|
||||||
*/
|
*/
|
||||||
@SuppressWarnings("deprecation")
|
@SuppressWarnings("deprecation")
|
||||||
private S3Client createS3Client(Configuration conf,
|
private S3Client createS3Client(Configuration conf,
|
||||||
String endpoint)
|
String endpoint, String configuredRegion, String expectedRegion)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
|
||||||
boolean endpointOverridden = false;
|
|
||||||
|
|
||||||
if (endpoint != null && !endpoint.isEmpty()) {
|
|
||||||
endpointOverridden = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
List<ExecutionInterceptor> interceptors = new ArrayList<>();
|
List<ExecutionInterceptor> interceptors = new ArrayList<>();
|
||||||
interceptors.add(new RegionInterceptor(endpointOverridden));
|
interceptors.add(new RegionInterceptor(endpoint, expectedRegion));
|
||||||
|
|
||||||
DefaultS3ClientFactory factory
|
DefaultS3ClientFactory factory
|
||||||
= new DefaultS3ClientFactory();
|
= new DefaultS3ClientFactory();
|
||||||
@ -229,7 +282,9 @@ private S3Client createS3Client(Configuration conf,
|
|||||||
.withEndpoint(endpoint)
|
.withEndpoint(endpoint)
|
||||||
.withMetrics(new EmptyS3AStatisticsContext()
|
.withMetrics(new EmptyS3AStatisticsContext()
|
||||||
.newStatisticsFromAwsSdk())
|
.newStatisticsFromAwsSdk())
|
||||||
.withExecutionInterceptors(interceptors);
|
.withExecutionInterceptors(interceptors)
|
||||||
|
.withRegion(configuredRegion);
|
||||||
|
|
||||||
|
|
||||||
S3Client client = factory.createS3Client(
|
S3Client client = factory.createS3Client(
|
||||||
getFileSystem().getUri(),
|
getFileSystem().getUri(),
|
||||||
|
@ -26,7 +26,6 @@
|
|||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.nio.file.AccessDeniedException;
|
import java.nio.file.AccessDeniedException;
|
||||||
|
|
||||||
import software.amazon.awssdk.regions.Region;
|
|
||||||
import software.amazon.awssdk.services.s3.S3Client;
|
import software.amazon.awssdk.services.s3.S3Client;
|
||||||
import software.amazon.awssdk.services.s3.model.HeadBucketResponse;
|
import software.amazon.awssdk.services.s3.model.HeadBucketResponse;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
@ -596,8 +595,8 @@ protected HeadBucketResponse readLandsatMetadata(final S3AFileSystem delegatedFS
|
|||||||
.withPathUri(new URI("s3a://localhost/"))
|
.withPathUri(new URI("s3a://localhost/"))
|
||||||
.withMetrics(new EmptyS3AStatisticsContext()
|
.withMetrics(new EmptyS3AStatisticsContext()
|
||||||
.newStatisticsFromAwsSdk())
|
.newStatisticsFromAwsSdk())
|
||||||
.withUserAgentSuffix("ITestSessionDelegationInFilesystem")
|
.withUserAgentSuffix("ITestSessionDelegationInFilesystem");
|
||||||
.withRegion(Region.US_WEST_2);
|
|
||||||
S3Client s3 = factory.createS3Client(landsat, parameters);
|
S3Client s3 = factory.createS3Client(landsat, parameters);
|
||||||
|
|
||||||
return Invoker.once("HEAD", host,
|
return Invoker.once("HEAD", host,
|
||||||
|
@ -31,11 +31,10 @@
|
|||||||
</property>
|
</property>
|
||||||
|
|
||||||
<!-- Per-bucket configurations: landsat-pds -->
|
<!-- Per-bucket configurations: landsat-pds -->
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>fs.s3a.bucket.landsat-pds.endpoint</name>
|
<name>fs.s3a.bucket.landsat-pds.endpoint.region</name>
|
||||||
<value>${central.endpoint}</value>
|
<value>us-west-2</value>
|
||||||
<description>The endpoint for s3a://landsat-pds URLs</description>
|
<description>The region for s3a://landsat-pds</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
@ -58,10 +57,9 @@
|
|||||||
</property>
|
</property>
|
||||||
|
|
||||||
<!-- Per-bucket configurations: usgs-landsat -->
|
<!-- Per-bucket configurations: usgs-landsat -->
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>fs.s3a.bucket.usgs-landsat.endpoint</name>
|
<name>fs.s3a.bucket.usgs-landsat.endpoint.region</name>
|
||||||
<value>${central.endpoint}</value>
|
<value>us-west-2</value>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
@ -82,6 +80,12 @@
|
|||||||
<value>false</value>
|
<value>false</value>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<!-- Per-bucket configurations: osm-pds -->
|
||||||
|
<property>
|
||||||
|
<name>fs.s3a.bucket.osm-pds.endpoint.region</name>
|
||||||
|
<value>us-east-1</value>
|
||||||
|
<description>The region for s3a://osm-pds</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<!--
|
<!--
|
||||||
This is the default endpoint, which can be used to interact
|
This is the default endpoint, which can be used to interact
|
||||||
|
Loading…
Reference in New Issue
Block a user