diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java index 66cac99de7..15f69bde3f 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java @@ -28,7 +28,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import com.amazonaws.AmazonClientException; import com.amazonaws.event.ProgressEvent; import com.amazonaws.event.ProgressEventType; import com.amazonaws.event.ProgressListener; @@ -46,6 +45,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.PathIOException; import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.s3a.commit.CommitConstants; import org.apache.hadoop.fs.s3a.commit.PutTracker; @@ -304,8 +304,8 @@ public synchronized void write(byte[] source, int offset, int len) /** * Start an asynchronous upload of the current block. - * @throws IOException Problems opening the destination for upload - * or initializing the upload. + * @throws IOException Problems opening the destination for upload, + * initializing the upload, or if a previous operation has failed. */ private synchronized void uploadCurrentBlock() throws IOException { Preconditions.checkState(hasActiveBlock(), "No active block"); @@ -394,6 +394,13 @@ public void close() throws IOException { } LOG.debug("Upload complete to {} by {}", key, writeOperationHelper); } catch (IOException ioe) { + // the operation failed. + // if this happened during a multipart upload, abort the + // operation, so as to not leave (billable) data + // pending on the bucket + if (multiPartUpload != null) { + multiPartUpload.abort(); + } writeOperationHelper.writeFailed(ioe); throw ioe; } finally { @@ -528,6 +535,13 @@ private class MultiPartUpload { private int partsUploaded; private long bytesSubmitted; + /** + * Any IOException raised during block upload. + * if non-null, then close() MUST NOT complete + * the file upload. + */ + private IOException blockUploadFailure; + MultiPartUpload(String key) throws IOException { this.uploadId = writeOperationHelper.initiateMultiPartUpload(key); this.partETagsFutures = new ArrayList<>(2); @@ -568,31 +582,60 @@ public long getBytesSubmitted() { return bytesSubmitted; } + /** + * A block upload has failed. + * Recorded it if there has been no previous failure. + * @param e error + */ + public void noteUploadFailure(final IOException e) { + if (blockUploadFailure == null) { + blockUploadFailure = e; + } + } + + /** + * If there is a block upload failure -throw it. + * @throws IOException if one has already been caught. + */ + public void maybeRethrowUploadFailure() throws IOException { + if (blockUploadFailure != null) { + throw blockUploadFailure; + } + } + /** * Upload a block of data. * This will take the block * @param block block to upload * @throws IOException upload failure + * @throws PathIOException if too many blocks were written */ private void uploadBlockAsync(final S3ADataBlocks.DataBlock block) throws IOException { LOG.debug("Queueing upload of {} for upload {}", block, uploadId); Preconditions.checkNotNull(uploadId, "Null uploadId"); + maybeRethrowUploadFailure(); partsSubmitted++; final int size = block.dataSize(); bytesSubmitted += size; - final S3ADataBlocks.BlockUploadData uploadData = block.startUpload(); final int currentPartNumber = partETagsFutures.size() + 1; - final UploadPartRequest request = - writeOperationHelper.newUploadPartRequest( - key, - uploadId, - currentPartNumber, - size, - uploadData.getUploadStream(), - uploadData.getFile(), - 0L); - + final UploadPartRequest request; + final S3ADataBlocks.BlockUploadData uploadData; + try { + uploadData = block.startUpload(); + request = writeOperationHelper.newUploadPartRequest( + key, + uploadId, + currentPartNumber, + size, + uploadData.getUploadStream(), + uploadData.getFile(), + 0L); + } catch (IOException e) { + // failure to start the upload. + noteUploadFailure(e); + throw e; + } long transferQueueTime = now(); BlockUploadProgress callback = new BlockUploadProgress( @@ -613,6 +656,10 @@ private void uploadBlockAsync(final S3ADataBlocks.DataBlock block) LOG.debug("Stream statistics of {}", statistics); partsUploaded++; return partETag; + } catch (IOException e) { + // save immediately. + noteUploadFailure(e); + throw e; } finally { // close the stream and block cleanupWithLogger(LOG, uploadData, block); @@ -638,10 +685,6 @@ private List waitForAllPartUploads() throws IOException { //there is no way of recovering so abort //cancel all partUploads LOG.debug("While waiting for upload completion", ee); - LOG.debug("Cancelling futures"); - for (ListenableFuture future : partETagsFutures) { - future.cancel(true); - } //abort multipartupload this.abort(); throw extractException("Multi-part upload with id '" + uploadId @@ -649,6 +692,16 @@ private List waitForAllPartUploads() throws IOException { } } + /** + * Cancel all active uploads. + */ + private void cancelAllActiveFutures() { + LOG.debug("Cancelling futures"); + for (ListenableFuture future : partETagsFutures) { + future.cancel(true); + } + } + /** * This completes a multipart upload. * Sometimes it fails; here retries are handled to avoid losing all data @@ -658,6 +711,7 @@ private List waitForAllPartUploads() throws IOException { */ private void complete(List partETags) throws IOException { + maybeRethrowUploadFailure(); AtomicInteger errorCount = new AtomicInteger(0); try { writeOperationHelper.completeMPUwithRetries(key, @@ -675,9 +729,9 @@ private void complete(List partETags) * IOExceptions are caught; this is expected to be run as a cleanup process. */ public void abort() { - int retryCount = 0; - AmazonClientException lastException; + LOG.debug("Aborting upload"); fs.incrementStatistic(OBJECT_MULTIPART_UPLOAD_ABORTED); + cancelAllActiveFutures(); try { writeOperationHelper.abortMultipartUpload(key, uploadId, (text, e, r, i) -> statistics.exceptionInMultipartAbort()); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java index 602732b6d3..ab53486d64 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java @@ -49,6 +49,7 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathIOException; import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState; import org.apache.hadoop.fs.s3a.s3guard.S3Guard; import org.apache.hadoop.fs.s3a.select.SelectBinding; @@ -57,6 +58,9 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static org.apache.hadoop.fs.s3a.Invoker.*; +import static org.apache.hadoop.fs.s3a.S3AUtils.longOption; +import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DEFAULT_UPLOAD_PART_COUNT_LIMIT; +import static org.apache.hadoop.fs.s3a.impl.InternalConstants.UPLOAD_PART_COUNT_LIMIT; /** * Helper for low-level operations against an S3 Bucket for writing data, @@ -384,6 +388,7 @@ public void abortMultipartCommit(String destKey, String uploadId) * A subset of the file may be posted, by providing the starting point * in {@code offset} and a length of block in {@code size} equal to * or less than the remaining bytes. + * The part number must be less than 10000. * @param destKey destination key of ongoing operation * @param uploadId ID of ongoing upload * @param partNumber current part number of the upload @@ -392,6 +397,8 @@ public void abortMultipartCommit(String destKey, String uploadId) * @param sourceFile optional source file. * @param offset offset in file to start reading. * @return the request. + * @throws IllegalArgumentException if the parameters are invalid -including + * @throws PathIOException if the part number is out of range. */ public UploadPartRequest newUploadPartRequest( String destKey, @@ -400,18 +407,32 @@ public UploadPartRequest newUploadPartRequest( int size, InputStream uploadStream, File sourceFile, - Long offset) { + Long offset) throws PathIOException { checkNotNull(uploadId); // exactly one source must be set; xor verifies this checkArgument((uploadStream != null) ^ (sourceFile != null), "Data source"); checkArgument(size >= 0, "Invalid partition size %s", size); - checkArgument(partNumber > 0 && partNumber <= 10000, - "partNumber must be between 1 and 10000 inclusive, but is %s", - partNumber); + checkArgument(partNumber > 0, + "partNumber must be between 1 and %s inclusive, but is %s", + DEFAULT_UPLOAD_PART_COUNT_LIMIT, partNumber); LOG.debug("Creating part upload request for {} #{} size {}", uploadId, partNumber, size); + long partCountLimit = longOption(conf, + UPLOAD_PART_COUNT_LIMIT, + DEFAULT_UPLOAD_PART_COUNT_LIMIT, + 1); + if (partCountLimit != DEFAULT_UPLOAD_PART_COUNT_LIMIT) { + LOG.warn("Configuration property {} shouldn't be overridden by client", + UPLOAD_PART_COUNT_LIMIT); + } + final String pathErrorMsg = "Number of parts in multipart upload exceeded." + + " Current part count = %s, Part count limit = %s "; + if (partNumber > partCountLimit) { + throw new PathIOException(destKey, + String.format(pathErrorMsg, partNumber, partCountLimit)); + } UploadPartRequest request = new UploadPartRequest() .withBucketName(bucket) .withKey(destKey) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitOperations.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitOperations.java index 8592ad4901..155e86a3d4 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitOperations.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitOperations.java @@ -42,6 +42,7 @@ import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.fs.PathIOException; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.S3AInstrumentation; @@ -50,6 +51,7 @@ import org.apache.hadoop.fs.s3a.commit.files.PendingSet; import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit; import org.apache.hadoop.fs.s3a.commit.files.SuccessData; +import org.apache.hadoop.fs.s3a.impl.InternalConstants; import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.DurationInfo; @@ -481,6 +483,15 @@ public SinglePendingCommit uploadFileToPendingCommit(File localFile, if (numParts == 0) { numParts = 1; } + if (numParts > InternalConstants.DEFAULT_UPLOAD_PART_COUNT_LIMIT) { + // fail if the file is too big. + // it would be possible to be clever here and recalculate the part size, + // but this is not currently done. + throw new PathIOException(destPath.toString(), + String.format("File to upload (size %d)" + + " is too big to be uploaded in parts of size %d", + numParts, length)); + } List parts = new ArrayList<>((int) numParts); @@ -510,7 +521,6 @@ public SinglePendingCommit uploadFileToPendingCommit(File localFile, return commitData; } finally { if (threw && uploadId != null) { - statistics.commitAborted(); try { abortMultipartCommit(destKey, uploadId); } catch (IOException e) { diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java index c73580d19f..4d944a1721 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java @@ -23,6 +23,8 @@ import java.util.HashSet; import java.util.Set; +import com.google.common.annotations.VisibleForTesting; + import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.s3a.Constants; @@ -93,4 +95,19 @@ private InternalConstants() { /** Directory marker attribute: see HADOOP-16613. Value: {@value}. */ public static final String X_DIRECTORY = "application/x-directory"; + + /** + * A configuration option for test use only: maximum + * part count on block writes/uploads. + * Value: {@value}. + */ + @VisibleForTesting + public static final String UPLOAD_PART_COUNT_LIMIT = + "fs.s3a.internal.upload.part.count.limit"; + + /** + * Maximum entries you can upload in a single file write/copy/upload. + * Value: {@value}. + */ + public static final int DEFAULT_UPLOAD_PART_COUNT_LIMIT = 10000; } diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md index c05641b2b4..056ad66832 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md @@ -1216,6 +1216,24 @@ a new one than read to the end of a large file. Note: the threshold when data is read rather than the stream aborted can be tuned by `fs.s3a.readahead.range`; seek policy in `fs.s3a.experimental.input.fadvise`. +### `PathIOException` Number of parts in multipart upload exceeded. + +Number of parts in multipart upload exceeded + +``` +org.apache.hadoop.fs.PathIOException: `test/testMultiPartUploadFailure': Number of parts in multipart upload exceeded. Current part count = X, Part count limit = Y + + at org.apache.hadoop.fs.s3a.WriteOperationHelper.newUploadPartRequest(WriteOperationHelper.java:432) + at org.apache.hadoop.fs.s3a.S3ABlockOutputStream$MultiPartUpload.uploadBlockAsync(S3ABlockOutputStream.java:627) + at org.apache.hadoop.fs.s3a.S3ABlockOutputStream$MultiPartUpload.access$000(S3ABlockOutputStream.java:532) + at org.apache.hadoop.fs.s3a.S3ABlockOutputStream.uploadCurrentBlock(S3ABlockOutputStream.java:316) + at org.apache.hadoop.fs.s3a.S3ABlockOutputStream.write(S3ABlockOutputStream.java:301) +``` + +This is a known issue where upload fails if number of parts +is more than 10000 (specified by aws SDK). You can configure +`fs.s3a.multipart.size` to reduce the number of parts. + ### `UnknownStoreException` Bucket does not exist. The bucket does not exist. diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ABlockOutputStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ABlockOutputStream.java index ff176f58da..60904d7ae8 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ABlockOutputStream.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ABlockOutputStream.java @@ -18,18 +18,23 @@ package org.apache.hadoop.fs.s3a; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.PathIOException; import org.apache.hadoop.fs.s3a.commit.PutTracker; import org.apache.hadoop.util.Progressable; import org.junit.Before; import org.junit.Test; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.util.concurrent.ExecutorService; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; import static org.junit.Assert.*; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; /** * Unit tests for {@link S3ABlockOutputStream}. @@ -57,10 +62,24 @@ public void setUp() throws Exception { public void testFlushNoOpWhenStreamClosed() throws Exception { doThrow(new IOException()).when(stream).checkOpen(); - try { - stream.flush(); - } catch (Exception e){ - fail("Should not have any exception."); - } + stream.flush(); + } + + @Test + public void testWriteOperationHelperPartLimits() throws Throwable { + S3AFileSystem s3a = mock(S3AFileSystem.class); + when(s3a.getBucket()).thenReturn("bucket"); + WriteOperationHelper woh = new WriteOperationHelper(s3a, + new Configuration()); + ByteArrayInputStream inputStream = new ByteArrayInputStream( + "a".getBytes()); + // first one works + String key = "destKey"; + woh.newUploadPartRequest(key, + "uploadId", 1, 1024, inputStream, null, 0L); + // but ask past the limit and a PathIOE is raised + intercept(PathIOException.class, key, + () -> woh.newUploadPartRequest(key, + "uploadId", 50000, 1024, inputStream, null, 0L)); } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AMultipartUploadSizeLimits.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AMultipartUploadSizeLimits.java new file mode 100644 index 0000000000..4a348be8db --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AMultipartUploadSizeLimits.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.scale; + +import java.io.File; + +import org.assertj.core.api.Assertions; +import org.junit.Test; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathIOException; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.S3AInstrumentation; +import org.apache.hadoop.fs.s3a.Statistic; +import org.apache.hadoop.fs.s3a.auth.ProgressCounter; +import org.apache.hadoop.fs.s3a.commit.CommitOperations; + +import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile; +import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; +import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; +import static org.apache.hadoop.fs.s3a.impl.InternalConstants.UPLOAD_PART_COUNT_LIMIT; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + +/** + * Testing S3 multipart upload for s3. + */ +public class ITestS3AMultipartUploadSizeLimits extends S3AScaleTestBase { + + public static final int MPU_SIZE = 5 * _1MB; + + @Override + protected Configuration createScaleConfiguration() { + Configuration configuration = super.createScaleConfiguration(); + removeBaseAndBucketOverrides(configuration, + MULTIPART_SIZE, + UPLOAD_PART_COUNT_LIMIT); + configuration.setLong(MULTIPART_SIZE, MPU_SIZE); + // Setting the part count limit to 2 such that we + // failures. + configuration.setLong(UPLOAD_PART_COUNT_LIMIT, 2); + return configuration; + } + + /** + * Uploads under the limit are valid. + */ + @Test + public void testTwoPartUpload() throws Throwable { + Path file = path(getMethodName()); + // Creating a file having parts less than configured + // part count will succeed. + createFile(getFileSystem(), file, true, + dataset(6 * _1MB, 'a', 'z' - 'a')); + } + + /** + * Tests to validate that exception is thrown during a + * multi part upload when the number of parts is greater + * than the allowed limit. + */ + @Test + public void testUploadOverLimitFailure() throws Throwable { + S3AFileSystem fs = getFileSystem(); + Path file = path(getMethodName()); + // Creating a file with more than configured part count should + // throw a PathIOE + intercept(PathIOException.class, + () -> createFile(fs, + file, + false, + dataset(15 * _1MB, 'a', 'z' - 'a'))); + // and the path does not exist + assertPathDoesNotExist("upload must not have completed", file); + } + + @Test + public void testCommitLimitFailure() throws Throwable { + describe("verify commit uploads fail-safe when MPU limits exceeded"); + S3AFileSystem fs = getFileSystem(); + CommitOperations actions = new CommitOperations(fs); + File tempFile = File.createTempFile("commit", ".txt"); + FileUtils.writeByteArrayToFile(tempFile, + dataset(15 * _1MB, 'a', 'z' - 'a')); + Path dest = methodPath(); + final S3AInstrumentation instrumentation = fs.getInstrumentation(); + final long initial = instrumentation.getCounterValue( + Statistic.COMMITTER_COMMITS_ABORTED); + + intercept(PathIOException.class, () -> + actions.uploadFileToPendingCommit(tempFile, + dest, + null, + MPU_SIZE, + new ProgressCounter())); + assertPathDoesNotExist("upload must not have completed", dest); + final long after = instrumentation.getCounterValue( + Statistic.COMMITTER_COMMITS_ABORTED); + Assertions.assertThat(after). + describedAs("commit abort count") + .isEqualTo(initial + 1); + } +}