HADOOP-12020. Add s3a storage class option fs.s3a.create.storage.class (#3877)

Adds a new option fs.s3a.create.storage.class which can
be used to set the storage class for files created in AWS S3.
Consult the documentation for details and instructions on how
disable the relevant tests when testing against third-party
stores.

Contributed by Monthon Klongklaew
This commit is contained in:
monthonk 2022-06-08 19:05:17 +01:00 committed by GitHub
parent 0a65883c50
commit 5ac55b405d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 597 additions and 6 deletions

View File

@ -429,6 +429,57 @@ private Constants() {
*/ */
public static final String CONTENT_ENCODING = "fs.s3a.object.content.encoding"; public static final String CONTENT_ENCODING = "fs.s3a.object.content.encoding";
/**
* S3 storage class: standard, reduced_redundancy, intelligent_tiering etc.
* Value {@value }.
*/
public static final String STORAGE_CLASS = "fs.s3a.create.storage.class";
/**
* S3 Storage option: {@value}.
*/
public static final String STORAGE_CLASS_STANDARD = "standard";
/**
* S3 Storage option: {@value}.
*/
public static final String STORAGE_CLASS_REDUCED_REDUNDANCY = "reduced_redundancy";
/**
* S3 Storage option: {@value}.
*/
public static final String STORAGE_CLASS_GLACIER = "glacier";
/**
* S3 Storage option: {@value}.
*/
public static final String STORAGE_CLASS_STANDARD_INFREQUENT_ACCESS = "standard_ia";
/**
* S3 Storage option: {@value}.
*/
public static final String STORAGE_CLASS_ONEZONE_INFREQUENT_ACCESS = "onezone_ia";
/**
* S3 Storage option: {@value}.
*/
public static final String STORAGE_CLASS_INTELLIGENT_TIERING = "intelligent_tiering";
/**
* S3 Storage option: {@value}.
*/
public static final String STORAGE_CLASS_DEEP_ARCHIVE = "deep_archive";
/**
* S3 Storage option: {@value}.
*/
public static final String STORAGE_CLASS_OUTPOSTS = "outposts";
/**
* S3 Storage option: {@value}.
*/
public static final String STORAGE_CLASS_GLACIER_INSTANT_RETRIEVAL = "glacier_ir";
// should we try to purge old multipart uploads when starting up // should we try to purge old multipart uploads when starting up
public static final String PURGE_EXISTING_MULTIPART = public static final String PURGE_EXISTING_MULTIPART =
"fs.s3a.multipart.purge"; "fs.s3a.multipart.purge";

View File

@ -32,6 +32,7 @@
import java.util.Date; import java.util.Date;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.List; import java.util.List;
import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
@ -66,6 +67,7 @@
import com.amazonaws.services.s3.model.PutObjectRequest; import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.PutObjectResult; import com.amazonaws.services.s3.model.PutObjectResult;
import com.amazonaws.services.s3.model.S3Object; import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.StorageClass;
import com.amazonaws.services.s3.model.UploadPartRequest; import com.amazonaws.services.s3.model.UploadPartRequest;
import com.amazonaws.services.s3.model.UploadPartResult; import com.amazonaws.services.s3.model.UploadPartResult;
import com.amazonaws.services.s3.transfer.Copy; import com.amazonaws.services.s3.transfer.Copy;
@ -963,6 +965,18 @@ protected RequestFactory createRequestFactory() {
// Any encoding type // Any encoding type
String contentEncoding = getConf().getTrimmed(CONTENT_ENCODING, null); String contentEncoding = getConf().getTrimmed(CONTENT_ENCODING, null);
String storageClassConf = getConf()
.getTrimmed(STORAGE_CLASS, "")
.toUpperCase(Locale.US);
StorageClass storageClass;
try {
storageClass = StorageClass.fromValue(storageClassConf);
} catch (IllegalArgumentException e) {
LOG.warn("Unknown storage class property {}: {}; falling back to default storage class",
STORAGE_CLASS, storageClassConf);
storageClass = null;
}
return RequestFactoryImpl.builder() return RequestFactoryImpl.builder()
.withBucket(requireNonNull(bucket)) .withBucket(requireNonNull(bucket))
.withCannedACL(getCannedACL()) .withCannedACL(getCannedACL())
@ -970,6 +984,7 @@ protected RequestFactory createRequestFactory() {
.withMultipartPartCountLimit(partCountLimit) .withMultipartPartCountLimit(partCountLimit)
.withRequestPreparer(getAuditManager()::requestCreated) .withRequestPreparer(getAuditManager()::requestCreated)
.withContentEncoding(contentEncoding) .withContentEncoding(contentEncoding)
.withStorageClass(storageClass)
.build(); .build();
} }

View File

@ -44,6 +44,7 @@
import com.amazonaws.services.s3.model.SSEAwsKeyManagementParams; import com.amazonaws.services.s3.model.SSEAwsKeyManagementParams;
import com.amazonaws.services.s3.model.SSECustomerKey; import com.amazonaws.services.s3.model.SSECustomerKey;
import com.amazonaws.services.s3.model.SelectObjectContentRequest; import com.amazonaws.services.s3.model.SelectObjectContentRequest;
import com.amazonaws.services.s3.model.StorageClass;
import com.amazonaws.services.s3.model.UploadPartRequest; import com.amazonaws.services.s3.model.UploadPartRequest;
import org.apache.hadoop.fs.PathIOException; import org.apache.hadoop.fs.PathIOException;
@ -106,6 +107,12 @@ public interface RequestFactory {
*/ */
String getContentEncoding(); String getContentEncoding();
/**
* Get the object storage class, return null if none.
* @return storage class
*/
StorageClass getStorageClass();
/** /**
* Create a new object metadata instance. * Create a new object metadata instance.
* Any standard metadata headers are added here, for example: * Any standard metadata headers are added here, for example:

View File

@ -46,6 +46,7 @@
import com.amazonaws.services.s3.model.SSEAwsKeyManagementParams; import com.amazonaws.services.s3.model.SSEAwsKeyManagementParams;
import com.amazonaws.services.s3.model.SSECustomerKey; import com.amazonaws.services.s3.model.SSECustomerKey;
import com.amazonaws.services.s3.model.SelectObjectContentRequest; import com.amazonaws.services.s3.model.SelectObjectContentRequest;
import com.amazonaws.services.s3.model.StorageClass;
import com.amazonaws.services.s3.model.UploadPartRequest; import com.amazonaws.services.s3.model.UploadPartRequest;
import org.apache.hadoop.util.Preconditions; import org.apache.hadoop.util.Preconditions;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -116,6 +117,11 @@ public class RequestFactoryImpl implements RequestFactory {
*/ */
private final String contentEncoding; private final String contentEncoding;
/**
* Storage class.
*/
private final StorageClass storageClass;
/** /**
* Constructor. * Constructor.
* @param builder builder with all the configuration. * @param builder builder with all the configuration.
@ -128,6 +134,7 @@ protected RequestFactoryImpl(
this.multipartPartCountLimit = builder.multipartPartCountLimit; this.multipartPartCountLimit = builder.multipartPartCountLimit;
this.requestPreparer = builder.requestPreparer; this.requestPreparer = builder.requestPreparer;
this.contentEncoding = builder.contentEncoding; this.contentEncoding = builder.contentEncoding;
this.storageClass = builder.storageClass;
} }
/** /**
@ -200,6 +207,15 @@ public String getContentEncoding() {
return contentEncoding; return contentEncoding;
} }
/**
* Get the object storage class, return null if none.
* @return storage class
*/
@Override
public StorageClass getStorageClass() {
return storageClass;
}
/** /**
* Sets server side encryption parameters to the part upload * Sets server side encryption parameters to the part upload
* request when encryption is enabled. * request when encryption is enabled.
@ -343,7 +359,7 @@ protected void copyEncryptionParameters(
} }
/** /**
* Create a putObject request. * Create a putObject request.
* Adds the ACL and metadata * Adds the ACL, storage class and metadata
* @param key key of object * @param key key of object
* @param metadata metadata header * @param metadata metadata header
* @param srcfile source file * @param srcfile source file
@ -357,6 +373,9 @@ public PutObjectRequest newPutObjectRequest(String key,
srcfile); srcfile);
setOptionalPutRequestParameters(putObjectRequest); setOptionalPutRequestParameters(putObjectRequest);
putObjectRequest.setCannedAcl(cannedACL); putObjectRequest.setCannedAcl(cannedACL);
if (storageClass != null) {
putObjectRequest.setStorageClass(storageClass);
}
putObjectRequest.setMetadata(metadata); putObjectRequest.setMetadata(metadata);
return prepareRequest(putObjectRequest); return prepareRequest(putObjectRequest);
} }
@ -431,6 +450,9 @@ public InitiateMultipartUploadRequest newMultipartUploadRequest(
destKey, destKey,
newObjectMetadata(-1)); newObjectMetadata(-1));
initiateMPURequest.setCannedACL(getCannedACL()); initiateMPURequest.setCannedACL(getCannedACL());
if (getStorageClass() != null) {
initiateMPURequest.withStorageClass(getStorageClass());
}
setOptionalMultipartUploadRequestParameters(initiateMPURequest); setOptionalMultipartUploadRequestParameters(initiateMPURequest);
return prepareRequest(initiateMPURequest); return prepareRequest(initiateMPURequest);
} }
@ -610,6 +632,11 @@ public static final class RequestFactoryBuilder {
/** Content Encoding. */ /** Content Encoding. */
private String contentEncoding; private String contentEncoding;
/**
* Storage class.
*/
private StorageClass storageClass;
/** /**
* Multipart limit. * Multipart limit.
*/ */
@ -641,6 +668,16 @@ public RequestFactoryBuilder withContentEncoding(final String value) {
return this; return this;
} }
/**
* Storage class.
* @param value new value
* @return the builder
*/
public RequestFactoryBuilder withStorageClass(final StorageClass value) {
storageClass = value;
return this;
}
/** /**
* Target bucket. * Target bucket.
* @param value new value * @param value new value

View File

@ -1079,6 +1079,17 @@ options are covered in [Testing](./testing.md).
</description> </description>
</property> </property>
<property>
<name>fs.s3a.create.storage.class</name>
<value></value>
<description>
Storage class: standard, reduced_redundancy, intelligent_tiering, etc.
Specify the storage class for S3A PUT object requests.
If not set the storage class will be null
and mapped to default standard class on S3.
</description>
</property>
``` ```
## <a name="retry_and_recovery"></a>Retry and Recovery ## <a name="retry_and_recovery"></a>Retry and Recovery
@ -1650,6 +1661,26 @@ To enable this feature within S3A, configure the `fs.s3a.requester.pays.enabled`
</property> </property>
``` ```
## <a name="storage_classes"></a>Storage Classes
Amazon S3 offers a range of [Storage Classes](https://aws.amazon.com/s3/storage-classes/)
that you can choose from based on behavior of your applications. By using the right
storage class, you can reduce the cost of your bucket.
S3A uses Standard storage class for PUT object requests by default, which is suitable for
general use cases. To use a specific storage class, set the value in `fs.s3a.create.storage.class` property to
the storage class you want.
```xml
<property>
<name>fs.s3a.create.storage.class</name>
<value>intelligent_tiering</value>
</property>
```
Please note that S3A does not support reading from archive storage classes at the moment.
`AccessDeniedException` with InvalidObjectState will be thrown if you're trying to do so.
## <a name="upload"></a>How S3A writes data to S3 ## <a name="upload"></a>How S3A writes data to S3
The original S3A client implemented file writes by The original S3A client implemented file writes by

View File

@ -572,6 +572,18 @@ can be turned off.
Encryption is only used for those specific test suites with `Encryption` in Encryption is only used for those specific test suites with `Encryption` in
their classname. their classname.
### Disabling the storage class tests
When running storage class tests against third party object store that doesn't support
S3 storage class, these tests might fail. They can be disabled.
```xml
<property>
<name>test.fs.s3a.create.storage.class.enabled</name>
<value>false</value>
</property>
```
### Configuring the CSV file read tests** ### Configuring the CSV file read tests**
To test on alternate infrastructures supporting To test on alternate infrastructures supporting

View File

@ -561,17 +561,34 @@ Error Code: 403 Forbidden; Request ID: myshortreqid; S3 Extended Request ID: myl
To enable requester pays, set `fs.s3a.requester.pays.enabled` property to `true`. To enable requester pays, set `fs.s3a.requester.pays.enabled` property to `true`.
### <a name="access_denied_archive_storage_class"></a>`AccessDeniedException` "InvalidObjectState" when trying to read files
```
java.nio.file.AccessDeniedException: file1: copyFile(file1, file2) on file1: com.amazonaws.services.s3.model.AmazonS3Exception: Operation is not valid for the source object's storage class (Service: Amazon S3; Status Code: 403; Error Code: InvalidObjectState; Request ID: SK9EMPC1YRX75VZR; S3 Extended Request ID: /nhUfdwJ+y5DLz6B4YR2FdA0FnQWwhDAkSCakn42zs2JssK3qWTrfwdNDiy6bOyXHOvJY0VAlHw=; Proxy: null), S3 Extended Request ID: /nhUfdwJ+y5DLz6B4YR2FdA0FnQWwhDAkSCakn42zs2JssK3qWTrfwdNDiy6bOyXHOvJY0VAlHw=:InvalidObjectState
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Operation is not valid for the source object's storage class (Service: Amazon S3; Status Code: 403; Error Code: InvalidObjectState; Request ID: SK9EMPC1YRX75VZR; S3 Extended Request ID: /nhUfdwJ+y5DLz6B4YR2FdA0FnQWwhDAkSCakn42zs2JssK3qWTrfwdNDiy6bOyXHOvJY0VAlHw=; Proxy: null), S3 Extended Request ID: /nhUfdwJ+y5DLz6B4YR2FdA0FnQWwhDAkSCakn42zs2JssK3qWTrfwdNDiy6bOyXHOvJY0VAlHw=
```
This happens when you're trying to read or copy files that have archive storage class such as
Glacier.
If you want to access the file with S3A after writes, do not set `fs.s3a.create.storage.class` to `glacier` or `deep_archive`.
### <a name="no_region_session_credentials"></a> "Unable to find a region via the region provider chain." when using session credentials. ### <a name="no_region_session_credentials"></a> "Unable to find a region via the region provider chain." when using session credentials.
Region must be provided when requesting session credentials, or an exception will be thrown with the message: Region must be provided when requesting session credentials, or an exception will be thrown with the
message:
``` ```
com.amazonaws.SdkClientException: Unable to find a region via the region provider com.amazonaws.SdkClientException: Unable to find a region via the region provider
chain. Must provide an explicit region in the builder or setup environment to supply a region. chain. Must provide an explicit region in the builder or setup environment to supply a region.
``` ```
In this case you have to set the `fs.s3a.assumed.role.sts.endpoint` property to a valid
S3 sts endpoint and region like the following: In this case you have to set the `fs.s3a.assumed.role.sts.endpoint` property to a valid S3 sts
endpoint and region like the following:
```xml ```xml
<property> <property>
<name>fs.s3a.assumed.role.sts.endpoint</name> <name>fs.s3a.assumed.role.sts.endpoint</name>
<value>${sts.endpoint}</value> <value>${sts.endpoint}</value>

View File

@ -0,0 +1,217 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import java.nio.file.AccessDeniedException;
import java.util.Map;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.contract.s3a.S3AContract;
import static org.apache.hadoop.fs.s3a.Constants.STORAGE_CLASS;
import static org.apache.hadoop.fs.s3a.Constants.STORAGE_CLASS_GLACIER;
import static org.apache.hadoop.fs.s3a.Constants.STORAGE_CLASS_REDUCED_REDUNDANCY;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfStorageClassTestsDisabled;
import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.XA_STORAGE_CLASS;
import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.decodeBytes;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
/**
* Tests of storage class.
*/
public class ITestS3AStorageClass extends AbstractS3ATestBase {
@Override
protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
disableFilesystemCaching(conf);
removeBaseAndBucketOverrides(conf, STORAGE_CLASS);
return conf;
}
@Override
public void setup() throws Exception {
super.setup();
skipIfStorageClassTestsDisabled(getConfiguration());
}
/*
* This test ensures the default storage class configuration (no config or null)
* works well with create and copy operations
*/
@Test
public void testCreateAndCopyObjectWithStorageClassDefault() throws Throwable {
Configuration conf = this.createConfiguration();
S3AContract contract = (S3AContract) createContract(conf);
contract.init();
FileSystem fs = contract.getTestFileSystem();
Path dir = methodPath();
fs.mkdirs(dir);
assertObjectHasNoStorageClass(dir);
Path path = new Path(dir, "file1");
ContractTestUtils.touch(fs, path);
assertObjectHasNoStorageClass(path);
Path path2 = new Path(dir, "file1");
fs.rename(path, path2);
assertObjectHasNoStorageClass(path2);
}
/*
* Verify object can be created and copied correctly
* with specified storage class
*/
@Test
public void testCreateAndCopyObjectWithStorageClassReducedRedundancy() throws Throwable {
Configuration conf = this.createConfiguration();
conf.set(STORAGE_CLASS, STORAGE_CLASS_REDUCED_REDUNDANCY);
S3AContract contract = (S3AContract) createContract(conf);
contract.init();
FileSystem fs = contract.getTestFileSystem();
Path dir = methodPath();
fs.mkdirs(dir);
// even with storage class specified
// directories do not have storage class
assertObjectHasNoStorageClass(dir);
Path path = new Path(dir, "file1");
ContractTestUtils.touch(fs, path);
assertObjectHasStorageClass(path, STORAGE_CLASS_REDUCED_REDUNDANCY);
Path path2 = new Path(dir, "file1");
fs.rename(path, path2);
assertObjectHasStorageClass(path2, STORAGE_CLASS_REDUCED_REDUNDANCY);
}
/*
* Archive storage classes have different behavior
* from general storage classes
*/
@Test
public void testCreateAndCopyObjectWithStorageClassGlacier() throws Throwable {
Configuration conf = this.createConfiguration();
conf.set(STORAGE_CLASS, STORAGE_CLASS_GLACIER);
S3AContract contract = (S3AContract) createContract(conf);
contract.init();
FileSystem fs = contract.getTestFileSystem();
Path dir = methodPath();
fs.mkdirs(dir);
// even with storage class specified
// directories do not have storage class
assertObjectHasNoStorageClass(dir);
Path path = new Path(dir, "file1");
ContractTestUtils.touch(fs, path);
assertObjectHasStorageClass(path, STORAGE_CLASS_GLACIER);
Path path2 = new Path(dir, "file2");
// this is the current behavior
// object with archive storage class can't be read directly
// when trying to read it, AccessDeniedException will be thrown
// with message InvalidObjectState
intercept(AccessDeniedException.class, "InvalidObjectState", () -> fs.rename(path, path2));
}
/*
* Verify object can be created and copied correctly
* with completely invalid storage class
*/
@Test
public void testCreateAndCopyObjectWithStorageClassInvalid() throws Throwable {
Configuration conf = this.createConfiguration();
conf.set(STORAGE_CLASS, "testing");
S3AContract contract = (S3AContract) createContract(conf);
contract.init();
FileSystem fs = contract.getTestFileSystem();
Path dir = methodPath();
fs.mkdirs(dir);
// even with storage class specified
// directories do not have storage class
assertObjectHasNoStorageClass(dir);
Path path = new Path(dir, "file1");
ContractTestUtils.touch(fs, path);
assertObjectHasNoStorageClass(path);
Path path2 = new Path(dir, "file1");
fs.rename(path, path2);
assertObjectHasNoStorageClass(path2);
}
/*
* Verify object can be created and copied correctly
* with empty string configuration
*/
@Test
public void testCreateAndCopyObjectWithStorageClassEmpty() throws Throwable {
Configuration conf = this.createConfiguration();
conf.set(STORAGE_CLASS, "");
S3AContract contract = (S3AContract) createContract(conf);
contract.init();
FileSystem fs = contract.getTestFileSystem();
Path dir = methodPath();
fs.mkdirs(dir);
// even with storage class specified
// directories do not have storage class
assertObjectHasNoStorageClass(dir);
Path path = new Path(dir, "file1");
ContractTestUtils.touch(fs, path);
assertObjectHasNoStorageClass(path);
Path path2 = new Path(dir, "file1");
fs.rename(path, path2);
assertObjectHasNoStorageClass(path2);
}
/**
* Assert that a given object has no storage class specified.
*
* @param path path
*/
protected void assertObjectHasNoStorageClass(Path path) throws Throwable {
S3AFileSystem fs = getFileSystem();
Map<String, byte[]> xAttrs = fs.getXAttrs(path);
String storageClass = decodeBytes(xAttrs.get(XA_STORAGE_CLASS));
Assertions.assertThat(storageClass).describedAs("Storage class of object %s", path).isNull();
}
/**
* Assert that a given object has the given storage class specified.
*
* @param path path
* @param expectedStorageClass expected storage class for the object
*/
protected void assertObjectHasStorageClass(Path path, String expectedStorageClass)
throws Throwable {
S3AFileSystem fs = getFileSystem();
Map<String, byte[]> xAttrs = fs.getXAttrs(path);
String actualStorageClass = decodeBytes(xAttrs.get(XA_STORAGE_CLASS));
Assertions.assertThat(actualStorageClass).describedAs("Storage class of object %s", path)
.isEqualToIgnoringCase(expectedStorageClass);
}
}

View File

@ -53,6 +53,11 @@ public interface S3ATestConstants {
*/ */
String KEY_ENCRYPTION_TESTS = TEST_FS_S3A + "encryption.enabled"; String KEY_ENCRYPTION_TESTS = TEST_FS_S3A + "encryption.enabled";
/**
* A property set to true if storage class tests are enabled: {@value }.
*/
String KEY_STORAGE_CLASS_TESTS_ENABLED = TEST_FS_S3A + "create.storage.class.enabled";
/** /**
* Tell tests that they are being executed in parallel: {@value}. * Tell tests that they are being executed in parallel: {@value}.
*/ */

View File

@ -465,6 +465,17 @@ public static void skipIfEncryptionTestsDisabled(
} }
} }
/**
* Skip a test if storage class tests are disabled.
* @param configuration configuration to probe
*/
public static void skipIfStorageClassTestsDisabled(
Configuration configuration) {
if (!configuration.getBoolean(KEY_STORAGE_CLASS_TESTS_ENABLED, true)) {
skip("Skipping storage class tests");
}
}
/** /**
* Create a test path, using the value of * Create a test path, using the value of
* {@link S3ATestConstants#TEST_UNIQUE_FORK_ID} if it is set. * {@link S3ATestConstants#TEST_UNIQUE_FORK_ID} if it is set.

View File

@ -71,6 +71,8 @@
import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.hadoop.util.concurrent.HadoopExecutors;
import static org.apache.hadoop.fs.contract.ContractTestUtils.*; import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
import static org.apache.hadoop.fs.s3a.Constants.STORAGE_CLASS;
import static org.apache.hadoop.fs.s3a.Constants.STORAGE_CLASS_INTELLIGENT_TIERING;
import static org.apache.hadoop.fs.s3a.S3AUtils.*; import static org.apache.hadoop.fs.s3a.S3AUtils.*;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
import static org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter.E_SELF_GENERATED_JOB_UUID; import static org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter.E_SELF_GENERATED_JOB_UUID;
@ -695,6 +697,22 @@ private void validateContent(Path dir,
expectedOutput.toString(), output); expectedOutput.toString(), output);
} }
/**
* Verify storage class of output file matches the expected storage class.
* @param dir output directory.
* @param expectedStorageClass expected storage class value.
* @throws Exception failure.
*/
private void validateStorageClass(Path dir, String expectedStorageClass) throws Exception {
Path expectedFile = getPart0000(dir);
S3AFileSystem fs = getFileSystem();
String actualStorageClass = fs.getObjectMetadata(expectedFile).getStorageClass();
Assertions.assertThat(actualStorageClass)
.describedAs("Storage class of object %s", expectedFile)
.isEqualToIgnoringCase(expectedStorageClass);
}
/** /**
* Identify any path under the directory which begins with the * Identify any path under the directory which begins with the
* {@code "part-m-00000"} sequence. * {@code "part-m-00000"} sequence.
@ -796,6 +814,41 @@ public void testCommitLifecycle() throws Exception {
assertNoMultipartUploadsPending(outDir); assertNoMultipartUploadsPending(outDir);
} }
@Test
public void testCommitWithStorageClassConfig() throws Exception {
describe("Commit with specific storage class configuration;" +
" expect the final file has correct storage class.");
Configuration conf = getConfiguration();
skipIfStorageClassTestsDisabled(conf);
conf.set(STORAGE_CLASS, STORAGE_CLASS_INTELLIGENT_TIERING);
JobData jobData = startJob(false);
JobContext jContext = jobData.jContext;
TaskAttemptContext tContext = jobData.tContext;
AbstractS3ACommitter committer = jobData.committer;
validateTaskAttemptWorkingDirectory(committer, tContext);
// write output
writeTextOutput(tContext);
// commit task
dumpMultipartUploads();
commitTask(committer, tContext);
// commit job
assertMultipartUploadsPending(outDir);
commitJob(committer, jContext);
// validate output
validateContent(outDir, shouldExpectSuccessMarker(),
committer.getUUID());
assertNoMultipartUploadsPending(outDir);
// validate storage class
validateStorageClass(outDir, STORAGE_CLASS_INTELLIGENT_TIERING);
}
@Test @Test
public void testCommitterWithDuplicatedCommit() throws Exception { public void testCommitterWithDuplicatedCommit() throws Exception {
describe("Call a task then job commit twice;" + describe("Call a task then job commit twice;" +

View File

@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.scale;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.Constants;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import static org.apache.hadoop.fs.contract.ContractTestUtils.bandwidth;
import static org.apache.hadoop.fs.contract.ContractTestUtils.toHuman;
import static org.apache.hadoop.fs.s3a.Constants.STORAGE_CLASS;
import static org.apache.hadoop.fs.s3a.Constants.STORAGE_CLASS_REDUCED_REDUNDANCY;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfStorageClassTestsDisabled;
/**
* Class to verify that {@link Constants#STORAGE_CLASS} is set correctly
* for creating and renaming huge files with multipart upload requests.
*/
public class ITestS3AHugeFilesStorageClass extends AbstractSTestS3AHugeFiles {
private static final Logger LOG = LoggerFactory.getLogger(ITestS3AHugeFilesStorageClass.class);
@Override
public void setup() throws Exception {
super.setup();
skipIfStorageClassTestsDisabled(getConfiguration());
}
@Override
protected Configuration createScaleConfiguration() {
Configuration conf = super.createScaleConfiguration();
disableFilesystemCaching(conf);
removeBaseAndBucketOverrides(conf, STORAGE_CLASS);
conf.set(STORAGE_CLASS, STORAGE_CLASS_REDUCED_REDUNDANCY);
return conf;
}
@Override
protected String getBlockOutputBufferName() {
return Constants.FAST_UPLOAD_BUFFER_ARRAY;
}
@Override
public void test_010_CreateHugeFile() throws IOException {
super.test_010_CreateHugeFile();
assertStorageClass(getPathOfFileToCreate());
}
@Override
public void test_030_postCreationAssertions() throws Throwable {
super.test_030_postCreationAssertions();
assertStorageClass(getPathOfFileToCreate());
}
@Override
public void test_040_PositionedReadHugeFile() throws Throwable {
skipQuietly("PositionedReadHugeFile");
}
@Override
public void test_050_readHugeFile() throws Throwable {
skipQuietly("readHugeFile");
}
@Override
public void test_090_verifyRenameSourceEncryption() throws IOException {
skipQuietly("verifyRenameSourceEncryption");
}
@Override
public void test_100_renameHugeFile() throws Throwable {
Path hugefile = getHugefile();
Path hugefileRenamed = getHugefileRenamed();
assumeHugeFileExists();
describe("renaming %s to %s", hugefile, hugefileRenamed);
S3AFileSystem fs = getFileSystem();
FileStatus status = fs.getFileStatus(hugefile);
long size = status.getLen();
fs.delete(hugefileRenamed, false);
ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
fs.rename(hugefile, hugefileRenamed);
long mb = Math.max(size / _1MB, 1);
timer.end("time to rename file of %d MB", mb);
LOG.info("Time per MB to rename = {} nS", toHuman(timer.nanosPerOperation(mb)));
bandwidth(timer, size);
FileStatus destFileStatus = fs.getFileStatus(hugefileRenamed);
assertEquals(size, destFileStatus.getLen());
assertStorageClass(hugefileRenamed);
}
@Override
public void test_110_verifyRenameDestEncryption() throws IOException {
skipQuietly("verifyRenameDestEncryption");
}
private void skipQuietly(String text) {
describe("Skipping: %s", text);
}
protected void assertStorageClass(Path hugeFile) throws IOException {
S3AFileSystem fs = getFileSystem();
String actual = fs.getObjectMetadata(hugeFile).getStorageClass();
assertTrue(
"Storage class of object is " + actual + ", expected " + STORAGE_CLASS_REDUCED_REDUNDANCY,
STORAGE_CLASS_REDUCED_REDUNDANCY.equalsIgnoreCase(actual));
}
}