HADOOP-19004. S3A: Support Authentication through HttpSigner API (#6324)
Move to the new auth flow based signers for aws. * Implement a new Signer Initialization Chain * Add a new instantiation method * Add a new test * Fix Reflection Code for SignerInitialization Contributed by Harshit Gupta
This commit is contained in:
parent
453e264eb4
commit
2f1e1558b6
@ -1543,4 +1543,20 @@ private Constants() {
|
||||
* Value: {@value}.
|
||||
*/
|
||||
public static final boolean S3EXPRESS_CREATE_SESSION_DEFAULT = true;
|
||||
|
||||
/**
|
||||
* Flag to switch to a v2 SDK HTTP signer. Value {@value}.
|
||||
*/
|
||||
public static final String HTTP_SIGNER_ENABLED = "fs.s3a.http.signer.enabled";
|
||||
|
||||
/**
|
||||
* Default value of {@link #HTTP_SIGNER_ENABLED}: {@value}.
|
||||
*/
|
||||
public static final boolean HTTP_SIGNER_ENABLED_DEFAULT = false;
|
||||
|
||||
/**
|
||||
* Classname of the http signer to use when {@link #HTTP_SIGNER_ENABLED}
|
||||
* is true: {@value}.
|
||||
*/
|
||||
public static final String HTTP_SIGNER_CLASS_NAME = "fs.s3a.http.signer.class";
|
||||
}
|
||||
|
@ -32,7 +32,9 @@
|
||||
import software.amazon.awssdk.core.interceptor.ExecutionInterceptor;
|
||||
import software.amazon.awssdk.core.retry.RetryPolicy;
|
||||
import software.amazon.awssdk.http.apache.ApacheHttpClient;
|
||||
import software.amazon.awssdk.http.auth.spi.scheme.AuthScheme;
|
||||
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
|
||||
import software.amazon.awssdk.identity.spi.AwsCredentialsIdentity;
|
||||
import software.amazon.awssdk.regions.Region;
|
||||
import software.amazon.awssdk.services.s3.S3AsyncClient;
|
||||
import software.amazon.awssdk.services.s3.S3BaseClientBuilder;
|
||||
@ -52,10 +54,15 @@
|
||||
import static org.apache.hadoop.fs.s3a.Constants.AWS_REGION;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_DEFAULT_REGION;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.CENTRAL_ENDPOINT;
|
||||
import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.REQUESTER_PAYS_HEADER;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.HTTP_SIGNER_CLASS_NAME;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.HTTP_SIGNER_ENABLED;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.HTTP_SIGNER_ENABLED_DEFAULT;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_SECURE_CONNECTIONS;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.SECURE_CONNECTIONS;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.AWS_SERVICE_IDENTIFIER_S3;
|
||||
import static org.apache.hadoop.fs.s3a.auth.SignerFactory.createHttpSigner;
|
||||
import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.REQUESTER_PAYS_HEADER;
|
||||
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.AUTH_SCHEME_AWS_SIGV_4;
|
||||
|
||||
|
||||
/**
|
||||
@ -165,11 +172,19 @@ private <BuilderT extends S3BaseClientBuilder<BuilderT, ClientT>, ClientT> Build
|
||||
.pathStyleAccessEnabled(parameters.isPathStyleAccess())
|
||||
.build();
|
||||
|
||||
return builder
|
||||
S3BaseClientBuilder s3BaseClientBuilder = builder
|
||||
.overrideConfiguration(createClientOverrideConfiguration(parameters, conf))
|
||||
.credentialsProvider(parameters.getCredentialSet())
|
||||
.disableS3ExpressSessionAuth(!parameters.isExpressCreateSession())
|
||||
.serviceConfiguration(serviceConfiguration);
|
||||
|
||||
if (conf.getBoolean(HTTP_SIGNER_ENABLED, HTTP_SIGNER_ENABLED_DEFAULT)) {
|
||||
// use an http signer through an AuthScheme
|
||||
final AuthScheme<AwsCredentialsIdentity> signer =
|
||||
createHttpSigner(conf, AUTH_SCHEME_AWS_SIGV_4, HTTP_SIGNER_CLASS_NAME);
|
||||
builder.putAuthScheme(signer);
|
||||
}
|
||||
return (BuilderT) s3BaseClientBuilder;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -177,6 +192,7 @@ private <BuilderT extends S3BaseClientBuilder<BuilderT, ClientT>, ClientT> Build
|
||||
* @param parameters parameter object
|
||||
* @param conf configuration object
|
||||
* @throws IOException any IOE raised, or translated exception
|
||||
* @throws RuntimeException some failures creating an http signer
|
||||
* @return the override configuration
|
||||
*/
|
||||
protected ClientOverrideConfiguration createClientOverrideConfiguration(
|
||||
|
@ -0,0 +1,70 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.s3a.auth;
|
||||
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import software.amazon.awssdk.http.auth.aws.signer.AwsV4HttpSigner;
|
||||
import software.amazon.awssdk.http.auth.spi.signer.AsyncSignRequest;
|
||||
import software.amazon.awssdk.http.auth.spi.signer.AsyncSignedRequest;
|
||||
import software.amazon.awssdk.http.auth.spi.signer.HttpSigner;
|
||||
import software.amazon.awssdk.http.auth.spi.signer.SignRequest;
|
||||
import software.amazon.awssdk.http.auth.spi.signer.SignedRequest;
|
||||
import software.amazon.awssdk.identity.spi.AwsCredentialsIdentity;
|
||||
|
||||
/**
|
||||
* Custom signer that delegates to the AWS V4 signer.
|
||||
* Logs at TRACE the string value of any request.
|
||||
* This is in the production code to support testing the signer plugin mechansim.
|
||||
* To use
|
||||
* <pre>
|
||||
* fs.s3a.http.signer.enabled = true
|
||||
* fs.s3a.http.signer.class = org.apache.hadoop.fs.s3a.auth.CustomHttpSigner
|
||||
* </pre>
|
||||
*/
|
||||
public final class CustomHttpSigner implements HttpSigner<AwsCredentialsIdentity> {
|
||||
private static final Logger LOG = LoggerFactory
|
||||
.getLogger(CustomHttpSigner.class);
|
||||
|
||||
/**
|
||||
* The delegate signer.
|
||||
*/
|
||||
private final HttpSigner<AwsCredentialsIdentity> delegateSigner;
|
||||
|
||||
public CustomHttpSigner() {
|
||||
delegateSigner = AwsV4HttpSigner.create();
|
||||
}
|
||||
|
||||
@Override
|
||||
public SignedRequest sign(SignRequest<? extends AwsCredentialsIdentity>
|
||||
request) {
|
||||
LOG.trace("Signing request:{}", request.request());
|
||||
return delegateSigner.sign(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<AsyncSignedRequest> signAsync(
|
||||
final AsyncSignRequest<? extends AwsCredentialsIdentity> request) {
|
||||
|
||||
LOG.trace("Signing async request:{}", request.request());
|
||||
return delegateSigner.signAsync(request);
|
||||
}
|
||||
}
|
@ -29,12 +29,20 @@
|
||||
import software.amazon.awssdk.auth.signer.AwsS3V4Signer;
|
||||
import software.amazon.awssdk.core.signer.NoOpSigner;
|
||||
import software.amazon.awssdk.core.signer.Signer;
|
||||
import software.amazon.awssdk.http.auth.spi.scheme.AuthScheme;
|
||||
import software.amazon.awssdk.http.auth.spi.signer.HttpSigner;
|
||||
import software.amazon.awssdk.identity.spi.AwsCredentialsIdentity;
|
||||
import software.amazon.awssdk.identity.spi.IdentityProvider;
|
||||
import software.amazon.awssdk.identity.spi.IdentityProviders;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.s3a.S3AUtils;
|
||||
import org.apache.hadoop.fs.s3a.impl.InstantiationIOException;
|
||||
|
||||
import static org.apache.hadoop.fs.s3a.Constants.HTTP_SIGNER_CLASS_NAME;
|
||||
import static org.apache.hadoop.fs.s3a.impl.InstantiationIOException.unavailable;
|
||||
import static org.apache.hadoop.util.Preconditions.checkArgument;
|
||||
import static org.apache.hadoop.util.Preconditions.checkState;
|
||||
|
||||
/**
|
||||
* Signer factory used to register and create signers.
|
||||
@ -119,4 +127,64 @@ public static Signer createSigner(String signerType, String configKey) throws IO
|
||||
|
||||
return signer;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create an auth scheme instance from an ID and a signer.
|
||||
* @param schemeId scheme id
|
||||
* @param signer signer
|
||||
* @return the auth scheme
|
||||
*/
|
||||
public static AuthScheme<AwsCredentialsIdentity> createAuthScheme(
|
||||
String schemeId,
|
||||
HttpSigner<AwsCredentialsIdentity> signer) {
|
||||
|
||||
return new AuthScheme<AwsCredentialsIdentity>() {
|
||||
@Override
|
||||
public String schemeId() {
|
||||
return schemeId;
|
||||
}
|
||||
@Override
|
||||
public IdentityProvider<AwsCredentialsIdentity> identityProvider(
|
||||
IdentityProviders providers) {
|
||||
return providers.identityProvider(AwsCredentialsIdentity.class);
|
||||
}
|
||||
@Override
|
||||
public HttpSigner<AwsCredentialsIdentity> signer() {
|
||||
return signer;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Create an auth scheme by looking up the signer class in the configuration,
|
||||
* loading and instantiating it.
|
||||
* @param conf configuration
|
||||
* @param scheme scheme to bond to
|
||||
* @param configKey configuration key
|
||||
* @return the auth scheme
|
||||
* @throws InstantiationIOException failure to instantiate
|
||||
* @throws IllegalStateException if the signer class is not defined
|
||||
* @throws RuntimeException other configuration problems
|
||||
*/
|
||||
public static AuthScheme<AwsCredentialsIdentity> createHttpSigner(
|
||||
Configuration conf, String scheme, String configKey) throws IOException {
|
||||
|
||||
final Class<? extends HttpSigner> clazz = conf.getClass(HTTP_SIGNER_CLASS_NAME,
|
||||
null, HttpSigner.class);
|
||||
checkState(clazz != null, "No http signer class defined in %s", configKey);
|
||||
LOG.debug("Creating http signer {} from {}", clazz, configKey);
|
||||
try {
|
||||
return createAuthScheme(scheme, clazz.newInstance());
|
||||
|
||||
} catch (InstantiationException | IllegalAccessException e) {
|
||||
throw new InstantiationIOException(
|
||||
InstantiationIOException.Kind.InstantiationFailure,
|
||||
null,
|
||||
clazz.getName(),
|
||||
HTTP_SIGNER_CLASS_NAME,
|
||||
e.toString(),
|
||||
e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -105,6 +105,7 @@ private AWSClientConfig() {
|
||||
* @param awsServiceIdentifier service
|
||||
* @return the builder inited with signer, timeouts and UA.
|
||||
* @throws IOException failure.
|
||||
* @throws RuntimeException some failures creating an http signer
|
||||
*/
|
||||
public static ClientOverrideConfiguration.Builder createClientConfigBuilder(Configuration conf,
|
||||
String awsServiceIdentifier) throws IOException {
|
||||
|
@ -286,4 +286,10 @@ private InternalConstants() {
|
||||
FS_S3A_CREATE_PERFORMANCE_ENABLED,
|
||||
DIRECTORY_OPERATIONS_PURGE_UPLOADS,
|
||||
ENABLE_MULTI_DELETE));
|
||||
|
||||
/**
|
||||
* AWS V4 Auth Scheme to use when creating signers: {@value}.
|
||||
*/
|
||||
public static final String AUTH_SCHEME_AWS_SIGV_4 = "aws.auth#sigv4";
|
||||
|
||||
}
|
||||
|
@ -0,0 +1,151 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.s3a.auth;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.ContentSummary;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
||||
import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
|
||||
import org.apache.hadoop.fs.s3a.Constants;
|
||||
import org.apache.hadoop.fs.s3a.S3AFileSystem;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
|
||||
import static org.apache.hadoop.fs.s3a.Constants.CUSTOM_SIGNERS;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.HTTP_SIGNER_CLASS_NAME;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.SIGNING_ALGORITHM_S3;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.HTTP_SIGNER_ENABLED;
|
||||
import static org.apache.hadoop.fs.s3a.MultipartTestUtils.createMagicFile;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
|
||||
|
||||
/**
|
||||
* Test the HTTP signer SPI.
|
||||
* Two different UGIs are created; ths simplifies cleanup.
|
||||
*/
|
||||
public class ITestHttpSigner extends AbstractS3ATestBase {
|
||||
private static final Logger LOG = LoggerFactory
|
||||
.getLogger(ITestHttpSigner.class);
|
||||
|
||||
private static final String TEST_ID_KEY = "TEST_ID_KEY";
|
||||
private static final String TEST_REGION_KEY = "TEST_REGION_KEY";
|
||||
|
||||
private final UserGroupInformation ugi1 = UserGroupInformation.createRemoteUser("user1");
|
||||
|
||||
private final UserGroupInformation ugi2 = UserGroupInformation.createRemoteUser("user2");
|
||||
|
||||
private String regionName;
|
||||
|
||||
private String endpoint;
|
||||
|
||||
@Override
|
||||
public void setup() throws Exception {
|
||||
super.setup();
|
||||
final S3AFileSystem fs = getFileSystem();
|
||||
final Configuration conf = fs.getConf();
|
||||
// determine the endpoint -skipping the test.
|
||||
endpoint = conf.getTrimmed(Constants.ENDPOINT, Constants.CENTRAL_ENDPOINT);
|
||||
LOG.debug("Test endpoint is {}", endpoint);
|
||||
regionName = conf.getTrimmed(Constants.AWS_REGION, "");
|
||||
if (regionName.isEmpty()) {
|
||||
regionName = determineRegion(fs.getBucket());
|
||||
}
|
||||
LOG.debug("Determined region name to be [{}] for bucket [{}]", regionName,
|
||||
fs.getBucket());
|
||||
}
|
||||
|
||||
private String determineRegion(String bucketName) throws IOException {
|
||||
return getS3AInternals().getBucketLocation(bucketName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void teardown() throws Exception {
|
||||
super.teardown();
|
||||
FileSystem.closeAllForUGI(ugi1);
|
||||
FileSystem.closeAllForUGI(ugi2);
|
||||
}
|
||||
|
||||
private Configuration createTestConfig(String identifier) {
|
||||
Configuration conf = createConfiguration();
|
||||
|
||||
removeBaseAndBucketOverrides(conf,
|
||||
CUSTOM_SIGNERS,
|
||||
SIGNING_ALGORITHM_S3);
|
||||
|
||||
conf.setBoolean(HTTP_SIGNER_ENABLED, true);
|
||||
conf.set(HTTP_SIGNER_CLASS_NAME, CustomHttpSigner.class.getName());
|
||||
|
||||
conf.set(TEST_ID_KEY, identifier);
|
||||
conf.set(TEST_REGION_KEY, regionName);
|
||||
|
||||
// make absolutely sure there is no caching.
|
||||
disableFilesystemCaching(conf);
|
||||
|
||||
return conf;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCustomSignerAndInitializer()
|
||||
throws IOException, InterruptedException {
|
||||
|
||||
final Path basePath = path(getMethodName());
|
||||
FileSystem fs1 = runStoreOperationsAndVerify(ugi1,
|
||||
new Path(basePath, "customsignerpath1"), "id1");
|
||||
|
||||
FileSystem fs2 = runStoreOperationsAndVerify(ugi2,
|
||||
new Path(basePath, "customsignerpath2"), "id2");
|
||||
}
|
||||
|
||||
private S3AFileSystem runStoreOperationsAndVerify(UserGroupInformation ugi,
|
||||
Path finalPath, String identifier)
|
||||
throws IOException, InterruptedException {
|
||||
Configuration conf = createTestConfig(identifier);
|
||||
return ugi.doAs((PrivilegedExceptionAction<S3AFileSystem>) () -> {
|
||||
S3AFileSystem fs = (S3AFileSystem)finalPath.getFileSystem(conf);
|
||||
|
||||
fs.mkdirs(finalPath);
|
||||
|
||||
// now do some more operations to make sure all is good.
|
||||
final Path subdir = new Path(finalPath, "year=1970/month=1/day=1");
|
||||
fs.mkdirs(subdir);
|
||||
|
||||
final Path file1 = new Path(subdir, "file1");
|
||||
ContractTestUtils.touch(fs, new Path(subdir, "file1"));
|
||||
fs.listStatus(subdir);
|
||||
fs.delete(file1, false);
|
||||
ContractTestUtils.touch(fs, new Path(subdir, "file1"));
|
||||
|
||||
// create a magic file.
|
||||
createMagicFile(fs, subdir);
|
||||
ContentSummary summary = fs.getContentSummary(finalPath);
|
||||
fs.getS3AInternals().abortMultipartUploads(subdir);
|
||||
fs.rename(subdir, new Path(finalPath, "renamed"));
|
||||
fs.delete(finalPath, true);
|
||||
return fs;
|
||||
});
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user