diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/AbstractSessionCredentialsProvider.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/AbstractSessionCredentialsProvider.java index c316b91116..5b1829e096 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/AbstractSessionCredentialsProvider.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/AbstractSessionCredentialsProvider.java @@ -45,7 +45,7 @@ public abstract class AbstractSessionCredentialsProvider extends AbstractAWSCredentialProvider { /** Credentials, created in {@link #init()}. */ - private AWSCredentials awsCredentials; + private volatile AWSCredentials awsCredentials; /** Atomic flag for on-demand initialization. */ private final AtomicBoolean initialized = new AtomicBoolean(false); @@ -54,7 +54,7 @@ public abstract class AbstractSessionCredentialsProvider * The (possibly translated) initialization exception. * Used for testing. */ - private IOException initializationException; + private volatile IOException initializationException; /** * Constructor. @@ -73,9 +73,9 @@ public AbstractSessionCredentialsProvider( * @throws IOException on any failure. */ @Retries.OnceTranslated - protected void init() throws IOException { + protected synchronized void init() throws IOException { // stop re-entrant attempts - if (initialized.getAndSet(true)) { + if (isInitialized()) { return; } try { @@ -84,6 +84,8 @@ protected void init() throws IOException { } catch (IOException e) { initializationException = e; throw e; + } finally { + initialized.set(true); } } @@ -132,13 +134,15 @@ public AWSCredentials getCredentials() throws SdkBaseException { } if (awsCredentials == null) { throw new CredentialInitializationException( - "Provider " + this + " has no credentials"); + "Provider " + this + " has no credentials: " + + (initializationException != null ? initializationException.toString() : ""), + initializationException); } return awsCredentials; } public final boolean hasCredentials() { - return awsCredentials == null; + return awsCredentials != null; } @Override diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AAWSCredentialsProvider.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AAWSCredentialsProvider.java index 6030005d10..6456cb5e12 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AAWSCredentialsProvider.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AAWSCredentialsProvider.java @@ -22,9 +22,15 @@ import java.io.InterruptedIOException; import java.net.URI; import java.nio.file.AccessDeniedException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.AWSCredentialsProvider; @@ -37,6 +43,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.auth.AbstractSessionCredentialsProvider; import org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider; import org.apache.hadoop.fs.s3a.auth.NoAuthWithAWSException; import org.apache.hadoop.io.retry.RetryPolicy; @@ -46,6 +53,7 @@ import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; import static org.apache.hadoop.fs.s3a.S3AUtils.*; import static org.apache.hadoop.test.LambdaTestUtils.intercept; +import static org.apache.hadoop.test.LambdaTestUtils.interceptFuture; import static org.junit.Assert.*; /** @@ -198,7 +206,7 @@ static abstract class AbstractProvider implements AWSCredentialsProvider { /** * A credential provider whose constructor signature doesn't match. */ - static class ConstructorSignatureErrorProvider + protected static class ConstructorSignatureErrorProvider implements AWSCredentialsProvider { @SuppressWarnings("unused") @@ -218,7 +226,7 @@ public void refresh() { /** * A credential provider whose constructor raises an NPE. */ - static class ConstructorFailureProvider + protected static class ConstructorFailureProvider implements AWSCredentialsProvider { @SuppressWarnings("unused") @@ -246,7 +254,7 @@ public void testAWSExceptionTranslation() throws Throwable { } } - static class AWSExceptionRaisingFactory implements AWSCredentialsProvider { + protected static class AWSExceptionRaisingFactory implements AWSCredentialsProvider { public static final String NO_AUTH = "No auth"; @@ -462,7 +470,7 @@ public void testIOEInConstructorPropagation() throws Throwable { /** * Credential provider which raises an IOE when constructed. */ - private static class IOERaisingProvider implements AWSCredentialsProvider { + protected static class IOERaisingProvider implements AWSCredentialsProvider { public IOERaisingProvider(URI uri, Configuration conf) throws IOException { @@ -480,4 +488,153 @@ public void refresh() { } } + private static final AWSCredentials EXPECTED_CREDENTIALS = new AWSCredentials() { + @Override + public String getAWSAccessKeyId() { + return "expectedAccessKey"; + } + + @Override + public String getAWSSecretKey() { + return "expectedSecret"; + } + }; + + /** + * Credential provider that takes a long time. + */ + protected static class SlowProvider extends AbstractSessionCredentialsProvider { + + public SlowProvider(@Nullable URI uri, Configuration conf) { + super(uri, conf); + } + + @Override + protected AWSCredentials createCredentials(Configuration config) throws IOException { + // yield to other callers to induce race condition + Thread.yield(); + return EXPECTED_CREDENTIALS; + } + } + + private static final int CONCURRENT_THREADS = 10; + + @Test + public void testConcurrentAuthentication() throws Throwable { + Configuration conf = createProviderConfiguration(SlowProvider.class.getName()); + Path testFile = getCSVTestPath(conf); + + AWSCredentialProviderList list = createAWSCredentialProviderSet(testFile.toUri(), conf); + + SlowProvider provider = (SlowProvider) list.getProviders().get(0); + + ExecutorService pool = Executors.newFixedThreadPool(CONCURRENT_THREADS); + + List> results = new ArrayList<>(); + + try { + assertFalse( + "Provider not initialized. isInitialized should be false", + provider.isInitialized()); + assertFalse( + "Provider not initialized. hasCredentials should be false", + provider.hasCredentials()); + if (provider.getInitializationException() != null) { + throw new AssertionError( + "Provider not initialized. getInitializationException should return null", + provider.getInitializationException()); + } + + for (int i = 0; i < CONCURRENT_THREADS; i++) { + results.add(pool.submit(() -> list.getCredentials())); + } + + for (Future result : results) { + AWSCredentials credentials = result.get(); + assertEquals("Access key from credential provider", + "expectedAccessKey", credentials.getAWSAccessKeyId()); + assertEquals("Secret key from credential provider", + "expectedSecret", credentials.getAWSSecretKey()); + } + } finally { + pool.awaitTermination(10, TimeUnit.SECONDS); + pool.shutdown(); + } + + assertTrue( + "Provider initialized without errors. isInitialized should be true", + provider.isInitialized()); + assertTrue( + "Provider initialized without errors. hasCredentials should be true", + provider.hasCredentials()); + if (provider.getInitializationException() != null) { + throw new AssertionError( + "Provider initialized without errors. getInitializationException should return null", + provider.getInitializationException()); + } + } + + /** + * Credential provider with error. + */ + protected static class ErrorProvider extends AbstractSessionCredentialsProvider { + + public ErrorProvider(@Nullable URI uri, Configuration conf) { + super(uri, conf); + } + + @Override + protected AWSCredentials createCredentials(Configuration config) throws IOException { + throw new IOException("expected error"); + } + } + + @Test + public void testConcurrentAuthenticationError() throws Throwable { + Configuration conf = createProviderConfiguration(ErrorProvider.class.getName()); + Path testFile = getCSVTestPath(conf); + + AWSCredentialProviderList list = createAWSCredentialProviderSet(testFile.toUri(), conf); + ErrorProvider provider = (ErrorProvider) list.getProviders().get(0); + + ExecutorService pool = Executors.newFixedThreadPool(CONCURRENT_THREADS); + + List> results = new ArrayList<>(); + + try { + assertFalse("Provider not initialized. isInitialized should be false", + provider.isInitialized()); + assertFalse("Provider not initialized. hasCredentials should be false", + provider.hasCredentials()); + if (provider.getInitializationException() != null) { + throw new AssertionError( + "Provider not initialized. getInitializationException should return null", + provider.getInitializationException()); + } + + for (int i = 0; i < CONCURRENT_THREADS; i++) { + results.add(pool.submit(() -> list.getCredentials())); + } + + for (Future result : results) { + interceptFuture(CredentialInitializationException.class, + "expected error", + result + ); + } + } finally { + pool.awaitTermination(10, TimeUnit.SECONDS); + pool.shutdown(); + } + + assertTrue( + "Provider initialization failed. isInitialized should be true", + provider.isInitialized()); + assertFalse( + "Provider initialization failed. hasCredentials should be false", + provider.hasCredentials()); + assertTrue( + "Provider initialization failed. getInitializationException should contain the error", + provider.getInitializationException().getMessage().contains("expected error")); + } }