diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml index 75acf489e6..29c2bc288c 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml +++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml @@ -1033,7 +1033,19 @@ fs.s3a.assumed.role.sts.endpoint - AWS Simple Token Service Endpoint. If unset, uses the default endpoint. + AWS Security Token Service Endpoint. + If unset, uses the default endpoint. + Only used if AssumedRoleCredentialProvider is the AWS credential provider. + + + + + fs.s3a.assumed.role.sts.endpoint.region + us-west-1 + + AWS Security Token Service Endpoint's region; + Needed if fs.s3a.assumed.role.sts.endpoint points to an endpoint + other than the default one and the v4 signature is used. Only used if AssumedRoleCredentialProvider is the AWS credential provider. @@ -1058,7 +1070,9 @@ fs.s3a.connection.ssl.enabled true - Enables or disables SSL connections to S3. + Enables or disables SSL connections to AWS services. + Also sets the default port to use for the s3a proxy settings, + when not explicitly set in fs.s3a.proxy.port. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSCredentialProviderList.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSCredentialProviderList.java index 10201f00d3..f9052fa97b 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSCredentialProviderList.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSCredentialProviderList.java @@ -18,24 +18,28 @@ package org.apache.hadoop.fs.s3a; +import java.io.Closeable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + import com.amazonaws.AmazonClientException; import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.AnonymousAWSCredentials; import com.google.common.annotations.VisibleForTesting; -import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.io.IOUtils; - +import com.google.common.base.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.Closeable; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.s3a.auth.NoAuthWithAWSException; +import org.apache.hadoop.io.IOUtils; /** * A list of providers. @@ -62,10 +66,18 @@ public class AWSCredentialProviderList implements AWSCredentialsProvider, public static final String NO_AWS_CREDENTIAL_PROVIDERS = "No AWS Credential Providers"; + static final String + CREDENTIALS_REQUESTED_WHEN_CLOSED + = "Credentials requested after provider list was closed"; + private final List providers = new ArrayList<>(1); private boolean reuseLastProvider = true; private AWSCredentialsProvider lastProvider; + private final AtomicInteger refCount = new AtomicInteger(1); + + private final AtomicBoolean closed = new AtomicBoolean(false); + /** * Empty instance. This is not ready to be used. */ @@ -94,6 +106,9 @@ public void add(AWSCredentialsProvider p) { */ @Override public void refresh() { + if (isClosed()) { + return; + } for (AWSCredentialsProvider provider : providers) { provider.refresh(); } @@ -106,6 +121,11 @@ public void refresh() { */ @Override public AWSCredentials getCredentials() { + if (isClosed()) { + LOG.warn(CREDENTIALS_REQUESTED_WHEN_CLOSED); + throw new NoAuthWithAWSException( + CREDENTIALS_REQUESTED_WHEN_CLOSED); + } checkNotEmpty(); if (reuseLastProvider && lastProvider != null) { return lastProvider.getCredentials(); @@ -136,8 +156,7 @@ public AWSCredentials getCredentials() { if (lastException != null) { message += ": " + lastException; } - throw new AmazonClientException(message, lastException); - + throw new NoAuthWithAWSException(message, lastException); } /** @@ -156,7 +175,7 @@ List getProviders() { */ public void checkNotEmpty() { if (providers.isEmpty()) { - throw new AmazonClientException(NO_AWS_CREDENTIAL_PROVIDERS); + throw new NoAuthWithAWSException(NO_AWS_CREDENTIAL_PROVIDERS); } } @@ -178,8 +197,38 @@ public String listProviderNames() { */ @Override public String toString() { - return "AWSCredentialProviderList: " + - StringUtils.join(providers, " "); + return "AWSCredentialProviderList[" + + "refcount= " + refCount.get() + ": [" + + StringUtils.join(providers, ", ") + ']'; + } + + /** + * Get a reference to this object with an updated reference count. + * + * @return a reference to this + */ + public synchronized AWSCredentialProviderList share() { + Preconditions.checkState(!closed.get(), "Provider list is closed"); + refCount.incrementAndGet(); + return this; + } + + /** + * Get the current reference count. + * @return the current ref count + */ + @VisibleForTesting + public int getRefCount() { + return refCount.get(); + } + + /** + * Get the closed flag. + * @return true iff the list is closed. + */ + @VisibleForTesting + public boolean isClosed() { + return closed.get(); } /** @@ -190,9 +239,29 @@ public String toString() { */ @Override public void close() { - for(AWSCredentialsProvider p: providers) { + synchronized (this) { + if (closed.get()) { + // already closed: no-op + return; + } + int remainder = refCount.decrementAndGet(); + if (remainder != 0) { + // still actively used, or somehow things are + // now negative + LOG.debug("Not closing {}", this); + return; + } + // at this point, the closing is going to happen + LOG.debug("Closing {}", this); + closed.set(true); + } + + // do this outside the synchronized block. + for (AWSCredentialsProvider p : providers) { if (p instanceof Closeable) { - IOUtils.closeStream((Closeable)p); + IOUtils.closeStream((Closeable) p); + } else if (p instanceof AutoCloseable) { + S3AUtils.closeAutocloseables(LOG, (AutoCloseable)p); } } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index c52193698f..a8da6ec440 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -84,10 +84,27 @@ private Constants() { public static final String ASSUMED_ROLE_SESSION_DURATION = "fs.s3a.assumed.role.session.duration"; - /** Simple Token Service Endpoint. If unset, uses the default endpoint. */ + /** Security Token Service Endpoint. If unset, uses the default endpoint. */ public static final String ASSUMED_ROLE_STS_ENDPOINT = "fs.s3a.assumed.role.sts.endpoint"; + /** + * Region for the STS endpoint; only relevant if the endpoint + * is set. + */ + public static final String ASSUMED_ROLE_STS_ENDPOINT_REGION = + "fs.s3a.assumed.role.sts.endpoint.region"; + + /** + * Default value for the STS endpoint region; needed for + * v4 signing. + */ + public static final String ASSUMED_ROLE_STS_ENDPOINT_REGION_DEFAULT = + "us-west-1"; + + /** + * Default duration of an assumed role. + */ public static final String ASSUMED_ROLE_SESSION_DURATION_DEFAULT = "30m"; /** list of providers to authenticate for the assumed role. */ diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java index f33b25eca9..ade317fd60 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java @@ -18,59 +18,45 @@ package org.apache.hadoop.fs.s3a; +import java.io.IOException; +import java.net.URI; + import com.amazonaws.ClientConfiguration; -import com.amazonaws.Protocol; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3Client; import com.amazonaws.services.s3.S3ClientOptions; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.util.VersionInfo; import org.slf4j.Logger; -import java.io.IOException; -import java.net.URI; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; -import static org.apache.hadoop.fs.s3a.Constants.*; -import static org.apache.hadoop.fs.s3a.S3AUtils.createAWSCredentialProviderSet; -import static org.apache.hadoop.fs.s3a.S3AUtils.intOption; +import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT; +import static org.apache.hadoop.fs.s3a.Constants.PATH_STYLE_ACCESS; /** - * The default factory implementation, which calls the AWS SDK to configure - * and create an {@link AmazonS3Client} that communicates with the S3 service. + * The default {@link S3ClientFactory} implementation. + * This which calls the AWS SDK to configure and create an + * {@link AmazonS3Client} that communicates with the S3 service. */ -public class DefaultS3ClientFactory extends Configured implements - S3ClientFactory { +public class DefaultS3ClientFactory extends Configured + implements S3ClientFactory { protected static final Logger LOG = S3AFileSystem.LOG; @Override - public AmazonS3 createS3Client(URI name) throws IOException { + public AmazonS3 createS3Client(URI name, + final String bucket, + final AWSCredentialsProvider credentials) throws IOException { Configuration conf = getConf(); - AWSCredentialsProvider credentials = - createAWSCredentialProviderSet(name, conf); - final ClientConfiguration awsConf = createAwsConf(getConf()); - AmazonS3 s3 = newAmazonS3Client(credentials, awsConf); - return createAmazonS3Client(s3, conf, credentials, awsConf); + final ClientConfiguration awsConf = S3AUtils.createAwsConf(getConf(), bucket); + return configureAmazonS3Client( + newAmazonS3Client(credentials, awsConf), conf); } /** - * Create a new {@link ClientConfiguration}. - * @param conf The Hadoop configuration - * @return new AWS client configuration - */ - public static ClientConfiguration createAwsConf(Configuration conf) { - final ClientConfiguration awsConf = new ClientConfiguration(); - initConnectionSettings(conf, awsConf); - initProxySupport(conf, awsConf); - initUserAgent(conf, awsConf); - return awsConf; - } - - /** - * Wrapper around constructor for {@link AmazonS3} client. Override this to - * provide an extended version of the client + * Wrapper around constructor for {@link AmazonS3} client. + * Override this to provide an extended version of the client * @param credentials credentials to use * @param awsConf AWS configuration * @return new AmazonS3 client @@ -81,120 +67,17 @@ protected AmazonS3 newAmazonS3Client( } /** - * Initializes all AWS SDK settings related to connection management. + * Configure S3 client from the Hadoop configuration. + * + * This includes: endpoint, Path Access and possibly other + * options. * * @param conf Hadoop configuration - * @param awsConf AWS SDK configuration - */ - private static void initConnectionSettings(Configuration conf, - ClientConfiguration awsConf) { - awsConf.setMaxConnections(intOption(conf, MAXIMUM_CONNECTIONS, - DEFAULT_MAXIMUM_CONNECTIONS, 1)); - boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS, - DEFAULT_SECURE_CONNECTIONS); - awsConf.setProtocol(secureConnections ? Protocol.HTTPS : Protocol.HTTP); - awsConf.setMaxErrorRetry(intOption(conf, MAX_ERROR_RETRIES, - DEFAULT_MAX_ERROR_RETRIES, 0)); - awsConf.setConnectionTimeout(intOption(conf, ESTABLISH_TIMEOUT, - DEFAULT_ESTABLISH_TIMEOUT, 0)); - awsConf.setSocketTimeout(intOption(conf, SOCKET_TIMEOUT, - DEFAULT_SOCKET_TIMEOUT, 0)); - int sockSendBuffer = intOption(conf, SOCKET_SEND_BUFFER, - DEFAULT_SOCKET_SEND_BUFFER, 2048); - int sockRecvBuffer = intOption(conf, SOCKET_RECV_BUFFER, - DEFAULT_SOCKET_RECV_BUFFER, 2048); - awsConf.setSocketBufferSizeHints(sockSendBuffer, sockRecvBuffer); - String signerOverride = conf.getTrimmed(SIGNING_ALGORITHM, ""); - if (!signerOverride.isEmpty()) { - LOG.debug("Signer override = {}", signerOverride); - awsConf.setSignerOverride(signerOverride); - } - } - - /** - * Initializes AWS SDK proxy support if configured. - * - * @param conf Hadoop configuration - * @param awsConf AWS SDK configuration - * @throws IllegalArgumentException if misconfigured - */ - private static void initProxySupport(Configuration conf, - ClientConfiguration awsConf) throws IllegalArgumentException { - String proxyHost = conf.getTrimmed(PROXY_HOST, ""); - int proxyPort = conf.getInt(PROXY_PORT, -1); - if (!proxyHost.isEmpty()) { - awsConf.setProxyHost(proxyHost); - if (proxyPort >= 0) { - awsConf.setProxyPort(proxyPort); - } else { - if (conf.getBoolean(SECURE_CONNECTIONS, DEFAULT_SECURE_CONNECTIONS)) { - LOG.warn("Proxy host set without port. Using HTTPS default 443"); - awsConf.setProxyPort(443); - } else { - LOG.warn("Proxy host set without port. Using HTTP default 80"); - awsConf.setProxyPort(80); - } - } - String proxyUsername = conf.getTrimmed(PROXY_USERNAME); - String proxyPassword = conf.getTrimmed(PROXY_PASSWORD); - if ((proxyUsername == null) != (proxyPassword == null)) { - String msg = "Proxy error: " + PROXY_USERNAME + " or " + - PROXY_PASSWORD + " set without the other."; - LOG.error(msg); - throw new IllegalArgumentException(msg); - } - awsConf.setProxyUsername(proxyUsername); - awsConf.setProxyPassword(proxyPassword); - awsConf.setProxyDomain(conf.getTrimmed(PROXY_DOMAIN)); - awsConf.setProxyWorkstation(conf.getTrimmed(PROXY_WORKSTATION)); - if (LOG.isDebugEnabled()) { - LOG.debug("Using proxy server {}:{} as user {} with password {} on " + - "domain {} as workstation {}", awsConf.getProxyHost(), - awsConf.getProxyPort(), - String.valueOf(awsConf.getProxyUsername()), - awsConf.getProxyPassword(), awsConf.getProxyDomain(), - awsConf.getProxyWorkstation()); - } - } else if (proxyPort >= 0) { - String msg = - "Proxy error: " + PROXY_PORT + " set without " + PROXY_HOST; - LOG.error(msg); - throw new IllegalArgumentException(msg); - } - } - - /** - * Initializes the User-Agent header to send in HTTP requests to the S3 - * back-end. We always include the Hadoop version number. The user also - * may set an optional custom prefix to put in front of the Hadoop version - * number. The AWS SDK interally appends its own information, which seems - * to include the AWS SDK version, OS and JVM version. - * - * @param conf Hadoop configuration - * @param awsConf AWS SDK configuration - */ - private static void initUserAgent(Configuration conf, - ClientConfiguration awsConf) { - String userAgent = "Hadoop " + VersionInfo.getVersion(); - String userAgentPrefix = conf.getTrimmed(USER_AGENT_PREFIX, ""); - if (!userAgentPrefix.isEmpty()) { - userAgent = userAgentPrefix + ", " + userAgent; - } - LOG.debug("Using User-Agent: {}", userAgent); - awsConf.setUserAgentPrefix(userAgent); - } - - /** - * Creates an {@link AmazonS3Client} from the established configuration. - * - * @param conf Hadoop configuration - * @param credentials AWS credentials - * @param awsConf AWS SDK configuration * @return S3 client * @throws IllegalArgumentException if misconfigured */ - private static AmazonS3 createAmazonS3Client(AmazonS3 s3, Configuration conf, - AWSCredentialsProvider credentials, ClientConfiguration awsConf) + private static AmazonS3 configureAmazonS3Client(AmazonS3 s3, + Configuration conf) throws IllegalArgumentException { String endPoint = conf.getTrimmed(ENDPOINT, ""); if (!endPoint.isEmpty()) { @@ -206,21 +89,29 @@ private static AmazonS3 createAmazonS3Client(AmazonS3 s3, Configuration conf, throw new IllegalArgumentException(msg, e); } } - enablePathStyleAccessIfRequired(s3, conf); - return s3; + return applyS3ClientOptions(s3, conf); } /** - * Enables path-style access to S3 buckets if configured. By default, the - * behavior is to use virtual hosted-style access with URIs of the form - * http://bucketname.s3.amazonaws.com. Enabling path-style access and a - * region-specific endpoint switches the behavior to use URIs of the form - * http://s3-eu-west-1.amazonaws.com/bucketname. + * Perform any tuning of the {@code S3ClientOptions} settings based on + * the Hadoop configuration. + * This is different from the general AWS configuration creation as + * it is unique to S3 connections. * + * The {@link Constants#PATH_STYLE_ACCESS} option enables path-style access + * to S3 buckets if configured. By default, the + * behavior is to use virtual hosted-style access with URIs of the form + * {@code http://bucketname.s3.amazonaws.com} + * Enabling path-style access and a + * region-specific endpoint switches the behavior to use URIs of the form + * {@code http://s3-eu-west-1.amazonaws.com/bucketname}. + * It is common to use this when connecting to private S3 servers, as it + * avoids the need to play with DNS entries. * @param s3 S3 client * @param conf Hadoop configuration + * @return the S3 client */ - private static void enablePathStyleAccessIfRequired(AmazonS3 s3, + private static AmazonS3 applyS3ClientOptions(AmazonS3 s3, Configuration conf) { final boolean pathStyleAccess = conf.getBoolean(PATH_STYLE_ACCESS, false); if (pathStyleAccess) { @@ -229,5 +120,6 @@ private static void enablePathStyleAccessIfRequired(AmazonS3 s3, .setPathStyleAccess(true) .build()); } + return s3; } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java index 99ed87da8c..2cd1aae5ba 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java @@ -114,6 +114,16 @@ public S3ObjectSummary summary() { /** Map of key to delay -> time it was created. */ private Map delayedPutKeys = new HashMap<>(); + /** + * Instantiate. + * This subclasses a deprecated constructor of the parent + * {@code AmazonS3Client} class; we can't use the builder API because, + * that only creates the consistent client. + * @param credentials credentials to auth. + * @param clientConfiguration connection settings + * @param conf hadoop configuration. + */ + @SuppressWarnings("deprecation") public InconsistentAmazonS3Client(AWSCredentialsProvider credentials, ClientConfiguration clientConfiguration, Configuration conf) { super(credentials, clientConfiguration); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentS3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentS3ClientFactory.java index 17d268bdcf..932c472f5b 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentS3ClientFactory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentS3ClientFactory.java @@ -21,16 +21,27 @@ import com.amazonaws.ClientConfiguration; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.services.s3.AmazonS3; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; /** * S3 Client factory used for testing with eventual consistency fault injection. + * This client is for testing only; it is in the production + * {@code hadoop-aws} module to enable integration tests to use this + * just by editing the Hadoop configuration used to bring up the client. */ @InterfaceAudience.Private @InterfaceStability.Unstable public class InconsistentS3ClientFactory extends DefaultS3ClientFactory { + /** + * Create the inconsistent client. + * Logs a warning that this is being done. + * @param credentials credentials to use + * @param awsConf AWS configuration + * @return an inconsistent client. + */ @Override protected AmazonS3 newAmazonS3Client(AWSCredentialsProvider credentials, ClientConfiguration awsConf) { diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 737d7da95c..72a5fdea38 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -77,8 +77,9 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ListeningExecutorService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import org.apache.commons.io.IOUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; @@ -124,9 +125,6 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.commons.lang3.StringUtils.isNotEmpty; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - /** * The core S3A Filesystem implementation. * @@ -205,6 +203,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities { private boolean useListV1; private MagicCommitIntegration committerIntegration; + private AWSCredentialProviderList credentials; + /** Add any deprecated keys. */ @SuppressWarnings("deprecation") private static void addDeprecatedKeys() { @@ -252,8 +252,10 @@ public void initialize(URI name, Configuration originalConf) Class s3ClientFactoryClass = conf.getClass( S3_CLIENT_FACTORY_IMPL, DEFAULT_S3_CLIENT_FACTORY_IMPL, S3ClientFactory.class); + + credentials = createAWSCredentialProviderSet(name, conf); s3 = ReflectionUtils.newInstance(s3ClientFactoryClass, conf) - .createS3Client(name); + .createS3Client(name, bucket, credentials); invoker = new Invoker(new S3ARetryPolicy(getConf()), onRetry); s3guardInvoker = new Invoker(new S3GuardExistsRetryPolicy(getConf()), onRetry); @@ -2470,12 +2472,11 @@ public void close() throws IOException { transfers.shutdownNow(true); transfers = null; } - if (metadataStore != null) { - metadataStore.close(); - metadataStore = null; - } - IOUtils.closeQuietly(instrumentation); + S3AUtils.closeAll(LOG, metadataStore, instrumentation); + metadataStore = null; instrumentation = null; + closeAutocloseables(LOG, credentials); + credentials = null; } } @@ -2885,6 +2886,7 @@ public String toString() { } sb.append(", boundedExecutor=").append(boundedThreadPool); sb.append(", unboundedExecutor=").append(unboundedThreadPool); + sb.append(", credentials=").append(credentials); sb.append(", statistics {") .append(statistics) .append("}"); @@ -3319,4 +3321,17 @@ public boolean hasCapability(String capability) { return false; } } + + /** + * Get a shared copy of the AWS credentials, with its reference + * counter updated. + * Caller is required to call {@code close()} on this after + * they have finished using it. + * @param purpose what is this for? This is initially for logging + * @return a reference to shared credentials. + */ + public AWSCredentialProviderList shareCredentials(final String purpose) { + LOG.debug("Sharing credentials for: {}", purpose); + return credentials.share(); + } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java index 2b361fd455..e6e789573e 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java @@ -37,6 +37,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.InvalidRequestException; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.auth.NoAuthWithAWSException; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.net.ConnectTimeoutException; @@ -154,8 +155,9 @@ protected Map, RetryPolicy> createExceptionMap() { policyMap.put(InterruptedException.class, fail); // note this does not pick up subclasses (like socket timeout) policyMap.put(InterruptedIOException.class, fail); - // interesting question: should this be retried ever? + // Access denial and auth exceptions are not retried policyMap.put(AccessDeniedException.class, fail); + policyMap.put(NoAuthWithAWSException.class, fail); policyMap.put(FileNotFoundException.class, fail); policyMap.put(InvalidRequestException.class, fail); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java index a5f7d75449..9908fd1d90 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java @@ -21,6 +21,8 @@ import com.amazonaws.AbortedException; import com.amazonaws.AmazonClientException; import com.amazonaws.AmazonServiceException; +import com.amazonaws.ClientConfiguration; +import com.amazonaws.Protocol; import com.amazonaws.SdkBaseException; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.EnvironmentVariableCredentialsProvider; @@ -44,15 +46,18 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.s3a.auth.NoAuthWithAWSException; import org.apache.hadoop.fs.s3native.S3xLoginHelper; import org.apache.hadoop.net.ConnectTimeoutException; import org.apache.hadoop.security.ProviderUtils; +import org.apache.hadoop.util.VersionInfo; import com.google.common.collect.Lists; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; +import java.io.Closeable; import java.io.EOFException; import java.io.FileNotFoundException; import java.io.IOException; @@ -174,11 +179,17 @@ public static IOException translateException(@Nullable String operation, // call considered an sign of connectivity failure return (EOFException)new EOFException(message).initCause(exception); } + if (exception instanceof NoAuthWithAWSException) { + // the exception raised by AWSCredentialProvider list if the + // credentials were not accepted. + return (AccessDeniedException)new AccessDeniedException(path, null, + exception.toString()).initCause(exception); + } return new AWSClientIOException(message, exception); } else { if (exception instanceof AmazonDynamoDBException) { // special handling for dynamo DB exceptions - return translateDynamoDBException(message, + return translateDynamoDBException(path, message, (AmazonDynamoDBException)exception); } IOException ioe; @@ -373,20 +384,45 @@ public static boolean signifiesConnectionBroken(SdkBaseException ex) { /** * Translate a DynamoDB exception into an IOException. + * + * @param path path in the DDB * @param message preformatted message for the exception - * @param ex exception + * @param ddbException exception * @return an exception to throw. */ - public static IOException translateDynamoDBException(String message, - AmazonDynamoDBException ex) { - if (isThrottleException(ex)) { - return new AWSServiceThrottledException(message, ex); + public static IOException translateDynamoDBException(final String path, + final String message, + final AmazonDynamoDBException ddbException) { + if (isThrottleException(ddbException)) { + return new AWSServiceThrottledException(message, ddbException); } - if (ex instanceof ResourceNotFoundException) { + if (ddbException instanceof ResourceNotFoundException) { return (FileNotFoundException) new FileNotFoundException(message) - .initCause(ex); + .initCause(ddbException); } - return new AWSServiceIOException(message, ex); + final int statusCode = ddbException.getStatusCode(); + final String errorCode = ddbException.getErrorCode(); + IOException result = null; + // 400 gets used a lot by DDB + if (statusCode == 400) { + switch (errorCode) { + case "AccessDeniedException": + result = (IOException) new AccessDeniedException( + path, + null, + ddbException.toString()) + .initCause(ddbException); + break; + + default: + result = new AWSBadRequestException(message, ddbException); + } + + } + if (result == null) { + result = new AWSServiceIOException(message, ddbException); + } + return result; } /** @@ -738,6 +774,29 @@ public static String lookupPassword( String baseKey, String overrideVal) throws IOException { + return lookupPassword(bucket, conf, baseKey, overrideVal, ""); + } + + /** + * Get a password from a configuration, including JCEKS files, handling both + * the absolute key and bucket override. + * @param bucket bucket or "" if none known + * @param conf configuration + * @param baseKey base key to look up, e.g "fs.s3a.secret.key" + * @param overrideVal override value: if non empty this is used instead of + * querying the configuration. + * @param defVal value to return if there is no password + * @return a password or the value of defVal. + * @throws IOException on any IO problem + * @throws IllegalArgumentException bad arguments + */ + public static String lookupPassword( + String bucket, + Configuration conf, + String baseKey, + String overrideVal, + String defVal) + throws IOException { String initialVal; Preconditions.checkArgument(baseKey.startsWith(FS_S3A_PREFIX), "%s does not start with $%s", baseKey, FS_S3A_PREFIX); @@ -757,7 +816,7 @@ public static String lookupPassword( // no bucket, make the initial value the override value initialVal = overrideVal; } - return getPassword(conf, baseKey, initialVal); + return getPassword(conf, baseKey, initialVal, defVal); } /** @@ -1059,6 +1118,134 @@ public static void deleteWithWarning(FileSystem fs, } } + /** + * Create a new AWS {@code ClientConfiguration}. + * All clients to AWS services MUST use this for consistent setup + * of connectivity, UA, proxy settings. + * @param conf The Hadoop configuration + * @param bucket Optional bucket to use to look up per-bucket proxy secrets + * @return new AWS client configuration + */ + public static ClientConfiguration createAwsConf(Configuration conf, + String bucket) + throws IOException { + final ClientConfiguration awsConf = new ClientConfiguration(); + initConnectionSettings(conf, awsConf); + initProxySupport(conf, bucket, awsConf); + initUserAgent(conf, awsConf); + return awsConf; + } + + /** + * Initializes all AWS SDK settings related to connection management. + * + * @param conf Hadoop configuration + * @param awsConf AWS SDK configuration + */ + public static void initConnectionSettings(Configuration conf, + ClientConfiguration awsConf) { + awsConf.setMaxConnections(intOption(conf, MAXIMUM_CONNECTIONS, + DEFAULT_MAXIMUM_CONNECTIONS, 1)); + boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS, + DEFAULT_SECURE_CONNECTIONS); + awsConf.setProtocol(secureConnections ? Protocol.HTTPS : Protocol.HTTP); + awsConf.setMaxErrorRetry(intOption(conf, MAX_ERROR_RETRIES, + DEFAULT_MAX_ERROR_RETRIES, 0)); + awsConf.setConnectionTimeout(intOption(conf, ESTABLISH_TIMEOUT, + DEFAULT_ESTABLISH_TIMEOUT, 0)); + awsConf.setSocketTimeout(intOption(conf, SOCKET_TIMEOUT, + DEFAULT_SOCKET_TIMEOUT, 0)); + int sockSendBuffer = intOption(conf, SOCKET_SEND_BUFFER, + DEFAULT_SOCKET_SEND_BUFFER, 2048); + int sockRecvBuffer = intOption(conf, SOCKET_RECV_BUFFER, + DEFAULT_SOCKET_RECV_BUFFER, 2048); + awsConf.setSocketBufferSizeHints(sockSendBuffer, sockRecvBuffer); + String signerOverride = conf.getTrimmed(SIGNING_ALGORITHM, ""); + if (!signerOverride.isEmpty()) { + LOG.debug("Signer override = {}", signerOverride); + awsConf.setSignerOverride(signerOverride); + } + } + + /** + * Initializes AWS SDK proxy support in the AWS client configuration + * if the S3A settings enable it. + * + * @param conf Hadoop configuration + * @param bucket Optional bucket to use to look up per-bucket proxy secrets + * @param awsConf AWS SDK configuration to update + * @throws IllegalArgumentException if misconfigured + * @throws IOException problem getting username/secret from password source. + */ + public static void initProxySupport(Configuration conf, + String bucket, + ClientConfiguration awsConf) throws IllegalArgumentException, + IOException { + String proxyHost = conf.getTrimmed(PROXY_HOST, ""); + int proxyPort = conf.getInt(PROXY_PORT, -1); + if (!proxyHost.isEmpty()) { + awsConf.setProxyHost(proxyHost); + if (proxyPort >= 0) { + awsConf.setProxyPort(proxyPort); + } else { + if (conf.getBoolean(SECURE_CONNECTIONS, DEFAULT_SECURE_CONNECTIONS)) { + LOG.warn("Proxy host set without port. Using HTTPS default 443"); + awsConf.setProxyPort(443); + } else { + LOG.warn("Proxy host set without port. Using HTTP default 80"); + awsConf.setProxyPort(80); + } + } + final String proxyUsername = lookupPassword(bucket, conf, PROXY_USERNAME, + null, null); + final String proxyPassword = lookupPassword(bucket, conf, PROXY_PASSWORD, + null, null); + if ((proxyUsername == null) != (proxyPassword == null)) { + String msg = "Proxy error: " + PROXY_USERNAME + " or " + + PROXY_PASSWORD + " set without the other."; + LOG.error(msg); + throw new IllegalArgumentException(msg); + } + awsConf.setProxyUsername(proxyUsername); + awsConf.setProxyPassword(proxyPassword); + awsConf.setProxyDomain(conf.getTrimmed(PROXY_DOMAIN)); + awsConf.setProxyWorkstation(conf.getTrimmed(PROXY_WORKSTATION)); + if (LOG.isDebugEnabled()) { + LOG.debug("Using proxy server {}:{} as user {} with password {} on " + + "domain {} as workstation {}", awsConf.getProxyHost(), + awsConf.getProxyPort(), + String.valueOf(awsConf.getProxyUsername()), + awsConf.getProxyPassword(), awsConf.getProxyDomain(), + awsConf.getProxyWorkstation()); + } + } else if (proxyPort >= 0) { + String msg = + "Proxy error: " + PROXY_PORT + " set without " + PROXY_HOST; + LOG.error(msg); + throw new IllegalArgumentException(msg); + } + } + + /** + * Initializes the User-Agent header to send in HTTP requests to AWS + * services. We always include the Hadoop version number. The user also + * may set an optional custom prefix to put in front of the Hadoop version + * number. The AWS SDK internally appends its own information, which seems + * to include the AWS SDK version, OS and JVM version. + * + * @param conf Hadoop configuration + * @param awsConf AWS SDK configuration to update + */ + private static void initUserAgent(Configuration conf, + ClientConfiguration awsConf) { + String userAgent = "Hadoop " + VersionInfo.getVersion(); + String userAgentPrefix = conf.getTrimmed(USER_AGENT_PREFIX, ""); + if (!userAgentPrefix.isEmpty()) { + userAgent = userAgentPrefix + ", " + userAgent; + } + LOG.debug("Using User-Agent: {}", userAgent); + awsConf.setUserAgentPrefix(userAgent); + } /** * An interface for use in lambda-expressions working with @@ -1289,18 +1476,40 @@ private static String passwordDiagnostics(String pass, String description) { * @param closeables the objects to close */ public static void closeAll(Logger log, - java.io.Closeable... closeables) { - for (java.io.Closeable c : closeables) { + Closeable... closeables) { + if (log == null) { + log = LOG; + } + for (Closeable c : closeables) { if (c != null) { try { - if (log != null) { - log.debug("Closing {}", c); - } + log.debug("Closing {}", c); c.close(); } catch (Exception e) { - if (log != null && log.isDebugEnabled()) { - log.debug("Exception in closing {}", c, e); - } + log.debug("Exception in closing {}", c, e); + } + } + } + } + /** + * Close the Closeable objects and ignore any Exception or + * null pointers. + * (This is the SLF4J equivalent of that in {@code IOUtils}). + * @param log the log to log at debug level. Can be null. + * @param closeables the objects to close + */ + public static void closeAutocloseables(Logger log, + AutoCloseable... closeables) { + if (log == null) { + log = LOG; + } + for (AutoCloseable c : closeables) { + if (c != null) { + try { + log.debug("Closing {}", c); + c.close(); + } catch (Exception e) { + log.debug("Exception in closing {}", c, e); } } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java index 9abb362ed4..b237e850d2 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.net.URI; +import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.services.s3.AmazonS3; import org.apache.hadoop.classification.InterfaceAudience; @@ -37,9 +38,13 @@ public interface S3ClientFactory { * Creates a new {@link AmazonS3} client. * * @param name raw input S3A file system URI + * @param bucket Optional bucket to use to look up per-bucket proxy secrets + * @param credentialSet credentials to use * @return S3 client * @throws IOException IO problem */ - AmazonS3 createS3Client(URI name) throws IOException; + AmazonS3 createS3Client(URI name, + final String bucket, + final AWSCredentialsProvider credentialSet) throws IOException; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/AssumedRoleCredentialProvider.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/AssumedRoleCredentialProvider.java index fdaf9bd544..e5a363952f 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/AssumedRoleCredentialProvider.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/AssumedRoleCredentialProvider.java @@ -24,9 +24,11 @@ import java.util.Locale; import java.util.concurrent.TimeUnit; +import com.amazonaws.AmazonClientException; import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider; +import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder; import com.amazonaws.services.securitytoken.model.AWSSecurityTokenServiceException; import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; @@ -37,6 +39,9 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.s3a.AWSCredentialProviderList; +import org.apache.hadoop.fs.s3a.S3AUtils; +import org.apache.hadoop.fs.s3a.Invoker; +import org.apache.hadoop.fs.s3a.S3ARetryPolicy; import org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider; import org.apache.hadoop.security.UserGroupInformation; @@ -77,17 +82,21 @@ public class AssumedRoleCredentialProvider implements AWSCredentialsProvider, private final String arn; + private final AWSCredentialProviderList credentialsToSTS; + + private final Invoker invoker; + /** * Instantiate. * This calls {@link #getCredentials()} to fail fast on the inner * role credential retrieval. - * @param uri URI of endpoint. + * @param fsUri URI of the filesystem. * @param conf configuration * @throws IOException on IO problems and some parameter checking * @throws IllegalArgumentException invalid parameters * @throws AWSSecurityTokenServiceException problems getting credentials */ - public AssumedRoleCredentialProvider(URI uri, Configuration conf) + public AssumedRoleCredentialProvider(URI fsUri, Configuration conf) throws IOException { arn = conf.getTrimmed(ASSUMED_ROLE_ARN, ""); @@ -99,13 +108,14 @@ public AssumedRoleCredentialProvider(URI uri, Configuration conf) Class[] awsClasses = loadAWSProviderClasses(conf, ASSUMED_ROLE_CREDENTIALS_PROVIDER, SimpleAWSCredentialsProvider.class); - AWSCredentialProviderList credentials = new AWSCredentialProviderList(); + credentialsToSTS = new AWSCredentialProviderList(); for (Class aClass : awsClasses) { if (this.getClass().equals(aClass)) { throw new IOException(E_FORBIDDEN_PROVIDER); } - credentials.add(createAWSCredentialProvider(conf, aClass, uri)); + credentialsToSTS.add(createAWSCredentialProvider(conf, aClass, fsUri)); } + LOG.debug("Credentials to obtain role credentials: {}", credentialsToSTS); // then the STS binding sessionName = conf.getTrimmed(ASSUMED_ROLE_SESSION_NAME, @@ -122,14 +132,27 @@ public AssumedRoleCredentialProvider(URI uri, Configuration conf) LOG.debug("Scope down policy {}", policy); builder.withScopeDownPolicy(policy); } - String epr = conf.get(ASSUMED_ROLE_STS_ENDPOINT, ""); - if (StringUtils.isNotEmpty(epr)) { - LOG.debug("STS Endpoint: {}", epr); - builder.withServiceEndpoint(epr); - } - LOG.debug("Credentials to obtain role credentials: {}", credentials); - builder.withLongLivedCredentialsProvider(credentials); + String endpoint = conf.get(ASSUMED_ROLE_STS_ENDPOINT, ""); + String region = conf.get(ASSUMED_ROLE_STS_ENDPOINT_REGION, + ASSUMED_ROLE_STS_ENDPOINT_REGION_DEFAULT); + AWSSecurityTokenServiceClientBuilder stsbuilder = + STSClientFactory.builder( + conf, + fsUri.getHost(), + credentialsToSTS, + endpoint, + region); + // the STS client is not tracked for a shutdown in close(), because it + // (currently) throws an UnsupportedOperationException in shutdown(). + builder.withStsClient(stsbuilder.build()); + + //now build the provider stsProvider = builder.build(); + + // to handle STS throttling by the AWS account, we + // need to retry + invoker = new Invoker(new S3ARetryPolicy(conf), this::operationRetried); + // and force in a fail-fast check just to keep the stack traces less // convoluted getCredentials(); @@ -143,7 +166,17 @@ public AssumedRoleCredentialProvider(URI uri, Configuration conf) @Override public AWSCredentials getCredentials() { try { - return stsProvider.getCredentials(); + return invoker.retryUntranslated("getCredentials", + true, + stsProvider::getCredentials); + } catch (IOException e) { + // this is in the signature of retryUntranslated; + // its hard to see how this could be raised, but for + // completeness, it is wrapped as an Amazon Client Exception + // and rethrown. + throw new AmazonClientException( + "getCredentials failed: " + e, + e); } catch (AWSSecurityTokenServiceException e) { LOG.error("Failed to get credentials for role {}", arn, e); @@ -161,7 +194,7 @@ public void refresh() { */ @Override public void close() { - stsProvider.close(); + S3AUtils.closeAutocloseables(LOG, stsProvider, credentialsToSTS); } @Override @@ -205,4 +238,23 @@ static String sanitize(String session) { return r.toString(); } + /** + * Callback from {@link Invoker} when an operation is retried. + * @param text text of the operation + * @param ex exception + * @param retries number of retries + * @param idempotent is the method idempotent + */ + public void operationRetried( + String text, + Exception ex, + int retries, + boolean idempotent) { + if (retries == 0) { + // log on the first retry attempt of the credential access. + // At worst, this means one log entry every intermittent renewal + // time. + LOG.info("Retried {}", text); + } + } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/NoAuthWithAWSException.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/NoAuthWithAWSException.java new file mode 100644 index 0000000000..f48e17a621 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/NoAuthWithAWSException.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.auth; + +import com.amazonaws.AmazonClientException; + +/** + * A specific subclass of {@code AmazonClientException} which can + * be used in the retry logic to fail fast when there is any + * authentication problem. + */ +public class NoAuthWithAWSException extends AmazonClientException { + + public NoAuthWithAWSException(final String message, final Throwable t) { + super(message, t); + } + + public NoAuthWithAWSException(final String message) { + super(message); + } +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/RoleModel.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/RoleModel.java index ca2c993a20..d4568b0dc0 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/RoleModel.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/RoleModel.java @@ -205,6 +205,14 @@ public static Policy policy(Statement... statements) { return new Policy(statements); } + /** + * From a set of statements, create a policy. + * @param statements statements + * @return the policy + */ + public static Policy policy(final List statements) { + return new Policy(statements); + } /** * Effect options. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/RolePolicies.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/RolePolicies.java index 6711eee25a..34ed2958e4 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/RolePolicies.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/RolePolicies.java @@ -29,6 +29,55 @@ public final class RolePolicies { private RolePolicies() { } + /** All KMS operations: {@value}.*/ + public static final String KMS_ALL_OPERATIONS = "kms:*"; + + /** KMS encryption. This is Not used by SSE-KMS: {@value}. */ + public static final String KMS_ENCRYPT = "kms:Encrypt"; + + /** + * Decrypt data encrypted with SSE-KMS: {@value}. + */ + public static final String KMS_DECRYPT = "kms:Decrypt"; + + /** + * Arn for all KMS keys: {@value}. + */ + public static final String KMS_ALL_KEYS = "arn:aws:kms:*"; + + /** + * This is used by S3 to generate a per-object encryption key and + * the encrypted value of this, the latter being what it tags + * the object with for later decryption: {@value}. + */ + public static final String KMS_GENERATE_DATA_KEY = "kms:GenerateDataKey"; + + /** + * Actions needed to read and write SSE-KMS data. + */ + private static final String[] KMS_KEY_RW = + new String[]{KMS_DECRYPT, KMS_GENERATE_DATA_KEY}; + + /** + * Actions needed to read SSE-KMS data. + */ + private static final String[] KMS_KEY_READ = + new String[] {KMS_DECRYPT}; + + /** + * Statement to allow KMS R/W access access, so full use of + * SSE-KMS. + */ + public static final Statement STATEMENT_ALLOW_SSE_KMS_RW = + statement(true, KMS_ALL_KEYS, KMS_KEY_RW); + + /** + * Statement to allow read access to KMS keys, so the ability + * to read SSE-KMS data,, but not decrypt it. + */ + public static final Statement STATEMENT_ALLOW_SSE_KMS_READ = + statement(true, KMS_ALL_KEYS, KMS_KEY_READ); + /** * All S3 operations: {@value}. */ @@ -52,7 +101,6 @@ private RolePolicies() { public static final String S3_LIST_BUCKET_MULTPART_UPLOADS = "s3:ListBucketMultipartUploads"; - /** * List multipart upload is needed for the S3A Commit protocols. */ @@ -97,6 +145,8 @@ private RolePolicies() { public static final String S3_GET_OBJECT_VERSION = "s3:GetObjectVersion"; + public static final String S3_GET_BUCKET_LOCATION = "s3:GetBucketLocation"; + public static final String S3_GET_OBJECT_VERSION_ACL = "s3:GetObjectVersionAcl"; @@ -128,7 +178,8 @@ private RolePolicies() { public static final String S3_RESTORE_OBJECT = "s3:RestoreObject"; /** - * Actions needed to read data from S3 through S3A. + * Actions needed to read a file in S3 through S3A, excluding + * S3Guard and SSE-KMS. */ public static final String[] S3_PATH_READ_OPERATIONS = new String[]{ @@ -136,18 +187,20 @@ private RolePolicies() { }; /** - * Actions needed to read data from S3 through S3A. + * Base actions needed to read data from S3 through S3A, + * excluding SSE-KMS data and S3Guard-ed buckets. */ public static final String[] S3_ROOT_READ_OPERATIONS = new String[]{ S3_LIST_BUCKET, S3_LIST_BUCKET_MULTPART_UPLOADS, - S3_GET_OBJECT, + S3_ALL_GET, }; /** * Actions needed to write data to an S3A Path. - * This includes the appropriate read operations. + * This includes the appropriate read operations, but + * not SSE-KMS or S3Guard support. */ public static final String[] S3_PATH_RW_OPERATIONS = new String[]{ @@ -163,6 +216,7 @@ private RolePolicies() { * This is purely the extra operations needed for writing atop * of the read operation set. * Deny these and a path is still readable, but not writeable. + * Excludes: SSE-KMS and S3Guard permissions. */ public static final String[] S3_PATH_WRITE_OPERATIONS = new String[]{ @@ -173,6 +227,7 @@ private RolePolicies() { /** * Actions needed for R/W IO from the root of a bucket. + * Excludes: SSE-KMS and S3Guard permissions. */ public static final String[] S3_ROOT_RW_OPERATIONS = new String[]{ @@ -190,26 +245,57 @@ private RolePolicies() { */ public static final String DDB_ALL_OPERATIONS = "dynamodb:*"; - public static final String DDB_ADMIN = "dynamodb:*"; + /** + * Operations needed for DDB/S3Guard Admin. + * For now: make this {@link #DDB_ALL_OPERATIONS}. + */ + public static final String DDB_ADMIN = DDB_ALL_OPERATIONS; + /** + * Permission for DDB describeTable() operation: {@value}. + * This is used during initialization. + */ + public static final String DDB_DESCRIBE_TABLE = "dynamodb:DescribeTable"; - public static final String DDB_BATCH_WRITE = "dynamodb:BatchWriteItem"; + /** + * Permission to query the DDB table: {@value}. + */ + public static final String DDB_QUERY = "dynamodb:Query"; + + /** + * Permission for DDB operation to get a record: {@value}. + */ + public static final String DDB_GET_ITEM = "dynamodb:GetItem"; + + /** + * Permission for DDB write record operation: {@value}. + */ + public static final String DDB_PUT_ITEM = "dynamodb:PutItem"; + + /** + * Permission for DDB update single item operation: {@value}. + */ + public static final String DDB_UPDATE_ITEM = "dynamodb:UpdateItem"; + + /** + * Permission for DDB delete operation: {@value}. + */ + public static final String DDB_DELETE_ITEM = "dynamodb:DeleteItem"; + + /** + * Permission for DDB operation: {@value}. + */ + public static final String DDB_BATCH_GET_ITEM = "dynamodb:BatchGetItem"; + + /** + * Batch write permission for DDB: {@value}. + */ + public static final String DDB_BATCH_WRITE_ITEM = "dynamodb:BatchWriteItem"; /** * All DynamoDB tables: {@value}. */ - public static final String ALL_DDB_TABLES = "arn:aws:dynamodb:::*"; - - - - public static final String WILDCARD = "*"; - - /** - * Allow all S3 Operations. - */ - public static final Statement STATEMENT_ALL_S3 = statement(true, - S3_ALL_BUCKETS, - S3_ALL_OPERATIONS); + public static final String ALL_DDB_TABLES = "arn:aws:dynamodb:*"; /** * Statement to allow all DDB access. @@ -218,11 +304,36 @@ private RolePolicies() { ALL_DDB_TABLES, DDB_ALL_OPERATIONS); /** - * Allow all S3 and S3Guard operations. + * Statement to allow all client operations needed for S3Guard, + * but none of the admin operations. + */ + public static final Statement STATEMENT_S3GUARD_CLIENT = statement(true, + ALL_DDB_TABLES, + DDB_BATCH_GET_ITEM, + DDB_BATCH_WRITE_ITEM, + DDB_DELETE_ITEM, + DDB_DESCRIBE_TABLE, + DDB_GET_ITEM, + DDB_PUT_ITEM, + DDB_QUERY, + DDB_UPDATE_ITEM + ); + + /** + * Allow all S3 Operations. + * This does not cover DDB or S3-KMS + */ + public static final Statement STATEMENT_ALL_S3 = statement(true, + S3_ALL_BUCKETS, + S3_ALL_OPERATIONS); + + /** + * Policy for all S3 and S3Guard operations, and SSE-KMS. */ public static final Policy ALLOW_S3_AND_SGUARD = policy( STATEMENT_ALL_S3, - STATEMENT_ALL_DDB + STATEMENT_ALL_DDB, + STATEMENT_ALLOW_SSE_KMS_RW ); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/STSClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/STSClientFactory.java new file mode 100644 index 0000000000..10bf88c61f --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/STSClientFactory.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.auth; + +import java.io.IOException; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.client.builder.AwsClientBuilder; +import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder; +import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.s3a.S3AUtils; + +/** + * Factory for creating STS Clients. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class STSClientFactory { + + private static final Logger LOG = + LoggerFactory.getLogger(STSClientFactory.class); + + /** + * Create the builder ready for any final configuration options. + * Picks up connection settings from the Hadoop configuration, including + * proxy secrets. + * @param conf Configuration to act as source of options. + * @param bucket Optional bucket to use to look up per-bucket proxy secrets + * @param credentials AWS credential chain to use + * @param stsEndpoint optional endpoint "https://sns.us-west-1.amazonaws.com" + * @param stsRegion the region, e.g "us-west-1" + * @return the builder to call {@code build()} + * @throws IOException problem reading proxy secrets + */ + public static AWSSecurityTokenServiceClientBuilder builder( + final Configuration conf, + final String bucket, + final AWSCredentialsProvider credentials, final String stsEndpoint, + final String stsRegion) throws IOException { + Preconditions.checkArgument(credentials != null, "No credentials"); + final AWSSecurityTokenServiceClientBuilder builder + = AWSSecurityTokenServiceClientBuilder.standard(); + final ClientConfiguration awsConf = S3AUtils.createAwsConf(conf, bucket); + builder.withClientConfiguration(awsConf); + builder.withCredentials(credentials); + if (StringUtils.isNotEmpty(stsEndpoint)) { + LOG.debug("STS Endpoint ={}", stsEndpoint); + builder.withEndpointConfiguration( + new AwsClientBuilder.EndpointConfiguration(stsEndpoint, stsRegion)); + } + return builder; + } + +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBClientFactory.java index 91e64cddf6..9e1d2f41b5 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBClientFactory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBClientFactory.java @@ -34,10 +34,9 @@ import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.fs.s3a.DefaultS3ClientFactory; +import org.apache.hadoop.fs.s3a.S3AUtils; import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_REGION_KEY; -import static org.apache.hadoop.fs.s3a.S3AUtils.createAWSCredentialProviderSet; /** * Interface to create a DynamoDB client. @@ -58,10 +57,14 @@ public interface DynamoDBClientFactory extends Configurable { * it will indicate an error. * * @param defaultRegion the default region of the AmazonDynamoDB client + * @param bucket Optional bucket to use to look up per-bucket proxy secrets + * @param credentials credentials to use for authentication. * @return a new DynamoDB client * @throws IOException if any IO error happens */ - AmazonDynamoDB createDynamoDBClient(String defaultRegion) throws IOException; + AmazonDynamoDB createDynamoDBClient(final String defaultRegion, + final String bucket, + final AWSCredentialsProvider credentials) throws IOException; /** * The default implementation for creating an AmazonDynamoDB. @@ -69,16 +72,15 @@ public interface DynamoDBClientFactory extends Configurable { class DefaultDynamoDBClientFactory extends Configured implements DynamoDBClientFactory { @Override - public AmazonDynamoDB createDynamoDBClient(String defaultRegion) + public AmazonDynamoDB createDynamoDBClient(String defaultRegion, + final String bucket, + final AWSCredentialsProvider credentials) throws IOException { Preconditions.checkNotNull(getConf(), "Should have been configured before usage"); final Configuration conf = getConf(); - final AWSCredentialsProvider credentials = - createAWSCredentialProviderSet(null, conf); - final ClientConfiguration awsConf = - DefaultS3ClientFactory.createAwsConf(conf); + final ClientConfiguration awsConf = S3AUtils.createAwsConf(conf, bucket); final String region = getRegion(conf, defaultRegion); LOG.debug("Creating DynamoDB client in region {}", region); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java index 43849b1b14..ba80b88635 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.net.URI; +import java.nio.file.AccessDeniedException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -34,6 +35,7 @@ import java.util.concurrent.atomic.AtomicInteger; import com.amazonaws.AmazonClientException; +import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome; import com.amazonaws.services.dynamodbv2.document.DynamoDB; @@ -67,6 +69,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.AWSCredentialProviderList; import org.apache.hadoop.fs.s3a.Constants; import org.apache.hadoop.fs.s3a.Invoker; import org.apache.hadoop.fs.s3a.Retries; @@ -75,13 +78,14 @@ import org.apache.hadoop.fs.s3a.S3ARetryPolicy; import org.apache.hadoop.fs.s3a.S3AUtils; import org.apache.hadoop.fs.s3a.Tristate; +import org.apache.hadoop.fs.s3a.auth.RolePolicies; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.ReflectionUtils; import static org.apache.hadoop.fs.s3a.Constants.*; -import static org.apache.hadoop.fs.s3a.S3AUtils.translateException; +import static org.apache.hadoop.fs.s3a.S3AUtils.*; import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.*; import static org.apache.hadoop.fs.s3a.s3guard.S3Guard.*; @@ -207,6 +211,7 @@ public class DynamoDBMetadataStore implements MetadataStore { new ValueMap().withBoolean(":false", false); private DynamoDB dynamoDB; + private AWSCredentialProviderList credentials; private String region; private Table table; private String tableName; @@ -242,10 +247,16 @@ public class DynamoDBMetadataStore implements MetadataStore { * A utility function to create DynamoDB instance. * @param conf the file system configuration * @param s3Region region of the associated S3 bucket (if any). + * @param bucket Optional bucket to use to look up per-bucket proxy secrets + * @param credentials credentials. * @return DynamoDB instance. * @throws IOException I/O error. */ - private static DynamoDB createDynamoDB(Configuration conf, String s3Region) + private static DynamoDB createDynamoDB( + final Configuration conf, + final String s3Region, + final String bucket, + final AWSCredentialsProvider credentials) throws IOException { Preconditions.checkNotNull(conf); final Class cls = conf.getClass( @@ -254,10 +265,18 @@ private static DynamoDB createDynamoDB(Configuration conf, String s3Region) DynamoDBClientFactory.class); LOG.debug("Creating DynamoDB client {} with S3 region {}", cls, s3Region); final AmazonDynamoDB dynamoDBClient = ReflectionUtils.newInstance(cls, conf) - .createDynamoDBClient(s3Region); + .createDynamoDBClient(s3Region, bucket, credentials); return new DynamoDB(dynamoDBClient); } + /** + * {@inheritDoc}. + * The credentials for authenticating with S3 are requested from the + * FS via {@link S3AFileSystem#shareCredentials(String)}; this will + * increment the reference counter of these credentials. + * @param fs {@code S3AFileSystem} associated with the MetadataStore + * @throws IOException on a failure + */ @Override @Retries.OnceRaw public void initialize(FileSystem fs) throws IOException { @@ -274,11 +293,23 @@ public void initialize(FileSystem fs) throws IOException { LOG.debug("Overriding S3 region with configured DynamoDB region: {}", region); } else { - region = owner.getBucketLocation(); + try { + region = owner.getBucketLocation(); + } catch (AccessDeniedException e) { + // access denied here == can't call getBucket. Report meaningfully + URI uri = owner.getUri(); + LOG.error("Failed to get bucket location from S3 bucket {}", + uri); + throw (IOException)new AccessDeniedException( + "S3 client role lacks permission " + + RolePolicies.S3_GET_BUCKET_LOCATION + " for " + uri) + .initCause(e); + } LOG.debug("Inferring DynamoDB region from S3 bucket: {}", region); } username = owner.getUsername(); - dynamoDB = createDynamoDB(conf, region); + credentials = owner.shareCredentials("s3guard"); + dynamoDB = createDynamoDB(conf, region, bucket, credentials); // use the bucket as the DynamoDB table name if not specified in config tableName = conf.getTrimmed(S3GUARD_DDB_TABLE_NAME_KEY, bucket); @@ -311,6 +342,9 @@ public void initialize(FileSystem fs) throws IOException { * must declare the table name and region in the * {@link Constants#S3GUARD_DDB_TABLE_NAME_KEY} and * {@link Constants#S3GUARD_DDB_REGION_KEY} respectively. + * It also creates a new credential provider list from the configuration, + * using the base fs.s3a.* options, as there is no bucket to infer per-bucket + * settings from. * * @see #initialize(FileSystem) * @throws IOException if there is an error @@ -327,7 +361,8 @@ public void initialize(Configuration config) throws IOException { region = conf.getTrimmed(S3GUARD_DDB_REGION_KEY); Preconditions.checkArgument(!StringUtils.isEmpty(region), "No DynamoDB region configured"); - dynamoDB = createDynamoDB(conf, region); + credentials = createAWSCredentialProviderSet(null, conf); + dynamoDB = createDynamoDB(conf, region, null, credentials); username = UserGroupInformation.getCurrentUser().getShortUserName(); initDataAccessRetries(conf); @@ -778,12 +813,17 @@ public synchronized void close() { if (instrumentation != null) { instrumentation.storeClosed(); } - if (dynamoDB != null) { - LOG.debug("Shutting down {}", this); - dynamoDB.shutdown(); - dynamoDB = null; + try { + if (dynamoDB != null) { + LOG.debug("Shutting down {}", this); + dynamoDB.shutdown(); + dynamoDB = null; + } + } finally { + closeAutocloseables(LOG, credentials); + credentials = null; } - } +} @Override @Retries.OnceTranslated diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/assumed_roles.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/assumed_roles.md index 3afd63fbd8..8af045776c 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/assumed_roles.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/assumed_roles.md @@ -29,7 +29,7 @@ assumed roles for different buckets. *IAM Assumed Roles are unlikely to be supported by third-party systems supporting the S3 APIs.* -## Using IAM Assumed Roles +## Using IAM Assumed Roles ### Before You Begin @@ -40,6 +40,8 @@ are, how to configure their policies, etc. * You need a pair of long-lived IAM User credentials, not the root account set. * Have the AWS CLI installed, and test that it works there. * Give the role access to S3, and, if using S3Guard, to DynamoDB. +* For working with data encrypted with SSE-KMS, the role must +have access to the appropriate KMS keys. Trying to learn how IAM Assumed Roles work by debugging stack traces from the S3A client is "suboptimal". @@ -51,7 +53,7 @@ To use assumed roles, the client must be configured to use the in the configuration option `fs.s3a.aws.credentials.provider`. This AWS Credential provider will read in the `fs.s3a.assumed.role` options needed to connect to the -Session Token Service [Assumed Role API](https://docs.aws.amazon.com/STS/latest/APIReference/API_AssumeRole.html), +Security Token Service [Assumed Role API](https://docs.aws.amazon.com/STS/latest/APIReference/API_AssumeRole.html), first authenticating with the full credentials, then assuming the specific role specified. It will then refresh this login at the configured rate of `fs.s3a.assumed.role.session.duration` @@ -69,7 +71,7 @@ which uses `fs.s3a.access.key` and `fs.s3a.secret.key`. Note: although you can list other AWS credential providers in to the Assumed Role Credential Provider, it can only cause confusion. -### Using Assumed Roles +### Configuring Assumed Roles To use assumed roles, the S3A client credentials provider must be set to the `AssumedRoleCredentialProvider`, and `fs.s3a.assumed.role.arn` to @@ -78,7 +80,6 @@ the previously created ARN. ```xml fs.s3a.aws.credentials.provider - org.apache.hadoop.fs.s3a.AssumedRoleCredentialProvider org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider @@ -159,7 +160,18 @@ Here are the full set of configuration options. fs.s3a.assumed.role.sts.endpoint - AWS Simple Token Service Endpoint. If unset, uses the default endpoint. + AWS Security Token Service Endpoint. If unset, uses the default endpoint. + Only used if AssumedRoleCredentialProvider is the AWS credential provider. + + + + + fs.s3a.assumed.role.sts.endpoint.region + us-west-1 + + AWS Security Token Service Endpoint's region; + Needed if fs.s3a.assumed.role.sts.endpoint points to an endpoint + other than the default one and the v4 signature is used. Only used if AssumedRoleCredentialProvider is the AWS credential provider. @@ -194,39 +206,101 @@ These lists represent the minimum actions to which the client's principal must have in order to work with a bucket. -### Read Access Permissions +### Read Access Permissions Permissions which must be granted when reading from a bucket: -| Action | S3A operations | -|--------|----------| -| `s3:ListBucket` | `listStatus()`, `getFileStatus()` and elsewhere | -| `s3:GetObject` | `getFileStatus()`, `open()` and elsewhere | -| `s3:ListBucketMultipartUploads` | Aborting/cleaning up S3A commit operations| +``` +s3:Get* +s3:ListBucket +``` +When using S3Guard, the client needs the appropriate +DynamoDB access permissions -The `s3:ListBucketMultipartUploads` is only needed when committing work -via the [S3A committers](committers.html). -However, it must be granted to the root path in order to safely clean up jobs. -It is simplest to permit this in all buckets, even if it is only actually -needed when writing data. +To use SSE-KMS encryption, the client needs the +SSE-KMS Permissions to access the +KMS key(s). +### Write Access Permissions -### Write Access Permissions +These permissions must all be granted for write access: -These permissions must *also* be granted for write access: +``` +s3:Get* +s3:Delete* +s3:Put* +s3:ListBucket +s3:ListBucketMultipartUploads +s3:AbortMultipartUpload +``` +### SSE-KMS Permissions -| Action | S3A operations | -|--------|----------| -| `s3:PutObject` | `mkdir()`, `create()`, `rename()`, `delete()` | -| `s3:DeleteObject` | `mkdir()`, `create()`, `rename()`, `delete()` | -| `s3:AbortMultipartUpload` | S3A committer `abortJob()` and `cleanup()` operations | -| `s3:ListMultipartUploadParts` | S3A committer `abortJob()` and `cleanup()` operations | +When to read data encrypted using SSE-KMS, the client must have + `kms:Decrypt` permission for the specific key a file was encrypted with. +``` +kms:Decrypt +``` -### Mixed Permissions in a single S3 Bucket +To write data using SSE-KMS, the client must have all the following permissions. + +``` +kms:Decrypt +kms:GenerateDataKey +``` + +This includes renaming: renamed files are encrypted with the encryption key +of the current S3A client; it must decrypt the source file first. + +If the caller doesn't have these permissions, the operation will fail with an +`AccessDeniedException`: the S3 Store does not provide the specifics of +the cause of the failure. + +### S3Guard Permissions + +To use S3Guard, all clients must have a subset of the +[AWS DynamoDB Permissions](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/api-permissions-reference.html). + +To work with buckets protected with S3Guard, the client must have +all the following rights on the DynamoDB Table used to protect that bucket. + +``` +dynamodb:BatchGetItem +dynamodb:BatchWriteItem +dynamodb:DeleteItem +dynamodb:DescribeTable +dynamodb:GetItem +dynamodb:PutItem +dynamodb:Query +dynamodb:UpdateItem +``` + +This is true, *even if the client only has read access to the data*. + +For the `hadoop s3guard` table management commands, _extra_ permissions are required: + +``` +dynamodb:CreateTable +dynamodb:DescribeLimits +dynamodb:DeleteTable +dynamodb:Scan +dynamodb:TagResource +dynamodb:UntagResource +dynamodb:UpdateTable +``` + +Without these permissions, tables cannot be created, destroyed or have their IO capacity +changed through the `s3guard set-capacity` call. +The `dynamodb:Scan` permission is needed for `s3guard prune` + +The `dynamodb:CreateTable` permission is needed by a client it tries to +create the DynamoDB table on startup, that is +`fs.s3a.s3guard.ddb.table.create` is `true` and the table does not already exist. + +### Mixed Permissions in a single S3 Bucket Mixing permissions down the "directory tree" is limited only to the extent of supporting writeable directories under @@ -274,7 +348,7 @@ This example has the base bucket read only, and a directory underneath, "Action" : [ "s3:ListBucket", "s3:ListBucketMultipartUploads", - "s3:GetObject" + "s3:Get*" ], "Resource" : "arn:aws:s3:::example-bucket/*" }, { @@ -320,7 +394,7 @@ the command line before trying to use the S3A client. `hadoop fs -mkdirs -p s3a://bucket/path/p1/` -### IOException: "Unset property fs.s3a.assumed.role.arn" +### IOException: "Unset property fs.s3a.assumed.role.arn" The Assumed Role Credential Provider is enabled, but `fs.s3a.assumed.role.arn` is unset. @@ -339,7 +413,7 @@ java.io.IOException: Unset property fs.s3a.assumed.role.arn at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:474) ``` -### "Not authorized to perform sts:AssumeRole" +### "Not authorized to perform sts:AssumeRole" This can arise if the role ARN set in `fs.s3a.assumed.role.arn` is invalid or one to which the caller has no access. @@ -399,7 +473,8 @@ Caused by: com.amazonaws.services.securitytoken.model.AWSSecurityTokenServiceExc The value of `fs.s3a.assumed.role.session.duration` is out of range. ``` -java.lang.IllegalArgumentException: Assume Role session duration should be in the range of 15min - 1Hr +java.lang.IllegalArgumentException: Assume Role session duration should be in the range of 15min +- 1Hr at com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider$Builder.withRoleSessionDurationSeconds(STSAssumeRoleSessionCredentialsProvider.java:437) at org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider.(AssumedRoleCredentialProvider.java:86) ``` @@ -603,7 +678,7 @@ Caused by: com.amazonaws.services.securitytoken.model.AWSSecurityTokenServiceExc ### `AccessDeniedException/InvalidClientTokenId`: "The security token included in the request is invalid" -The credentials used to authenticate with the AWS Simple Token Service are invalid. +The credentials used to authenticate with the AWS Security Token Service are invalid. ``` [ERROR] Failures: @@ -682,26 +757,7 @@ org.apache.hadoop.fs.s3a.AWSBadRequestException: Instantiate org.apache.hadoop.f at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3354) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:474) at org.apache.hadoop.fs.Path.getFileSystem(Path.java:361) - at org.apache.hadoop.fs.s3a.ITestAssumeRole.lambda$expectFileSystemFailure$0(ITestAssumeRole.java:70) - at org.apache.hadoop.fs.s3a.ITestAssumeRole.lambda$interceptC$1(ITestAssumeRole.java:84) - at org.apache.hadoop.test.LambdaTestUtils.intercept(LambdaTestUtils.java:491) - at org.apache.hadoop.test.LambdaTestUtils.intercept(LambdaTestUtils.java:377) - at org.apache.hadoop.test.LambdaTestUtils.intercept(LambdaTestUtils.java:446) - at org.apache.hadoop.fs.s3a.ITestAssumeRole.interceptC(ITestAssumeRole.java:82) - at org.apache.hadoop.fs.s3a.ITestAssumeRole.expectFileSystemFailure(ITestAssumeRole.java:68) - at org.apache.hadoop.fs.s3a.ITestAssumeRole.testAssumeRoleBadSession(ITestAssumeRole.java:216) - at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) - at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) - at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) - at java.lang.reflect.Method.invoke(Method.java:498) - at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47) - at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) - at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) - at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) - at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) - at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) - at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) - at org.junit.internal.runners.statements.FailOnTimeout$StatementThread.run(FailOnTimeout.java:74) + Caused by: com.amazonaws.services.securitytoken.model.AWSSecurityTokenServiceException: 1 validation error detected: Value 'Session Names cannot Hava Spaces!' at 'roleSessionName' failed to satisfy constraint: @@ -742,10 +798,11 @@ Caused by: com.amazonaws.services.securitytoken.model.AWSSecurityTokenServiceExc ### `java.nio.file.AccessDeniedException` within a FileSystem API call If an operation fails with an `AccessDeniedException`, then the role does not have -the permission for the S3 Operation invoked during the call +the permission for the S3 Operation invoked during the call. ``` -java.nio.file.AccessDeniedException: s3a://bucket/readonlyDir: rename(s3a://bucket/readonlyDir, s3a://bucket/renameDest) +java.nio.file.AccessDeniedException: s3a://bucket/readonlyDir: + rename(s3a://bucket/readonlyDir, s3a://bucket/renameDest) on s3a://bucket/readonlyDir: com.amazonaws.services.s3.model.AmazonS3Exception: Access Denied (Service: Amazon S3; Status Code: 403; Error Code: AccessDenied; Request ID: 2805F2ABF5246BB1; @@ -795,3 +852,33 @@ check the path for the operation. Make sure that all the read and write permissions are allowed for any bucket/path to which data is being written to, and read permissions for all buckets read from. + +If the bucket is using SSE-KMS to encrypt data: + +1. The caller must have the `kms:Decrypt` permission to read the data. +1. The caller needs `kms:Decrypt` and `kms:GenerateDataKey`. + +Without permissions, the request fails *and there is no explicit message indicating +that this is an encryption-key issue*. + +### `AccessDeniedException` + `AmazonDynamoDBException` + +``` +java.nio.file.AccessDeniedException: bucket1: + com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException: + User: arn:aws:sts::980678866538:assumed-role/s3guard-test-role/test is not authorized to perform: + dynamodb:DescribeTable on resource: arn:aws:dynamodb:us-west-1:980678866538:table/bucket1 + (Service: AmazonDynamoDBv2; Status Code: 400; +``` + +The caller is trying to access an S3 bucket which uses S3Guard, but the caller +lacks the relevant DynamoDB access permissions. + +The `dynamodb:DescribeTable` operation is the first one used in S3Guard to access, +the DynamoDB table, so it is often the first to fail. It can be a sign +that the role has no permissions at all to access the table named in the exception, +or just that this specific permission has been omitted. + +If the role policy requested for the assumed role didn't ask for any DynamoDB +permissions, this is where all attempts to work with a S3Guarded bucket will +fail. Check the value of `fs.s3a.assumed.role.policy` diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md index 7d0f67bb38..2dee10ab98 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md @@ -33,7 +33,7 @@ See also: * [Working with IAM Assumed Roles](./assumed_roles.html) * [Testing](./testing.html) -## Overview +## Overview Apache Hadoop's `hadoop-aws` module provides support for AWS integration. applications to easily use this support. @@ -88,7 +88,7 @@ maintain it. This connector is no longer available: users must migrate to the newer `s3a:` client. -## Getting Started +## Getting Started S3A depends upon two JARs, alongside `hadoop-common` and its dependencies. @@ -1698,6 +1698,6 @@ as configured by the value `fs.s3a.multipart.size`. To disable checksum verification in `distcp`, use the `-skipcrccheck` option: ```bash -hadoop distcp -update -skipcrccheck /user/alice/datasets s3a://alice-backup/datasets +hadoop distcp -update -skipcrccheck -numListstatusThreads 40 /user/alice/datasets s3a://alice-backup/datasets ``` diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java index aa6b5d8659..3214c76a9f 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java @@ -36,14 +36,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.hadoop.fs.s3a.S3ATestConstants.TEST_FS_S3A_NAME; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - import java.io.File; import java.net.URI; import java.security.PrivilegedExceptionAction; @@ -60,6 +52,9 @@ import static org.apache.hadoop.fs.s3a.Constants.*; import static org.apache.hadoop.fs.s3a.S3AUtils.*; import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; +import static org.apache.hadoop.fs.s3a.S3ATestConstants.TEST_FS_S3A_NAME; +import static org.junit.Assert.*; /** * S3A tests for configuration. @@ -134,12 +129,26 @@ public void testProxyConnection() throws Exception { conf.setInt(Constants.PROXY_PORT, 1); String proxy = conf.get(Constants.PROXY_HOST) + ":" + conf.get(Constants.PROXY_PORT); - try { - fs = S3ATestUtils.createTestFileSystem(conf); - fail("Expected a connection error for proxy server at " + proxy); - } catch (AWSClientIOException e) { - // expected - } + expectFSCreateFailure(AWSClientIOException.class, + conf, "when using proxy " + proxy); + } + + /** + * Expect a filesystem to not be created from a configuration + * @return the exception intercepted + * @throws Exception any other exception + */ + private E expectFSCreateFailure( + Class clazz, + Configuration conf, + String text) + throws Exception { + + return intercept(clazz, + () -> { + fs = S3ATestUtils.createTestFileSystem(conf); + return "expected failure creating FS " + text + " got " + fs; + }); } @Test @@ -148,15 +157,13 @@ public void testProxyPortWithoutHost() throws Exception { conf.unset(Constants.PROXY_HOST); conf.setInt(Constants.MAX_ERROR_RETRIES, 2); conf.setInt(Constants.PROXY_PORT, 1); - try { - fs = S3ATestUtils.createTestFileSystem(conf); - fail("Expected a proxy configuration error"); - } catch (IllegalArgumentException e) { - String msg = e.toString(); - if (!msg.contains(Constants.PROXY_HOST) && - !msg.contains(Constants.PROXY_PORT)) { - throw e; - } + IllegalArgumentException e = expectFSCreateFailure( + IllegalArgumentException.class, + conf, "Expected a connection error for proxy server"); + String msg = e.toString(); + if (!msg.contains(Constants.PROXY_HOST) && + !msg.contains(Constants.PROXY_PORT)) { + throw e; } } @@ -167,19 +174,11 @@ public void testAutomaticProxyPortSelection() throws Exception { conf.setInt(Constants.MAX_ERROR_RETRIES, 2); conf.set(Constants.PROXY_HOST, "127.0.0.1"); conf.set(Constants.SECURE_CONNECTIONS, "true"); - try { - fs = S3ATestUtils.createTestFileSystem(conf); - fail("Expected a connection error for proxy server"); - } catch (AWSClientIOException e) { - // expected - } + expectFSCreateFailure(AWSClientIOException.class, + conf, "Expected a connection error for proxy server"); conf.set(Constants.SECURE_CONNECTIONS, "false"); - try { - fs = S3ATestUtils.createTestFileSystem(conf); - fail("Expected a connection error for proxy server"); - } catch (AWSClientIOException e) { - // expected - } + expectFSCreateFailure(AWSClientIOException.class, + conf, "Expected a connection error for proxy server"); } @Test @@ -189,31 +188,31 @@ public void testUsernameInconsistentWithPassword() throws Exception { conf.set(Constants.PROXY_HOST, "127.0.0.1"); conf.setInt(Constants.PROXY_PORT, 1); conf.set(Constants.PROXY_USERNAME, "user"); - try { - fs = S3ATestUtils.createTestFileSystem(conf); - fail("Expected a connection error for proxy server"); - } catch (IllegalArgumentException e) { - String msg = e.toString(); - if (!msg.contains(Constants.PROXY_USERNAME) && - !msg.contains(Constants.PROXY_PASSWORD)) { - throw e; - } + IllegalArgumentException e = expectFSCreateFailure( + IllegalArgumentException.class, + conf, "Expected a connection error for proxy server"); + assertIsProxyUsernameError(e); + } + + private void assertIsProxyUsernameError(final IllegalArgumentException e) { + String msg = e.toString(); + if (!msg.contains(Constants.PROXY_USERNAME) && + !msg.contains(Constants.PROXY_PASSWORD)) { + throw e; } + } + + @Test + public void testUsernameInconsistentWithPassword2() throws Exception { conf = new Configuration(); conf.setInt(Constants.MAX_ERROR_RETRIES, 2); conf.set(Constants.PROXY_HOST, "127.0.0.1"); conf.setInt(Constants.PROXY_PORT, 1); conf.set(Constants.PROXY_PASSWORD, "password"); - try { - fs = S3ATestUtils.createTestFileSystem(conf); - fail("Expected a connection error for proxy server"); - } catch (IllegalArgumentException e) { - String msg = e.toString(); - if (!msg.contains(Constants.PROXY_USERNAME) && - !msg.contains(Constants.PROXY_PASSWORD)) { - throw e; - } - } + IllegalArgumentException e = expectFSCreateFailure( + IllegalArgumentException.class, + conf, "Expected a connection error for proxy server"); + assertIsProxyUsernameError(e); } @Test @@ -393,7 +392,7 @@ public void shouldBeAbleToSwitchOnS3PathStyleAccessViaConfigProperty() // Catch/pass standard path style access behaviour when live bucket // isn't in the same region as the s3 client default. See // http://docs.aws.amazon.com/AmazonS3/latest/dev/VirtualHosting.html - assertEquals(e.getStatusCode(), HttpStatus.SC_MOVED_PERMANENTLY); + assertEquals(HttpStatus.SC_MOVED_PERMANENTLY, e.getStatusCode()); } } @@ -428,8 +427,16 @@ public void testCustomUserAgent() throws Exception { public void testCloseIdempotent() throws Throwable { conf = new Configuration(); fs = S3ATestUtils.createTestFileSystem(conf); + AWSCredentialProviderList credentials = + fs.shareCredentials("testCloseIdempotent"); + credentials.close(); fs.close(); + assertTrue("Closing FS didn't close credentials " + credentials, + credentials.isClosed()); + assertEquals("refcount not zero in " + credentials, 0, credentials.getRefCount()); fs.close(); + // and the numbers should not change + assertEquals("refcount not zero in " + credentials, 0, credentials.getRefCount()); } @Test diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ATemporaryCredentials.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ATemporaryCredentials.java index 44a2beb0ea..afc4086344 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ATemporaryCredentials.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ATemporaryCredentials.java @@ -19,15 +19,14 @@ package org.apache.hadoop.fs.s3a; import java.io.IOException; -import java.net.URI; -import com.amazonaws.auth.AWSCredentialsProvider; -import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClient; +import com.amazonaws.services.securitytoken.AWSSecurityTokenService; +import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder; import com.amazonaws.services.securitytoken.model.GetSessionTokenRequest; import com.amazonaws.services.securitytoken.model.GetSessionTokenResult; import com.amazonaws.services.securitytoken.model.Credentials; -import org.apache.hadoop.fs.s3native.S3xLoginHelper; +import org.apache.hadoop.fs.s3a.auth.STSClientFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.test.LambdaTestUtils; @@ -55,6 +54,14 @@ public class ITestS3ATemporaryCredentials extends AbstractS3ATestBase { private static final long TEST_FILE_SIZE = 1024; + private AWSCredentialProviderList credentials; + + @Override + public void teardown() throws Exception { + S3AUtils.closeAutocloseables(LOG, credentials); + super.teardown(); + } + /** * Test use of STS for requesting temporary credentials. * @@ -63,7 +70,7 @@ public class ITestS3ATemporaryCredentials extends AbstractS3ATestBase { * S3A tests to request temporary credentials, then attempt to use those * credentials instead. * - * @throws IOException + * @throws IOException failure */ @Test public void testSTS() throws IOException { @@ -71,21 +78,20 @@ public void testSTS() throws IOException { if (!conf.getBoolean(TEST_STS_ENABLED, true)) { skip("STS functional tests disabled"); } + S3AFileSystem testFS = getFileSystem(); + credentials = testFS.shareCredentials("testSTS"); - S3xLoginHelper.Login login = S3AUtils.getAWSAccessKeys( - URI.create("s3a://foobar"), conf); - if (!login.hasLogin()) { - skip("testSTS disabled because AWS credentials not configured"); - } - AWSCredentialsProvider parentCredentials = new BasicAWSCredentialsProvider( - login.getUser(), login.getPassword()); + String bucket = testFS.getBucket(); + AWSSecurityTokenServiceClientBuilder builder = STSClientFactory.builder( + conf, + bucket, + credentials, + conf.getTrimmed(TEST_STS_ENDPOINT, ""), ""); + AWSSecurityTokenService stsClient = builder.build(); - String stsEndpoint = conf.getTrimmed(TEST_STS_ENDPOINT, ""); - AWSSecurityTokenServiceClient stsClient; - stsClient = new AWSSecurityTokenServiceClient(parentCredentials); - if (!stsEndpoint.isEmpty()) { - LOG.debug("STS Endpoint ={}", stsEndpoint); - stsClient.setEndpoint(stsEndpoint); + if (!conf.getTrimmed(TEST_STS_ENDPOINT, "").isEmpty()) { + LOG.debug("STS Endpoint ={}", conf.getTrimmed(TEST_STS_ENDPOINT, "")); + stsClient.setEndpoint(conf.getTrimmed(TEST_STS_ENDPOINT, "")); } GetSessionTokenRequest sessionTokenRequest = new GetSessionTokenRequest(); sessionTokenRequest.setDurationSeconds(900); @@ -93,23 +99,28 @@ public void testSTS() throws IOException { sessionTokenResult = stsClient.getSessionToken(sessionTokenRequest); Credentials sessionCreds = sessionTokenResult.getCredentials(); - String childAccessKey = sessionCreds.getAccessKeyId(); - conf.set(ACCESS_KEY, childAccessKey); - String childSecretKey = sessionCreds.getSecretAccessKey(); - conf.set(SECRET_KEY, childSecretKey); - String sessionToken = sessionCreds.getSessionToken(); - conf.set(SESSION_TOKEN, sessionToken); + // clone configuration so changes here do not affect the base FS. + Configuration conf2 = new Configuration(conf); + S3AUtils.clearBucketOption(conf2, bucket, AWS_CREDENTIALS_PROVIDER); + S3AUtils.clearBucketOption(conf2, bucket, ACCESS_KEY); + S3AUtils.clearBucketOption(conf2, bucket, SECRET_KEY); + S3AUtils.clearBucketOption(conf2, bucket, SESSION_TOKEN); - conf.set(AWS_CREDENTIALS_PROVIDER, PROVIDER_CLASS); + conf2.set(ACCESS_KEY, sessionCreds.getAccessKeyId()); + conf2.set(SECRET_KEY, sessionCreds.getSecretAccessKey()); + conf2.set(SESSION_TOKEN, sessionCreds.getSessionToken()); - try(S3AFileSystem fs = S3ATestUtils.createTestFileSystem(conf)) { + conf2.set(AWS_CREDENTIALS_PROVIDER, PROVIDER_CLASS); + + // with valid credentials, we can set properties. + try(S3AFileSystem fs = S3ATestUtils.createTestFileSystem(conf2)) { createAndVerifyFile(fs, path("testSTS"), TEST_FILE_SIZE); } // now create an invalid set of credentials by changing the session // token - conf.set(SESSION_TOKEN, "invalid-" + sessionToken); - try (S3AFileSystem fs = S3ATestUtils.createTestFileSystem(conf)) { + conf2.set(SESSION_TOKEN, "invalid-" + sessionCreds.getSessionToken()); + try (S3AFileSystem fs = S3ATestUtils.createTestFileSystem(conf2)) { createAndVerifyFile(fs, path("testSTSInvalidToken"), TEST_FILE_SIZE); fail("Expected an access exception, but file access to " + fs.getUri() + " was allowed: " + fs); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java index 763819b2a4..a1df1a5fb5 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java @@ -20,6 +20,7 @@ import com.amazonaws.services.s3.model.ListObjectsV2Request; import com.amazonaws.services.s3.model.ListObjectsV2Result; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; @@ -28,6 +29,7 @@ import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.contract.AbstractFSContract; import org.apache.hadoop.fs.contract.s3a.S3AContract; + import org.junit.Assume; import org.junit.Test; @@ -37,6 +39,7 @@ import java.util.Collection; import java.util.HashSet; import java.util.List; +import java.util.stream.Collectors; import static org.apache.hadoop.fs.contract.ContractTestUtils.touch; import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile; @@ -71,7 +74,9 @@ protected AbstractFSContract createContract(Configuration conf) { // Other configs would break test assumptions conf.set(FAIL_INJECT_INCONSISTENCY_KEY, DEFAULT_DELAY_KEY_SUBSTRING); conf.setFloat(FAIL_INJECT_INCONSISTENCY_PROBABILITY, 1.0f); - conf.setLong(FAIL_INJECT_INCONSISTENCY_MSEC, DEFAULT_DELAY_KEY_MSEC); + // this is a long value to guarantee that the inconsistency holds + // even over long-haul connections, and in the debugger too/ + conf.setLong(FAIL_INJECT_INCONSISTENCY_MSEC, (long) (60 * 1000)); return new S3AContract(conf); } @@ -524,37 +529,60 @@ public void testInconsistentS3ClientDeletes() throws Throwable { ListObjectsV2Result postDeleteDelimited = listObjectsV2(fs, key, "/"); ListObjectsV2Result postDeleteUndelimited = listObjectsV2(fs, key, null); + assertListSizeEqual( + "InconsistentAmazonS3Client added back objects incorrectly " + + "in a non-recursive listing", + preDeleteDelimited.getObjectSummaries(), + postDeleteDelimited.getObjectSummaries()); - assertEquals("InconsistentAmazonS3Client added back objects incorrectly " + + assertListSizeEqual("InconsistentAmazonS3Client added back prefixes incorrectly " + "in a non-recursive listing", - preDeleteDelimited.getObjectSummaries().size(), - postDeleteDelimited.getObjectSummaries().size() + preDeleteDelimited.getCommonPrefixes(), + postDeleteDelimited.getCommonPrefixes() ); - assertEquals("InconsistentAmazonS3Client added back prefixes incorrectly " + - "in a non-recursive listing", - preDeleteDelimited.getCommonPrefixes().size(), - postDeleteDelimited.getCommonPrefixes().size() - ); - assertEquals("InconsistentAmazonS3Client added back objects incorrectly " + + assertListSizeEqual("InconsistentAmazonS3Client added back objects incorrectly " + "in a recursive listing", - preDeleteUndelimited.getObjectSummaries().size(), - postDeleteUndelimited.getObjectSummaries().size() + preDeleteUndelimited.getObjectSummaries(), + postDeleteUndelimited.getObjectSummaries() ); - assertEquals("InconsistentAmazonS3Client added back prefixes incorrectly " + + + assertListSizeEqual("InconsistentAmazonS3Client added back prefixes incorrectly " + "in a recursive listing", - preDeleteUndelimited.getCommonPrefixes().size(), - postDeleteUndelimited.getCommonPrefixes().size() + preDeleteUndelimited.getCommonPrefixes(), + postDeleteUndelimited.getCommonPrefixes() ); } /** - * retrying v2 list. - * @param fs - * @param key - * @param delimiter - * @return + * Assert that the two list sizes match; failure message includes the lists. + * @param message text for the assertion + * @param expected expected list + * @param actual actual list + * @param type of list + */ + private void assertListSizeEqual(String message, + List expected, + List actual) { + String leftContents = expected.stream() + .map(n -> n.toString()) + .collect(Collectors.joining("\n")); + String rightContents = actual.stream() + .map(n -> n.toString()) + .collect(Collectors.joining("\n")); + String summary = "\nExpected:" + leftContents + + "\n-----------\nActual:" + rightContents; + assertEquals(message + summary, expected.size(), actual.size()); + } + + /** + * Retrying v2 list directly through the s3 client. + * @param fs filesystem + * @param key key to list under + * @param delimiter any delimiter + * @return the listing * @throws IOException on error */ + @Retries.RetryRaw private ListObjectsV2Result listObjectsV2(S3AFileSystem fs, String key, String delimiter) throws IOException { ListObjectsV2Request k = fs.createListObjectsRequest(key, delimiter) diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardWriteBack.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardWriteBack.java index c8a54b82ed..d5cd4d4d9a 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardWriteBack.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardWriteBack.java @@ -65,11 +65,12 @@ public void testListStatusWriteBack() throws Exception { // delete the existing directory (in case of last test failure) noS3Guard.delete(directory, true); // Create a directory on S3 only - noS3Guard.mkdirs(new Path(directory, "OnS3")); + Path onS3 = new Path(directory, "OnS3"); + noS3Guard.mkdirs(onS3); // Create a directory on both S3 and metadata store - Path p = new Path(directory, "OnS3AndMS"); - ContractTestUtils.assertPathDoesNotExist(noWriteBack, "path", p); - noWriteBack.mkdirs(p); + Path onS3AndMS = new Path(directory, "OnS3AndMS"); + ContractTestUtils.assertPathDoesNotExist(noWriteBack, "path", onS3AndMS); + noWriteBack.mkdirs(onS3AndMS); FileStatus[] fsResults; DirListingMetadata mdResults; @@ -83,6 +84,8 @@ public void testListStatusWriteBack() throws Exception { // Metadata store without write-back should still only contain /OnS3AndMS, // because newly discovered /OnS3 is not written back to metadata store mdResults = noWriteBack.getMetadataStore().listChildren(directory); + assertNotNull("No results from noWriteBack listChildren " + directory, + mdResults); assertEquals("Metadata store without write back should still only know " + "about /OnS3AndMS, but it has: " + mdResults, 1, mdResults.numEntries()); @@ -102,8 +105,7 @@ public void testListStatusWriteBack() throws Exception { // If we don't clean this up, the next test run will fail because it will // have recorded /OnS3 being deleted even after it's written to noS3Guard. - getFileSystem().getMetadataStore().forgetMetadata( - new Path(directory, "OnS3")); + getFileSystem().getMetadataStore().forgetMetadata(onS3); } /** @@ -118,26 +120,33 @@ private S3AFileSystem createTestFS(URI fsURI, boolean disableS3Guard, // Create a FileSystem that is S3-backed only conf = createConfiguration(); - S3ATestUtils.disableFilesystemCaching(conf); String host = fsURI.getHost(); - if (disableS3Guard) { - conf.set(Constants.S3_METADATA_STORE_IMPL, - Constants.S3GUARD_METASTORE_NULL); - S3AUtils.setBucketOption(conf, host, - S3_METADATA_STORE_IMPL, - S3GUARD_METASTORE_NULL); - } else { - S3ATestUtils.maybeEnableS3Guard(conf); - conf.setBoolean(METADATASTORE_AUTHORITATIVE, authoritativeMeta); - S3AUtils.setBucketOption(conf, host, - METADATASTORE_AUTHORITATIVE, - Boolean.toString(authoritativeMeta)); - S3AUtils.setBucketOption(conf, host, - S3_METADATA_STORE_IMPL, - conf.get(S3_METADATA_STORE_IMPL)); + String metastore; + + metastore = S3GUARD_METASTORE_NULL; + if (!disableS3Guard) { + // pick up the metadata store used by the main test + metastore = getFileSystem().getConf().get(S3_METADATA_STORE_IMPL); + assertNotEquals(S3GUARD_METASTORE_NULL, metastore); } - FileSystem fs = FileSystem.get(fsURI, conf); - return asS3AFS(fs); + + conf.set(Constants.S3_METADATA_STORE_IMPL, metastore); + conf.setBoolean(METADATASTORE_AUTHORITATIVE, authoritativeMeta); + S3AUtils.setBucketOption(conf, host, + METADATASTORE_AUTHORITATIVE, + Boolean.toString(authoritativeMeta)); + S3AUtils.setBucketOption(conf, host, + S3_METADATA_STORE_IMPL, metastore); + + S3AFileSystem fs = asS3AFS(FileSystem.newInstance(fsURI, conf)); + // do a check to verify that everything got through + assertEquals("Metadata store should have been disabled: " + fs, + disableS3Guard, !fs.hasMetadataStore()); + assertEquals("metastore option did not propagate", + metastore, fs.getConf().get(S3_METADATA_STORE_IMPL)); + + return fs; + } private static S3AFileSystem asS3AFS(FileSystem fs) { diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3ClientFactory.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3ClientFactory.java index b746bfe5cc..dbf228d4c7 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3ClientFactory.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3ClientFactory.java @@ -23,6 +23,7 @@ import java.net.URI; import java.util.ArrayList; +import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.model.MultipartUploadListing; import com.amazonaws.services.s3.model.Region; @@ -34,8 +35,9 @@ public class MockS3ClientFactory implements S3ClientFactory { @Override - public AmazonS3 createS3Client(URI name) { - String bucket = name.getHost(); + public AmazonS3 createS3Client(URI name, + final String bucket, + final AWSCredentialsProvider credentialSet) { AmazonS3 s3 = mock(AmazonS3.class); when(s3.doesBucketExist(bucket)).thenReturn(true); // this listing is used in startup if purging is enabled, so diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AAWSCredentialsProvider.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AAWSCredentialsProvider.java index d731ae7ae7..b28925cd3b 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AAWSCredentialsProvider.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AAWSCredentialsProvider.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.net.URI; +import java.nio.file.AccessDeniedException; import java.util.Arrays; import java.util.List; @@ -34,11 +35,15 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider; +import org.apache.hadoop.fs.s3a.auth.NoAuthWithAWSException; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.test.GenericTestUtils; import static org.apache.hadoop.fs.s3a.Constants.*; import static org.apache.hadoop.fs.s3a.S3ATestConstants.*; import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; import static org.apache.hadoop.fs.s3a.S3AUtils.*; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; import static org.junit.Assert.*; /** @@ -221,14 +226,13 @@ private void expectException(Class exceptionClass, } private void expectProviderInstantiationFailure(String option, - String expectedErrorText) throws IOException { + String expectedErrorText) throws Exception { Configuration conf = new Configuration(); conf.set(AWS_CREDENTIALS_PROVIDER, option); Path testFile = new Path( conf.getTrimmed(KEY_CSVTEST_FILE, DEFAULT_CSVTEST_FILE)); - expectException(IOException.class, expectedErrorText); - URI uri = testFile.toUri(); - S3AUtils.createAWSCredentialProviderSet(uri, conf); + intercept(IOException.class, expectedErrorText, + () -> S3AUtils.createAWSCredentialProviderSet(testFile.toUri(), conf)); } /** @@ -288,4 +292,68 @@ public void testAuthenticationContainsProbes() { authenticationContains(conf, AssumedRoleCredentialProvider.NAME)); } + @Test + public void testExceptionLogic() throws Throwable { + AWSCredentialProviderList providers + = new AWSCredentialProviderList(); + // verify you can't get credentials from it + NoAuthWithAWSException noAuth = intercept(NoAuthWithAWSException.class, + AWSCredentialProviderList.NO_AWS_CREDENTIAL_PROVIDERS, + () -> providers.getCredentials()); + // but that it closes safely + providers.close(); + + S3ARetryPolicy retryPolicy = new S3ARetryPolicy(new Configuration()); + assertEquals("Expected no retry on auth failure", + RetryPolicy.RetryAction.FAIL.action, + retryPolicy.shouldRetry(noAuth, 0, 0, true).action); + + try { + throw S3AUtils.translateException("login", "", noAuth); + } catch (AccessDeniedException expected) { + // this is what we want; other exceptions will be passed up + assertEquals("Expected no retry on AccessDeniedException", + RetryPolicy.RetryAction.FAIL.action, + retryPolicy.shouldRetry(expected, 0, 0, true).action); + } + + } + + @Test + public void testRefCounting() throws Throwable { + AWSCredentialProviderList providers + = new AWSCredentialProviderList(); + assertEquals("Ref count for " + providers, + 1, providers.getRefCount()); + AWSCredentialProviderList replicate = providers.share(); + assertEquals(providers, replicate); + assertEquals("Ref count after replication for " + providers, + 2, providers.getRefCount()); + assertFalse("Was closed " + providers, providers.isClosed()); + providers.close(); + assertFalse("Was closed " + providers, providers.isClosed()); + assertEquals("Ref count after close() for " + providers, + 1, providers.getRefCount()); + + // this should now close it + providers.close(); + assertTrue("Was not closed " + providers, providers.isClosed()); + assertEquals("Ref count after close() for " + providers, + 0, providers.getRefCount()); + assertEquals("Ref count after second close() for " + providers, + 0, providers.getRefCount()); + intercept(IllegalStateException.class, "closed", + () -> providers.share()); + // final call harmless + providers.close(); + assertEquals("Ref count after close() for " + providers, + 0, providers.getRefCount()); + providers.refresh(); + + intercept(NoAuthWithAWSException.class, + AWSCredentialProviderList.CREDENTIALS_REQUESTED_WHEN_CLOSED, + () -> providers.getCredentials()); + } + + } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumeRole.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumeRole.java index c6985b07d6..7451ef1641 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumeRole.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumeRole.java @@ -61,6 +61,7 @@ import static org.apache.hadoop.fs.s3a.auth.RoleModel.*; import static org.apache.hadoop.fs.s3a.auth.RolePolicies.*; import static org.apache.hadoop.fs.s3a.auth.RoleTestUtils.forbidden; +import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains; import static org.apache.hadoop.test.LambdaTestUtils.*; /** @@ -85,6 +86,24 @@ public class ITestAssumeRole extends AbstractS3ATestBase { */ private S3AFileSystem roleFS; + /** + * Duration range exception text on SDKs which check client-side. + */ + protected static final String E_DURATION_RANGE_1 + = "Assume Role session duration should be in the range of 15min - 1Hr"; + + /** + * Duration range too high text on SDKs which check on the server. + */ + protected static final String E_DURATION_RANGE_2 + = "Member must have value less than or equal to 43200"; + + /** + * Duration range too low text on SDKs which check on the server. + */ + protected static final String E_DURATION_RANGE_3 + = "Member must have value greater than or equal to 900"; + @Override public void setup() throws Exception { super.setup(); @@ -112,13 +131,14 @@ private String getAssumedRoleARN() { * @param clazz class of exception to expect * @param text text in exception * @param type of exception as inferred from clazz + * @return the caught exception if it was of the expected type and contents * @throws Exception if the exception was the wrong class */ - private void expectFileSystemCreateFailure( + private E expectFileSystemCreateFailure( Configuration conf, Class clazz, String text) throws Exception { - interceptClosing(clazz, + return interceptClosing(clazz, text, () -> new Path(getFileSystem().getUri()).getFileSystem(conf)); } @@ -246,6 +266,60 @@ public void testAssumeRoleBadSession() throws Exception { "Member must satisfy regular expression pattern"); } + /** + * A duration >1h is forbidden client-side in AWS SDK 1.11.271; + * with the ability to extend durations deployed in March 2018, + * duration checks will need to go server-side, and, presumably, + * later SDKs will remove the client side checks. + * This code exists to see when this happens. + */ + @Test + public void testAssumeRoleThreeHourSessionDuration() throws Exception { + describe("Try to authenticate with a long session duration"); + + Configuration conf = createAssumedRoleConfig(); + // add a duration of three hours + conf.setInt(ASSUMED_ROLE_SESSION_DURATION, 3 * 60 * 60); + try { + new Path(getFileSystem().getUri()).getFileSystem(conf).close(); + LOG.info("Successfully created token of a duration >3h"); + } catch (IOException ioe) { + assertExceptionContains(E_DURATION_RANGE_1, ioe); + } + } + + /** + * A duration >1h is forbidden client-side in AWS SDK 1.11.271; + * with the ability to extend durations deployed in March 2018. + * with the later SDKs, the checks go server-side and + * later SDKs will remove the client side checks. + * This test asks for a duration which will still be rejected, and + * looks for either of the error messages raised. + */ + @Test + public void testAssumeRoleThirtySixHourSessionDuration() throws Exception { + describe("Try to authenticate with a long session duration"); + + Configuration conf = createAssumedRoleConfig(); + conf.setInt(ASSUMED_ROLE_SESSION_DURATION, 36 * 60 * 60); + IOException ioe = expectFileSystemCreateFailure(conf, + IOException.class, null); + assertIsRangeException(ioe); + } + + /** + * Look for either the client-side or STS-side range exception + * @param e exception + * @throws Exception the exception, if its text doesn't match + */ + private void assertIsRangeException(final Exception e) throws Exception { + String message = e.toString(); + if (!message.contains(E_DURATION_RANGE_1) + && !message.contains(E_DURATION_RANGE_2) + && !message.contains(E_DURATION_RANGE_3)) { + throw e; + } + } /** * Create the assumed role configuration. @@ -280,11 +354,11 @@ public void testAssumedIllegalDuration() throws Throwable { describe("Expect the constructor to fail if the session is to short"); Configuration conf = new Configuration(); conf.set(ASSUMED_ROLE_SESSION_DURATION, "30s"); - interceptClosing(IllegalArgumentException.class, "", + Exception ex = interceptClosing(Exception.class, "", () -> new AssumedRoleCredentialProvider(uri, conf)); + assertIsRangeException(ex); } - @Test public void testAssumeRoleCreateFS() throws IOException { describe("Create an FS client with the role and do some basic IO"); @@ -296,24 +370,32 @@ public void testAssumeRoleCreateFS() throws IOException { conf.get(ACCESS_KEY), roleARN); try (FileSystem fs = path.getFileSystem(conf)) { - fs.getFileStatus(new Path("/")); + fs.getFileStatus(ROOT); fs.mkdirs(path("testAssumeRoleFS")); } } @Test public void testAssumeRoleRestrictedPolicyFS() throws Exception { - describe("Restrict the policy for this session; verify that reads fail"); + describe("Restrict the policy for this session; verify that reads fail."); + // there's some special handling of S3Guard here as operations + // which only go to DDB don't fail the way S3 would reject them. Configuration conf = createAssumedRoleConfig(); bindRolePolicy(conf, RESTRICTED_POLICY); Path path = new Path(getFileSystem().getUri()); + boolean guarded = getFileSystem().hasMetadataStore(); try (FileSystem fs = path.getFileSystem(conf)) { - forbidden("getFileStatus", - () -> fs.getFileStatus(new Path("/"))); - forbidden("getFileStatus", - () -> fs.listStatus(new Path("/"))); - forbidden("getFileStatus", + if (!guarded) { + // when S3Guard is enabled, the restricted policy still + // permits S3Guard record lookup, so getFileStatus calls + // will work iff the record is in the database. + forbidden("getFileStatus", + () -> fs.getFileStatus(ROOT)); + } + forbidden("", + () -> fs.listStatus(ROOT)); + forbidden("", () -> fs.mkdirs(path("testAssumeRoleFS"))); } } @@ -333,7 +415,11 @@ public void testAssumeRolePoliciesOverrideRolePerms() throws Throwable { Configuration conf = createAssumedRoleConfig(); bindRolePolicy(conf, - policy(statement(false, S3_ALL_BUCKETS, S3_GET_OBJECT_TORRENT))); + policy( + statement(false, S3_ALL_BUCKETS, S3_GET_OBJECT_TORRENT), + ALLOW_S3_GET_BUCKET_LOCATION, + STATEMENT_S3GUARD_CLIENT, + STATEMENT_ALLOW_SSE_KMS_RW)); Path path = path("testAssumeRoleStillIncludesRolePerms"); roleFS = (S3AFileSystem) path.getFileSystem(conf); assertTouchForbidden(roleFS, path); @@ -342,6 +428,8 @@ public void testAssumeRolePoliciesOverrideRolePerms() throws Throwable { /** * After blocking all write verbs used by S3A, try to write data (fail) * and read data (succeed). + * For S3Guard: full DDB RW access is retained. + * SSE-KMS key access is set to decrypt only. */ @Test public void testReadOnlyOperations() throws Throwable { @@ -352,7 +440,9 @@ public void testReadOnlyOperations() throws Throwable { bindRolePolicy(conf, policy( statement(false, S3_ALL_BUCKETS, S3_PATH_WRITE_OPERATIONS), - STATEMENT_ALL_S3, STATEMENT_ALL_DDB)); + STATEMENT_ALL_S3, + STATEMENT_S3GUARD_CLIENT, + STATEMENT_ALLOW_SSE_KMS_READ)); Path path = methodPath(); roleFS = (S3AFileSystem) path.getFileSystem(conf); // list the root path, expect happy @@ -399,8 +489,9 @@ public void testRestrictedWriteSubdir() throws Throwable { Configuration conf = createAssumedRoleConfig(); bindRolePolicyStatements(conf, - STATEMENT_ALL_DDB, + STATEMENT_S3GUARD_CLIENT, statement(true, S3_ALL_BUCKETS, S3_ROOT_READ_OPERATIONS), + STATEMENT_ALLOW_SSE_KMS_RW, new Statement(Effects.Allow) .addActions(S3_ALL_OPERATIONS) .addResources(directory(restrictedDir))); @@ -447,7 +538,7 @@ public void testRestrictedSingleDeleteRename() throws Throwable { } /** - * Execute a sequence of rename operations. + * Execute a sequence of rename operations with access locked down. * @param conf FS configuration */ public void executeRestrictedRename(final Configuration conf) @@ -461,7 +552,8 @@ public void executeRestrictedRename(final Configuration conf) fs.delete(basePath, true); bindRolePolicyStatements(conf, - STATEMENT_ALL_DDB, + STATEMENT_S3GUARD_CLIENT, + STATEMENT_ALLOW_SSE_KMS_RW, statement(true, S3_ALL_BUCKETS, S3_ROOT_READ_OPERATIONS), new Statement(Effects.Allow) .addActions(S3_PATH_RW_OPERATIONS) @@ -502,6 +594,25 @@ public void testRestrictedRenameReadOnlySingleDelete() throws Throwable { executeRenameReadOnlyData(conf); } + /** + * Without simulation of STS failures, and with STS overload likely to + * be very rare, there'll be no implicit test coverage of + * {@link AssumedRoleCredentialProvider#operationRetried(String, Exception, int, boolean)}. + * This test simply invokes the callback for both the first and second retry event. + * + * If the handler ever adds more than logging, this test ensures that things + * don't break. + */ + @Test + public void testAssumedRoleRetryHandler() throws Throwable { + try(AssumedRoleCredentialProvider provider + = new AssumedRoleCredentialProvider(getFileSystem().getUri(), + createAssumedRoleConfig())) { + provider.operationRetried("retry", new IOException("failure"), 0, true); + provider.operationRetried("retry", new IOException("failure"), 1, true); + } + } + /** * Execute a sequence of rename operations where the source * data is read only to the client calling rename(). @@ -534,7 +645,7 @@ public void executeRenameReadOnlyData(final Configuration conf) touch(fs, readOnlyFile); bindRolePolicyStatements(conf, - STATEMENT_ALL_DDB, + STATEMENT_S3GUARD_CLIENT, statement(true, S3_ALL_BUCKETS, S3_ROOT_READ_OPERATIONS), new Statement(Effects.Allow) .addActions(S3_PATH_RW_OPERATIONS) @@ -614,7 +725,8 @@ public void testRestrictedCommitActions() throws Throwable { fs.mkdirs(readOnlyDir); bindRolePolicyStatements(conf, - STATEMENT_ALL_DDB, + STATEMENT_S3GUARD_CLIENT, + STATEMENT_ALLOW_SSE_KMS_RW, statement(true, S3_ALL_BUCKETS, S3_ROOT_READ_OPERATIONS), new Statement(Effects.Allow) .addActions(S3_PATH_RW_OPERATIONS) @@ -752,7 +864,8 @@ public void executePartialDelete(final Configuration conf) fs.delete(destDir, true); bindRolePolicyStatements(conf, - STATEMENT_ALL_DDB, + STATEMENT_S3GUARD_CLIENT, + STATEMENT_ALLOW_SSE_KMS_RW, statement(true, S3_ALL_BUCKETS, S3_ALL_OPERATIONS), new Statement(Effects.Deny) .addActions(S3_PATH_WRITE_OPERATIONS) diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumedRoleCommitOperations.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumedRoleCommitOperations.java index bb662680ff..834826e447 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumedRoleCommitOperations.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumedRoleCommitOperations.java @@ -72,7 +72,8 @@ public void setup() throws Exception { Configuration conf = newAssumedRoleConfig(getConfiguration(), getAssumedRoleARN()); bindRolePolicyStatements(conf, - STATEMENT_ALL_DDB, + STATEMENT_S3GUARD_CLIENT, + STATEMENT_ALLOW_SSE_KMS_RW, statement(true, S3_ALL_BUCKETS, S3_ROOT_READ_OPERATIONS), new RoleModel.Statement(RoleModel.Effects.Allow) .addActions(S3_PATH_RW_OPERATIONS) @@ -81,7 +82,6 @@ public void setup() throws Exception { roleFS = (S3AFileSystem) restrictedDir.getFileSystem(conf); } - @Override public void teardown() throws Exception { S3AUtils.closeAll(LOG, roleFS); @@ -122,7 +122,6 @@ protected Path path(String filepath) throws IOException { return new Path(restrictedDir, filepath); } - private String getAssumedRoleARN() { return getContract().getConf().getTrimmed(ASSUMED_ROLE_ARN, ""); } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/RoleTestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/RoleTestUtils.java index 9fa26002e4..854e7ec981 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/RoleTestUtils.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/RoleTestUtils.java @@ -58,14 +58,23 @@ public final class RoleTestUtils { /** Deny GET requests to all buckets. */ - public static final Statement DENY_GET_ALL = + public static final Statement DENY_S3_GET_OBJECT = statement(false, S3_ALL_BUCKETS, S3_GET_OBJECT); - /** - * This is AWS policy removes read access. - */ - public static final Policy RESTRICTED_POLICY = policy(DENY_GET_ALL); + public static final Statement ALLOW_S3_GET_BUCKET_LOCATION + = statement(true, S3_ALL_BUCKETS, S3_GET_BUCKET_LOCATION); + /** + * This is AWS policy removes read access from S3, leaves S3Guard access up. + * This will allow clients to use S3Guard list/HEAD operations, even + * the ability to write records, but not actually access the underlying + * data. + * The client does need {@link RolePolicies#S3_GET_BUCKET_LOCATION} to + * get the bucket location. + */ + public static final Policy RESTRICTED_POLICY = policy( + DENY_S3_GET_OBJECT, STATEMENT_ALL_DDB, ALLOW_S3_GET_BUCKET_LOCATION + ); /** * Error message to get from the AWS SDK if you can't assume the role. @@ -145,7 +154,7 @@ public static Configuration newAssumedRoleConfig( Configuration conf = new Configuration(srcConf); conf.set(AWS_CREDENTIALS_PROVIDER, AssumedRoleCredentialProvider.NAME); conf.set(ASSUMED_ROLE_ARN, roleARN); - conf.set(ASSUMED_ROLE_SESSION_NAME, "valid"); + conf.set(ASSUMED_ROLE_SESSION_NAME, "test"); conf.set(ASSUMED_ROLE_SESSION_DURATION, "15m"); disableFilesystemCaching(conf); return conf; @@ -163,9 +172,8 @@ public static AccessDeniedException forbidden( String contained, Callable eval) throws Exception { - AccessDeniedException ex = intercept(AccessDeniedException.class, eval); - GenericTestUtils.assertExceptionContains(contained, ex); - return ex; + return intercept(AccessDeniedException.class, + contained, eval); } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/AbstractS3GuardToolTestBase.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/AbstractS3GuardToolTestBase.java index f591e3258c..9185fc5b7c 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/AbstractS3GuardToolTestBase.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/AbstractS3GuardToolTestBase.java @@ -32,6 +32,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; +import org.apache.hadoop.fs.s3a.S3AUtils; import org.apache.hadoop.util.StopWatch; import com.google.common.base.Preconditions; import org.apache.hadoop.fs.FileSystem; @@ -51,6 +52,7 @@ import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.util.StringUtils; +import static org.apache.hadoop.fs.s3a.Constants.METADATASTORE_AUTHORITATIVE; import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_TABLE_NAME_KEY; import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_METASTORE_NULL; import static org.apache.hadoop.fs.s3a.Constants.S3_METADATA_STORE_IMPL; @@ -144,8 +146,11 @@ public void setup() throws Exception { // Also create a "raw" fs without any MetadataStore configured Configuration conf = new Configuration(getConfiguration()); - conf.set(S3_METADATA_STORE_IMPL, S3GUARD_METASTORE_NULL); URI fsUri = getFileSystem().getUri(); + conf.set(S3_METADATA_STORE_IMPL, S3GUARD_METASTORE_NULL); + S3AUtils.setBucketOption(conf,fsUri.getHost(), + METADATASTORE_AUTHORITATIVE, + S3GUARD_METASTORE_NULL); rawFs = (S3AFileSystem) FileSystem.newInstance(fsUri, conf); } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardConcurrentOps.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardConcurrentOps.java index c6838a08c7..22a1efd2a4 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardConcurrentOps.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardConcurrentOps.java @@ -40,8 +40,10 @@ import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.fs.s3a.AWSCredentialProviderList; import org.apache.hadoop.fs.s3a.AbstractS3ATestBase; import org.apache.hadoop.fs.s3a.Constants; +import org.apache.hadoop.fs.s3a.S3AFileSystem; import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_REGION_KEY; @@ -80,81 +82,102 @@ private void deleteTable(DynamoDB db, String tableName) throws @Test public void testConcurrentTableCreations() throws Exception { - final Configuration conf = getConfiguration(); + S3AFileSystem fs = getFileSystem(); + final Configuration conf = fs.getConf(); Assume.assumeTrue("Test only applies when DynamoDB is used for S3Guard", conf.get(Constants.S3_METADATA_STORE_IMPL).equals( Constants.S3GUARD_METASTORE_DYNAMO)); + AWSCredentialProviderList sharedCreds = + fs.shareCredentials("testConcurrentTableCreations"); + // close that shared copy. + sharedCreds.close(); + // this is the original reference count. + int originalRefCount = sharedCreds.getRefCount(); + + //now init the store; this should increment the ref count. DynamoDBMetadataStore ms = new DynamoDBMetadataStore(); - ms.initialize(getFileSystem()); - DynamoDB db = ms.getDynamoDB(); + ms.initialize(fs); - String tableName = "testConcurrentTableCreations" + new Random().nextInt(); - conf.setBoolean(Constants.S3GUARD_DDB_TABLE_CREATE_KEY, true); - conf.set(Constants.S3GUARD_DDB_TABLE_NAME_KEY, tableName); + // the ref count should have gone up + assertEquals("Credential Ref count unchanged after initializing metastore " + + sharedCreds, + originalRefCount + 1, sharedCreds.getRefCount()); + try { + DynamoDB db = ms.getDynamoDB(); - String region = conf.getTrimmed(S3GUARD_DDB_REGION_KEY); - if (StringUtils.isEmpty(region)) { - // no region set, so pick it up from the test bucket - conf.set(S3GUARD_DDB_REGION_KEY, getFileSystem().getBucketLocation()); - } - int concurrentOps = 16; - int iterations = 4; + String tableName = "testConcurrentTableCreations" + new Random().nextInt(); + conf.setBoolean(Constants.S3GUARD_DDB_TABLE_CREATE_KEY, true); + conf.set(Constants.S3GUARD_DDB_TABLE_NAME_KEY, tableName); - failIfTableExists(db, tableName); + String region = conf.getTrimmed(S3GUARD_DDB_REGION_KEY); + if (StringUtils.isEmpty(region)) { + // no region set, so pick it up from the test bucket + conf.set(S3GUARD_DDB_REGION_KEY, fs.getBucketLocation()); + } + int concurrentOps = 16; + int iterations = 4; - for (int i = 0; i < iterations; i++) { - ExecutorService executor = Executors.newFixedThreadPool( - concurrentOps, new ThreadFactory() { - private AtomicInteger count = new AtomicInteger(0); + failIfTableExists(db, tableName); - public Thread newThread(Runnable r) { - return new Thread(r, - "testConcurrentTableCreations" + count.getAndIncrement()); + for (int i = 0; i < iterations; i++) { + ExecutorService executor = Executors.newFixedThreadPool( + concurrentOps, new ThreadFactory() { + private AtomicInteger count = new AtomicInteger(0); + + public Thread newThread(Runnable r) { + return new Thread(r, + "testConcurrentTableCreations" + count.getAndIncrement()); + } + }); + ((ThreadPoolExecutor) executor).prestartAllCoreThreads(); + Future[] futures = new Future[concurrentOps]; + for (int f = 0; f < concurrentOps; f++) { + final int index = f; + futures[f] = executor.submit(new Callable() { + @Override + public Exception call() throws Exception { + + ContractTestUtils.NanoTimer timer = + new ContractTestUtils.NanoTimer(); + + Exception result = null; + try (DynamoDBMetadataStore store = new DynamoDBMetadataStore()) { + store.initialize(conf); + } catch (Exception e) { + LOG.error(e.getClass() + ": " + e.getMessage()); + result = e; + } + + timer.end("Parallel DynamoDB client creation %d", index); + LOG.info("Parallel DynamoDB client creation {} ran from {} to {}", + index, timer.getStartTime(), timer.getEndTime()); + return result; } }); - ((ThreadPoolExecutor) executor).prestartAllCoreThreads(); - Future[] futures = new Future[concurrentOps]; - for (int f = 0; f < concurrentOps; f++) { - final int index = f; - futures[f] = executor.submit(new Callable() { - @Override - public Exception call() throws Exception { - - ContractTestUtils.NanoTimer timer = - new ContractTestUtils.NanoTimer(); - - Exception result = null; - try (DynamoDBMetadataStore store = new DynamoDBMetadataStore()) { - store.initialize(conf); - } catch (Exception e) { - LOG.error(e.getClass() + ": " + e.getMessage()); - result = e; - } - - timer.end("Parallel DynamoDB client creation %d", index); - LOG.info("Parallel DynamoDB client creation {} ran from {} to {}", - index, timer.getStartTime(), timer.getEndTime()); - return result; + } + List exceptions = new ArrayList<>(concurrentOps); + for (int f = 0; f < concurrentOps; f++) { + Exception outcome = futures[f].get(); + if (outcome != null) { + exceptions.add(outcome); } - }); - } - List exceptions = new ArrayList<>(concurrentOps); - for (int f = 0; f < concurrentOps; f++) { - Exception outcome = futures[f].get(); - if (outcome != null) { - exceptions.add(outcome); + } + deleteTable(db, tableName); + int exceptionsThrown = exceptions.size(); + if (exceptionsThrown > 0) { + // at least one exception was thrown. Fail the test & nest the first + // exception caught + throw new AssertionError(exceptionsThrown + "/" + concurrentOps + + " threads threw exceptions while initializing on iteration " + i, + exceptions.get(0)); } } - deleteTable(db, tableName); - int exceptionsThrown = exceptions.size(); - if (exceptionsThrown > 0) { - // at least one exception was thrown. Fail the test & nest the first - // exception caught - throw new AssertionError(exceptionsThrown + "/" + concurrentOps + - " threads threw exceptions while initializing on iteration " + i, - exceptions.get(0)); - } + } finally { + ms.close(); } + assertEquals("Credential Ref count unchanged after closing metastore: " + + sharedCreds, + originalRefCount, sharedCreds.getRefCount()); } }