diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml index 9d6f32f291..a4224a0ec0 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml +++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml @@ -1590,6 +1590,14 @@ implementations can still be used + + fs.s3a.accesspoint.required + false + Require that all S3 access is made through Access Points and not through + buckets directly. If enabled, use per-bucket overrides to allow bucket access to a specific set + of buckets. + + fs.s3a.block.size 32M diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/ArnResource.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/ArnResource.java new file mode 100644 index 0000000000..7c866ac967 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/ArnResource.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import javax.annotation.Nonnull; + +import com.amazonaws.arn.Arn; +import com.amazonaws.regions.RegionUtils; + +/** + * Represents an Arn Resource, this can be an accesspoint or bucket. + */ +public final class ArnResource { + + /** + * Resource name. + */ + private final String name; + + /** + * Resource owner account id. + */ + private final String ownerAccountId; + + /** + * Resource region. + */ + private final String region; + + /** + * Full Arn for the resource. + */ + private final String fullArn; + + /** + * Partition for the resource. Allowed partitions: aws, aws-cn, aws-us-gov + */ + private final String partition; + + /** + * Because of the different ways an endpoint can be constructed depending on partition we're + * relying on the AWS SDK to produce the endpoint. In this case we need a region key of the form + * {@code String.format("accesspoint-%s", awsRegion)} + */ + private final String accessPointRegionKey; + + private ArnResource(String name, String owner, String region, String partition, String fullArn) { + this.name = name; + this.ownerAccountId = owner; + this.region = region; + this.partition = partition; + this.fullArn = fullArn; + this.accessPointRegionKey = String.format("accesspoint-%s", region); + } + + /** + * Resource name. + * @return resource name. + */ + public String getName() { + return name; + } + + /** + * Return owner's account id. + * @return owner account id + */ + public String getOwnerAccountId() { + return ownerAccountId; + } + + /** + * Resource region. + * @return resource region. + */ + public String getRegion() { + return region; + } + + /** + * Full arn for resource. + * @return arn for resource. + */ + public String getFullArn() { + return fullArn; + } + + /** + * Formatted endpoint for the resource. + * @return resource endpoint. + */ + public String getEndpoint() { + return RegionUtils.getRegion(accessPointRegionKey) + .getServiceEndpoint("s3"); + } + + /** + * Parses the passed `arn` string into a full ArnResource. + * @param arn - string representing an Arn resource. + * @return new ArnResource instance. + * @throws IllegalArgumentException - if the Arn is malformed or any of the region, accountId and + * resource name properties are empty. + */ + @Nonnull + public static ArnResource accessPointFromArn(String arn) throws IllegalArgumentException { + Arn parsed = Arn.fromString(arn); + + if (parsed.getRegion().isEmpty() || parsed.getAccountId().isEmpty() || + parsed.getResourceAsString().isEmpty()) { + throw new IllegalArgumentException( + String.format("Access Point Arn %s has an invalid format or missing properties", arn)); + } + + String resourceName = parsed.getResource().getResource(); + return new ArnResource(resourceName, parsed.getAccountId(), parsed.getRegion(), + parsed.getPartition(), arn); + } +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index 35387f0c21..e5fccf0388 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -1056,4 +1056,8 @@ public final class Constants { */ public static final String AWS_S3_CENTRAL_REGION = "us-east-1"; + /** + * Require that all S3 access is made through Access Points. + */ + public static final String AWS_S3_ACCESSPOINT_REQUIRED = "fs.s3a.accesspoint.required"; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 496c4dc5a6..20de1dc904 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -204,9 +204,12 @@ import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit; import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletionIgnoringExceptions; import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.isObjectNotFound; import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.isUnknownBucket; +import static org.apache.hadoop.fs.s3a.impl.InternalConstants.AP_REQUIRED_EXCEPTION; +import static org.apache.hadoop.fs.s3a.impl.InternalConstants.ARN_BUCKET_OPTION; import static org.apache.hadoop.fs.s3a.impl.InternalConstants.CSE_PADDING_LENGTH; import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DEFAULT_UPLOAD_PART_COUNT_LIMIT; import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DELETE_CONSIDERED_IDEMPOTENT; +import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_403; import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_404; import static org.apache.hadoop.fs.s3a.impl.InternalConstants.UPLOAD_PART_COUNT_LIMIT; import static org.apache.hadoop.fs.s3a.impl.NetworkBinding.fixBucketRegion; @@ -258,6 +261,12 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, Invoker.LOG_EVENT); private final Retried onRetry = this::operationRetried; + + /** + * Represents bucket name for all S3 operations. If per bucket override for + * {@link InternalConstants#ARN_BUCKET_OPTION} property is set, then the bucket is updated to + * point to the configured Arn. + */ private String bucket; private int maxKeys; private Listing listing; @@ -345,6 +354,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, */ private boolean isCSEEnabled; + /** + * Bucket AccessPoint. + */ + private ArnResource accessPoint; + /** Add any deprecated keys. */ @SuppressWarnings("deprecation") private static void addDeprecatedKeys() { @@ -386,10 +400,20 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, LOG.debug("Initializing S3AFileSystem for {}", bucket); // clone the configuration into one with propagated bucket options Configuration conf = propagateBucketOptions(originalConf, bucket); - // HADOOP-17894. remove references to s3a stores in JCEKS credentials. conf = ProviderUtils.excludeIncompatibleCredentialProviders( conf, S3AFileSystem.class); + String arn = String.format(ARN_BUCKET_OPTION, bucket); + String configuredArn = conf.getTrimmed(arn, ""); + if (!configuredArn.isEmpty()) { + accessPoint = ArnResource.accessPointFromArn(configuredArn); + LOG.info("Using AccessPoint ARN \"{}\" for bucket {}", configuredArn, bucket); + bucket = accessPoint.getFullArn(); + } else if (conf.getBoolean(AWS_S3_ACCESSPOINT_REQUIRED, false)) { + LOG.warn("Access Point usage is required because \"{}\" is enabled," + + " but not configured for the bucket: {}", AWS_S3_ACCESSPOINT_REQUIRED, bucket); + throw new PathIOException(bucket, AP_REQUIRED_EXCEPTION); + } // fix up the classloader of the configuration to be whatever // classloader loaded this filesystem. @@ -449,6 +473,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, "version 2", listVersion); } useListV1 = (listVersion == 1); + if (accessPoint != null && useListV1) { + LOG.warn("V1 list configured in fs.s3a.list.version. This is not supported in by" + + " access points. Upgrading to V2"); + useListV1 = false; + } signerManager = new SignerManager(bucket, this, conf, owner); signerManager.initCustomSigners(); @@ -690,11 +719,24 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, */ @Retries.RetryTranslated protected void verifyBucketExistsV2() - throws UnknownStoreException, IOException { + throws UnknownStoreException, IOException { if (!invoker.retry("doesBucketExistV2", bucket, true, trackDurationOfOperation(getDurationTrackerFactory(), STORE_EXISTS_PROBE.getSymbol(), - () -> s3.doesBucketExistV2(bucket)))) { + () -> { + // Bug in SDK always returns `true` for AccessPoint ARNs with `doesBucketExistV2()` + // expanding implementation to use ARNs and buckets correctly + try { + s3.getBucketAcl(bucket); + } catch (AmazonServiceException ex) { + int statusCode = ex.getStatusCode(); + if (statusCode == SC_404 || (statusCode == SC_403 && accessPoint != null)) { + return false; + } + } + + return true; + }))) { throw new UnknownStoreException("s3a://" + bucket + "/", " Bucket does " + "not exist"); } @@ -782,10 +824,14 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, S3_CLIENT_FACTORY_IMPL, DEFAULT_S3_CLIENT_FACTORY_IMPL, S3ClientFactory.class); + String endpoint = accessPoint == null + ? conf.getTrimmed(ENDPOINT, DEFAULT_ENDPOINT) + : accessPoint.getEndpoint(); + S3ClientFactory.S3ClientCreationParameters parameters = null; parameters = new S3ClientFactory.S3ClientCreationParameters() .withCredentialSet(credentials) - .withEndpoint(conf.getTrimmed(ENDPOINT, DEFAULT_ENDPOINT)) + .withEndpoint(endpoint) .withMetrics(statisticsContext.newStatisticsFromAwsSdk()) .withPathStyleAccess(conf.getBoolean(PATH_STYLE_ACCESS, false)) .withUserAgentSuffix(uaSuffix) @@ -1114,7 +1160,10 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, final String region = trackDurationAndSpan( STORE_EXISTS_PROBE, bucketName, null, () -> invoker.retry("getBucketLocation()", bucketName, true, () -> - s3.getBucketLocation(bucketName))); + // If accessPoint then region is known from Arn + accessPoint != null + ? accessPoint.getRegion() + : s3.getBucketLocation(bucketName))); return fixBucketRegion(region); } @@ -4106,6 +4155,10 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, .append("}"); } sb.append(", ClientSideEncryption=").append(isCSEEnabled); + + if (accessPoint != null) { + sb.append(", arnForBucket=").append(accessPoint.getFullArn()); + } sb.append('}'); return sb.toString(); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java index a1b8bab3ab..51ba67676c 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java @@ -461,6 +461,7 @@ public class S3AInstrumentation implements Closeable, MetricsSource, /** * Create an IOStatistics store which updates FS metrics * as well as IOStatistics. + * @return instance of the store. */ public IOStatisticsStore createMetricsUpdatingStore() { return new MetricsUpdatingIOStatisticsStore(); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java index 51b1bf60a2..5e79dfee39 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java @@ -95,6 +95,9 @@ public final class InternalConstants { Arrays.asList(Constants.INPUT_FADVISE, Constants.READAHEAD_RANGE))); + /** 403 error code. */ + public static final int SC_403 = 403; + /** 404 error code. */ public static final int SC_404 = 404; @@ -134,4 +137,15 @@ public final class InternalConstants { */ public static final int CSE_PADDING_LENGTH = 16; + /** + * Error message to indicate Access Points are required to be used for S3 access. + */ + public static final String AP_REQUIRED_EXCEPTION = "Access Points usage is required" + + " but not configured for the bucket."; + + /** + * AccessPoint ARN for the bucket. When set as a bucket override the requests for that bucket + * will go through the AccessPoint. + */ + public static final String ARN_BUCKET_OPTION = "fs.s3a.bucket.%s.accesspoint.arn"; } diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md index 9c5091db9e..02c6d500a0 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md @@ -1563,6 +1563,62 @@ Why explicitly declare a bucket bound to the central endpoint? It ensures that if the default endpoint is changed to a new region, data store in US-east is still reachable. +## Configuring S3 AccessPoints usage with S3A +S3a now supports [S3 Access Point](https://aws.amazon.com/s3/features/access-points/) usage which +improves VPC integration with S3 and simplifies your data's permission model because different +policies can be applied now on the Access Point level. For more information about why to use and +how to create them make sure to read the official documentation. + +Accessing data through an access point, is done by using its ARN, as opposed to just the bucket name. +You can set the Access Point ARN property using the following per bucket configuration property: +```xml + + fs.s3a.sample-bucket.accesspoint.arn + {ACCESSPOINT_ARN_HERE} + Configure S3a traffic to use this AccessPoint + +``` + +This configures access to the `sample-bucket` bucket for S3A, to go through the +new Access Point ARN. So, for example `s3a://sample-bucket/key` will now use your +configured ARN when getting data from S3 instead of your bucket. + +You can also use an Access Point name as a path URI such as `s3a://finance-team-access/key`, by +configuring the `.accesspoint.arn` property as a per-bucket override: +```xml + + fs.s3a.finance-team-access.accesspoint.arn + {ACCESSPOINT_ARN_HERE} + Configure S3a traffic to use this AccessPoint + +``` + +The `fs.s3a.accesspoint.required` property can also require all access to S3 to go through Access +Points. This has the advantage of increasing security inside a VPN / VPC as you only allow access +to known sources of data defined through Access Points. In case there is a need to access a bucket +directly (without Access Points) then you can use per bucket overrides to disable this setting on a +bucket by bucket basis i.e. `fs.s3a.{YOUR-BUCKET}.accesspoint.required`. + +```xml + + + fs.s3a.accesspoint.required + true + + + + fs.s3a.example-bucket.accesspoint.required + false + +``` + +Before using Access Points make sure you're not impacted by the following: +- `ListObjectsV1` is not supported, this is also deprecated on AWS S3 for performance reasons; +- The endpoint for S3 requests will automatically change from `s3.amazonaws.com` to use +`s3-accesspoint.REGION.amazonaws.{com | com.cn}` depending on the Access Point ARN. While +considering endpoints, if you have any custom signers that use the host endpoint property make +sure to update them if needed; + ## How S3A writes data to S3 The original S3A client implemented file writes by diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md index 3218af870c..559687a3fd 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md @@ -296,6 +296,15 @@ For the default test dataset, hosted in the `landsat-pds` bucket, this is: ``` +### Testing Access Point Integration +S3a supports using Access Point ARNs to access data in S3. If you think your changes affect VPC +integration, request signing, ARN manipulation, or any code path that deals with the actual +sending and retrieving of data to/from S3, make sure you run the entire integration test suite with +this feature enabled. + +Check out [our documentation](./index.html#accesspoints) for steps on how to enable this feature. To +create access points for your S3 bucket you can use the AWS Console or CLI. + ## Viewing Integration Test Reports @@ -1071,6 +1080,9 @@ as it may take a couple of SDK updates before it is ready. in `fs.s3a.assumed.role.arn` for testing assumed roles, and `fs.s3a.encryption.key` for encryption, for full coverage. If you can, scale up the scale tests. +1. Create an Access Point for your bucket (using the AWS Console or CLI), update S3a configuration +to use it ([docs for help](./index.html#accesspoints)) and re-run the `ITest*` integration tests from +your IDE or via maven. 1. Run the `ILoadTest*` load tests from your IDE or via maven through `mvn verify -Dtest=skip -Dit.test=ILoadTest\*` ; look for regressions in performance as much as failures. diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABucketExistence.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABucketExistence.java index 7ed8083eaf..fb295f3f09 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABucketExistence.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABucketExistence.java @@ -27,11 +27,14 @@ import org.junit.Test; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathIOException; +import org.apache.hadoop.fs.s3a.impl.InternalConstants; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.test.LambdaTestUtils; import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset; +import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_ACCESSPOINT_REQUIRED; import static org.apache.hadoop.fs.s3a.Constants.FS_S3A; import static org.apache.hadoop.fs.s3a.Constants.S3A_BUCKET_PROBE; import static org.apache.hadoop.test.LambdaTestUtils.intercept; @@ -157,6 +160,35 @@ public class ITestS3ABucketExistence extends AbstractS3ATestBase { () -> FileSystem.get(uri, configuration)); } + @Test + public void testAccessPointProbingV2() throws Exception { + describe("Test V2 bucket probing using an AccessPoint ARN"); + Configuration configuration = createConfigurationWithProbe(2); + String accessPointArn = "arn:aws:s3:eu-west-1:123456789012:accesspoint/" + randomBucket; + configuration.set(String.format(InternalConstants.ARN_BUCKET_OPTION, randomBucket), + accessPointArn); + + expectUnknownStore( + () -> FileSystem.get(uri, configuration)); + } + + @Test + public void testAccessPointRequired() throws Exception { + describe("Test V2 bucket probing with 'fs.s3a.accesspoint.required' property."); + Configuration configuration = createConfigurationWithProbe(2); + configuration.set(AWS_S3_ACCESSPOINT_REQUIRED, "true"); + intercept(PathIOException.class, + InternalConstants.AP_REQUIRED_EXCEPTION, + "Should throw IOException if Access Points are required but not configured.", + () -> FileSystem.get(uri, configuration)); + + String accessPointArn = "arn:aws:s3:eu-west-1:123456789012:accesspoint/" + randomBucket; + configuration.set(String.format(InternalConstants.ARN_BUCKET_OPTION, randomBucket), + accessPointArn); + expectUnknownStore( + () -> FileSystem.get(uri, configuration)); + } + @Override protected Configuration getConfiguration() { Configuration configuration = super.getConfiguration(); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java index 696f2c4ced..c2b3aab078 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java @@ -65,6 +65,8 @@ public class ITestS3AConfiguration { private static final String EXAMPLE_ID = "AKASOMEACCESSKEY"; private static final String EXAMPLE_KEY = "RGV0cm9pdCBSZ/WQgY2xl/YW5lZCB1cAEXAMPLE"; + private static final String AP_ILLEGAL_ACCESS = + "ARN of type accesspoint cannot be passed as a bucket"; private Configuration conf; private S3AFileSystem fs; @@ -360,6 +362,14 @@ public class ITestS3AConfiguration { // isn't in the same region as the s3 client default. See // http://docs.aws.amazon.com/AmazonS3/latest/dev/VirtualHosting.html assertEquals(HttpStatus.SC_MOVED_PERMANENTLY, e.getStatusCode()); + } catch (final IllegalArgumentException e) { + // Path style addressing does not work with AP ARNs + if (!fs.getBucket().contains("arn:")) { + LOG.error("Caught unexpected exception: ", e); + throw e; + } + + GenericTestUtils.assertExceptionContains(AP_ILLEGAL_ACCESS, e); } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java index b39e0f02fe..55ddba9bbd 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java @@ -1342,5 +1342,4 @@ public final class S3ATestUtils { + " in " + secrets); } } - } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestArnResource.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestArnResource.java new file mode 100644 index 0000000000..5ac47752ec --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestArnResource.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import com.amazonaws.regions.Regions; +import org.assertj.core.api.Assertions; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.test.HadoopTestBase; + +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + +/** + * Verifies the mapping of ARN declaration of resource to the associated + * access point. + * The region mapping assertions have been brittle to changes across AWS SDK + * versions to only verify partial matches, rather than the FQDN of the + * endpoints. + * + */ +public class TestArnResource extends HadoopTestBase { + private final static Logger LOG = LoggerFactory.getLogger(TestArnResource.class); + + @Test + public void parseAccessPointFromArn() throws IllegalArgumentException { + describe("Parse AccessPoint ArnResource from arn string"); + + String accessPoint = "testAp"; + String accountId = "123456789101"; + String[][] regionPartitionEndpoints = new String[][] { + {Regions.EU_WEST_1.getName(), "aws", "eu-west-1.amazonaws.com"}, + {Regions.US_GOV_EAST_1.getName(), "aws-us-gov", + "us-gov-east-1.amazonaws.com"}, + {Regions.CN_NORTH_1.getName(), "aws-cn", "cn-north-1.amazonaws.com"}, + }; + + for (String[] testPair : regionPartitionEndpoints) { + String region = testPair[0]; + String partition = testPair[1]; + String endpoint = testPair[2]; + + // arn:partition:service:region:account-id:resource-type/resource-id + String arn = String.format("arn:%s:s3:%s:%s:accesspoint/%s", partition, region, accountId, + accessPoint); + + ArnResource resource = ArnResource.accessPointFromArn(arn); + assertEquals("Arn does not match", arn, resource.getFullArn()); + assertEquals("Access Point name does not match", accessPoint, resource.getName()); + assertEquals("Account Id does not match", accountId, resource.getOwnerAccountId()); + assertEquals("Region does not match", region, resource.getRegion()); + Assertions.assertThat(resource.getEndpoint()) + .describedAs("Endpoint does not match") + .contains(endpoint); + } + } + + @Test + public void invalidARNsMustThrow() throws Exception { + describe("Using an invalid ARN format must throw when initializing an ArnResource."); + + intercept(IllegalArgumentException.class, () -> + ArnResource.accessPointFromArn("invalid:arn:resource")); + } + + private void describe(String message) { + LOG.info(message); + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/ITestAuditManager.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/ITestAuditManager.java index 287fe51b5e..9e6d82ce6a 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/ITestAuditManager.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/ITestAuditManager.java @@ -121,8 +121,8 @@ public class ITestAuditManager extends AbstractS3ACostTest { final S3AFileSystem fs = getFileSystem(); final long exec0 = lookupCounterStatistic(iostats(), AUDIT_REQUEST_EXECUTION.getSymbol()); - // API call - fs.getBucketLocation(); + // API call to a known path, `getBucketLocation()` does not always result in an API call. + fs.listStatus(path("/")); // which MUST have ended up calling the extension request handler Assertions.assertThat(SimpleAWSRequestHandler.getInvocationCount()) .describedAs("Invocation count of plugged in request handler") diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestCustomSigner.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestCustomSigner.java index 22ce1e91c2..cbba326d5e 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestCustomSigner.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestCustomSigner.java @@ -20,6 +20,7 @@ package org.apache.hadoop.fs.s3a.auth; import java.io.IOException; import java.security.PrivilegedExceptionAction; +import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.Objects; @@ -27,6 +28,7 @@ import java.util.concurrent.atomic.AtomicInteger; import com.amazonaws.SignableRequest; import com.amazonaws.auth.AWS4Signer; +import com.amazonaws.arn.Arn; import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.Signer; import com.amazonaws.services.s3.internal.AWSS3V4Signer; @@ -143,7 +145,7 @@ public class ITestCustomSigner extends AbstractS3ATestBase { conf.set(TEST_ID_KEY, identifier); conf.set(TEST_REGION_KEY, regionName); - conf.set(Constants.ENDPOINT, endpoint); + // make absolutely sure there is no caching. disableFilesystemCaching(conf); @@ -190,7 +192,7 @@ public class ITestCustomSigner extends AbstractS3ATestBase { LOG.info("Signing request #{}", c); String host = request.getEndpoint().getHost(); - String bucketName = host.split("\\.")[0]; + String bucketName = parseBucketFromHost(host); try { lastStoreValue = CustomSignerInitializer .getStoreValue(bucketName, UserGroupInformation.getCurrentUser()); @@ -214,6 +216,35 @@ public class ITestCustomSigner extends AbstractS3ATestBase { } } + private String parseBucketFromHost(String host) { + String[] hostBits = host.split("\\."); + String bucketName = hostBits[0]; + String service = hostBits[1]; + + if (bucketName.equals("kms")) { + return bucketName; + } + + if (service.contains("s3-accesspoint") || service.contains("s3-outposts") + || service.contains("s3-object-lambda")) { + // If AccessPoint then bucketName is of format `accessPoint-accountId`; + String[] accessPointBits = hostBits[0].split("-"); + int lastElem = accessPointBits.length - 1; + String accountId = accessPointBits[lastElem]; + String accessPointName = String.join("", Arrays.copyOf(accessPointBits, lastElem)); + Arn arn = Arn.builder() + .withAccountId(accountId) + .withPartition("aws") + .withRegion(hostBits[2]) + .withResource("accesspoint" + "/" + accessPointName) + .withService("s3").build(); + + bucketName = arn.toString(); + } + + return bucketName; + } + public static int getInstantiationCount() { return INSTANTIATION_COUNT.get(); } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/AbstractS3ACostTest.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/AbstractS3ACostTest.java index a0cc1190a2..3511020aa6 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/AbstractS3ACostTest.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/AbstractS3ACostTest.java @@ -38,6 +38,7 @@ import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.Statistic; import org.apache.hadoop.fs.s3a.Tristate; import org.apache.hadoop.fs.s3a.impl.DirectoryPolicy; +import org.apache.hadoop.fs.s3a.impl.InternalConstants; import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum; import org.apache.hadoop.fs.s3a.statistics.StatisticTypeEnum; import org.apache.hadoop.fs.store.audit.AuditSpan; @@ -95,6 +96,9 @@ public class AbstractS3ACostTest extends AbstractS3ATestBase { public Configuration createConfiguration() { Configuration conf = super.createConfiguration(); String bucketName = getTestBucketName(conf); + String arnKey = String.format(InternalConstants.ARN_BUCKET_OPTION, bucketName); + String arn = conf.getTrimmed(arnKey, ""); + removeBaseAndBucketOverrides(bucketName, conf, DIRECTORY_MARKER_POLICY, AUTHORITATIVE_PATH); @@ -104,6 +108,12 @@ public class AbstractS3ACostTest extends AbstractS3ATestBase { ? DIRECTORY_MARKER_POLICY_KEEP : DIRECTORY_MARKER_POLICY_DELETE); disableFilesystemCaching(conf); + + // AccessPoint ARN is the only per bucket configuration that must be kept. + if (!arn.isEmpty()) { + conf.set(arnKey, arn); + } + return conf; }