diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java index 98c72d2766..c85263f190 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java @@ -112,7 +112,7 @@ public S3AsyncClient createS3AsyncClient( return configureClientBuilder(S3AsyncClient.builder(), parameters, conf, bucket) .httpClientBuilder(httpClientBuilder) .multipartConfiguration(multipartConfiguration) - .multipartEnabled(true) + .multipartEnabled(parameters.isMultipartCopy()) .build(); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index e192135b9f..9307c4c265 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -440,6 +440,12 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, */ private boolean isMultipartUploadEnabled = DEFAULT_MULTIPART_UPLOAD_ENABLED; + /** + * Should file copy operations use the S3 transfer manager? + * True unless multipart upload is disabled. + */ + private boolean isMultipartCopyEnabled; + /** * A cache of files that should be deleted when the FileSystem is closed * or the JVM is exited. @@ -576,6 +582,9 @@ public void initialize(URI name, Configuration originalConf) intOption(conf, PREFETCH_BLOCK_COUNT_KEY, PREFETCH_BLOCK_DEFAULT_COUNT, 1); this.isMultipartUploadEnabled = conf.getBoolean(MULTIPART_UPLOADS_ENABLED, DEFAULT_MULTIPART_UPLOAD_ENABLED); + // multipart copy and upload are the same; this just makes it explicit + this.isMultipartCopyEnabled = isMultipartUploadEnabled; + initThreadPools(conf); int listVersion = conf.getInt(LIST_VERSION, DEFAULT_LIST_VERSION); @@ -982,6 +991,7 @@ private void bindAWSClient(URI name, boolean dtEnabled) throws IOException { .withRequesterPays(conf.getBoolean(ALLOW_REQUESTER_PAYS, DEFAULT_ALLOW_REQUESTER_PAYS)) .withExecutionInterceptors(auditManager.createExecutionInterceptors()) .withMinimumPartSize(partSize) + .withMultipartCopyEnabled(isMultipartCopyEnabled) .withMultipartThreshold(multiPartThreshold) .withTransferManagerExecutor(unboundedThreadPool) .withRegion(region); @@ -1468,6 +1478,11 @@ public AWSCredentialProviderList shareCredentials(final String purpose) { LOG.debug("Sharing credentials for: {}", purpose); return credentials.share(); } + + @Override + public boolean isMultipartCopyEnabled() { + return S3AFileSystem.this.isMultipartUploadEnabled; + } } /** @@ -4436,37 +4451,56 @@ private CopyObjectResponse copyFile(String srcKey, String dstKey, long size, e); } - return readInvoker.retry( - action, srcKey, - true, - () -> { - CopyObjectRequest.Builder copyObjectRequestBuilder = - getRequestFactory().newCopyObjectRequestBuilder(srcKey, dstKey, srcom); - changeTracker.maybeApplyConstraint(copyObjectRequestBuilder); - incrementStatistic(OBJECT_COPY_REQUESTS); + CopyObjectRequest.Builder copyObjectRequestBuilder = + getRequestFactory().newCopyObjectRequestBuilder(srcKey, dstKey, srcom); + changeTracker.maybeApplyConstraint(copyObjectRequestBuilder); + CopyObjectResponse response; - Copy copy = transferManager.copy( - CopyRequest.builder() - .copyObjectRequest(copyObjectRequestBuilder.build()) - .build()); + // transfer manager is skipped if disabled or the file is too small to worry about + final boolean useTransferManager = isMultipartCopyEnabled && size >= multiPartThreshold; + if (useTransferManager) { + // use transfer manager + response = readInvoker.retry( + action, srcKey, + true, + () -> { + incrementStatistic(OBJECT_COPY_REQUESTS); - try { - CompletedCopy completedCopy = copy.completionFuture().join(); - CopyObjectResponse result = completedCopy.response(); - changeTracker.processResponse(result); - incrementWriteOperations(); - instrumentation.filesCopied(1, size); - return result; - } catch (CompletionException e) { - Throwable cause = e.getCause(); - if (cause instanceof SdkException) { - SdkException awsException = (SdkException)cause; - changeTracker.processException(awsException, "copy"); - throw awsException; + Copy copy = transferManager.copy( + CopyRequest.builder() + .copyObjectRequest(copyObjectRequestBuilder.build()) + .build()); + + try { + CompletedCopy completedCopy = copy.completionFuture().join(); + return completedCopy.response(); + } catch (CompletionException e) { + Throwable cause = e.getCause(); + if (cause instanceof SdkException) { + SdkException awsException = (SdkException)cause; + changeTracker.processException(awsException, "copy"); + throw awsException; + } + throw extractException(action, srcKey, e); } - throw extractException(action, srcKey, e); - } - }); + }); + } else { + // single part copy bypasses transfer manager + // note, this helps with some mock testing, e.g. HBoss. as there is less to mock. + response = readInvoker.retry( + action, srcKey, + true, + () -> { + LOG.debug("copyFile: single part copy {} -> {} of size {}", srcKey, dstKey, size); + incrementStatistic(OBJECT_COPY_REQUESTS); + return s3Client.copyObject(copyObjectRequestBuilder.build()); + }); + } + + changeTracker.processResponse(response); + incrementWriteOperations(); + instrumentation.filesCopied(1, size); + return response; } /** diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInternals.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInternals.java index 23c4d35012..18d6c1af58 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInternals.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInternals.java @@ -115,4 +115,10 @@ public interface S3AInternals { @AuditEntryPoint @Retries.RetryTranslated HeadBucketResponse getBucketMetadata() throws IOException; + + /** + * Is multipart copy enabled? + * @return true if the transfer manager is used to copy files. + */ + boolean isMultipartCopyEnabled(); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java index d4504cd08d..e2e792ebfb 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java @@ -156,6 +156,11 @@ final class S3ClientCreationParameters { */ private long multiPartThreshold; + /** + * Multipart upload enabled. + */ + private boolean multipartCopy = true; + /** * Executor that the transfer manager will use to execute background tasks. */ @@ -399,5 +404,24 @@ public S3ClientCreationParameters withRegion( public Region getRegion() { return region; } + + /** + * Set the multipart flag.. + * + * @param value new value + * @return the builder + */ + public S3ClientCreationParameters withMultipartCopyEnabled(final boolean value) { + this.multipartCopy = value; + return this; + } + + /** + * Get the multipart flag. + * @return multipart flag + */ + public boolean isMultipartCopy() { + return multipartCopy; + } } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java index 1a30c04358..75c6efbe2a 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java @@ -355,10 +355,10 @@ public long getFilesize() { /** * Is this expected to be a multipart upload? * Assertions will change if not. - * @return true by default. + * @return what the filesystem expects. */ protected boolean expectMultipartUpload() { - return true; + return getFileSystem().getS3AInternals().isMultipartCopyEnabled(); } /** diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesNoMultipart.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesNoMultipart.java index ed300dba01..e154ab5676 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesNoMultipart.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesNoMultipart.java @@ -18,13 +18,10 @@ package org.apache.hadoop.fs.s3a.scale; +import org.assertj.core.api.Assertions; + import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3a.Constants; -import org.apache.hadoop.fs.s3a.S3AFileSystem; -import org.apache.hadoop.fs.s3a.S3ATestUtils; -import org.apache.hadoop.fs.s3a.api.UnsupportedRequestException; import static org.apache.hadoop.fs.contract.ContractTestUtils.IO_CHUNK_BUFFER_SIZE; import static org.apache.hadoop.fs.s3a.Constants.MIN_MULTIPART_THRESHOLD; @@ -33,7 +30,6 @@ import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_UPLOADS_ENABLED; import static org.apache.hadoop.fs.s3a.Constants.REQUEST_TIMEOUT; import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; -import static org.apache.hadoop.test.LambdaTestUtils.intercept; /** * Use a single PUT for the whole upload/rename/delete workflow; include verification @@ -41,11 +37,6 @@ */ public class ITestS3AHugeFilesNoMultipart extends AbstractSTestS3AHugeFiles { - /** - * Size to ensure MPUs don't happen in transfer manager. - */ - public static final String S_1T = "1T"; - public static final String SINGLE_PUT_REQUEST_TIMEOUT = "1h"; /** @@ -56,11 +47,23 @@ protected String getBlockOutputBufferName() { return Constants.FAST_UPLOAD_BUFFER_DISK; } + /** + * Multipart upload is always disabled. + * @return false + */ @Override protected boolean expectMultipartUpload() { return false; } + /** + * Is multipart copy enabled? + * @return true if the transfer manager is used to copy files. + */ + private boolean isMultipartCopyEnabled() { + return getFileSystem().getS3AInternals().isMultipartCopyEnabled(); + } + /** * Create a configuration without multipart upload, * and a long request timeout to allow for a very slow @@ -77,35 +80,21 @@ protected Configuration createScaleConfiguration() { MULTIPART_SIZE, REQUEST_TIMEOUT); conf.setInt(IO_CHUNK_BUFFER_SIZE, 655360); - conf.set(MIN_MULTIPART_THRESHOLD, S_1T); - conf.set(MULTIPART_SIZE, S_1T); + conf.setInt(MIN_MULTIPART_THRESHOLD, MULTIPART_MIN_SIZE); + conf.setInt(MULTIPART_SIZE, MULTIPART_MIN_SIZE); conf.setBoolean(MULTIPART_UPLOADS_ENABLED, false); conf.set(REQUEST_TIMEOUT, SINGLE_PUT_REQUEST_TIMEOUT); return conf; } /** - * After the file is created, attempt a rename with an FS - * instance with a small multipart threshold; - * this MUST be rejected. + * Verify multipart copy is disabled. */ @Override public void test_030_postCreationAssertions() throws Throwable { - assumeHugeFileExists(); - final Path hugefile = getHugefile(); - final Path hugefileRenamed = getHugefileRenamed(); - describe("renaming %s to %s", hugefile, hugefileRenamed); - S3AFileSystem fs = getFileSystem(); - fs.delete(hugefileRenamed, false); - // create a new fs with a small multipart threshold; expect rename failure. - final Configuration conf = new Configuration(fs.getConf()); - conf.setInt(MIN_MULTIPART_THRESHOLD, MULTIPART_MIN_SIZE); - conf.setInt(MULTIPART_SIZE, MULTIPART_MIN_SIZE); - S3ATestUtils.disableFilesystemCaching(conf); - - try (FileSystem fs2 = FileSystem.get(fs.getUri(), conf)) { - intercept(UnsupportedRequestException.class, () -> - fs2.rename(hugefile, hugefileRenamed)); - } + super.test_030_postCreationAssertions(); + Assertions.assertThat(isMultipartCopyEnabled()) + .describedAs("Multipart copy should be disabled in %s", getFileSystem()) + .isFalse(); } }