HADOOP-13551. AWS metrics wire-up (#2778)

Moves to the builder API for AWS S3 client creation, and
offers a similar style of API to the S3A FileSystem and tests, hiding
the details of which options are client, which are in AWS Conf,
and doing the wiring up of S3A statistics interfaces to the AWS
SDK internals. S3A Statistics, including IOStatistics, should now
count throttling events handled in the AWS SDK itself.

This patch restores endpoint determination by probes to US-East-1
if the client isn't configured with fs.s3a.endpoint.

Explicitly setting the endpoint will save the cost of these probe
HTTP requests.

Contributed by Steve Loughran.

Change-Id: Ifa6caa8ff56369ad30e4fd01a42bc74f7b8b3d6b
This commit is contained in:
Steve Loughran 2021-03-24 13:32:54 +00:00
parent 4c3324ca1a
commit a07e3c41ca
No known key found for this signature in database
GPG Key ID: D22CF846DBB162A0
11 changed files with 466 additions and 220 deletions

View File

@ -160,14 +160,33 @@ private Constants() {
DEFAULT_SSL_CHANNEL_MODE = DEFAULT_SSL_CHANNEL_MODE =
DelegatingSSLSocketFactory.SSLChannelMode.Default_JSSE; DelegatingSSLSocketFactory.SSLChannelMode.Default_JSSE;
//use a custom endpoint? /**
* Endpoint. For v4 signing and/or better performance,
* this should be the specific endpoint of the region
* in which the bucket is hosted.
*/
public static final String ENDPOINT = "fs.s3a.endpoint"; public static final String ENDPOINT = "fs.s3a.endpoint";
/** /**
* Default value of s3 endpoint. If not set explicitly using * Default value of s3 endpoint: {@value}.
* {@code AmazonS3#setEndpoint()}, this is used. * It tells the AWS client to work it out by asking the central
* endpoint where the bucket lives; caching that
* value in the client for the life of the process.
* <p>
* Note: previously this constant was defined as
* {@link #CENTRAL_ENDPOINT}, however the actual
* S3A client code used "" as the default when
* {@link #ENDPOINT} was unset.
* As core-default.xml also set the endpoint to "",
* the empty string has long been the <i>real</i>
* default value.
*/ */
public static final String DEFAULT_ENDPOINT = "s3.amazonaws.com"; public static final String DEFAULT_ENDPOINT = "";
/**
* The central endpoint :{@value}.
*/
public static final String CENTRAL_ENDPOINT = "s3.amazonaws.com";
//Enable path style access? Overrides default virtual hosting //Enable path style access? Overrides default virtual hosting
public static final String PATH_STYLE_ACCESS = "fs.s3a.path.style.access"; public static final String PATH_STYLE_ACCESS = "fs.s3a.path.style.access";

View File

@ -22,9 +22,8 @@
import java.net.URI; import java.net.URI;
import com.amazonaws.ClientConfiguration; import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.client.builder.AwsClientBuilder; import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.metrics.RequestMetricCollector; import com.amazonaws.handlers.RequestHandler2;
import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client; import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.AmazonS3ClientBuilder; import com.amazonaws.services.s3.AmazonS3ClientBuilder;
@ -41,18 +40,15 @@
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured; import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.s3a.statistics.StatisticsFromAwsSdk;
import org.apache.hadoop.fs.s3a.statistics.impl.AwsStatisticsCollector; import org.apache.hadoop.fs.s3a.statistics.impl.AwsStatisticsCollector;
import static org.apache.hadoop.fs.s3a.Constants.EXPERIMENTAL_AWS_INTERNAL_THROTTLING; import static org.apache.hadoop.fs.s3a.Constants.EXPERIMENTAL_AWS_INTERNAL_THROTTLING;
import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT;
import static org.apache.hadoop.fs.s3a.Constants.EXPERIMENTAL_AWS_INTERNAL_THROTTLING_DEFAULT; import static org.apache.hadoop.fs.s3a.Constants.EXPERIMENTAL_AWS_INTERNAL_THROTTLING_DEFAULT;
import static org.apache.hadoop.fs.s3a.Constants.PATH_STYLE_ACCESS;
/** /**
* The default {@link S3ClientFactory} implementation. * The default {@link S3ClientFactory} implementation.
* This calls the AWS SDK to configure and create an * This calls the AWS SDK to configure and create an
* {@link AmazonS3Client} that communicates with the S3 service. * {@code AmazonS3Client} that communicates with the S3 service.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Unstable @InterfaceStability.Unstable
@ -60,8 +56,6 @@ public class DefaultS3ClientFactory extends Configured
implements S3ClientFactory { implements S3ClientFactory {
private static final String S3_SERVICE_NAME = "s3"; private static final String S3_SERVICE_NAME = "s3";
private static final String S3_SIGNER = "S3SignerType";
private static final String S3_V4_SIGNER = "AWSS3V4SignerType";
/** /**
* Subclasses refer to this. * Subclasses refer to this.
@ -70,22 +64,21 @@ public class DefaultS3ClientFactory extends Configured
LoggerFactory.getLogger(DefaultS3ClientFactory.class); LoggerFactory.getLogger(DefaultS3ClientFactory.class);
/** /**
* Create the client. * Create the client by preparing the AwsConf configuration
* <p> * and then invoking {@code buildAmazonS3Client()}.
* If the AWS stats are not null then a {@link AwsStatisticsCollector}.
* is created to bind to the two.
* <i>Important: until this binding works properly across regions,
* this should be null.</i>
*/ */
@Override @Override
public AmazonS3 createS3Client(URI name, public AmazonS3 createS3Client(
final String bucket, final URI uri,
final AWSCredentialsProvider credentials, final S3ClientCreationParameters parameters) throws IOException {
final String userAgentSuffix,
final StatisticsFromAwsSdk statisticsFromAwsSdk) throws IOException {
Configuration conf = getConf(); Configuration conf = getConf();
final ClientConfiguration awsConf = S3AUtils final ClientConfiguration awsConf = S3AUtils
.createAwsConf(conf, bucket, Constants.AWS_SERVICE_IDENTIFIER_S3); .createAwsConf(conf,
uri.getHost(),
Constants.AWS_SERVICE_IDENTIFIER_S3);
// add any headers
parameters.getHeaders().forEach((h, v) ->
awsConf.addHeader(h, v));
// When EXPERIMENTAL_AWS_INTERNAL_THROTTLING is false // When EXPERIMENTAL_AWS_INTERNAL_THROTTLING is false
// throttling is explicitly disabled on the S3 client so that // throttling is explicitly disabled on the S3 client so that
@ -96,111 +89,62 @@ public AmazonS3 createS3Client(URI name,
conf.getBoolean(EXPERIMENTAL_AWS_INTERNAL_THROTTLING, conf.getBoolean(EXPERIMENTAL_AWS_INTERNAL_THROTTLING,
EXPERIMENTAL_AWS_INTERNAL_THROTTLING_DEFAULT)); EXPERIMENTAL_AWS_INTERNAL_THROTTLING_DEFAULT));
if (!StringUtils.isEmpty(userAgentSuffix)) { if (!StringUtils.isEmpty(parameters.getUserAgentSuffix())) {
awsConf.setUserAgentSuffix(userAgentSuffix); awsConf.setUserAgentSuffix(parameters.getUserAgentSuffix());
} }
// optional metrics
RequestMetricCollector metrics = statisticsFromAwsSdk != null
? new AwsStatisticsCollector(statisticsFromAwsSdk)
: null;
return newAmazonS3Client( return buildAmazonS3Client(
credentials,
awsConf, awsConf,
metrics, parameters);
conf.getTrimmed(ENDPOINT, ""),
conf.getBoolean(PATH_STYLE_ACCESS, false));
} }
/** /**
* Create an {@link AmazonS3} client. * Use the Builder API to create an AWS S3 client.
* Override this to provide an extended version of the client
* @param credentials credentials to use
* @param awsConf AWS configuration
* @param metrics metrics collector or null
* @param endpoint endpoint string; may be ""
* @param pathStyleAccess enable path style access?
* @return new AmazonS3 client
*/
protected AmazonS3 newAmazonS3Client(
final AWSCredentialsProvider credentials,
final ClientConfiguration awsConf,
final RequestMetricCollector metrics,
final String endpoint,
final boolean pathStyleAccess) {
if (metrics != null) {
LOG.debug("Building S3 client using the SDK builder API");
return buildAmazonS3Client(credentials, awsConf, metrics, endpoint,
pathStyleAccess);
} else {
LOG.debug("Building S3 client using the SDK builder API");
return classicAmazonS3Client(credentials, awsConf, endpoint,
pathStyleAccess);
}
}
/**
* Use the (newer) Builder SDK to create a an AWS S3 client.
* <p> * <p>
* This has a more complex endpoint configuration in a * This has a more complex endpoint configuration mechanism
* way which does not yet work in this code in a way * which initially caused problems; the
* which doesn't trigger regressions. So it is only used * {@code withForceGlobalBucketAccessEnabled(true)}
* when SDK metrics are supplied. * command is critical here.
* @param credentials credentials to use
* @param awsConf AWS configuration * @param awsConf AWS configuration
* @param metrics metrics collector or null * @param parameters parameters
* @param endpoint endpoint string; may be ""
* @param pathStyleAccess enable path style access?
* @return new AmazonS3 client * @return new AmazonS3 client
*/ */
private AmazonS3 buildAmazonS3Client( protected AmazonS3 buildAmazonS3Client(
final AWSCredentialsProvider credentials,
final ClientConfiguration awsConf, final ClientConfiguration awsConf,
final RequestMetricCollector metrics, final S3ClientCreationParameters parameters) {
final String endpoint,
final boolean pathStyleAccess) {
AmazonS3ClientBuilder b = AmazonS3Client.builder(); AmazonS3ClientBuilder b = AmazonS3Client.builder();
b.withCredentials(credentials); b.withCredentials(parameters.getCredentialSet());
b.withClientConfiguration(awsConf); b.withClientConfiguration(awsConf);
b.withPathStyleAccessEnabled(pathStyleAccess); b.withPathStyleAccessEnabled(parameters.isPathStyleAccess());
if (metrics != null) {
b.withMetricsCollector(metrics); if (parameters.getMetrics() != null) {
b.withMetricsCollector(
new AwsStatisticsCollector(parameters.getMetrics()));
}
if (parameters.getRequestHandlers() != null) {
b.withRequestHandlers(
parameters.getRequestHandlers().toArray(new RequestHandler2[0]));
}
if (parameters.getMonitoringListener() != null) {
b.withMonitoringListener(parameters.getMonitoringListener());
} }
// endpoint set up is a PITA // endpoint set up is a PITA
// client.setEndpoint("") is no longer available
AwsClientBuilder.EndpointConfiguration epr AwsClientBuilder.EndpointConfiguration epr
= createEndpointConfiguration(endpoint, awsConf); = createEndpointConfiguration(parameters.getEndpoint(),
awsConf);
if (epr != null) { if (epr != null) {
// an endpoint binding was constructed: use it. // an endpoint binding was constructed: use it.
b.withEndpointConfiguration(epr); b.withEndpointConfiguration(epr);
} else {
// no idea what the endpoint is, so tell the SDK
// to work it out at the cost of an extra HEAD request
b.withForceGlobalBucketAccessEnabled(true);
} }
final AmazonS3 client = b.build(); final AmazonS3 client = b.build();
return client; return client;
} }
/**
* Wrapper around constructor for {@link AmazonS3} client.
* Override this to provide an extended version of the client.
* <p>
* This uses a deprecated constructor -it is currently
* the only one which works for us.
* @param credentials credentials to use
* @param awsConf AWS configuration
* @param endpoint endpoint string; may be ""
* @param pathStyleAccess enable path style access?
* @return new AmazonS3 client
*/
@SuppressWarnings("deprecation")
private AmazonS3 classicAmazonS3Client(
AWSCredentialsProvider credentials,
ClientConfiguration awsConf,
final String endpoint,
final boolean pathStyleAccess) {
final AmazonS3 client = new AmazonS3Client(credentials, awsConf);
return configureAmazonS3Client(client, endpoint, pathStyleAccess);
}
/** /**
* Configure classic S3 client. * Configure classic S3 client.
* <p> * <p>
@ -226,31 +170,6 @@ protected static AmazonS3 configureAmazonS3Client(AmazonS3 s3,
throw new IllegalArgumentException(msg, e); throw new IllegalArgumentException(msg, e);
} }
} }
return applyS3ClientOptions(s3, pathStyleAccess);
}
/**
* Perform any tuning of the {@code S3ClientOptions} settings based on
* the Hadoop configuration.
* This is different from the general AWS configuration creation as
* it is unique to S3 connections.
* <p>
* The {@link Constants#PATH_STYLE_ACCESS} option enables path-style access
* to S3 buckets if configured. By default, the
* behavior is to use virtual hosted-style access with URIs of the form
* {@code http://bucketname.s3.amazonaws.com}
* <p>
* Enabling path-style access and a
* region-specific endpoint switches the behavior to use URIs of the form
* {@code http://s3-eu-west-1.amazonaws.com/bucketname}.
* It is common to use this when connecting to private S3 servers, as it
* avoids the need to play with DNS entries.
* @param s3 S3 client
* @param pathStyleAccess enable path style access?
* @return the S3 client
*/
protected static AmazonS3 applyS3ClientOptions(AmazonS3 s3,
final boolean pathStyleAccess) {
if (pathStyleAccess) { if (pathStyleAccess) {
LOG.debug("Enabling path style access!"); LOG.debug("Enabling path style access!");
s3.setS3ClientOptions(S3ClientOptions.builder() s3.setS3ClientOptions(S3ClientOptions.builder()

View File

@ -19,8 +19,6 @@
package org.apache.hadoop.fs.s3a; package org.apache.hadoop.fs.s3a;
import com.amazonaws.ClientConfiguration; import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.metrics.RequestMetricCollector;
import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
@ -31,31 +29,25 @@
* This client is for testing <i>only</i>; it is in the production * This client is for testing <i>only</i>; it is in the production
* {@code hadoop-aws} module to enable integration tests to use this * {@code hadoop-aws} module to enable integration tests to use this
* just by editing the Hadoop configuration used to bring up the client. * just by editing the Hadoop configuration used to bring up the client.
*
* The factory uses the older constructor-based instantiation/configuration
* of the client, so does not wire up metrics, handlers etc.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Unstable @InterfaceStability.Unstable
public class InconsistentS3ClientFactory extends DefaultS3ClientFactory { public class InconsistentS3ClientFactory extends DefaultS3ClientFactory {
/**
* Create the inconsistent client.
* Logs a warning that this is being done.
* @param credentials credentials to use
* @param awsConf AWS configuration
* @param metrics metric collector
* @param endpoint AWS endpoint
* @param pathStyleAccess should path style access be supported?
* @return an inconsistent client.
*/
@Override @Override
protected AmazonS3 newAmazonS3Client(AWSCredentialsProvider credentials, protected AmazonS3 buildAmazonS3Client(
ClientConfiguration awsConf, final ClientConfiguration awsConf,
final RequestMetricCollector metrics, final S3ClientCreationParameters parameters) {
final String endpoint,
final boolean pathStyleAccess) {
LOG.warn("** FAILURE INJECTION ENABLED. Do not run in production! **"); LOG.warn("** FAILURE INJECTION ENABLED. Do not run in production! **");
InconsistentAmazonS3Client s3 InconsistentAmazonS3Client s3
= new InconsistentAmazonS3Client(credentials, awsConf, getConf()); = new InconsistentAmazonS3Client(
configureAmazonS3Client(s3, endpoint, pathStyleAccess); parameters.getCredentialSet(), awsConf, getConf());
configureAmazonS3Client(s3,
parameters.getEndpoint(),
parameters.isPathStyleAccess());
return s3; return s3;
} }
} }

View File

@ -69,7 +69,6 @@
import com.amazonaws.services.s3.model.ObjectMetadata; import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectRequest; import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.PutObjectResult; import com.amazonaws.services.s3.model.PutObjectResult;
import com.amazonaws.services.s3.model.SSEAwsKeyManagementParams; import com.amazonaws.services.s3.model.SSEAwsKeyManagementParams;
import com.amazonaws.services.s3.model.SSECustomerKey; import com.amazonaws.services.s3.model.SSECustomerKey;
import com.amazonaws.services.s3.model.UploadPartRequest; import com.amazonaws.services.s3.model.UploadPartRequest;
@ -83,7 +82,6 @@
import com.amazonaws.event.ProgressListener; import com.amazonaws.event.ProgressListener;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -166,7 +164,6 @@
import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics; import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics;
import org.apache.hadoop.fs.s3a.statistics.CommitterStatistics; import org.apache.hadoop.fs.s3a.statistics.CommitterStatistics;
import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext; import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
import org.apache.hadoop.fs.s3a.statistics.StatisticsFromAwsSdk;
import org.apache.hadoop.fs.s3a.statistics.impl.BondedS3AStatisticsContext; import org.apache.hadoop.fs.s3a.statistics.impl.BondedS3AStatisticsContext;
import org.apache.hadoop.fs.s3native.S3xLoginHelper; import org.apache.hadoop.fs.s3native.S3xLoginHelper;
import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicies;
@ -198,7 +195,6 @@
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletionIgnoringExceptions; import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletionIgnoringExceptions;
import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.isObjectNotFound; import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.isObjectNotFound;
import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.isUnknownBucket; import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.isUnknownBucket;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.AWS_SDK_METRICS_ENABLED;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_404; import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_404;
import static org.apache.hadoop.fs.s3a.impl.NetworkBinding.fixBucketRegion; import static org.apache.hadoop.fs.s3a.impl.NetworkBinding.fixBucketRegion;
import static org.apache.hadoop.fs.s3a.impl.NetworkBinding.logDnsLookup; import static org.apache.hadoop.fs.s3a.impl.NetworkBinding.logDnsLookup;
@ -376,6 +372,11 @@ public void initialize(URI name, Configuration originalConf)
LOG.debug("Initializing S3AFileSystem for {}", bucket); LOG.debug("Initializing S3AFileSystem for {}", bucket);
// clone the configuration into one with propagated bucket options // clone the configuration into one with propagated bucket options
Configuration conf = propagateBucketOptions(originalConf, bucket); Configuration conf = propagateBucketOptions(originalConf, bucket);
// fix up the classloader of the configuration to be whatever
// classloader loaded this filesystem.
// See: HADOOP-17372
conf.setClassLoader(this.getClass().getClassLoader());
// patch the Hadoop security providers // patch the Hadoop security providers
patchSecurityCredentialProviders(conf); patchSecurityCredentialProviders(conf);
// look for delegation token support early. // look for delegation token support early.
@ -740,16 +741,17 @@ private void bindAWSClient(URI name, boolean dtEnabled) throws IOException {
S3_CLIENT_FACTORY_IMPL, DEFAULT_S3_CLIENT_FACTORY_IMPL, S3_CLIENT_FACTORY_IMPL, DEFAULT_S3_CLIENT_FACTORY_IMPL,
S3ClientFactory.class); S3ClientFactory.class);
StatisticsFromAwsSdk awsStats = null; S3ClientFactory.S3ClientCreationParameters parameters = null;
// TODO: HADOOP-16830 when the S3 client building code works parameters = new S3ClientFactory.S3ClientCreationParameters()
// with different regions, .withCredentialSet(credentials)
// then non-null stats can be passed in here. .withEndpoint(conf.getTrimmed(ENDPOINT, DEFAULT_ENDPOINT))
if (AWS_SDK_METRICS_ENABLED) { .withMetrics(statisticsContext.newStatisticsFromAwsSdk())
awsStats = statisticsContext.newStatisticsFromAwsSdk(); .withPathStyleAccess(conf.getBoolean(PATH_STYLE_ACCESS, false))
} .withUserAgentSuffix(uaSuffix);
s3 = ReflectionUtils.newInstance(s3ClientFactoryClass, conf) s3 = ReflectionUtils.newInstance(s3ClientFactoryClass, conf)
.createS3Client(getUri(), bucket, credentials, uaSuffix, awsStats); .createS3Client(getUri(),
parameters);
} }
/** /**

View File

@ -18,38 +18,246 @@
package org.apache.hadoop.fs.s3a; package org.apache.hadoop.fs.s3a;
import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.handlers.RequestHandler2;
import com.amazonaws.monitoring.MonitoringListener;
import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.s3a.statistics.StatisticsFromAwsSdk; import org.apache.hadoop.fs.s3a.statistics.StatisticsFromAwsSdk;
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_ENDPOINT;
/** /**
* Factory for creation of {@link AmazonS3} client instances. * Factory for creation of {@link AmazonS3} client instances.
* Important: HBase's HBoss module implements this interface in its
* tests.
* Take care when updating this interface to ensure that a client
* implementing only the deprecated method will work.
* See https://github.com/apache/hbase-filesystem
*
*/ */
@InterfaceAudience.Private @InterfaceAudience.LimitedPrivate("HBoss")
@InterfaceStability.Unstable @InterfaceStability.Evolving
public interface S3ClientFactory { public interface S3ClientFactory {
/** /**
* Creates a new {@link AmazonS3} client. * Creates a new {@link AmazonS3} client.
* *
* @param name raw input S3A file system URI * @param uri S3A file system URI
* @param bucket Optional bucket to use to look up per-bucket proxy secrets * @param parameters parameter object
* @param credentialSet credentials to use
* @param userAgentSuffix optional suffix for the UA field.
* @param statisticsFromAwsSdk binding for AWS stats - may be null
* @return S3 client * @return S3 client
* @throws IOException IO problem * @throws IOException IO problem
*/ */
AmazonS3 createS3Client(URI name, AmazonS3 createS3Client(URI uri,
String bucket, S3ClientCreationParameters parameters) throws IOException;
AWSCredentialsProvider credentialSet,
String userAgentSuffix,
StatisticsFromAwsSdk statisticsFromAwsSdk) throws IOException;
/**
* Settings for the S3 Client.
* Implemented as a class to pass in so that adding
* new parameters does not break the binding of
* external implementations of the factory.
*/
final class S3ClientCreationParameters {
/**
* Credentials.
*/
private AWSCredentialsProvider credentialSet;
/**
* Endpoint.
*/
private String endpoint = DEFAULT_ENDPOINT;
/**
* Custom Headers.
*/
private final Map<String, String> headers = new HashMap<>();
/**
* Monitoring listener.
*/
private MonitoringListener monitoringListener;
/**
* RequestMetricCollector metrics...if not-null will be wrapped
* with an {@code AwsStatisticsCollector} and passed to
* the client.
*/
private StatisticsFromAwsSdk metrics;
/**
* Use (deprecated) path style access.
*/
private boolean pathStyleAccess;
/**
* This is in the settings awaiting wiring up and testing.
*/
private boolean requesterPays;
/**
* Request handlers; used for auditing, X-Ray etc.
*/
private List<RequestHandler2> requestHandlers;
/**
* Suffix to UA.
*/
private String userAgentSuffix = "";
public List<RequestHandler2> getRequestHandlers() {
return requestHandlers;
}
/**
* List of request handlers.
* @param handlers handler list.
* @return this object
*/
public S3ClientCreationParameters withRequestHandlers(
@Nullable final List<RequestHandler2> handlers) {
requestHandlers = handlers;
return this;
}
public MonitoringListener getMonitoringListener() {
return monitoringListener;
}
/**
* listener for AWS monitoring events.
* @param listener listener
* @return this object
*/
public S3ClientCreationParameters withMonitoringListener(
@Nullable final MonitoringListener listener) {
monitoringListener = listener;
return this;
}
public StatisticsFromAwsSdk getMetrics() {
return metrics;
}
/**
* Metrics binding. This is the S3A-level
* statistics interface, which will be wired
* up to the AWS callbacks.
* @param statistics statistics implementation
* @return this object
*/
public S3ClientCreationParameters withMetrics(
@Nullable final StatisticsFromAwsSdk statistics) {
metrics = statistics;
return this;
}
/**
* Requester pays option. Not yet wired up.
* @param value new value
* @return the builder
*/
public S3ClientCreationParameters withRequesterPays(
final boolean value) {
requesterPays = value;
return this;
}
public boolean isRequesterPays() {
return requesterPays;
}
public AWSCredentialsProvider getCredentialSet() {
return credentialSet;
}
/**
* Set credentials.
* @param value new value
* @return the builder
*/
public S3ClientCreationParameters withCredentialSet(
final AWSCredentialsProvider value) {
credentialSet = value;
return this;
}
public String getUserAgentSuffix() {
return userAgentSuffix;
}
/**
* Set UA suffix.
* @param value new value
* @return the builder
*/
public S3ClientCreationParameters withUserAgentSuffix(
final String value) {
userAgentSuffix = value;
return this;
}
public String getEndpoint() {
return endpoint;
}
/**
* Set endpoint.
* @param value new value
* @return the builder
*/
public S3ClientCreationParameters withEndpoint(
final String value) {
endpoint = value;
return this;
}
public boolean isPathStyleAccess() {
return pathStyleAccess;
}
/**
* Set path access option.
* @param value new value
* @return the builder
*/
public S3ClientCreationParameters withPathStyleAccess(
final boolean value) {
pathStyleAccess = value;
return this;
}
/**
* Add a custom header.
* @param header header name
* @param value new value
* @return the builder
*/
public S3ClientCreationParameters withHeader(
String header, String value) {
headers.put(header, value);
return this;
}
/**
* Get the map of headers.
* @return (mutable) header map
*/
public Map<String, String> getHeaders() {
return headers;
}
}
} }

View File

@ -111,10 +111,4 @@ private InternalConstants() {
*/ */
public static final int DEFAULT_UPLOAD_PART_COUNT_LIMIT = 10000; public static final int DEFAULT_UPLOAD_PART_COUNT_LIMIT = 10000;
/**
* Flag to enable AWS Statistics binding. As this is triggering
* problems related to region/endpoint setup, it is currently
* disabled.
*/
public static final boolean AWS_SDK_METRICS_ENABLED = true;
} }

View File

@ -23,13 +23,10 @@
import java.net.URI; import java.net.URI;
import java.util.ArrayList; import java.util.ArrayList;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.MultipartUploadListing; import com.amazonaws.services.s3.model.MultipartUploadListing;
import com.amazonaws.services.s3.model.Region; import com.amazonaws.services.s3.model.Region;
import org.apache.hadoop.fs.s3a.statistics.StatisticsFromAwsSdk;
/** /**
* An {@link S3ClientFactory} that returns Mockito mocks of the {@link AmazonS3} * An {@link S3ClientFactory} that returns Mockito mocks of the {@link AmazonS3}
* interface suitable for unit testing. * interface suitable for unit testing.
@ -37,12 +34,10 @@
public class MockS3ClientFactory implements S3ClientFactory { public class MockS3ClientFactory implements S3ClientFactory {
@Override @Override
public AmazonS3 createS3Client(URI name, public AmazonS3 createS3Client(URI uri,
final String bucket, final S3ClientCreationParameters parameters) {
final AWSCredentialsProvider credentialSet,
final String userAgentSuffix,
final StatisticsFromAwsSdk statisticsFromAwsSdks) {
AmazonS3 s3 = mock(AmazonS3.class); AmazonS3 s3 = mock(AmazonS3.class);
String bucket = uri.getHost();
when(s3.doesBucketExist(bucket)).thenReturn(true); when(s3.doesBucketExist(bucket)).thenReturn(true);
when(s3.doesBucketExistV2(bucket)).thenReturn(true); when(s3.doesBucketExistV2(bucket)).thenReturn(true);
// this listing is used in startup if purging is enabled, so // this listing is used in startup if purging is enabled, so

View File

@ -23,12 +23,11 @@
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import com.amazonaws.SignableRequest; import com.amazonaws.SignableRequest;
import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.Signer; import com.amazonaws.auth.Signer;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.internal.AWSS3V4Signer; import com.amazonaws.services.s3.internal.AWSS3V4Signer;
import org.assertj.core.api.Assertions; import org.assertj.core.api.Assertions;
import org.junit.Test; import org.junit.Test;
@ -40,14 +39,15 @@
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.AbstractS3ATestBase; import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
import org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider; import org.apache.hadoop.fs.s3a.Constants;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.auth.ITestCustomSigner.CustomSignerInitializer.StoreValue; import org.apache.hadoop.fs.s3a.auth.ITestCustomSigner.CustomSignerInitializer.StoreValue;
import org.apache.hadoop.fs.s3a.auth.delegation.DelegationTokenProvider; import org.apache.hadoop.fs.s3a.auth.delegation.DelegationTokenProvider;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import static org.apache.hadoop.fs.s3a.Constants.CUSTOM_SIGNERS; import static org.apache.hadoop.fs.s3a.Constants.CUSTOM_SIGNERS;
import static org.apache.hadoop.fs.s3a.Constants.SIGNING_ALGORITHM_S3; import static org.apache.hadoop.fs.s3a.Constants.SIGNING_ALGORITHM_S3;
import static org.apache.hadoop.fs.s3a.impl.NetworkBinding.fixBucketRegion; import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
/** /**
* Tests for custom Signers and SignerInitializers. * Tests for custom Signers and SignerInitializers.
@ -62,23 +62,32 @@ public class ITestCustomSigner extends AbstractS3ATestBase {
private String regionName; private String regionName;
private String endpoint;
@Override @Override
public void setup() throws Exception { public void setup() throws Exception {
super.setup(); super.setup();
regionName = determineRegion(getFileSystem().getBucket()); final S3AFileSystem fs = getFileSystem();
regionName = determineRegion(fs.getBucket());
LOG.info("Determined region name to be [{}] for bucket [{}]", regionName, LOG.info("Determined region name to be [{}] for bucket [{}]", regionName,
getFileSystem().getBucket()); fs.getBucket());
endpoint = fs.getConf()
.get(Constants.ENDPOINT, Constants.CENTRAL_ENDPOINT);
LOG.info("Test endpoint is {}", endpoint);
} }
@Test @Test
public void testCustomSignerAndInitializer() public void testCustomSignerAndInitializer()
throws IOException, InterruptedException { throws IOException, InterruptedException {
final Path basePath = path(getMethodName());
UserGroupInformation ugi1 = UserGroupInformation.createRemoteUser("user1"); UserGroupInformation ugi1 = UserGroupInformation.createRemoteUser("user1");
FileSystem fs1 = runMkDirAndVerify(ugi1, "/customsignerpath1", "id1"); FileSystem fs1 = runMkDirAndVerify(ugi1,
new Path(basePath, "customsignerpath1"), "id1");
UserGroupInformation ugi2 = UserGroupInformation.createRemoteUser("user2"); UserGroupInformation ugi2 = UserGroupInformation.createRemoteUser("user2");
FileSystem fs2 = runMkDirAndVerify(ugi2, "/customsignerpath2", "id2"); FileSystem fs2 = runMkDirAndVerify(ugi2,
new Path(basePath, "customsignerpath2"), "id2");
Assertions.assertThat(CustomSignerInitializer.knownStores.size()) Assertions.assertThat(CustomSignerInitializer.knownStores.size())
.as("Num registered stores mismatch").isEqualTo(2); .as("Num registered stores mismatch").isEqualTo(2);
@ -91,20 +100,19 @@ public void testCustomSignerAndInitializer()
} }
private FileSystem runMkDirAndVerify(UserGroupInformation ugi, private FileSystem runMkDirAndVerify(UserGroupInformation ugi,
String pathString, String identifier) Path finalPath, String identifier)
throws IOException, InterruptedException { throws IOException, InterruptedException {
Configuration conf = createTestConfig(identifier); Configuration conf = createTestConfig(identifier);
Path path = new Path(pathString);
path = path.makeQualified(getFileSystem().getUri(),
getFileSystem().getWorkingDirectory());
Path finalPath = path;
return ugi.doAs((PrivilegedExceptionAction<FileSystem>) () -> { return ugi.doAs((PrivilegedExceptionAction<FileSystem>) () -> {
int invocationCount = CustomSigner.invocationCount; int instantiationCount = CustomSigner.getInstantiationCount();
int invocationCount = CustomSigner.getInvocationCount();
FileSystem fs = finalPath.getFileSystem(conf); FileSystem fs = finalPath.getFileSystem(conf);
fs.mkdirs(finalPath); fs.mkdirs(finalPath);
Assertions.assertThat(CustomSigner.invocationCount) Assertions.assertThat(CustomSigner.getInstantiationCount())
.as("Invocation count lower than expected") .as("CustomSigner Instantiation count lower than expected")
.isGreaterThan(instantiationCount);
Assertions.assertThat(CustomSigner.getInvocationCount())
.as("CustomSigner Invocation count lower than expected")
.isGreaterThan(invocationCount); .isGreaterThan(invocationCount);
Assertions.assertThat(CustomSigner.lastStoreValue) Assertions.assertThat(CustomSigner.lastStoreValue)
@ -118,6 +126,12 @@ private FileSystem runMkDirAndVerify(UserGroupInformation ugi,
}); });
} }
/**
* Create a test conf with the custom signer; fixes up
* endpoint to be that of the test FS.
* @param identifier test key.
* @return a configuration for a filesystem.
*/
private Configuration createTestConfig(String identifier) { private Configuration createTestConfig(String identifier) {
Configuration conf = createConfiguration(); Configuration conf = createConfiguration();
@ -128,24 +142,38 @@ private Configuration createTestConfig(String identifier) {
conf.set(TEST_ID_KEY, identifier); conf.set(TEST_ID_KEY, identifier);
conf.set(TEST_REGION_KEY, regionName); conf.set(TEST_REGION_KEY, regionName);
conf.set(Constants.ENDPOINT, endpoint);
// make absolutely sure there is no caching.
disableFilesystemCaching(conf);
return conf; return conf;
} }
private String determineRegion(String bucketName) throws IOException { private String determineRegion(String bucketName) throws IOException {
String region = getFileSystem().getBucketLocation(bucketName); return getFileSystem().getBucketLocation(bucketName);
return fixBucketRegion(region);
} }
@Private @Private
public static final class CustomSigner implements Signer { public static final class CustomSigner implements Signer {
private static int invocationCount = 0;
private static final AtomicInteger INSTANTIATION_COUNT =
new AtomicInteger(0);
private static final AtomicInteger INVOCATION_COUNT =
new AtomicInteger(0);
private static StoreValue lastStoreValue; private static StoreValue lastStoreValue;
public CustomSigner() {
int c = INSTANTIATION_COUNT.incrementAndGet();
LOG.info("Creating Signer #{}", c);
}
@Override @Override
public void sign(SignableRequest<?> request, AWSCredentials credentials) { public void sign(SignableRequest<?> request, AWSCredentials credentials) {
invocationCount++; int c = INVOCATION_COUNT.incrementAndGet();
LOG.info("Signing request #{}", c);
String host = request.getEndpoint().getHost(); String host = request.getEndpoint().getHost();
String bucketName = host.split("\\.")[0]; String bucketName = host.split("\\.")[0];
try { try {
@ -159,6 +187,14 @@ public void sign(SignableRequest<?> request, AWSCredentials credentials) {
realSigner.setRegionName(lastStoreValue.conf.get(TEST_REGION_KEY)); realSigner.setRegionName(lastStoreValue.conf.get(TEST_REGION_KEY));
realSigner.sign(request, credentials); realSigner.sign(request, credentials);
} }
public static int getInstantiationCount() {
return INSTANTIATION_COUNT.get();
}
public static int getInvocationCount() {
return INVOCATION_COUNT.get();
}
} }
@Private @Private

View File

@ -43,8 +43,8 @@
import org.apache.hadoop.fs.s3a.S3AEncryptionMethods; import org.apache.hadoop.fs.s3a.S3AEncryptionMethods;
import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3ATestUtils; import org.apache.hadoop.fs.s3a.S3ATestUtils;
import org.apache.hadoop.fs.s3a.S3ClientFactory;
import org.apache.hadoop.fs.s3a.Statistic; import org.apache.hadoop.fs.s3a.Statistic;
import org.apache.hadoop.fs.s3a.statistics.StatisticsFromAwsSdk;
import org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext; import org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext;
import org.apache.hadoop.hdfs.tools.DelegationTokenFetcher; import org.apache.hadoop.hdfs.tools.DelegationTokenFetcher;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
@ -72,7 +72,6 @@
import static org.apache.hadoop.fs.s3a.auth.delegation.MiniKerberizedHadoopCluster.ALICE; import static org.apache.hadoop.fs.s3a.auth.delegation.MiniKerberizedHadoopCluster.ALICE;
import static org.apache.hadoop.fs.s3a.auth.delegation.MiniKerberizedHadoopCluster.assertSecurityEnabled; import static org.apache.hadoop.fs.s3a.auth.delegation.MiniKerberizedHadoopCluster.assertSecurityEnabled;
import static org.apache.hadoop.fs.s3a.auth.delegation.S3ADelegationTokens.lookupS3ADelegationToken; import static org.apache.hadoop.fs.s3a.auth.delegation.S3ADelegationTokens.lookupS3ADelegationToken;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.AWS_SDK_METRICS_ENABLED;
import static org.apache.hadoop.test.LambdaTestUtils.doAs; import static org.apache.hadoop.test.LambdaTestUtils.doAs;
import static org.apache.hadoop.test.LambdaTestUtils.intercept; import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
@ -557,23 +556,22 @@ public void testDelegationBindingMismatch2() throws Throwable {
*/ */
protected ObjectMetadata readLandsatMetadata(final S3AFileSystem delegatedFS) protected ObjectMetadata readLandsatMetadata(final S3AFileSystem delegatedFS)
throws Exception { throws Exception {
AWSCredentialProviderList testing AWSCredentialProviderList testingCreds
= delegatedFS.shareCredentials("testing"); = delegatedFS.shareCredentials("testing");
URI landsat = new URI(DEFAULT_CSVTEST_FILE); URI landsat = new URI(DEFAULT_CSVTEST_FILE);
DefaultS3ClientFactory factory DefaultS3ClientFactory factory
= new DefaultS3ClientFactory(); = new DefaultS3ClientFactory();
Configuration conf = new Configuration(delegatedFS.getConf()); factory.setConf(new Configuration(delegatedFS.getConf()));
conf.set(ENDPOINT, "");
factory.setConf(conf);
String host = landsat.getHost(); String host = landsat.getHost();
StatisticsFromAwsSdk awsStats = null; S3ClientFactory.S3ClientCreationParameters parameters = null;
if (AWS_SDK_METRICS_ENABLED) { parameters = new S3ClientFactory.S3ClientCreationParameters()
awsStats = new EmptyS3AStatisticsContext() .withCredentialSet(testingCreds)
.newStatisticsFromAwsSdk(); .withEndpoint(DEFAULT_ENDPOINT)
} .withMetrics(new EmptyS3AStatisticsContext()
AmazonS3 s3 = factory.createS3Client(landsat, host, testing, .newStatisticsFromAwsSdk())
"ITestSessionDelegationInFileystem", awsStats); .withUserAgentSuffix("ITestSessionDelegationInFileystem");
AmazonS3 s3 = factory.createS3Client(landsat, parameters);
return Invoker.once("HEAD", host, return Invoker.once("HEAD", host,
() -> s3.getObjectMetadata(host, landsat.getPath().substring(1))); () -> s3.getObjectMetadata(host, landsat.getPath().substring(1)));

View File

@ -175,6 +175,7 @@ public void testCreateAbortEmptyFile() throws Throwable {
Path destFile = methodPath(filename); Path destFile = methodPath(filename);
Path pendingFilePath = makeMagic(destFile); Path pendingFilePath = makeMagic(destFile);
touch(fs, pendingFilePath); touch(fs, pendingFilePath);
waitForConsistency();
validateIntermediateAndFinalPaths(pendingFilePath, destFile); validateIntermediateAndFinalPaths(pendingFilePath, destFile);
Path pendingDataPath = validatePendingCommitData(filename, Path pendingDataPath = validatePendingCommitData(filename,
pendingFilePath); pendingFilePath);

View File

@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.statistics;
import org.junit.Test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.statistics.IOStatistics;
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_ENDPOINT;
import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getLandsatCSVPath;
import static org.apache.hadoop.fs.s3a.Statistic.STORE_IO_REQUEST;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter;
/**
* Verify that AWS SDK statistics are wired up.
* This test tries to read data from US-east-1 and us-west-2 buckets
* so as to be confident that the nuances of region mapping
* are handed correctly (HADOOP-13551).
* The statistics are probed to verify that the wiring up is complete.
*/
public class ITestAWSStatisticCollection extends AbstractS3ATestBase {
private static final Path COMMON_CRAWL_PATH
= new Path("s3a://osm-pds/planet/planet-latest.orc");
@Test
public void testLandsatStatistics() throws Throwable {
final Configuration conf = getConfiguration();
// skips the tests if the landsat path isn't the default.
Path path = getLandsatCSVPath(conf);
conf.set(ENDPOINT, DEFAULT_ENDPOINT);
conf.unset("fs.s3a.bucket.landsat-pds.endpoint");
try (S3AFileSystem fs = (S3AFileSystem) path.getFileSystem(conf)) {
fs.getObjectMetadata(path);
IOStatistics iostats = fs.getIOStatistics();
assertThatStatisticCounter(iostats,
STORE_IO_REQUEST.getSymbol())
.isGreaterThanOrEqualTo(1);
}
}
@Test
public void testCommonCrawlStatistics() throws Throwable {
final Configuration conf = getConfiguration();
// skips the tests if the landsat path isn't the default.
getLandsatCSVPath(conf);
Path path = COMMON_CRAWL_PATH;
conf.set(ENDPOINT, DEFAULT_ENDPOINT);
try (S3AFileSystem fs = (S3AFileSystem) path.getFileSystem(conf)) {
fs.getObjectMetadata(path);
IOStatistics iostats = fs.getIOStatistics();
assertThatStatisticCounter(iostats,
STORE_IO_REQUEST.getSymbol())
.isGreaterThanOrEqualTo(1);
}
}
}