HADOOP-17198. Support S3 Access Points (#3260)
Add support for S3 Access Points. This provides extra security as it ensures applications are not working with buckets belong to third parties. To bind a bucket to an access point, set the access point (ap) ARN, which must be done for each specific bucket, using the pattern fs.s3a.bucket.$BUCKET.accesspoint.arn = ARN * The global/bucket option `fs.s3a.accesspoint.required` to mandate that buckets must declare their access point. * This is not compatible with S3Guard. Consult the documentation for further details. Contributed by Bogdan Stolojan
This commit is contained in:
parent
1db5eb43ad
commit
b7c2864613
@ -1605,6 +1605,14 @@
|
||||
implementations can still be used</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.s3a.accesspoint.required</name>
|
||||
<value>false</value>
|
||||
<description>Require that all S3 access is made through Access Points and not through
|
||||
buckets directly. If enabled, use per-bucket overrides to allow bucket access to a specific set
|
||||
of buckets.</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.s3a.block.size</name>
|
||||
<value>32M</value>
|
||||
|
@ -0,0 +1,134 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.s3a;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
|
||||
import com.amazonaws.arn.Arn;
|
||||
import com.amazonaws.regions.RegionUtils;
|
||||
|
||||
/**
|
||||
* Represents an Arn Resource, this can be an accesspoint or bucket.
|
||||
*/
|
||||
public final class ArnResource {
|
||||
|
||||
/**
|
||||
* Resource name.
|
||||
*/
|
||||
private final String name;
|
||||
|
||||
/**
|
||||
* Resource owner account id.
|
||||
*/
|
||||
private final String ownerAccountId;
|
||||
|
||||
/**
|
||||
* Resource region.
|
||||
*/
|
||||
private final String region;
|
||||
|
||||
/**
|
||||
* Full Arn for the resource.
|
||||
*/
|
||||
private final String fullArn;
|
||||
|
||||
/**
|
||||
* Partition for the resource. Allowed partitions: aws, aws-cn, aws-us-gov
|
||||
*/
|
||||
private final String partition;
|
||||
|
||||
/**
|
||||
* Because of the different ways an endpoint can be constructed depending on partition we're
|
||||
* relying on the AWS SDK to produce the endpoint. In this case we need a region key of the form
|
||||
* {@code String.format("accesspoint-%s", awsRegion)}
|
||||
*/
|
||||
private final String accessPointRegionKey;
|
||||
|
||||
private ArnResource(String name, String owner, String region, String partition, String fullArn) {
|
||||
this.name = name;
|
||||
this.ownerAccountId = owner;
|
||||
this.region = region;
|
||||
this.partition = partition;
|
||||
this.fullArn = fullArn;
|
||||
this.accessPointRegionKey = String.format("accesspoint-%s", region);
|
||||
}
|
||||
|
||||
/**
|
||||
* Resource name.
|
||||
* @return resource name.
|
||||
*/
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return owner's account id.
|
||||
* @return owner account id
|
||||
*/
|
||||
public String getOwnerAccountId() {
|
||||
return ownerAccountId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Resource region.
|
||||
* @return resource region.
|
||||
*/
|
||||
public String getRegion() {
|
||||
return region;
|
||||
}
|
||||
|
||||
/**
|
||||
* Full arn for resource.
|
||||
* @return arn for resource.
|
||||
*/
|
||||
public String getFullArn() {
|
||||
return fullArn;
|
||||
}
|
||||
|
||||
/**
|
||||
* Formatted endpoint for the resource.
|
||||
* @return resource endpoint.
|
||||
*/
|
||||
public String getEndpoint() {
|
||||
return RegionUtils.getRegion(accessPointRegionKey)
|
||||
.getServiceEndpoint("s3");
|
||||
}
|
||||
|
||||
/**
|
||||
* Parses the passed `arn` string into a full ArnResource.
|
||||
* @param arn - string representing an Arn resource.
|
||||
* @return new ArnResource instance.
|
||||
* @throws IllegalArgumentException - if the Arn is malformed or any of the region, accountId and
|
||||
* resource name properties are empty.
|
||||
*/
|
||||
@Nonnull
|
||||
public static ArnResource accessPointFromArn(String arn) throws IllegalArgumentException {
|
||||
Arn parsed = Arn.fromString(arn);
|
||||
|
||||
if (parsed.getRegion().isEmpty() || parsed.getAccountId().isEmpty() ||
|
||||
parsed.getResourceAsString().isEmpty()) {
|
||||
throw new IllegalArgumentException(
|
||||
String.format("Access Point Arn %s has an invalid format or missing properties", arn));
|
||||
}
|
||||
|
||||
String resourceName = parsed.getResource().getResource();
|
||||
return new ArnResource(resourceName, parsed.getAccountId(), parsed.getRegion(),
|
||||
parsed.getPartition(), arn);
|
||||
}
|
||||
}
|
@ -1122,4 +1122,8 @@ private Constants() {
|
||||
*/
|
||||
public static final String AWS_S3_CENTRAL_REGION = "us-east-1";
|
||||
|
||||
/**
|
||||
* Require that all S3 access is made through Access Points.
|
||||
*/
|
||||
public static final String AWS_S3_ACCESSPOINT_REQUIRED = "fs.s3a.accesspoint.required";
|
||||
}
|
||||
|
@ -216,10 +216,14 @@
|
||||
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletionIgnoringExceptions;
|
||||
import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.isObjectNotFound;
|
||||
import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.isUnknownBucket;
|
||||
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.AP_REQUIRED_EXCEPTION;
|
||||
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.AP_S3GUARD_INCOMPATIBLE;
|
||||
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.ARN_BUCKET_OPTION;
|
||||
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.CSE_PADDING_LENGTH;
|
||||
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.CSE_S3GUARD_INCOMPATIBLE;
|
||||
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DEFAULT_UPLOAD_PART_COUNT_LIMIT;
|
||||
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DELETE_CONSIDERED_IDEMPOTENT;
|
||||
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_403;
|
||||
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_404;
|
||||
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.UPLOAD_PART_COUNT_LIMIT;
|
||||
import static org.apache.hadoop.fs.s3a.impl.NetworkBinding.fixBucketRegion;
|
||||
@ -274,6 +278,12 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
||||
private Invoker s3guardInvoker = new Invoker(RetryPolicies.TRY_ONCE_THEN_FAIL,
|
||||
Invoker.LOG_EVENT);
|
||||
private final Retried onRetry = this::operationRetried;
|
||||
|
||||
/**
|
||||
* Represents bucket name for all S3 operations. If per bucket override for
|
||||
* {@link InternalConstants#ARN_BUCKET_OPTION} property is set, then the bucket is updated to
|
||||
* point to the configured Arn.
|
||||
*/
|
||||
private String bucket;
|
||||
private int maxKeys;
|
||||
private Listing listing;
|
||||
@ -367,6 +377,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
||||
*/
|
||||
private boolean isCSEEnabled;
|
||||
|
||||
/**
|
||||
* Bucket AccessPoint.
|
||||
*/
|
||||
private ArnResource accessPoint;
|
||||
|
||||
/** Add any deprecated keys. */
|
||||
@SuppressWarnings("deprecation")
|
||||
private static void addDeprecatedKeys() {
|
||||
@ -408,10 +423,20 @@ public void initialize(URI name, Configuration originalConf)
|
||||
LOG.debug("Initializing S3AFileSystem for {}", bucket);
|
||||
// clone the configuration into one with propagated bucket options
|
||||
Configuration conf = propagateBucketOptions(originalConf, bucket);
|
||||
|
||||
// HADOOP-17894. remove references to s3a stores in JCEKS credentials.
|
||||
conf = ProviderUtils.excludeIncompatibleCredentialProviders(
|
||||
conf, S3AFileSystem.class);
|
||||
String arn = String.format(ARN_BUCKET_OPTION, bucket);
|
||||
String configuredArn = conf.getTrimmed(arn, "");
|
||||
if (!configuredArn.isEmpty()) {
|
||||
accessPoint = ArnResource.accessPointFromArn(configuredArn);
|
||||
LOG.info("Using AccessPoint ARN \"{}\" for bucket {}", configuredArn, bucket);
|
||||
bucket = accessPoint.getFullArn();
|
||||
} else if (conf.getBoolean(AWS_S3_ACCESSPOINT_REQUIRED, false)) {
|
||||
LOG.warn("Access Point usage is required because \"{}\" is enabled," +
|
||||
" but not configured for the bucket: {}", AWS_S3_ACCESSPOINT_REQUIRED, bucket);
|
||||
throw new PathIOException(bucket, AP_REQUIRED_EXCEPTION);
|
||||
}
|
||||
|
||||
// fix up the classloader of the configuration to be whatever
|
||||
// classloader loaded this filesystem.
|
||||
@ -479,6 +504,11 @@ public void initialize(URI name, Configuration originalConf)
|
||||
"version 2", listVersion);
|
||||
}
|
||||
useListV1 = (listVersion == 1);
|
||||
if (accessPoint != null && useListV1) {
|
||||
LOG.warn("V1 list configured in fs.s3a.list.version. This is not supported in by" +
|
||||
" access points. Upgrading to V2");
|
||||
useListV1 = false;
|
||||
}
|
||||
|
||||
signerManager = new SignerManager(bucket, this, conf, owner);
|
||||
signerManager.initCustomSigners();
|
||||
@ -556,6 +586,9 @@ public void initialize(URI name, Configuration originalConf)
|
||||
if (isCSEEnabled) {
|
||||
throw new PathIOException(uri.toString(), CSE_S3GUARD_INCOMPATIBLE);
|
||||
}
|
||||
if (accessPoint != null) {
|
||||
throw new PathIOException(uri.toString(), AP_S3GUARD_INCOMPATIBLE);
|
||||
}
|
||||
}
|
||||
|
||||
// LOG if S3Guard is disabled on the warn level set in config
|
||||
@ -743,11 +776,24 @@ protected void verifyBucketExists()
|
||||
*/
|
||||
@Retries.RetryTranslated
|
||||
protected void verifyBucketExistsV2()
|
||||
throws UnknownStoreException, IOException {
|
||||
throws UnknownStoreException, IOException {
|
||||
if (!invoker.retry("doesBucketExistV2", bucket, true,
|
||||
trackDurationOfOperation(getDurationTrackerFactory(),
|
||||
STORE_EXISTS_PROBE.getSymbol(),
|
||||
() -> s3.doesBucketExistV2(bucket)))) {
|
||||
() -> {
|
||||
// Bug in SDK always returns `true` for AccessPoint ARNs with `doesBucketExistV2()`
|
||||
// expanding implementation to use ARNs and buckets correctly
|
||||
try {
|
||||
s3.getBucketAcl(bucket);
|
||||
} catch (AmazonServiceException ex) {
|
||||
int statusCode = ex.getStatusCode();
|
||||
if (statusCode == SC_404 || (statusCode == SC_403 && accessPoint != null)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}))) {
|
||||
throw new UnknownStoreException("s3a://" + bucket + "/", " Bucket does "
|
||||
+ "not exist");
|
||||
}
|
||||
@ -835,10 +881,14 @@ private void bindAWSClient(URI name, boolean dtEnabled) throws IOException {
|
||||
S3_CLIENT_FACTORY_IMPL, DEFAULT_S3_CLIENT_FACTORY_IMPL,
|
||||
S3ClientFactory.class);
|
||||
|
||||
String endpoint = accessPoint == null
|
||||
? conf.getTrimmed(ENDPOINT, DEFAULT_ENDPOINT)
|
||||
: accessPoint.getEndpoint();
|
||||
|
||||
S3ClientFactory.S3ClientCreationParameters parameters = null;
|
||||
parameters = new S3ClientFactory.S3ClientCreationParameters()
|
||||
.withCredentialSet(credentials)
|
||||
.withEndpoint(conf.getTrimmed(ENDPOINT, DEFAULT_ENDPOINT))
|
||||
.withEndpoint(endpoint)
|
||||
.withMetrics(statisticsContext.newStatisticsFromAwsSdk())
|
||||
.withPathStyleAccess(conf.getBoolean(PATH_STYLE_ACCESS, false))
|
||||
.withUserAgentSuffix(uaSuffix)
|
||||
@ -1167,7 +1217,10 @@ public String getBucketLocation(String bucketName) throws IOException {
|
||||
final String region = trackDurationAndSpan(
|
||||
STORE_EXISTS_PROBE, bucketName, null, () ->
|
||||
invoker.retry("getBucketLocation()", bucketName, true, () ->
|
||||
s3.getBucketLocation(bucketName)));
|
||||
// If accessPoint then region is known from Arn
|
||||
accessPoint != null
|
||||
? accessPoint.getRegion()
|
||||
: s3.getBucketLocation(bucketName)));
|
||||
return fixBucketRegion(region);
|
||||
}
|
||||
|
||||
@ -4550,6 +4603,10 @@ public String toString() {
|
||||
.append("}");
|
||||
}
|
||||
sb.append(", ClientSideEncryption=").append(isCSEEnabled);
|
||||
|
||||
if (accessPoint != null) {
|
||||
sb.append(", arnForBucket=").append(accessPoint.getFullArn());
|
||||
}
|
||||
sb.append('}');
|
||||
return sb.toString();
|
||||
}
|
||||
|
@ -472,6 +472,7 @@ public DurationTracker trackDuration(final String key, final long count) {
|
||||
/**
|
||||
* Create an IOStatistics store which updates FS metrics
|
||||
* as well as IOStatistics.
|
||||
* @return instance of the store.
|
||||
*/
|
||||
public IOStatisticsStore createMetricsUpdatingStore() {
|
||||
return new MetricsUpdatingIOStatisticsStore();
|
||||
|
@ -95,6 +95,9 @@ private InternalConstants() {
|
||||
Arrays.asList(Constants.INPUT_FADVISE,
|
||||
Constants.READAHEAD_RANGE)));
|
||||
|
||||
/** 403 error code. */
|
||||
public static final int SC_403 = 403;
|
||||
|
||||
/** 404 error code. */
|
||||
public static final int SC_404 = 404;
|
||||
|
||||
@ -139,4 +142,21 @@ private InternalConstants() {
|
||||
*/
|
||||
public static final String CSE_S3GUARD_INCOMPATIBLE = "S3-CSE cannot be "
|
||||
+ "used with S3Guard";
|
||||
|
||||
/**
|
||||
* Error message to indicate Access Points are incompatible with S3Guard.
|
||||
*/
|
||||
public static final String AP_S3GUARD_INCOMPATIBLE = "Access Points cannot be used with S3Guard";
|
||||
|
||||
/**
|
||||
* Error message to indicate Access Points are required to be used for S3 access.
|
||||
*/
|
||||
public static final String AP_REQUIRED_EXCEPTION = "Access Points usage is required" +
|
||||
" but not configured for the bucket.";
|
||||
|
||||
/**
|
||||
* AccessPoint ARN for the bucket. When set as a bucket override the requests for that bucket
|
||||
* will go through the AccessPoint.
|
||||
*/
|
||||
public static final String ARN_BUCKET_OPTION = "fs.s3a.bucket.%s.accesspoint.arn";
|
||||
}
|
||||
|
@ -1580,6 +1580,62 @@ Why explicitly declare a bucket bound to the central endpoint? It ensures
|
||||
that if the default endpoint is changed to a new region, data store in
|
||||
US-east is still reachable.
|
||||
|
||||
## <a name="accesspoints"></a>Configuring S3 AccessPoints usage with S3A
|
||||
S3a now supports [S3 Access Point](https://aws.amazon.com/s3/features/access-points/) usage which
|
||||
improves VPC integration with S3 and simplifies your data's permission model because different
|
||||
policies can be applied now on the Access Point level. For more information about why to use and
|
||||
how to create them make sure to read the official documentation.
|
||||
|
||||
Accessing data through an access point, is done by using its ARN, as opposed to just the bucket name.
|
||||
You can set the Access Point ARN property using the following per bucket configuration property:
|
||||
```xml
|
||||
<property>
|
||||
<name>fs.s3a.sample-bucket.accesspoint.arn</name>
|
||||
<value> {ACCESSPOINT_ARN_HERE} </value>
|
||||
<description>Configure S3a traffic to use this AccessPoint</description>
|
||||
</property>
|
||||
```
|
||||
|
||||
This configures access to the `sample-bucket` bucket for S3A, to go through the
|
||||
new Access Point ARN. So, for example `s3a://sample-bucket/key` will now use your
|
||||
configured ARN when getting data from S3 instead of your bucket.
|
||||
|
||||
You can also use an Access Point name as a path URI such as `s3a://finance-team-access/key`, by
|
||||
configuring the `.accesspoint.arn` property as a per-bucket override:
|
||||
```xml
|
||||
<property>
|
||||
<name>fs.s3a.finance-team-access.accesspoint.arn</name>
|
||||
<value> {ACCESSPOINT_ARN_HERE} </value>
|
||||
<description>Configure S3a traffic to use this AccessPoint</description>
|
||||
</property>
|
||||
```
|
||||
|
||||
The `fs.s3a.accesspoint.required` property can also require all access to S3 to go through Access
|
||||
Points. This has the advantage of increasing security inside a VPN / VPC as you only allow access
|
||||
to known sources of data defined through Access Points. In case there is a need to access a bucket
|
||||
directly (without Access Points) then you can use per bucket overrides to disable this setting on a
|
||||
bucket by bucket basis i.e. `fs.s3a.{YOUR-BUCKET}.accesspoint.required`.
|
||||
|
||||
```xml
|
||||
<!-- Require access point only access -->
|
||||
<property>
|
||||
<name>fs.s3a.accesspoint.required</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
<!-- Disable it on a per-bucket basis if needed -->
|
||||
<property>
|
||||
<name>fs.s3a.example-bucket.accesspoint.required</name>
|
||||
<value>false</value>
|
||||
</property>
|
||||
```
|
||||
|
||||
Before using Access Points make sure you're not impacted by the following:
|
||||
- `ListObjectsV1` is not supported, this is also deprecated on AWS S3 for performance reasons;
|
||||
- The endpoint for S3 requests will automatically change from `s3.amazonaws.com` to use
|
||||
`s3-accesspoint.REGION.amazonaws.{com | com.cn}` depending on the Access Point ARN. While
|
||||
considering endpoints, if you have any custom signers that use the host endpoint property make
|
||||
sure to update them if needed;
|
||||
|
||||
## <a name="upload"></a>How S3A writes data to S3
|
||||
|
||||
The original S3A client implemented file writes by
|
||||
|
@ -296,6 +296,15 @@ For the default test dataset, hosted in the `landsat-pds` bucket, this is:
|
||||
</property>
|
||||
```
|
||||
|
||||
### <a name="csv"></a> Testing Access Point Integration
|
||||
S3a supports using Access Point ARNs to access data in S3. If you think your changes affect VPC
|
||||
integration, request signing, ARN manipulation, or any code path that deals with the actual
|
||||
sending and retrieving of data to/from S3, make sure you run the entire integration test suite with
|
||||
this feature enabled.
|
||||
|
||||
Check out [our documentation](./index.html#accesspoints) for steps on how to enable this feature. To
|
||||
create access points for your S3 bucket you can use the AWS Console or CLI.
|
||||
|
||||
## <a name="reporting"></a> Viewing Integration Test Reports
|
||||
|
||||
|
||||
@ -1468,6 +1477,9 @@ as it may take a couple of SDK updates before it is ready.
|
||||
in `fs.s3a.assumed.role.arn` for testing assumed roles,
|
||||
and `fs.s3a.encryption.key` for encryption, for full coverage.
|
||||
If you can, scale up the scale tests.
|
||||
1. Create an Access Point for your bucket (using the AWS Console or CLI), update S3a configuration
|
||||
to use it ([docs for help](./index.html#accesspoints)) and re-run the `ITest*` integration tests from
|
||||
your IDE or via maven.
|
||||
1. Run the `ILoadTest*` load tests from your IDE or via maven through
|
||||
`mvn verify -Dtest=skip -Dit.test=ILoadTest\*` ; look for regressions in performance
|
||||
as much as failures.
|
||||
|
@ -26,8 +26,9 @@
|
||||
import org.apache.hadoop.fs.contract.AbstractBondedFSContract;
|
||||
import org.apache.hadoop.fs.s3a.S3AFileSystem;
|
||||
import org.apache.hadoop.fs.s3a.S3ATestUtils;
|
||||
import org.apache.hadoop.fs.s3a.impl.InternalConstants;
|
||||
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.maybeSkipIfS3GuardAndS3CSEIOE;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfIOEContainsMessage;
|
||||
|
||||
/**
|
||||
* The contract of S3A: only enabled if the test bucket is provided.
|
||||
@ -77,8 +78,10 @@ public void init() throws IOException {
|
||||
try {
|
||||
super.init();
|
||||
} catch (PathIOException ioe) {
|
||||
// Skip the tests if S3-CSE and S3-Guard are enabled.
|
||||
maybeSkipIfS3GuardAndS3CSEIOE(ioe);
|
||||
// Skip the tests if (S3-CSE or Access Points) and S3-Guard are enabled.
|
||||
skipIfIOEContainsMessage(ioe,
|
||||
InternalConstants.CSE_S3GUARD_INCOMPATIBLE,
|
||||
InternalConstants.AP_S3GUARD_INCOMPATIBLE);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -27,11 +27,14 @@
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.PathIOException;
|
||||
import org.apache.hadoop.fs.s3a.impl.InternalConstants;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.test.LambdaTestUtils;
|
||||
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_ACCESSPOINT_REQUIRED;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.FS_S3A;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.S3A_BUCKET_PROBE;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_METASTORE_NULL;
|
||||
@ -161,6 +164,35 @@ public void testBucketProbingParameterValidation() throws Exception {
|
||||
() -> FileSystem.get(uri, configuration));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAccessPointProbingV2() throws Exception {
|
||||
describe("Test V2 bucket probing using an AccessPoint ARN");
|
||||
Configuration configuration = createConfigurationWithProbe(2);
|
||||
String accessPointArn = "arn:aws:s3:eu-west-1:123456789012:accesspoint/" + randomBucket;
|
||||
configuration.set(String.format(InternalConstants.ARN_BUCKET_OPTION, randomBucket),
|
||||
accessPointArn);
|
||||
|
||||
expectUnknownStore(
|
||||
() -> FileSystem.get(uri, configuration));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAccessPointRequired() throws Exception {
|
||||
describe("Test V2 bucket probing with 'fs.s3a.accesspoint.required' property.");
|
||||
Configuration configuration = createConfigurationWithProbe(2);
|
||||
configuration.set(AWS_S3_ACCESSPOINT_REQUIRED, "true");
|
||||
intercept(PathIOException.class,
|
||||
InternalConstants.AP_REQUIRED_EXCEPTION,
|
||||
"Should throw IOException if Access Points are required but not configured.",
|
||||
() -> FileSystem.get(uri, configuration));
|
||||
|
||||
String accessPointArn = "arn:aws:s3:eu-west-1:123456789012:accesspoint/" + randomBucket;
|
||||
configuration.set(String.format(InternalConstants.ARN_BUCKET_OPTION, randomBucket),
|
||||
accessPointArn);
|
||||
expectUnknownStore(
|
||||
() -> FileSystem.get(uri, configuration));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Configuration getConfiguration() {
|
||||
Configuration configuration = super.getConfiguration();
|
||||
|
@ -66,6 +66,8 @@ public class ITestS3AConfiguration {
|
||||
private static final String EXAMPLE_ID = "AKASOMEACCESSKEY";
|
||||
private static final String EXAMPLE_KEY =
|
||||
"RGV0cm9pdCBSZ/WQgY2xl/YW5lZCB1cAEXAMPLE";
|
||||
private static final String AP_ILLEGAL_ACCESS =
|
||||
"ARN of type accesspoint cannot be passed as a bucket";
|
||||
|
||||
private Configuration conf;
|
||||
private S3AFileSystem fs;
|
||||
@ -361,6 +363,14 @@ public void shouldBeAbleToSwitchOnS3PathStyleAccessViaConfigProperty()
|
||||
// isn't in the same region as the s3 client default. See
|
||||
// http://docs.aws.amazon.com/AmazonS3/latest/dev/VirtualHosting.html
|
||||
assertEquals(HttpStatus.SC_MOVED_PERMANENTLY, e.getStatusCode());
|
||||
} catch (final IllegalArgumentException e) {
|
||||
// Path style addressing does not work with AP ARNs
|
||||
if (!fs.getBucket().contains("arn:")) {
|
||||
LOG.error("Caught unexpected exception: ", e);
|
||||
throw e;
|
||||
}
|
||||
|
||||
GenericTestUtils.assertExceptionContains(AP_ILLEGAL_ACCESS, e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -38,7 +38,6 @@
|
||||
|
||||
import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
|
||||
import org.apache.hadoop.fs.s3a.impl.ContextAccessors;
|
||||
import org.apache.hadoop.fs.s3a.impl.InternalConstants;
|
||||
import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum;
|
||||
import org.apache.hadoop.fs.s3a.impl.StoreContext;
|
||||
import org.apache.hadoop.fs.s3a.impl.StoreContextBuilder;
|
||||
@ -257,17 +256,19 @@ private static void skipIfS3GuardAndS3CSEEnabled(Configuration conf) {
|
||||
}
|
||||
|
||||
/**
|
||||
* Either skip if PathIOE occurred due to S3CSE and S3Guard
|
||||
* incompatibility or throw the PathIOE.
|
||||
* Skip if PathIOE occurred due to exception which contains a message which signals
|
||||
* an incompatibility or throw the PathIOE.
|
||||
*
|
||||
* @param ioe PathIOE being parsed.
|
||||
* @throws PathIOException Throws PathIOE if it doesn't relate to S3CSE
|
||||
* and S3Guard incompatibility.
|
||||
* @param messages messages found in the PathIOE that trigger a test to skip
|
||||
* @throws PathIOException Throws PathIOE if it doesn't relate to any message in {@code messages}.
|
||||
*/
|
||||
public static void maybeSkipIfS3GuardAndS3CSEIOE(PathIOException ioe)
|
||||
public static void skipIfIOEContainsMessage(PathIOException ioe, String...messages)
|
||||
throws PathIOException {
|
||||
if (ioe.toString().contains(InternalConstants.CSE_S3GUARD_INCOMPATIBLE)) {
|
||||
skip("Skipping since CSE is enabled with S3Guard.");
|
||||
for (String message: messages) {
|
||||
if (ioe.toString().contains(message)) {
|
||||
skip("Skipping because: " + message);
|
||||
}
|
||||
}
|
||||
throw ioe;
|
||||
}
|
||||
@ -1549,5 +1550,4 @@ public static void skipIfEncryptionNotSet(Configuration configuration,
|
||||
+ s3AEncryptionMethod.getMethod());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -0,0 +1,75 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.s3a;
|
||||
|
||||
import com.amazonaws.regions.Regions;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.test.HadoopTestBase;
|
||||
|
||||
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
||||
|
||||
public class TestArnResource extends HadoopTestBase {
|
||||
private final static Logger LOG = LoggerFactory.getLogger(TestArnResource.class);
|
||||
|
||||
@Test
|
||||
public void parseAccessPointFromArn() throws IllegalArgumentException {
|
||||
describe("Parse AccessPoint ArnResource from arn string");
|
||||
|
||||
String accessPoint = "testAp";
|
||||
String accountId = "123456789101";
|
||||
String[][] regionPartitionEndpoints = new String[][] {
|
||||
{Regions.EU_WEST_1.getName(), "aws", "s3-accesspoint.eu-west-1.amazonaws.com"},
|
||||
{Regions.US_GOV_EAST_1.getName(), "aws-us-gov",
|
||||
"s3-accesspoint.us-gov-east-1.amazonaws.com"},
|
||||
{Regions.CN_NORTH_1.getName(), "aws-cn", "s3-accesspoint.cn-north-1.amazonaws.com.cn"},
|
||||
};
|
||||
|
||||
for (String[] testPair : regionPartitionEndpoints) {
|
||||
String region = testPair[0];
|
||||
String partition = testPair[1];
|
||||
String endpoint = testPair[2];
|
||||
|
||||
// arn:partition:service:region:account-id:resource-type/resource-id
|
||||
String arn = String.format("arn:%s:s3:%s:%s:accesspoint/%s", partition, region, accountId,
|
||||
accessPoint);
|
||||
|
||||
ArnResource resource = ArnResource.accessPointFromArn(arn);
|
||||
assertEquals("Arn does not match", arn, resource.getFullArn());
|
||||
assertEquals("Access Point name does not match", accessPoint, resource.getName());
|
||||
assertEquals("Account Id does not match", accountId, resource.getOwnerAccountId());
|
||||
assertEquals("Region does not match", region, resource.getRegion());
|
||||
assertEquals("Endpoint does not match", endpoint, resource.getEndpoint());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void invalidARNsMustThrow() throws Exception {
|
||||
describe("Using an invalid ARN format must throw when initializing an ArnResource.");
|
||||
|
||||
intercept(IllegalArgumentException.class, () ->
|
||||
ArnResource.accessPointFromArn("invalid:arn:resource"));
|
||||
}
|
||||
|
||||
private void describe(String message) {
|
||||
LOG.info(message);
|
||||
}
|
||||
}
|
@ -121,8 +121,8 @@ public void testRequestHandlerBinding() throws Throwable {
|
||||
final S3AFileSystem fs = getFileSystem();
|
||||
final long exec0 = lookupCounterStatistic(iostats(),
|
||||
AUDIT_REQUEST_EXECUTION.getSymbol());
|
||||
// API call
|
||||
fs.getBucketLocation();
|
||||
// API call to a known path, `getBucketLocation()` does not always result in an API call.
|
||||
fs.listStatus(path("/"));
|
||||
// which MUST have ended up calling the extension request handler
|
||||
Assertions.assertThat(SimpleAWSRequestHandler.getInvocationCount())
|
||||
.describedAs("Invocation count of plugged in request handler")
|
||||
|
@ -20,6 +20,7 @@
|
||||
|
||||
import java.io.IOException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
@ -27,6 +28,7 @@
|
||||
|
||||
import com.amazonaws.SignableRequest;
|
||||
import com.amazonaws.auth.AWS4Signer;
|
||||
import com.amazonaws.arn.Arn;
|
||||
import com.amazonaws.auth.AWSCredentials;
|
||||
import com.amazonaws.auth.Signer;
|
||||
import com.amazonaws.services.s3.internal.AWSS3V4Signer;
|
||||
@ -143,7 +145,7 @@ private Configuration createTestConfig(String identifier) {
|
||||
|
||||
conf.set(TEST_ID_KEY, identifier);
|
||||
conf.set(TEST_REGION_KEY, regionName);
|
||||
conf.set(Constants.ENDPOINT, endpoint);
|
||||
|
||||
// make absolutely sure there is no caching.
|
||||
disableFilesystemCaching(conf);
|
||||
|
||||
@ -190,7 +192,7 @@ public void sign(SignableRequest<?> request, AWSCredentials credentials) {
|
||||
LOG.info("Signing request #{}", c);
|
||||
|
||||
String host = request.getEndpoint().getHost();
|
||||
String bucketName = host.split("\\.")[0];
|
||||
String bucketName = parseBucketFromHost(host);
|
||||
try {
|
||||
lastStoreValue = CustomSignerInitializer
|
||||
.getStoreValue(bucketName, UserGroupInformation.getCurrentUser());
|
||||
@ -214,6 +216,35 @@ public void sign(SignableRequest<?> request, AWSCredentials credentials) {
|
||||
}
|
||||
}
|
||||
|
||||
private String parseBucketFromHost(String host) {
|
||||
String[] hostBits = host.split("\\.");
|
||||
String bucketName = hostBits[0];
|
||||
String service = hostBits[1];
|
||||
|
||||
if (bucketName.equals("kms")) {
|
||||
return bucketName;
|
||||
}
|
||||
|
||||
if (service.contains("s3-accesspoint") || service.contains("s3-outposts")
|
||||
|| service.contains("s3-object-lambda")) {
|
||||
// If AccessPoint then bucketName is of format `accessPoint-accountId`;
|
||||
String[] accessPointBits = hostBits[0].split("-");
|
||||
int lastElem = accessPointBits.length - 1;
|
||||
String accountId = accessPointBits[lastElem];
|
||||
String accessPointName = String.join("", Arrays.copyOf(accessPointBits, lastElem));
|
||||
Arn arn = Arn.builder()
|
||||
.withAccountId(accountId)
|
||||
.withPartition("aws")
|
||||
.withRegion(hostBits[2])
|
||||
.withResource("accesspoint" + "/" + accessPointName)
|
||||
.withService("s3").build();
|
||||
|
||||
bucketName = arn.toString();
|
||||
}
|
||||
|
||||
return bucketName;
|
||||
}
|
||||
|
||||
public static int getInstantiationCount() {
|
||||
return INSTANTIATION_COUNT.get();
|
||||
}
|
||||
|
@ -38,6 +38,7 @@
|
||||
import org.apache.hadoop.fs.s3a.Statistic;
|
||||
import org.apache.hadoop.fs.s3a.Tristate;
|
||||
import org.apache.hadoop.fs.s3a.impl.DirectoryPolicy;
|
||||
import org.apache.hadoop.fs.s3a.impl.InternalConstants;
|
||||
import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum;
|
||||
import org.apache.hadoop.fs.s3a.statistics.StatisticTypeEnum;
|
||||
import org.apache.hadoop.fs.store.audit.AuditSpan;
|
||||
@ -125,6 +126,14 @@ public AbstractS3ACostTest(
|
||||
public Configuration createConfiguration() {
|
||||
Configuration conf = super.createConfiguration();
|
||||
String bucketName = getTestBucketName(conf);
|
||||
// If AccessPoint ARN is set guarded tests are skipped
|
||||
String arnKey = String.format(InternalConstants.ARN_BUCKET_OPTION, bucketName);
|
||||
String arn = conf.getTrimmed(arnKey, "");
|
||||
if (isGuarded() && !arn.isEmpty()) {
|
||||
ContractTestUtils.skip(
|
||||
"Skipping test since AccessPoint ARN is set and is incompatible with S3Guard.");
|
||||
}
|
||||
|
||||
removeBucketOverrides(bucketName, conf,
|
||||
S3_METADATA_STORE_IMPL);
|
||||
if (!isGuarded()) {
|
||||
@ -146,6 +155,12 @@ public Configuration createConfiguration() {
|
||||
conf.setBoolean(METADATASTORE_AUTHORITATIVE, authoritative);
|
||||
}
|
||||
disableFilesystemCaching(conf);
|
||||
|
||||
// AccessPoint ARN is the only per bucket configuration that must be kept.
|
||||
if (!arn.isEmpty()) {
|
||||
conf.set(arnKey, arn);
|
||||
}
|
||||
|
||||
return conf;
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user