From 9dffa65021a7aabac7a177f05fa8c4e87f55ab3b Mon Sep 17 00:00:00 2001 From: monthonk <47974768+monthonk@users.noreply.github.com> Date: Thu, 1 Sep 2022 18:14:32 +0100 Subject: [PATCH] HADOOP-18339. S3A storage class option only picked up when buffering writes to disk. (#4669) Follow-up to HADOOP-12020 Support configuration of different S3 storage classes; S3 storage class is now set when buffering to heap/bytebuffers, and when creating directory markers Contributed by Monthon Klongklaew --- .../fs/s3a/impl/RequestFactoryImpl.java | 20 ++++++++----- .../hadoop/fs/s3a/ITestS3AStorageClass.java | 30 ++++++++++++++++++- 2 files changed, 42 insertions(+), 8 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java index 6190bd6fb8..5d9c7bb5d1 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java @@ -408,6 +408,9 @@ public PutObjectRequest newPutObjectRequest(String key, inputStream, metadata); setOptionalPutRequestParameters(putObjectRequest); putObjectRequest.setCannedAcl(cannedACL); + if (storageClass != null) { + putObjectRequest.setStorageClass(storageClass); + } return prepareRequest(putObjectRequest); } @@ -416,19 +419,22 @@ public PutObjectRequest newDirectoryMarkerRequest(String directory) { String key = directory.endsWith("/") ? directory : (directory + "/"); - // an input stream which is laways empty - final InputStream im = new InputStream() { + // an input stream which is always empty + final InputStream inputStream = new InputStream() { @Override public int read() throws IOException { return -1; } }; // preparation happens in here - final ObjectMetadata md = createObjectMetadata(0L, true); - md.setContentType(HeaderProcessing.CONTENT_TYPE_X_DIRECTORY); - PutObjectRequest putObjectRequest = - newPutObjectRequest(key, md, null, im); - return putObjectRequest; + final ObjectMetadata metadata = createObjectMetadata(0L, true); + metadata.setContentType(HeaderProcessing.CONTENT_TYPE_X_DIRECTORY); + + PutObjectRequest putObjectRequest = new PutObjectRequest(getBucket(), key, + inputStream, metadata); + setOptionalPutRequestParameters(putObjectRequest); + putObjectRequest.setCannedAcl(cannedACL); + return prepareRequest(putObjectRequest); } @Override diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AStorageClass.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AStorageClass.java index e141ef5aa3..7c56f8d2ea 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AStorageClass.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AStorageClass.java @@ -19,10 +19,14 @@ package org.apache.hadoop.fs.s3a; import java.nio.file.AccessDeniedException; +import java.util.Arrays; +import java.util.Collection; import java.util.Map; import org.assertj.core.api.Assertions; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -30,6 +34,10 @@ import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.fs.contract.s3a.S3AContract; +import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER; +import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER_ARRAY; +import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER_DISK; +import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BYTEBUFFER; import static org.apache.hadoop.fs.s3a.Constants.STORAGE_CLASS; import static org.apache.hadoop.fs.s3a.Constants.STORAGE_CLASS_GLACIER; import static org.apache.hadoop.fs.s3a.Constants.STORAGE_CLASS_REDUCED_REDUNDANCY; @@ -43,13 +51,33 @@ /** * Tests of storage class. */ +@RunWith(Parameterized.class) public class ITestS3AStorageClass extends AbstractS3ATestBase { + /** + * HADOOP-18339. Parameterized the test for different fast upload buffer types + * to ensure the storage class configuration works with all of them. + */ + @Parameterized.Parameters(name = "fast-upload-buffer-{0}") + public static Collection params() { + return Arrays.asList(new Object[][]{ + {FAST_UPLOAD_BUFFER_DISK}, + {FAST_UPLOAD_BUFFER_ARRAY} + }); + } + + private final String fastUploadBufferType; + + public ITestS3AStorageClass(String fastUploadBufferType) { + this.fastUploadBufferType = fastUploadBufferType; + } + @Override protected Configuration createConfiguration() { Configuration conf = super.createConfiguration(); disableFilesystemCaching(conf); - removeBaseAndBucketOverrides(conf, STORAGE_CLASS); + removeBaseAndBucketOverrides(conf, STORAGE_CLASS, FAST_UPLOAD_BUFFER); + conf.set(FAST_UPLOAD_BUFFER, fastUploadBufferType); return conf; }