HADOOP-18997. S3A: Add option fs.s3a.s3express.create.session to disable S3Express CreateSession (#6316)
Adds a new option fs.s3a.s3express.create.session; default is true. When false, this disables the CreateSession call to create/refresh temporary session credentials when working with an Amazon S3 Express store. This avoids having to give the caller the new IAM role permission, at the expense of every remote call on the S3 Express store having to include the latency of a checkup of IAM permissions. * fs.s3a.s3express.create.session is set to false in tests which generate new role permissions and call AssumeRole * move ApiCallTimeoutException logic until after sdk exceptions get translated to IOE. This lines up for any future AWS throwing up underlying cause here. * Tests will automatically skip ACL, storage class, S3 Select or encryption tests when target fs is S3Express. * same for the out of order multipart uploader test cases, v1 listing. * bucket tool s3 test treats invalid location error as a successful invocation of the create bucket attempt Contributed by Steve Loughran
This commit is contained in:
parent
607c981042
commit
25089dc9ee
@ -1525,4 +1525,22 @@ private Constants() {
|
||||
public static final String STORE_CAPABILITY_AWS_V2 =
|
||||
"fs.s3a.capability.aws.v2";
|
||||
|
||||
/**
|
||||
* Use the S3 Express createSession() operation to authenticate with
|
||||
* S3Express storage?
|
||||
* <p>
|
||||
* Value: {@value}.
|
||||
* <p>
|
||||
* This is preferred as it is faster, but it does require extra IAM
|
||||
* permissions and is not suited to some deployments, including some
|
||||
* of the hadoop-aws test suites.
|
||||
*/
|
||||
public static final String S3EXPRESS_CREATE_SESSION =
|
||||
"fs.s3a.s3express.create.session";
|
||||
|
||||
/**
|
||||
* Default value of {@link #S3EXPRESS_CREATE_SESSION}.
|
||||
* Value: {@value}.
|
||||
*/
|
||||
public static final boolean S3EXPRESS_CREATE_SESSION_DEFAULT = true;
|
||||
}
|
||||
|
@ -144,7 +144,9 @@ public S3TransferManager createS3TransferManager(final S3AsyncClient s3AsyncClie
|
||||
|
||||
/**
|
||||
* Configure a sync or async S3 client builder.
|
||||
* This method handles all shared configuration.
|
||||
* This method handles all shared configuration, including
|
||||
* path style access, credentials and whether or not to use S3Express
|
||||
* CreateSession.
|
||||
* @param builder S3 client builder
|
||||
* @param parameters parameter object
|
||||
* @param conf configuration object
|
||||
@ -166,6 +168,7 @@ private <BuilderT extends S3BaseClientBuilder<BuilderT, ClientT>, ClientT> Build
|
||||
return builder
|
||||
.overrideConfiguration(createClientOverrideConfiguration(parameters, conf))
|
||||
.credentialsProvider(parameters.getCredentialSet())
|
||||
.disableS3ExpressSessionAuth(!parameters.isExpressCreateSession())
|
||||
.serviceConfiguration(serviceConfiguration);
|
||||
}
|
||||
|
||||
|
@ -1045,7 +1045,9 @@ private void bindAWSClient(URI name, boolean dtEnabled) throws IOException {
|
||||
.withMultipartCopyEnabled(isMultipartCopyEnabled)
|
||||
.withMultipartThreshold(multiPartThreshold)
|
||||
.withTransferManagerExecutor(unboundedThreadPool)
|
||||
.withRegion(configuredRegion);
|
||||
.withRegion(configuredRegion)
|
||||
.withExpressCreateSession(
|
||||
conf.getBoolean(S3EXPRESS_CREATE_SESSION, S3EXPRESS_CREATE_SESSION_DEFAULT));
|
||||
|
||||
S3ClientFactory clientFactory = ReflectionUtils.newInstance(s3ClientFactoryClass, conf);
|
||||
s3Client = clientFactory.createS3Client(getUri(), parameters);
|
||||
@ -4498,6 +4500,9 @@ public List<RoleModel.Statement> listAWSPolicyRules(
|
||||
// way to predict read keys, and not worried about granting
|
||||
// too much encryption access.
|
||||
statements.add(STATEMENT_ALLOW_KMS_RW);
|
||||
if (s3ExpressStore) {
|
||||
LOG.warn("S3Express store polices not yet implemented");
|
||||
}
|
||||
|
||||
return statements;
|
||||
}
|
||||
@ -5440,7 +5445,9 @@ public boolean hasPathCapability(final Path path, final String capability)
|
||||
case SelectConstants.S3_SELECT_CAPABILITY:
|
||||
// select is only supported if enabled and client side encryption is
|
||||
// disabled.
|
||||
return !isCSEEnabled && SelectBinding.isSelectEnabled(getConf());
|
||||
return !isCSEEnabled
|
||||
&& SelectBinding.isSelectEnabled(getConf())
|
||||
&& !s3ExpressStore;
|
||||
|
||||
case CommonPathCapabilities.FS_CHECKSUMS:
|
||||
// capability depends on FS configuration
|
||||
|
@ -174,20 +174,6 @@ public static IOException translateException(@Nullable String operation,
|
||||
StringUtils.isNotEmpty(path)? (" on " + path) : "",
|
||||
exception);
|
||||
|
||||
// timeout issues
|
||||
// ApiCallAttemptTimeoutException: a single HTTP request attempt failed.
|
||||
// ApiCallTimeoutException: a request with any configured retries failed.
|
||||
// The ApiCallTimeoutException exception should be the only one seen in
|
||||
// the S3A code, but for due diligence both are handled and mapped to
|
||||
// our own AWSApiCallTimeoutException.
|
||||
if (exception instanceof ApiCallTimeoutException
|
||||
|| exception instanceof ApiCallAttemptTimeoutException) {
|
||||
// An API call to an AWS service timed out.
|
||||
// This is a subclass of ConnectTimeoutException so
|
||||
// all retry logic for that exception is handled without
|
||||
// having to look down the stack for a
|
||||
return new AWSApiCallTimeoutException(message, exception);
|
||||
}
|
||||
if (!(exception instanceof AwsServiceException)) {
|
||||
// exceptions raised client-side: connectivity, auth, network problems...
|
||||
Exception innerCause = containsInterruptedException(exception);
|
||||
@ -214,6 +200,20 @@ public static IOException translateException(@Nullable String operation,
|
||||
if (ioe != null) {
|
||||
return ioe;
|
||||
}
|
||||
// timeout issues
|
||||
// ApiCallAttemptTimeoutException: a single HTTP request attempt failed.
|
||||
// ApiCallTimeoutException: a request with any configured retries failed.
|
||||
// The ApiCallTimeoutException exception should be the only one seen in
|
||||
// the S3A code, but for due diligence both are handled and mapped to
|
||||
// our own AWSApiCallTimeoutException.
|
||||
if (exception instanceof ApiCallTimeoutException
|
||||
|| exception instanceof ApiCallAttemptTimeoutException) {
|
||||
// An API call to an AWS service timed out.
|
||||
// This is a subclass of ConnectTimeoutException so
|
||||
// all retry logic for that exception is handled without
|
||||
// having to look down the stack for a
|
||||
return new AWSApiCallTimeoutException(message, exception);
|
||||
}
|
||||
// no custom handling.
|
||||
return new AWSClientIOException(message, exception);
|
||||
} else {
|
||||
|
@ -37,6 +37,7 @@
|
||||
import org.apache.hadoop.fs.s3a.statistics.StatisticsFromAwsSdk;
|
||||
|
||||
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_ENDPOINT;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.S3EXPRESS_CREATE_SESSION_DEFAULT;
|
||||
|
||||
/**
|
||||
* Factory for creation of {@link S3Client} client instances.
|
||||
@ -170,6 +171,10 @@ final class S3ClientCreationParameters {
|
||||
*/
|
||||
private String region;
|
||||
|
||||
/**
|
||||
* Enable S3Express create session.
|
||||
*/
|
||||
private boolean expressCreateSession = S3EXPRESS_CREATE_SESSION_DEFAULT;
|
||||
|
||||
/**
|
||||
* List of execution interceptors to include in the chain
|
||||
@ -422,5 +427,39 @@ public S3ClientCreationParameters withRegion(
|
||||
public String getRegion() {
|
||||
return region;
|
||||
}
|
||||
|
||||
/**
|
||||
* Should s3express createSession be called?
|
||||
* @return true if the client should enable createSession.
|
||||
*/
|
||||
public boolean isExpressCreateSession() {
|
||||
return expressCreateSession;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set builder value.
|
||||
* @param value new value
|
||||
* @return the builder
|
||||
*/
|
||||
public S3ClientCreationParameters withExpressCreateSession(final boolean value) {
|
||||
expressCreateSession = value;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "S3ClientCreationParameters{" +
|
||||
"endpoint='" + endpoint + '\'' +
|
||||
", pathStyleAccess=" + pathStyleAccess +
|
||||
", requesterPays=" + requesterPays +
|
||||
", userAgentSuffix='" + userAgentSuffix + '\'' +
|
||||
", pathUri=" + pathUri +
|
||||
", minimumPartSize=" + minimumPartSize +
|
||||
", multiPartThreshold=" + multiPartThreshold +
|
||||
", multipartCopy=" + multipartCopy +
|
||||
", region='" + region + '\'' +
|
||||
", expressCreateSession=" + expressCreateSession +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -198,6 +198,11 @@ private RolePolicies() {
|
||||
|
||||
public static final String S3_RESTORE_OBJECT = "s3:RestoreObject";
|
||||
|
||||
/**
|
||||
* S3Express session permission; required unless sessions are disabled.
|
||||
*/
|
||||
public static final String S3EXPRESS_CREATE_SESSION_POLICY = "s3express:CreateSession";
|
||||
|
||||
/**
|
||||
* Actions needed to read a file in S3 through S3A, excluding
|
||||
* SSE-KMS.
|
||||
@ -219,7 +224,7 @@ private RolePolicies() {
|
||||
*/
|
||||
private static final String[] S3_ROOT_READ_OPERATIONS =
|
||||
new String[]{
|
||||
S3_ALL_GET,
|
||||
S3_ALL_GET
|
||||
};
|
||||
|
||||
public static final List<String> S3_ROOT_READ_OPERATIONS_LIST =
|
||||
@ -234,7 +239,7 @@ private RolePolicies() {
|
||||
public static final String[] S3_BUCKET_READ_OPERATIONS =
|
||||
new String[]{
|
||||
S3_ALL_GET,
|
||||
S3_BUCKET_ALL_LIST,
|
||||
S3_BUCKET_ALL_LIST
|
||||
};
|
||||
|
||||
/**
|
||||
@ -248,7 +253,7 @@ private RolePolicies() {
|
||||
S3_PUT_OBJECT,
|
||||
S3_PUT_OBJECT_ACL,
|
||||
S3_DELETE_OBJECT,
|
||||
S3_ABORT_MULTIPART_UPLOAD,
|
||||
S3_ABORT_MULTIPART_UPLOAD
|
||||
}));
|
||||
|
||||
/**
|
||||
@ -276,7 +281,7 @@ private RolePolicies() {
|
||||
S3_PUT_OBJECT,
|
||||
S3_PUT_OBJECT_ACL,
|
||||
S3_DELETE_OBJECT,
|
||||
S3_ABORT_MULTIPART_UPLOAD,
|
||||
S3_ABORT_MULTIPART_UPLOAD
|
||||
}));
|
||||
|
||||
/**
|
||||
|
@ -57,7 +57,7 @@ private S3ExpressStorage() {
|
||||
* This may get confused against third party stores, so takes the endpoint
|
||||
* and only supports aws endpoints round the world.
|
||||
* @param bucket bucket to probe
|
||||
* @param endpoint endpoint string.
|
||||
* @param endpoint endpoint string. If empty, this is considered an AWS endpoint.
|
||||
* @return true if the store is S3 Express.
|
||||
*/
|
||||
public static boolean isS3ExpressStore(String bucket, final String endpoint) {
|
||||
|
@ -29,6 +29,7 @@
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestConstants.KEY_SCALE_TESTS_ENABLED;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestConstants.SCALE_TEST_TIMEOUT_MILLIS;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.assumeNotS3ExpressFileSystem;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestPropertyBool;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestPropertyBytes;
|
||||
import static org.apache.hadoop.fs.s3a.scale.AbstractSTestS3AHugeFiles.DEFAULT_HUGE_PARTITION_SIZE;
|
||||
@ -116,4 +117,16 @@ public void testDirectoryInTheWay() throws Exception {
|
||||
public void testMultipartUploadReverseOrder() throws Exception {
|
||||
skip("skipped for speed");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void testMultipartUploadReverseOrderNonContiguousPartNumbers() throws Exception {
|
||||
assumeNotS3ExpressFileSystem(getFileSystem());
|
||||
super.testMultipartUploadReverseOrderNonContiguousPartNumbers();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void testConcurrentUploads() throws Throwable {
|
||||
assumeNotS3ExpressFileSystem(getFileSystem());
|
||||
super.testConcurrentUploads();
|
||||
}
|
||||
}
|
||||
|
@ -27,6 +27,7 @@
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestConstants.KEY_LIST_V1_ENABLED;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfNotEnabled;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfS3ExpressBucket;
|
||||
|
||||
/**
|
||||
* S3A contract tests for getFileStatus, using the v1 List Objects API.
|
||||
@ -52,6 +53,7 @@ protected Configuration createConfiguration() {
|
||||
disableFilesystemCaching(conf);
|
||||
skipIfNotEnabled(conf, KEY_LIST_V1_ENABLED,
|
||||
"Skipping V1 listing tests");
|
||||
skipIfS3ExpressBucket(conf);
|
||||
conf.setInt(Constants.MAX_PAGING_KEYS, 2);
|
||||
|
||||
// Use v1 List Objects API
|
||||
|
@ -39,6 +39,7 @@
|
||||
import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
|
||||
import org.apache.hadoop.fs.s3a.impl.ContextAccessors;
|
||||
import org.apache.hadoop.fs.s3a.impl.NetworkBinding;
|
||||
import org.apache.hadoop.fs.s3a.impl.S3ExpressStorage;
|
||||
import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum;
|
||||
import org.apache.hadoop.fs.s3a.impl.StoreContext;
|
||||
import org.apache.hadoop.fs.s3a.impl.StoreContextBuilder;
|
||||
@ -76,6 +77,7 @@
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.UncheckedIOException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
@ -470,12 +472,24 @@ public static void disableFilesystemCaching(Configuration conf) {
|
||||
}
|
||||
|
||||
/**
|
||||
* Skip a test if encryption tests are disabled.
|
||||
* Disable S3Express createSession calls.
|
||||
* @param conf configuration to patch
|
||||
* @return the configuration.
|
||||
*/
|
||||
public static Configuration disableCreateSession(Configuration conf) {
|
||||
conf.setBoolean(S3EXPRESS_CREATE_SESSION, false);
|
||||
return conf;
|
||||
}
|
||||
|
||||
/**
|
||||
* Skip a test if encryption tests are disabled,
|
||||
* or the bucket is an S3Express bucket.
|
||||
* @param configuration configuration to probe
|
||||
*/
|
||||
public static void skipIfEncryptionTestsDisabled(
|
||||
Configuration configuration) {
|
||||
skipIfNotEnabled(configuration, KEY_ENCRYPTION_TESTS, "Skipping encryption tests");
|
||||
skipIfS3ExpressBucket(configuration);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -493,23 +507,72 @@ public static void skipIfNotEnabled(final Configuration configuration,
|
||||
}
|
||||
|
||||
/**
|
||||
* Skip a test if storage class tests are disabled.
|
||||
* Skip a test if storage class tests are disabled,
|
||||
* or the bucket is an S3Express bucket.
|
||||
* @param configuration configuration to probe
|
||||
*/
|
||||
public static void skipIfStorageClassTestsDisabled(
|
||||
Configuration configuration) {
|
||||
skipIfNotEnabled(configuration, KEY_STORAGE_CLASS_TESTS_ENABLED,
|
||||
"Skipping storage class tests");
|
||||
skipIfS3ExpressBucket(configuration);
|
||||
}
|
||||
|
||||
/**
|
||||
* Skip a test if ACL class tests are disabled.
|
||||
* Skip a test if ACL class tests are disabled,
|
||||
* or the bucket is an S3Express bucket.
|
||||
* @param configuration configuration to probe
|
||||
*/
|
||||
public static void skipIfACLTestsDisabled(
|
||||
Configuration configuration) {
|
||||
skipIfNotEnabled(configuration, KEY_ACL_TESTS_ENABLED,
|
||||
"Skipping storage class ACL tests");
|
||||
skipIfS3ExpressBucket(configuration);
|
||||
}
|
||||
|
||||
/**
|
||||
* Skip a test if the test bucket is an S3Express bucket.
|
||||
* @param configuration configuration to probe
|
||||
*/
|
||||
public static void skipIfS3ExpressBucket(
|
||||
Configuration configuration) {
|
||||
assume("Skipping test as bucket is an S3Express bucket",
|
||||
!isS3ExpressTestBucket(configuration));
|
||||
}
|
||||
|
||||
/**
|
||||
* Is the test bucket an S3Express bucket?
|
||||
* @param conf configuration
|
||||
* @return true if the bucket is an S3Express bucket.
|
||||
*/
|
||||
public static boolean isS3ExpressTestBucket(final Configuration conf) {
|
||||
return S3ExpressStorage.isS3ExpressStore(getTestBucketName(conf), "");
|
||||
}
|
||||
|
||||
/**
|
||||
* Skip a test if the filesystem lacks a required capability.
|
||||
* @param fs filesystem
|
||||
* @param capability capability
|
||||
*/
|
||||
public static void assumePathCapability(FileSystem fs, String capability) {
|
||||
try {
|
||||
assume("Filesystem lacks capability " + capability,
|
||||
fs.hasPathCapability(new Path("/"), capability));
|
||||
} catch (IOException e) {
|
||||
throw new UncheckedIOException(e);
|
||||
}
|
||||
} /**
|
||||
* Skip a test if the filesystem has a required capability.
|
||||
* @param fs filesystem
|
||||
* @param capability capability
|
||||
*/
|
||||
public static void assumePathCapabilityFalse(FileSystem fs, String capability) {
|
||||
try {
|
||||
assume("Filesystem has capability " + capability,
|
||||
!fs.hasPathCapability(new Path("/"), capability));
|
||||
} catch (IOException e) {
|
||||
throw new UncheckedIOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -1016,16 +1079,14 @@ public static void expectErrorCode(final int code, final ExitUtil.ExitException
|
||||
* Require a test case to be against Amazon S3 Express store.
|
||||
*/
|
||||
public static void assumeS3ExpressFileSystem(final FileSystem fs) throws IOException {
|
||||
assume("store is not S3 Express: " + fs.getUri(),
|
||||
fs.hasPathCapability(new Path("/"), STORE_CAPABILITY_S3_EXPRESS_STORAGE));
|
||||
assumePathCapability(fs, STORE_CAPABILITY_S3_EXPRESS_STORAGE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Require a test case to be against a standard S3 store.
|
||||
*/
|
||||
public static void assumeNotS3ExpressFileSystem(final FileSystem fs) throws IOException {
|
||||
assume("store is S3 Express: " + fs.getUri(),
|
||||
!fs.hasPathCapability(new Path("/"), STORE_CAPABILITY_S3_EXPRESS_STORAGE));
|
||||
public static void assumeNotS3ExpressFileSystem(final FileSystem fs) {
|
||||
assumePathCapabilityFalse(fs, STORE_CAPABILITY_S3_EXPRESS_STORAGE);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -59,6 +59,7 @@
|
||||
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.*;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.S3EXPRESS_CREATE_SESSION;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
|
||||
import static org.apache.hadoop.fs.s3a.auth.CredentialProviderListFactory.E_FORBIDDEN_AWS_PROVIDER;
|
||||
import static org.apache.hadoop.fs.s3a.auth.RoleTestUtils.*;
|
||||
@ -103,6 +104,13 @@ public class ITestAssumeRole extends AbstractS3ATestBase {
|
||||
protected static final String VALIDATION_ERROR
|
||||
= "ValidationError";
|
||||
|
||||
@Override
|
||||
protected Configuration createConfiguration() {
|
||||
final Configuration conf = super.createConfiguration();
|
||||
removeBaseAndBucketOverrides(conf, S3EXPRESS_CREATE_SESSION);
|
||||
return conf;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setup() throws Exception {
|
||||
super.setup();
|
||||
@ -181,6 +189,10 @@ protected Configuration createValidRoleConf() throws JsonProcessingException {
|
||||
conf.set(ASSUMED_ROLE_ARN, roleARN);
|
||||
conf.set(ASSUMED_ROLE_SESSION_NAME, "valid");
|
||||
conf.set(ASSUMED_ROLE_SESSION_DURATION, "45m");
|
||||
// disable create session so there's no need to
|
||||
// add a role policy for it.
|
||||
disableCreateSession(conf);
|
||||
|
||||
bindRolePolicy(conf, RESTRICTED_POLICY);
|
||||
return conf;
|
||||
}
|
||||
@ -737,12 +749,14 @@ public void testBucketLocationForbidden() throws Throwable {
|
||||
|
||||
describe("Restrict role to read only");
|
||||
Configuration conf = createAssumedRoleConfig();
|
||||
|
||||
bindRolePolicyStatements(conf, STATEMENT_ALLOW_KMS_RW,
|
||||
statement(true, S3_ALL_BUCKETS, S3_ALL_OPERATIONS),
|
||||
statement(false, S3_ALL_BUCKETS, S3_GET_BUCKET_LOCATION));
|
||||
Path path = methodPath();
|
||||
roleFS = (S3AFileSystem) path.getFileSystem(conf);
|
||||
|
||||
// getBucketLocation fails with error
|
||||
assumeNotS3ExpressFileSystem(roleFS);
|
||||
forbidden("",
|
||||
() -> roleFS.getBucketLocation());
|
||||
S3GuardTool.BucketInfo infocmd = new S3GuardTool.BucketInfo(conf);
|
||||
|
@ -30,6 +30,7 @@
|
||||
|
||||
import static org.apache.hadoop.fs.s3a.Constants.ASSUMED_ROLE_ARN;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableCreateSession;
|
||||
import static org.apache.hadoop.fs.s3a.auth.RoleModel.*;
|
||||
import static org.apache.hadoop.fs.s3a.auth.RolePolicies.*;
|
||||
import static org.apache.hadoop.fs.s3a.auth.RoleTestUtils.*;
|
||||
@ -53,6 +54,11 @@ public class ITestAssumedRoleCommitOperations extends ITestCommitOperations {
|
||||
*/
|
||||
private S3AFileSystem roleFS;
|
||||
|
||||
@Override
|
||||
protected Configuration createConfiguration() {
|
||||
return disableCreateSession(super.createConfiguration());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setup() throws Exception {
|
||||
super.setup();
|
||||
|
@ -38,6 +38,8 @@
|
||||
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.*;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.S3EXPRESS_CREATE_SESSION;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableCreateSession;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
|
||||
import static org.apache.hadoop.fs.s3a.auth.RoleModel.*;
|
||||
@ -155,13 +157,16 @@ public static Configuration newAssumedRoleConfig(
|
||||
DELEGATION_TOKEN_BINDING,
|
||||
ASSUMED_ROLE_ARN,
|
||||
AWS_CREDENTIALS_PROVIDER,
|
||||
ASSUMED_ROLE_SESSION_DURATION);
|
||||
ASSUMED_ROLE_SESSION_DURATION,
|
||||
S3EXPRESS_CREATE_SESSION);
|
||||
|
||||
conf.set(AWS_CREDENTIALS_PROVIDER, AssumedRoleCredentialProvider.NAME);
|
||||
conf.set(ASSUMED_ROLE_ARN, roleARN);
|
||||
conf.set(ASSUMED_ROLE_SESSION_NAME, "test");
|
||||
conf.set(ASSUMED_ROLE_SESSION_DURATION, "15m");
|
||||
// force in bucket resolution during startup
|
||||
conf.setInt(S3A_BUCKET_PROBE, 1);
|
||||
disableCreateSession(conf);
|
||||
disableFilesystemCaching(conf);
|
||||
return conf;
|
||||
}
|
||||
|
@ -66,8 +66,10 @@
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.*;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.assumeSessionTestsEnabled;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableCreateSession;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.isS3ExpressTestBucket;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.unsetHadoopCredentialProviders;
|
||||
import static org.apache.hadoop.fs.s3a.S3AUtils.getEncryptionAlgorithm;
|
||||
@ -147,40 +149,50 @@ protected Configuration createConfiguration() {
|
||||
// disable if assume role opts are off
|
||||
assumeSessionTestsEnabled(conf);
|
||||
disableFilesystemCaching(conf);
|
||||
String s3EncryptionMethod;
|
||||
try {
|
||||
s3EncryptionMethod =
|
||||
getEncryptionAlgorithm(getTestBucketName(conf), conf).getMethod();
|
||||
} catch (IOException e) {
|
||||
throw new UncheckedIOException("Failed to lookup encryption algorithm.",
|
||||
e);
|
||||
}
|
||||
String s3EncryptionKey = getS3EncryptionKey(getTestBucketName(conf), conf);
|
||||
final String bucket = getTestBucketName(conf);
|
||||
final boolean isS3Express = isS3ExpressTestBucket(conf);
|
||||
|
||||
removeBaseAndBucketOverrides(conf,
|
||||
DELEGATION_TOKEN_BINDING,
|
||||
Constants.S3_ENCRYPTION_ALGORITHM,
|
||||
Constants.S3_ENCRYPTION_KEY,
|
||||
SERVER_SIDE_ENCRYPTION_ALGORITHM,
|
||||
SERVER_SIDE_ENCRYPTION_KEY);
|
||||
SERVER_SIDE_ENCRYPTION_KEY,
|
||||
S3EXPRESS_CREATE_SESSION);
|
||||
conf.set(HADOOP_SECURITY_AUTHENTICATION,
|
||||
UserGroupInformation.AuthenticationMethod.KERBEROS.name());
|
||||
enableDelegationTokens(conf, getDelegationBinding());
|
||||
conf.set(AWS_CREDENTIALS_PROVIDER, " ");
|
||||
// switch to CSE-KMS(if specified) else SSE-KMS.
|
||||
if (conf.getBoolean(KEY_ENCRYPTION_TESTS, true)) {
|
||||
if (!isS3Express && conf.getBoolean(KEY_ENCRYPTION_TESTS, true)) {
|
||||
String s3EncryptionMethod;
|
||||
try {
|
||||
s3EncryptionMethod =
|
||||
getEncryptionAlgorithm(bucket, conf).getMethod();
|
||||
} catch (IOException e) {
|
||||
throw new UncheckedIOException("Failed to lookup encryption algorithm.",
|
||||
e);
|
||||
}
|
||||
String s3EncryptionKey = getS3EncryptionKey(bucket, conf);
|
||||
|
||||
conf.set(Constants.S3_ENCRYPTION_ALGORITHM, s3EncryptionMethod);
|
||||
// KMS key ID a must if CSE-KMS is being tested.
|
||||
conf.set(Constants.S3_ENCRYPTION_KEY, s3EncryptionKey);
|
||||
}
|
||||
// set the YARN RM up for YARN tests.
|
||||
conf.set(YarnConfiguration.RM_PRINCIPAL, YARN_RM);
|
||||
|
||||
if (conf.getBoolean(KEY_ACL_TESTS_ENABLED, false) && !isS3Express) {
|
||||
// turn on ACLs so as to verify role DT permissions include
|
||||
// write access.
|
||||
conf.set(CANNED_ACL, LOG_DELIVERY_WRITE);
|
||||
}
|
||||
// disable create session so there's no need to
|
||||
// add a role policy for it.
|
||||
disableCreateSession(conf);
|
||||
return conf;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void setup() throws Exception {
|
||||
// clear any existing tokens from the FS
|
||||
|
@ -95,6 +95,7 @@ public abstract class AbstractCommitITest extends AbstractS3ATestBase {
|
||||
@Override
|
||||
protected Configuration createConfiguration() {
|
||||
Configuration conf = super.createConfiguration();
|
||||
disableFilesystemCaching(conf);
|
||||
String bucketName = getTestBucketName(conf);
|
||||
removeBucketOverrides(bucketName, conf,
|
||||
MAGIC_COMMITTER_ENABLED,
|
||||
|
@ -289,8 +289,11 @@ private Configuration createAssumedRoleConfig(String roleARN) {
|
||||
roleARN);
|
||||
removeBaseAndBucketOverrides(conf,
|
||||
DELEGATION_TOKEN_BINDING,
|
||||
ENABLE_MULTI_DELETE);
|
||||
ENABLE_MULTI_DELETE,
|
||||
S3EXPRESS_CREATE_SESSION);
|
||||
conf.setBoolean(ENABLE_MULTI_DELETE, multiDelete);
|
||||
disableCreateSession(conf);
|
||||
disableFilesystemCaching(conf);
|
||||
return conf;
|
||||
}
|
||||
|
||||
|
@ -61,6 +61,7 @@
|
||||
import static org.apache.hadoop.fs.s3a.S3AUtils.HIDDEN_FILE_FILTER;
|
||||
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_COMMITTER_ENABLED;
|
||||
import static org.apache.hadoop.mapreduce.lib.input.FileInputFormat.LIST_STATUS_NUM_THREADS;
|
||||
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
||||
import static org.apache.hadoop.util.ToolRunner.run;
|
||||
import static org.apache.hadoop.util.functional.RemoteIterators.foreach;
|
||||
import static org.apache.hadoop.util.functional.RemoteIterators.remoteIteratorFromArray;
|
||||
@ -307,9 +308,18 @@ public void testDistCp() throws Throwable {
|
||||
final Path src = createDirWithUpload();
|
||||
final Path dest = new Path(base, "dest");
|
||||
file(new Path(src, "real-file"));
|
||||
// distcp fails if uploads are visible
|
||||
final String options = "-useiterator -update -delete -direct";
|
||||
if (!fs.hasPathCapability(base, DIRECTORY_LISTING_INCONSISTENT)) {
|
||||
DistCpTestUtils.assertRunDistCp(DistCpConstants.SUCCESS, src.toString(), dest.toString(),
|
||||
"-useiterator -update -delete -direct", getConfiguration());
|
||||
options, getConfiguration());
|
||||
} else {
|
||||
// distcp fails if uploads are visible
|
||||
intercept(org.junit.ComparisonFailure.class, () -> {
|
||||
DistCpTestUtils.assertRunDistCp(DistCpConstants.SUCCESS, src.toString(), dest.toString(),
|
||||
options, getConfiguration());
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -321,9 +331,18 @@ public void testDistCpNoIterator() throws Throwable {
|
||||
final Path dest = new Path(base, "dest");
|
||||
file(new Path(src, "real-file"));
|
||||
|
||||
// distcp fails if uploads are visible
|
||||
final String options = "-update -delete -direct";
|
||||
if (!fs.hasPathCapability(base, DIRECTORY_LISTING_INCONSISTENT)) {
|
||||
DistCpTestUtils.assertRunDistCp(DistCpConstants.SUCCESS, src.toString(), dest.toString(),
|
||||
"-update -delete -direct", getConfiguration());
|
||||
options, getConfiguration());
|
||||
} else {
|
||||
// distcp fails if uploads are visible
|
||||
intercept(org.junit.ComparisonFailure.class, () -> {
|
||||
DistCpTestUtils.assertRunDistCp(DistCpConstants.SUCCESS, src.toString(), dest.toString(),
|
||||
options, getConfiguration());
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -66,6 +66,7 @@
|
||||
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
|
||||
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfS3ExpressBucket;
|
||||
import static org.apache.hadoop.fs.s3a.select.CsvFile.ALL_QUOTES;
|
||||
import static org.apache.hadoop.fs.s3a.select.SelectBinding.expandBackslashChars;
|
||||
import static org.apache.hadoop.fs.s3a.select.SelectConstants.*;
|
||||
@ -99,6 +100,13 @@ public class ITestS3Select extends AbstractS3SelectTest {
|
||||
/** CSV file with fewer columns than expected, all fields parse badly. */
|
||||
private Path brokenCSV;
|
||||
|
||||
@Override
|
||||
protected Configuration createConfiguration() {
|
||||
final Configuration conf = super.createConfiguration();
|
||||
skipIfS3ExpressBucket(conf);
|
||||
return conf;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setup() throws Exception {
|
||||
super.setup();
|
||||
|
@ -83,11 +83,14 @@ public void setup() throws Exception {
|
||||
selectConf = new Configuration(getConfiguration());
|
||||
localFile = getTempFilename();
|
||||
landsatSrc = getLandsatGZ().toString();
|
||||
final S3AFileSystem landsatFS = getLandsatFS();
|
||||
ChangeDetectionPolicy changeDetectionPolicy =
|
||||
getLandsatFS().getChangeDetectionPolicy();
|
||||
landsatFS.getChangeDetectionPolicy();
|
||||
Assume.assumeFalse("the standard landsat bucket doesn't have versioning",
|
||||
changeDetectionPolicy.getSource() == Source.VersionId
|
||||
&& changeDetectionPolicy.isRequireVersion());
|
||||
Assume.assumeTrue("S3 Select is not enabled",
|
||||
landsatFS.hasPathCapability(new Path("/"), S3_SELECT_CAPABILITY));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -86,6 +86,9 @@ public class ITestBucketTool extends AbstractS3ATestBase {
|
||||
*/
|
||||
public static final String USWEST_AZ_2 = "usw2-az2";
|
||||
|
||||
public static final String INVALID_LOCATION =
|
||||
"Invalid Name value for Location configuration";
|
||||
|
||||
private String bucketName;
|
||||
|
||||
private boolean s3ExpressStore;
|
||||
@ -118,7 +121,10 @@ public void testRecreateTestBucketS3Express() throws Throwable {
|
||||
fsURI));
|
||||
if (ex instanceof AWSBadRequestException) {
|
||||
// owned error
|
||||
assertExceptionContains(OWNED, ex);
|
||||
if (!ex.getMessage().contains(OWNED)
|
||||
&& !ex.getMessage().contains(INVALID_LOCATION)) {
|
||||
throw ex;
|
||||
}
|
||||
} else if (ex instanceof UnknownHostException) {
|
||||
// endpoint region stuff, expect the error to include the s3express endpoint
|
||||
// name
|
||||
|
Loading…
Reference in New Issue
Block a user