diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index d14a82e5c3..c4b8f6e3c4 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -160,14 +160,33 @@ private Constants() { DEFAULT_SSL_CHANNEL_MODE = DelegatingSSLSocketFactory.SSLChannelMode.Default_JSSE; - //use a custom endpoint? + /** + * Endpoint. For v4 signing and/or better performance, + * this should be the specific endpoint of the region + * in which the bucket is hosted. + */ public static final String ENDPOINT = "fs.s3a.endpoint"; /** - * Default value of s3 endpoint. If not set explicitly using - * {@code AmazonS3#setEndpoint()}, this is used. + * Default value of s3 endpoint: {@value}. + * It tells the AWS client to work it out by asking the central + * endpoint where the bucket lives; caching that + * value in the client for the life of the process. + *

+ * Note: previously this constant was defined as + * {@link #CENTRAL_ENDPOINT}, however the actual + * S3A client code used "" as the default when + * {@link #ENDPOINT} was unset. + * As core-default.xml also set the endpoint to "", + * the empty string has long been the real + * default value. */ - public static final String DEFAULT_ENDPOINT = "s3.amazonaws.com"; + public static final String DEFAULT_ENDPOINT = ""; + + /** + * The central endpoint :{@value}. + */ + public static final String CENTRAL_ENDPOINT = "s3.amazonaws.com"; //Enable path style access? Overrides default virtual hosting public static final String PATH_STYLE_ACCESS = "fs.s3a.path.style.access"; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java index 96d16e8b1b..ae50bd1459 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java @@ -22,9 +22,8 @@ import java.net.URI; import com.amazonaws.ClientConfiguration; -import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.client.builder.AwsClientBuilder; -import com.amazonaws.metrics.RequestMetricCollector; +import com.amazonaws.handlers.RequestHandler2; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3Client; import com.amazonaws.services.s3.AmazonS3ClientBuilder; @@ -41,18 +40,15 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.fs.s3a.statistics.StatisticsFromAwsSdk; import org.apache.hadoop.fs.s3a.statistics.impl.AwsStatisticsCollector; import static org.apache.hadoop.fs.s3a.Constants.EXPERIMENTAL_AWS_INTERNAL_THROTTLING; -import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT; import static org.apache.hadoop.fs.s3a.Constants.EXPERIMENTAL_AWS_INTERNAL_THROTTLING_DEFAULT; -import static org.apache.hadoop.fs.s3a.Constants.PATH_STYLE_ACCESS; /** * The default {@link S3ClientFactory} implementation. * This calls the AWS SDK to configure and create an - * {@link AmazonS3Client} that communicates with the S3 service. + * {@code AmazonS3Client} that communicates with the S3 service. */ @InterfaceAudience.Private @InterfaceStability.Unstable @@ -60,8 +56,6 @@ public class DefaultS3ClientFactory extends Configured implements S3ClientFactory { private static final String S3_SERVICE_NAME = "s3"; - private static final String S3_SIGNER = "S3SignerType"; - private static final String S3_V4_SIGNER = "AWSS3V4SignerType"; /** * Subclasses refer to this. @@ -70,22 +64,21 @@ public class DefaultS3ClientFactory extends Configured LoggerFactory.getLogger(DefaultS3ClientFactory.class); /** - * Create the client. - *

- * If the AWS stats are not null then a {@link AwsStatisticsCollector}. - * is created to bind to the two. - * Important: until this binding works properly across regions, - * this should be null. + * Create the client by preparing the AwsConf configuration + * and then invoking {@code buildAmazonS3Client()}. */ @Override - public AmazonS3 createS3Client(URI name, - final String bucket, - final AWSCredentialsProvider credentials, - final String userAgentSuffix, - final StatisticsFromAwsSdk statisticsFromAwsSdk) throws IOException { + public AmazonS3 createS3Client( + final URI uri, + final S3ClientCreationParameters parameters) throws IOException { Configuration conf = getConf(); final ClientConfiguration awsConf = S3AUtils - .createAwsConf(conf, bucket, Constants.AWS_SERVICE_IDENTIFIER_S3); + .createAwsConf(conf, + uri.getHost(), + Constants.AWS_SERVICE_IDENTIFIER_S3); + // add any headers + parameters.getHeaders().forEach((h, v) -> + awsConf.addHeader(h, v)); // When EXPERIMENTAL_AWS_INTERNAL_THROTTLING is false // throttling is explicitly disabled on the S3 client so that @@ -96,111 +89,62 @@ public AmazonS3 createS3Client(URI name, conf.getBoolean(EXPERIMENTAL_AWS_INTERNAL_THROTTLING, EXPERIMENTAL_AWS_INTERNAL_THROTTLING_DEFAULT)); - if (!StringUtils.isEmpty(userAgentSuffix)) { - awsConf.setUserAgentSuffix(userAgentSuffix); + if (!StringUtils.isEmpty(parameters.getUserAgentSuffix())) { + awsConf.setUserAgentSuffix(parameters.getUserAgentSuffix()); } - // optional metrics - RequestMetricCollector metrics = statisticsFromAwsSdk != null - ? new AwsStatisticsCollector(statisticsFromAwsSdk) - : null; - return newAmazonS3Client( - credentials, + return buildAmazonS3Client( awsConf, - metrics, - conf.getTrimmed(ENDPOINT, ""), - conf.getBoolean(PATH_STYLE_ACCESS, false)); + parameters); } /** - * Create an {@link AmazonS3} client. - * Override this to provide an extended version of the client - * @param credentials credentials to use - * @param awsConf AWS configuration - * @param metrics metrics collector or null - * @param endpoint endpoint string; may be "" - * @param pathStyleAccess enable path style access? - * @return new AmazonS3 client - */ - protected AmazonS3 newAmazonS3Client( - final AWSCredentialsProvider credentials, - final ClientConfiguration awsConf, - final RequestMetricCollector metrics, - final String endpoint, - final boolean pathStyleAccess) { - if (metrics != null) { - LOG.debug("Building S3 client using the SDK builder API"); - return buildAmazonS3Client(credentials, awsConf, metrics, endpoint, - pathStyleAccess); - } else { - LOG.debug("Building S3 client using the SDK builder API"); - return classicAmazonS3Client(credentials, awsConf, endpoint, - pathStyleAccess); - } - } - - /** - * Use the (newer) Builder SDK to create a an AWS S3 client. + * Use the Builder API to create an AWS S3 client. *

- * This has a more complex endpoint configuration in a - * way which does not yet work in this code in a way - * which doesn't trigger regressions. So it is only used - * when SDK metrics are supplied. - * @param credentials credentials to use + * This has a more complex endpoint configuration mechanism + * which initially caused problems; the + * {@code withForceGlobalBucketAccessEnabled(true)} + * command is critical here. * @param awsConf AWS configuration - * @param metrics metrics collector or null - * @param endpoint endpoint string; may be "" - * @param pathStyleAccess enable path style access? + * @param parameters parameters * @return new AmazonS3 client */ - private AmazonS3 buildAmazonS3Client( - final AWSCredentialsProvider credentials, + protected AmazonS3 buildAmazonS3Client( final ClientConfiguration awsConf, - final RequestMetricCollector metrics, - final String endpoint, - final boolean pathStyleAccess) { + final S3ClientCreationParameters parameters) { AmazonS3ClientBuilder b = AmazonS3Client.builder(); - b.withCredentials(credentials); + b.withCredentials(parameters.getCredentialSet()); b.withClientConfiguration(awsConf); - b.withPathStyleAccessEnabled(pathStyleAccess); - if (metrics != null) { - b.withMetricsCollector(metrics); + b.withPathStyleAccessEnabled(parameters.isPathStyleAccess()); + + if (parameters.getMetrics() != null) { + b.withMetricsCollector( + new AwsStatisticsCollector(parameters.getMetrics())); + } + if (parameters.getRequestHandlers() != null) { + b.withRequestHandlers( + parameters.getRequestHandlers().toArray(new RequestHandler2[0])); + } + if (parameters.getMonitoringListener() != null) { + b.withMonitoringListener(parameters.getMonitoringListener()); } // endpoint set up is a PITA - // client.setEndpoint("") is no longer available AwsClientBuilder.EndpointConfiguration epr - = createEndpointConfiguration(endpoint, awsConf); + = createEndpointConfiguration(parameters.getEndpoint(), + awsConf); if (epr != null) { // an endpoint binding was constructed: use it. b.withEndpointConfiguration(epr); + } else { + // no idea what the endpoint is, so tell the SDK + // to work it out at the cost of an extra HEAD request + b.withForceGlobalBucketAccessEnabled(true); } final AmazonS3 client = b.build(); return client; } - /** - * Wrapper around constructor for {@link AmazonS3} client. - * Override this to provide an extended version of the client. - *

- * This uses a deprecated constructor -it is currently - * the only one which works for us. - * @param credentials credentials to use - * @param awsConf AWS configuration - * @param endpoint endpoint string; may be "" - * @param pathStyleAccess enable path style access? - * @return new AmazonS3 client - */ - @SuppressWarnings("deprecation") - private AmazonS3 classicAmazonS3Client( - AWSCredentialsProvider credentials, - ClientConfiguration awsConf, - final String endpoint, - final boolean pathStyleAccess) { - final AmazonS3 client = new AmazonS3Client(credentials, awsConf); - return configureAmazonS3Client(client, endpoint, pathStyleAccess); - } - /** * Configure classic S3 client. *

@@ -226,31 +170,6 @@ protected static AmazonS3 configureAmazonS3Client(AmazonS3 s3, throw new IllegalArgumentException(msg, e); } } - return applyS3ClientOptions(s3, pathStyleAccess); - } - - /** - * Perform any tuning of the {@code S3ClientOptions} settings based on - * the Hadoop configuration. - * This is different from the general AWS configuration creation as - * it is unique to S3 connections. - *

- * The {@link Constants#PATH_STYLE_ACCESS} option enables path-style access - * to S3 buckets if configured. By default, the - * behavior is to use virtual hosted-style access with URIs of the form - * {@code http://bucketname.s3.amazonaws.com} - *

- * Enabling path-style access and a - * region-specific endpoint switches the behavior to use URIs of the form - * {@code http://s3-eu-west-1.amazonaws.com/bucketname}. - * It is common to use this when connecting to private S3 servers, as it - * avoids the need to play with DNS entries. - * @param s3 S3 client - * @param pathStyleAccess enable path style access? - * @return the S3 client - */ - protected static AmazonS3 applyS3ClientOptions(AmazonS3 s3, - final boolean pathStyleAccess) { if (pathStyleAccess) { LOG.debug("Enabling path style access!"); s3.setS3ClientOptions(S3ClientOptions.builder() diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentS3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentS3ClientFactory.java index ddc492235d..c11581f1d5 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentS3ClientFactory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentS3ClientFactory.java @@ -19,8 +19,6 @@ package org.apache.hadoop.fs.s3a; import com.amazonaws.ClientConfiguration; -import com.amazonaws.auth.AWSCredentialsProvider; -import com.amazonaws.metrics.RequestMetricCollector; import com.amazonaws.services.s3.AmazonS3; import org.apache.hadoop.classification.InterfaceAudience; @@ -31,31 +29,25 @@ * This client is for testing only; it is in the production * {@code hadoop-aws} module to enable integration tests to use this * just by editing the Hadoop configuration used to bring up the client. + * + * The factory uses the older constructor-based instantiation/configuration + * of the client, so does not wire up metrics, handlers etc. */ @InterfaceAudience.Private @InterfaceStability.Unstable public class InconsistentS3ClientFactory extends DefaultS3ClientFactory { - /** - * Create the inconsistent client. - * Logs a warning that this is being done. - * @param credentials credentials to use - * @param awsConf AWS configuration - * @param metrics metric collector - * @param endpoint AWS endpoint - * @param pathStyleAccess should path style access be supported? - * @return an inconsistent client. - */ @Override - protected AmazonS3 newAmazonS3Client(AWSCredentialsProvider credentials, - ClientConfiguration awsConf, - final RequestMetricCollector metrics, - final String endpoint, - final boolean pathStyleAccess) { + protected AmazonS3 buildAmazonS3Client( + final ClientConfiguration awsConf, + final S3ClientCreationParameters parameters) { LOG.warn("** FAILURE INJECTION ENABLED. Do not run in production! **"); InconsistentAmazonS3Client s3 - = new InconsistentAmazonS3Client(credentials, awsConf, getConf()); - configureAmazonS3Client(s3, endpoint, pathStyleAccess); + = new InconsistentAmazonS3Client( + parameters.getCredentialSet(), awsConf, getConf()); + configureAmazonS3Client(s3, + parameters.getEndpoint(), + parameters.isPathStyleAccess()); return s3; } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index f625346957..8db5d51def 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -69,7 +69,6 @@ import com.amazonaws.services.s3.model.ObjectMetadata; import com.amazonaws.services.s3.model.PutObjectRequest; import com.amazonaws.services.s3.model.PutObjectResult; - import com.amazonaws.services.s3.model.SSEAwsKeyManagementParams; import com.amazonaws.services.s3.model.SSECustomerKey; import com.amazonaws.services.s3.model.UploadPartRequest; @@ -83,7 +82,6 @@ import com.amazonaws.event.ProgressListener; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; -import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -166,7 +164,6 @@ import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics; import org.apache.hadoop.fs.s3a.statistics.CommitterStatistics; import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext; -import org.apache.hadoop.fs.s3a.statistics.StatisticsFromAwsSdk; import org.apache.hadoop.fs.s3a.statistics.impl.BondedS3AStatisticsContext; import org.apache.hadoop.fs.s3native.S3xLoginHelper; import org.apache.hadoop.io.retry.RetryPolicies; @@ -198,7 +195,6 @@ import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletionIgnoringExceptions; import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.isObjectNotFound; import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.isUnknownBucket; -import static org.apache.hadoop.fs.s3a.impl.InternalConstants.AWS_SDK_METRICS_ENABLED; import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_404; import static org.apache.hadoop.fs.s3a.impl.NetworkBinding.fixBucketRegion; import static org.apache.hadoop.fs.s3a.impl.NetworkBinding.logDnsLookup; @@ -376,6 +372,11 @@ public void initialize(URI name, Configuration originalConf) LOG.debug("Initializing S3AFileSystem for {}", bucket); // clone the configuration into one with propagated bucket options Configuration conf = propagateBucketOptions(originalConf, bucket); + // fix up the classloader of the configuration to be whatever + // classloader loaded this filesystem. + // See: HADOOP-17372 + conf.setClassLoader(this.getClass().getClassLoader()); + // patch the Hadoop security providers patchSecurityCredentialProviders(conf); // look for delegation token support early. @@ -740,16 +741,17 @@ private void bindAWSClient(URI name, boolean dtEnabled) throws IOException { S3_CLIENT_FACTORY_IMPL, DEFAULT_S3_CLIENT_FACTORY_IMPL, S3ClientFactory.class); - StatisticsFromAwsSdk awsStats = null; - // TODO: HADOOP-16830 when the S3 client building code works - // with different regions, - // then non-null stats can be passed in here. - if (AWS_SDK_METRICS_ENABLED) { - awsStats = statisticsContext.newStatisticsFromAwsSdk(); - } + S3ClientFactory.S3ClientCreationParameters parameters = null; + parameters = new S3ClientFactory.S3ClientCreationParameters() + .withCredentialSet(credentials) + .withEndpoint(conf.getTrimmed(ENDPOINT, DEFAULT_ENDPOINT)) + .withMetrics(statisticsContext.newStatisticsFromAwsSdk()) + .withPathStyleAccess(conf.getBoolean(PATH_STYLE_ACCESS, false)) + .withUserAgentSuffix(uaSuffix); s3 = ReflectionUtils.newInstance(s3ClientFactoryClass, conf) - .createS3Client(getUri(), bucket, credentials, uaSuffix, awsStats); + .createS3Client(getUri(), + parameters); } /** diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java index e04d3b5cbd..dbb39fb662 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java @@ -18,38 +18,246 @@ package org.apache.hadoop.fs.s3a; +import javax.annotation.Nullable; import java.io.IOException; import java.net.URI; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.handlers.RequestHandler2; +import com.amazonaws.monitoring.MonitoringListener; import com.amazonaws.services.s3.AmazonS3; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.s3a.statistics.StatisticsFromAwsSdk; +import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_ENDPOINT; + /** * Factory for creation of {@link AmazonS3} client instances. + * Important: HBase's HBoss module implements this interface in its + * tests. + * Take care when updating this interface to ensure that a client + * implementing only the deprecated method will work. + * See https://github.com/apache/hbase-filesystem + * */ -@InterfaceAudience.Private -@InterfaceStability.Unstable +@InterfaceAudience.LimitedPrivate("HBoss") +@InterfaceStability.Evolving public interface S3ClientFactory { /** * Creates a new {@link AmazonS3} client. * - * @param name raw input S3A file system URI - * @param bucket Optional bucket to use to look up per-bucket proxy secrets - * @param credentialSet credentials to use - * @param userAgentSuffix optional suffix for the UA field. - * @param statisticsFromAwsSdk binding for AWS stats - may be null + * @param uri S3A file system URI + * @param parameters parameter object * @return S3 client * @throws IOException IO problem */ - AmazonS3 createS3Client(URI name, - String bucket, - AWSCredentialsProvider credentialSet, - String userAgentSuffix, - StatisticsFromAwsSdk statisticsFromAwsSdk) throws IOException; + AmazonS3 createS3Client(URI uri, + S3ClientCreationParameters parameters) throws IOException; + /** + * Settings for the S3 Client. + * Implemented as a class to pass in so that adding + * new parameters does not break the binding of + * external implementations of the factory. + */ + final class S3ClientCreationParameters { + + /** + * Credentials. + */ + private AWSCredentialsProvider credentialSet; + + /** + * Endpoint. + */ + private String endpoint = DEFAULT_ENDPOINT; + + /** + * Custom Headers. + */ + private final Map headers = new HashMap<>(); + + /** + * Monitoring listener. + */ + private MonitoringListener monitoringListener; + + /** + * RequestMetricCollector metrics...if not-null will be wrapped + * with an {@code AwsStatisticsCollector} and passed to + * the client. + */ + private StatisticsFromAwsSdk metrics; + + /** + * Use (deprecated) path style access. + */ + private boolean pathStyleAccess; + + /** + * This is in the settings awaiting wiring up and testing. + */ + private boolean requesterPays; + + /** + * Request handlers; used for auditing, X-Ray etc. + */ + private List requestHandlers; + + /** + * Suffix to UA. + */ + private String userAgentSuffix = ""; + + public List getRequestHandlers() { + return requestHandlers; + } + + /** + * List of request handlers. + * @param handlers handler list. + * @return this object + */ + public S3ClientCreationParameters withRequestHandlers( + @Nullable final List handlers) { + requestHandlers = handlers; + return this; + } + + public MonitoringListener getMonitoringListener() { + return monitoringListener; + } + + /** + * listener for AWS monitoring events. + * @param listener listener + * @return this object + */ + public S3ClientCreationParameters withMonitoringListener( + @Nullable final MonitoringListener listener) { + monitoringListener = listener; + return this; + } + + public StatisticsFromAwsSdk getMetrics() { + return metrics; + } + + /** + * Metrics binding. This is the S3A-level + * statistics interface, which will be wired + * up to the AWS callbacks. + * @param statistics statistics implementation + * @return this object + */ + public S3ClientCreationParameters withMetrics( + @Nullable final StatisticsFromAwsSdk statistics) { + metrics = statistics; + return this; + } + + /** + * Requester pays option. Not yet wired up. + * @param value new value + * @return the builder + */ + public S3ClientCreationParameters withRequesterPays( + final boolean value) { + requesterPays = value; + return this; + } + + public boolean isRequesterPays() { + return requesterPays; + } + + public AWSCredentialsProvider getCredentialSet() { + return credentialSet; + } + + /** + * Set credentials. + * @param value new value + * @return the builder + */ + + public S3ClientCreationParameters withCredentialSet( + final AWSCredentialsProvider value) { + credentialSet = value; + return this; + } + + public String getUserAgentSuffix() { + return userAgentSuffix; + } + + /** + * Set UA suffix. + * @param value new value + * @return the builder + */ + + public S3ClientCreationParameters withUserAgentSuffix( + final String value) { + userAgentSuffix = value; + return this; + } + + public String getEndpoint() { + return endpoint; + } + + /** + * Set endpoint. + * @param value new value + * @return the builder + */ + + public S3ClientCreationParameters withEndpoint( + final String value) { + endpoint = value; + return this; + } + + public boolean isPathStyleAccess() { + return pathStyleAccess; + } + + /** + * Set path access option. + * @param value new value + * @return the builder + */ + public S3ClientCreationParameters withPathStyleAccess( + final boolean value) { + pathStyleAccess = value; + return this; + } + + /** + * Add a custom header. + * @param header header name + * @param value new value + * @return the builder + */ + public S3ClientCreationParameters withHeader( + String header, String value) { + headers.put(header, value); + return this; + } + + /** + * Get the map of headers. + * @return (mutable) header map + */ + public Map getHeaders() { + return headers; + } + } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java index 358ec261cc..a5ce1f68ad 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java @@ -111,10 +111,4 @@ private InternalConstants() { */ public static final int DEFAULT_UPLOAD_PART_COUNT_LIMIT = 10000; - /** - * Flag to enable AWS Statistics binding. As this is triggering - * problems related to region/endpoint setup, it is currently - * disabled. - */ - public static final boolean AWS_SDK_METRICS_ENABLED = true; } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3ClientFactory.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3ClientFactory.java index 868ec2c36c..bd121ba272 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3ClientFactory.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3ClientFactory.java @@ -23,13 +23,10 @@ import java.net.URI; import java.util.ArrayList; -import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.model.MultipartUploadListing; import com.amazonaws.services.s3.model.Region; -import org.apache.hadoop.fs.s3a.statistics.StatisticsFromAwsSdk; - /** * An {@link S3ClientFactory} that returns Mockito mocks of the {@link AmazonS3} * interface suitable for unit testing. @@ -37,12 +34,10 @@ public class MockS3ClientFactory implements S3ClientFactory { @Override - public AmazonS3 createS3Client(URI name, - final String bucket, - final AWSCredentialsProvider credentialSet, - final String userAgentSuffix, - final StatisticsFromAwsSdk statisticsFromAwsSdks) { + public AmazonS3 createS3Client(URI uri, + final S3ClientCreationParameters parameters) { AmazonS3 s3 = mock(AmazonS3.class); + String bucket = uri.getHost(); when(s3.doesBucketExist(bucket)).thenReturn(true); when(s3.doesBucketExistV2(bucket)).thenReturn(true); // this listing is used in startup if purging is enabled, so diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestCustomSigner.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestCustomSigner.java index d9cb1d97bf..72af1752b1 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestCustomSigner.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestCustomSigner.java @@ -23,12 +23,11 @@ import java.util.HashMap; import java.util.Map; import java.util.Objects; +import java.util.concurrent.atomic.AtomicInteger; import com.amazonaws.SignableRequest; import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.Signer; -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.AmazonS3ClientBuilder; import com.amazonaws.services.s3.internal.AWSS3V4Signer; import org.assertj.core.api.Assertions; import org.junit.Test; @@ -40,14 +39,15 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3a.AbstractS3ATestBase; -import org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider; +import org.apache.hadoop.fs.s3a.Constants; +import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.auth.ITestCustomSigner.CustomSignerInitializer.StoreValue; import org.apache.hadoop.fs.s3a.auth.delegation.DelegationTokenProvider; import org.apache.hadoop.security.UserGroupInformation; import static org.apache.hadoop.fs.s3a.Constants.CUSTOM_SIGNERS; import static org.apache.hadoop.fs.s3a.Constants.SIGNING_ALGORITHM_S3; -import static org.apache.hadoop.fs.s3a.impl.NetworkBinding.fixBucketRegion; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching; /** * Tests for custom Signers and SignerInitializers. @@ -62,23 +62,32 @@ public class ITestCustomSigner extends AbstractS3ATestBase { private String regionName; + private String endpoint; + @Override public void setup() throws Exception { super.setup(); - regionName = determineRegion(getFileSystem().getBucket()); + final S3AFileSystem fs = getFileSystem(); + regionName = determineRegion(fs.getBucket()); LOG.info("Determined region name to be [{}] for bucket [{}]", regionName, - getFileSystem().getBucket()); + fs.getBucket()); + endpoint = fs.getConf() + .get(Constants.ENDPOINT, Constants.CENTRAL_ENDPOINT); + LOG.info("Test endpoint is {}", endpoint); } @Test public void testCustomSignerAndInitializer() throws IOException, InterruptedException { + final Path basePath = path(getMethodName()); UserGroupInformation ugi1 = UserGroupInformation.createRemoteUser("user1"); - FileSystem fs1 = runMkDirAndVerify(ugi1, "/customsignerpath1", "id1"); + FileSystem fs1 = runMkDirAndVerify(ugi1, + new Path(basePath, "customsignerpath1"), "id1"); UserGroupInformation ugi2 = UserGroupInformation.createRemoteUser("user2"); - FileSystem fs2 = runMkDirAndVerify(ugi2, "/customsignerpath2", "id2"); + FileSystem fs2 = runMkDirAndVerify(ugi2, + new Path(basePath, "customsignerpath2"), "id2"); Assertions.assertThat(CustomSignerInitializer.knownStores.size()) .as("Num registered stores mismatch").isEqualTo(2); @@ -91,20 +100,19 @@ public void testCustomSignerAndInitializer() } private FileSystem runMkDirAndVerify(UserGroupInformation ugi, - String pathString, String identifier) + Path finalPath, String identifier) throws IOException, InterruptedException { Configuration conf = createTestConfig(identifier); - Path path = new Path(pathString); - path = path.makeQualified(getFileSystem().getUri(), - getFileSystem().getWorkingDirectory()); - - Path finalPath = path; return ugi.doAs((PrivilegedExceptionAction) () -> { - int invocationCount = CustomSigner.invocationCount; + int instantiationCount = CustomSigner.getInstantiationCount(); + int invocationCount = CustomSigner.getInvocationCount(); FileSystem fs = finalPath.getFileSystem(conf); fs.mkdirs(finalPath); - Assertions.assertThat(CustomSigner.invocationCount) - .as("Invocation count lower than expected") + Assertions.assertThat(CustomSigner.getInstantiationCount()) + .as("CustomSigner Instantiation count lower than expected") + .isGreaterThan(instantiationCount); + Assertions.assertThat(CustomSigner.getInvocationCount()) + .as("CustomSigner Invocation count lower than expected") .isGreaterThan(invocationCount); Assertions.assertThat(CustomSigner.lastStoreValue) @@ -118,6 +126,12 @@ private FileSystem runMkDirAndVerify(UserGroupInformation ugi, }); } + /** + * Create a test conf with the custom signer; fixes up + * endpoint to be that of the test FS. + * @param identifier test key. + * @return a configuration for a filesystem. + */ private Configuration createTestConfig(String identifier) { Configuration conf = createConfiguration(); @@ -128,24 +142,38 @@ private Configuration createTestConfig(String identifier) { conf.set(TEST_ID_KEY, identifier); conf.set(TEST_REGION_KEY, regionName); + conf.set(Constants.ENDPOINT, endpoint); + // make absolutely sure there is no caching. + disableFilesystemCaching(conf); return conf; } private String determineRegion(String bucketName) throws IOException { - String region = getFileSystem().getBucketLocation(bucketName); - return fixBucketRegion(region); + return getFileSystem().getBucketLocation(bucketName); } @Private public static final class CustomSigner implements Signer { - private static int invocationCount = 0; + + private static final AtomicInteger INSTANTIATION_COUNT = + new AtomicInteger(0); + private static final AtomicInteger INVOCATION_COUNT = + new AtomicInteger(0); + private static StoreValue lastStoreValue; + public CustomSigner() { + int c = INSTANTIATION_COUNT.incrementAndGet(); + LOG.info("Creating Signer #{}", c); + } + @Override public void sign(SignableRequest request, AWSCredentials credentials) { - invocationCount++; + int c = INVOCATION_COUNT.incrementAndGet(); + LOG.info("Signing request #{}", c); + String host = request.getEndpoint().getHost(); String bucketName = host.split("\\.")[0]; try { @@ -159,6 +187,14 @@ public void sign(SignableRequest request, AWSCredentials credentials) { realSigner.setRegionName(lastStoreValue.conf.get(TEST_REGION_KEY)); realSigner.sign(request, credentials); } + + public static int getInstantiationCount() { + return INSTANTIATION_COUNT.get(); + } + + public static int getInvocationCount() { + return INVOCATION_COUNT.get(); + } } @Private diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/delegation/ITestSessionDelegationInFileystem.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/delegation/ITestSessionDelegationInFileystem.java index f5562bdf32..26655de9d4 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/delegation/ITestSessionDelegationInFileystem.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/delegation/ITestSessionDelegationInFileystem.java @@ -43,8 +43,8 @@ import org.apache.hadoop.fs.s3a.S3AEncryptionMethods; import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.S3ATestUtils; +import org.apache.hadoop.fs.s3a.S3ClientFactory; import org.apache.hadoop.fs.s3a.Statistic; -import org.apache.hadoop.fs.s3a.statistics.StatisticsFromAwsSdk; import org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext; import org.apache.hadoop.hdfs.tools.DelegationTokenFetcher; import org.apache.hadoop.io.Text; @@ -72,7 +72,6 @@ import static org.apache.hadoop.fs.s3a.auth.delegation.MiniKerberizedHadoopCluster.ALICE; import static org.apache.hadoop.fs.s3a.auth.delegation.MiniKerberizedHadoopCluster.assertSecurityEnabled; import static org.apache.hadoop.fs.s3a.auth.delegation.S3ADelegationTokens.lookupS3ADelegationToken; -import static org.apache.hadoop.fs.s3a.impl.InternalConstants.AWS_SDK_METRICS_ENABLED; import static org.apache.hadoop.test.LambdaTestUtils.doAs; import static org.apache.hadoop.test.LambdaTestUtils.intercept; import static org.hamcrest.Matchers.containsString; @@ -557,23 +556,22 @@ public void testDelegationBindingMismatch2() throws Throwable { */ protected ObjectMetadata readLandsatMetadata(final S3AFileSystem delegatedFS) throws Exception { - AWSCredentialProviderList testing + AWSCredentialProviderList testingCreds = delegatedFS.shareCredentials("testing"); URI landsat = new URI(DEFAULT_CSVTEST_FILE); DefaultS3ClientFactory factory = new DefaultS3ClientFactory(); - Configuration conf = new Configuration(delegatedFS.getConf()); - conf.set(ENDPOINT, ""); - factory.setConf(conf); + factory.setConf(new Configuration(delegatedFS.getConf())); String host = landsat.getHost(); - StatisticsFromAwsSdk awsStats = null; - if (AWS_SDK_METRICS_ENABLED) { - awsStats = new EmptyS3AStatisticsContext() - .newStatisticsFromAwsSdk(); - } - AmazonS3 s3 = factory.createS3Client(landsat, host, testing, - "ITestSessionDelegationInFileystem", awsStats); + S3ClientFactory.S3ClientCreationParameters parameters = null; + parameters = new S3ClientFactory.S3ClientCreationParameters() + .withCredentialSet(testingCreds) + .withEndpoint(DEFAULT_ENDPOINT) + .withMetrics(new EmptyS3AStatisticsContext() + .newStatisticsFromAwsSdk()) + .withUserAgentSuffix("ITestSessionDelegationInFileystem"); + AmazonS3 s3 = factory.createS3Client(landsat, parameters); return Invoker.once("HEAD", host, () -> s3.getObjectMetadata(host, landsat.getPath().substring(1))); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java index b025f6f096..4d7f81d019 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java @@ -175,6 +175,7 @@ public void testCreateAbortEmptyFile() throws Throwable { Path destFile = methodPath(filename); Path pendingFilePath = makeMagic(destFile); touch(fs, pendingFilePath); + waitForConsistency(); validateIntermediateAndFinalPaths(pendingFilePath, destFile); Path pendingDataPath = validatePendingCommitData(filename, pendingFilePath); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestAWSStatisticCollection.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestAWSStatisticCollection.java new file mode 100644 index 0000000000..e7696996db --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestAWSStatisticCollection.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.statistics; + +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.AbstractS3ATestBase; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.statistics.IOStatistics; + +import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_ENDPOINT; +import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.getLandsatCSVPath; +import static org.apache.hadoop.fs.s3a.Statistic.STORE_IO_REQUEST; +import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter; + +/** + * Verify that AWS SDK statistics are wired up. + * This test tries to read data from US-east-1 and us-west-2 buckets + * so as to be confident that the nuances of region mapping + * are handed correctly (HADOOP-13551). + * The statistics are probed to verify that the wiring up is complete. + */ +public class ITestAWSStatisticCollection extends AbstractS3ATestBase { + + private static final Path COMMON_CRAWL_PATH + = new Path("s3a://osm-pds/planet/planet-latest.orc"); + + @Test + public void testLandsatStatistics() throws Throwable { + final Configuration conf = getConfiguration(); + // skips the tests if the landsat path isn't the default. + Path path = getLandsatCSVPath(conf); + conf.set(ENDPOINT, DEFAULT_ENDPOINT); + conf.unset("fs.s3a.bucket.landsat-pds.endpoint"); + + try (S3AFileSystem fs = (S3AFileSystem) path.getFileSystem(conf)) { + fs.getObjectMetadata(path); + IOStatistics iostats = fs.getIOStatistics(); + assertThatStatisticCounter(iostats, + STORE_IO_REQUEST.getSymbol()) + .isGreaterThanOrEqualTo(1); + } + } + + @Test + public void testCommonCrawlStatistics() throws Throwable { + final Configuration conf = getConfiguration(); + // skips the tests if the landsat path isn't the default. + getLandsatCSVPath(conf); + + Path path = COMMON_CRAWL_PATH; + conf.set(ENDPOINT, DEFAULT_ENDPOINT); + + try (S3AFileSystem fs = (S3AFileSystem) path.getFileSystem(conf)) { + fs.getObjectMetadata(path); + IOStatistics iostats = fs.getIOStatistics(); + assertThatStatisticCounter(iostats, + STORE_IO_REQUEST.getSymbol()) + .isGreaterThanOrEqualTo(1); + } + } + +}