HADOOP-16900. Very large files can be truncated when written through the S3A FileSystem.
Contributed by Mukund Thakur and Steve Loughran. This patch ensures that writes to S3A fail when more than 10,000 blocks are written. That upper bound still exists. To write massive files, make sure that the value of fs.s3a.multipart.size is set to a size which is large enough to upload the files in fewer than 10,000 blocks. Change-Id: Icec604e2a357ffd38d7ae7bc3f887ff55f2d721a
This commit is contained in:
parent
cef0756929
commit
29b19cd592
@ -28,7 +28,6 @@
|
|||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import com.amazonaws.AmazonClientException;
|
|
||||||
import com.amazonaws.event.ProgressEvent;
|
import com.amazonaws.event.ProgressEvent;
|
||||||
import com.amazonaws.event.ProgressEventType;
|
import com.amazonaws.event.ProgressEventType;
|
||||||
import com.amazonaws.event.ProgressListener;
|
import com.amazonaws.event.ProgressListener;
|
||||||
@ -46,6 +45,7 @@
|
|||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.fs.PathIOException;
|
||||||
import org.apache.hadoop.fs.StreamCapabilities;
|
import org.apache.hadoop.fs.StreamCapabilities;
|
||||||
import org.apache.hadoop.fs.s3a.commit.CommitConstants;
|
import org.apache.hadoop.fs.s3a.commit.CommitConstants;
|
||||||
import org.apache.hadoop.fs.s3a.commit.PutTracker;
|
import org.apache.hadoop.fs.s3a.commit.PutTracker;
|
||||||
@ -304,8 +304,8 @@ public synchronized void write(byte[] source, int offset, int len)
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Start an asynchronous upload of the current block.
|
* Start an asynchronous upload of the current block.
|
||||||
* @throws IOException Problems opening the destination for upload
|
* @throws IOException Problems opening the destination for upload,
|
||||||
* or initializing the upload.
|
* initializing the upload, or if a previous operation has failed.
|
||||||
*/
|
*/
|
||||||
private synchronized void uploadCurrentBlock() throws IOException {
|
private synchronized void uploadCurrentBlock() throws IOException {
|
||||||
Preconditions.checkState(hasActiveBlock(), "No active block");
|
Preconditions.checkState(hasActiveBlock(), "No active block");
|
||||||
@ -394,6 +394,13 @@ public void close() throws IOException {
|
|||||||
}
|
}
|
||||||
LOG.debug("Upload complete to {} by {}", key, writeOperationHelper);
|
LOG.debug("Upload complete to {} by {}", key, writeOperationHelper);
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
|
// the operation failed.
|
||||||
|
// if this happened during a multipart upload, abort the
|
||||||
|
// operation, so as to not leave (billable) data
|
||||||
|
// pending on the bucket
|
||||||
|
if (multiPartUpload != null) {
|
||||||
|
multiPartUpload.abort();
|
||||||
|
}
|
||||||
writeOperationHelper.writeFailed(ioe);
|
writeOperationHelper.writeFailed(ioe);
|
||||||
throw ioe;
|
throw ioe;
|
||||||
} finally {
|
} finally {
|
||||||
@ -528,6 +535,13 @@ private class MultiPartUpload {
|
|||||||
private int partsUploaded;
|
private int partsUploaded;
|
||||||
private long bytesSubmitted;
|
private long bytesSubmitted;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Any IOException raised during block upload.
|
||||||
|
* if non-null, then close() MUST NOT complete
|
||||||
|
* the file upload.
|
||||||
|
*/
|
||||||
|
private IOException blockUploadFailure;
|
||||||
|
|
||||||
MultiPartUpload(String key) throws IOException {
|
MultiPartUpload(String key) throws IOException {
|
||||||
this.uploadId = writeOperationHelper.initiateMultiPartUpload(key);
|
this.uploadId = writeOperationHelper.initiateMultiPartUpload(key);
|
||||||
this.partETagsFutures = new ArrayList<>(2);
|
this.partETagsFutures = new ArrayList<>(2);
|
||||||
@ -568,31 +582,60 @@ public long getBytesSubmitted() {
|
|||||||
return bytesSubmitted;
|
return bytesSubmitted;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A block upload has failed.
|
||||||
|
* Recorded it if there has been no previous failure.
|
||||||
|
* @param e error
|
||||||
|
*/
|
||||||
|
public void noteUploadFailure(final IOException e) {
|
||||||
|
if (blockUploadFailure == null) {
|
||||||
|
blockUploadFailure = e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If there is a block upload failure -throw it.
|
||||||
|
* @throws IOException if one has already been caught.
|
||||||
|
*/
|
||||||
|
public void maybeRethrowUploadFailure() throws IOException {
|
||||||
|
if (blockUploadFailure != null) {
|
||||||
|
throw blockUploadFailure;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Upload a block of data.
|
* Upload a block of data.
|
||||||
* This will take the block
|
* This will take the block
|
||||||
* @param block block to upload
|
* @param block block to upload
|
||||||
* @throws IOException upload failure
|
* @throws IOException upload failure
|
||||||
|
* @throws PathIOException if too many blocks were written
|
||||||
*/
|
*/
|
||||||
private void uploadBlockAsync(final S3ADataBlocks.DataBlock block)
|
private void uploadBlockAsync(final S3ADataBlocks.DataBlock block)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
LOG.debug("Queueing upload of {} for upload {}", block, uploadId);
|
LOG.debug("Queueing upload of {} for upload {}", block, uploadId);
|
||||||
Preconditions.checkNotNull(uploadId, "Null uploadId");
|
Preconditions.checkNotNull(uploadId, "Null uploadId");
|
||||||
|
maybeRethrowUploadFailure();
|
||||||
partsSubmitted++;
|
partsSubmitted++;
|
||||||
final int size = block.dataSize();
|
final int size = block.dataSize();
|
||||||
bytesSubmitted += size;
|
bytesSubmitted += size;
|
||||||
final S3ADataBlocks.BlockUploadData uploadData = block.startUpload();
|
|
||||||
final int currentPartNumber = partETagsFutures.size() + 1;
|
final int currentPartNumber = partETagsFutures.size() + 1;
|
||||||
final UploadPartRequest request =
|
final UploadPartRequest request;
|
||||||
writeOperationHelper.newUploadPartRequest(
|
final S3ADataBlocks.BlockUploadData uploadData;
|
||||||
key,
|
try {
|
||||||
uploadId,
|
uploadData = block.startUpload();
|
||||||
currentPartNumber,
|
request = writeOperationHelper.newUploadPartRequest(
|
||||||
size,
|
key,
|
||||||
uploadData.getUploadStream(),
|
uploadId,
|
||||||
uploadData.getFile(),
|
currentPartNumber,
|
||||||
0L);
|
size,
|
||||||
|
uploadData.getUploadStream(),
|
||||||
|
uploadData.getFile(),
|
||||||
|
0L);
|
||||||
|
} catch (IOException e) {
|
||||||
|
// failure to start the upload.
|
||||||
|
noteUploadFailure(e);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
long transferQueueTime = now();
|
long transferQueueTime = now();
|
||||||
BlockUploadProgress callback =
|
BlockUploadProgress callback =
|
||||||
new BlockUploadProgress(
|
new BlockUploadProgress(
|
||||||
@ -613,6 +656,10 @@ private void uploadBlockAsync(final S3ADataBlocks.DataBlock block)
|
|||||||
LOG.debug("Stream statistics of {}", statistics);
|
LOG.debug("Stream statistics of {}", statistics);
|
||||||
partsUploaded++;
|
partsUploaded++;
|
||||||
return partETag;
|
return partETag;
|
||||||
|
} catch (IOException e) {
|
||||||
|
// save immediately.
|
||||||
|
noteUploadFailure(e);
|
||||||
|
throw e;
|
||||||
} finally {
|
} finally {
|
||||||
// close the stream and block
|
// close the stream and block
|
||||||
cleanupWithLogger(LOG, uploadData, block);
|
cleanupWithLogger(LOG, uploadData, block);
|
||||||
@ -638,10 +685,6 @@ private List<PartETag> waitForAllPartUploads() throws IOException {
|
|||||||
//there is no way of recovering so abort
|
//there is no way of recovering so abort
|
||||||
//cancel all partUploads
|
//cancel all partUploads
|
||||||
LOG.debug("While waiting for upload completion", ee);
|
LOG.debug("While waiting for upload completion", ee);
|
||||||
LOG.debug("Cancelling futures");
|
|
||||||
for (ListenableFuture<PartETag> future : partETagsFutures) {
|
|
||||||
future.cancel(true);
|
|
||||||
}
|
|
||||||
//abort multipartupload
|
//abort multipartupload
|
||||||
this.abort();
|
this.abort();
|
||||||
throw extractException("Multi-part upload with id '" + uploadId
|
throw extractException("Multi-part upload with id '" + uploadId
|
||||||
@ -649,6 +692,16 @@ private List<PartETag> waitForAllPartUploads() throws IOException {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cancel all active uploads.
|
||||||
|
*/
|
||||||
|
private void cancelAllActiveFutures() {
|
||||||
|
LOG.debug("Cancelling futures");
|
||||||
|
for (ListenableFuture<PartETag> future : partETagsFutures) {
|
||||||
|
future.cancel(true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This completes a multipart upload.
|
* This completes a multipart upload.
|
||||||
* Sometimes it fails; here retries are handled to avoid losing all data
|
* Sometimes it fails; here retries are handled to avoid losing all data
|
||||||
@ -658,6 +711,7 @@ private List<PartETag> waitForAllPartUploads() throws IOException {
|
|||||||
*/
|
*/
|
||||||
private void complete(List<PartETag> partETags)
|
private void complete(List<PartETag> partETags)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
maybeRethrowUploadFailure();
|
||||||
AtomicInteger errorCount = new AtomicInteger(0);
|
AtomicInteger errorCount = new AtomicInteger(0);
|
||||||
try {
|
try {
|
||||||
writeOperationHelper.completeMPUwithRetries(key,
|
writeOperationHelper.completeMPUwithRetries(key,
|
||||||
@ -675,9 +729,9 @@ private void complete(List<PartETag> partETags)
|
|||||||
* IOExceptions are caught; this is expected to be run as a cleanup process.
|
* IOExceptions are caught; this is expected to be run as a cleanup process.
|
||||||
*/
|
*/
|
||||||
public void abort() {
|
public void abort() {
|
||||||
int retryCount = 0;
|
LOG.debug("Aborting upload");
|
||||||
AmazonClientException lastException;
|
|
||||||
fs.incrementStatistic(OBJECT_MULTIPART_UPLOAD_ABORTED);
|
fs.incrementStatistic(OBJECT_MULTIPART_UPLOAD_ABORTED);
|
||||||
|
cancelAllActiveFutures();
|
||||||
try {
|
try {
|
||||||
writeOperationHelper.abortMultipartUpload(key, uploadId,
|
writeOperationHelper.abortMultipartUpload(key, uploadId,
|
||||||
(text, e, r, i) -> statistics.exceptionInMultipartAbort());
|
(text, e, r, i) -> statistics.exceptionInMultipartAbort());
|
||||||
|
@ -49,6 +49,7 @@
|
|||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.PathIOException;
|
||||||
import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState;
|
import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState;
|
||||||
import org.apache.hadoop.fs.s3a.s3guard.S3Guard;
|
import org.apache.hadoop.fs.s3a.s3guard.S3Guard;
|
||||||
import org.apache.hadoop.fs.s3a.select.SelectBinding;
|
import org.apache.hadoop.fs.s3a.select.SelectBinding;
|
||||||
@ -57,6 +58,9 @@
|
|||||||
import static com.google.common.base.Preconditions.checkArgument;
|
import static com.google.common.base.Preconditions.checkArgument;
|
||||||
import static com.google.common.base.Preconditions.checkNotNull;
|
import static com.google.common.base.Preconditions.checkNotNull;
|
||||||
import static org.apache.hadoop.fs.s3a.Invoker.*;
|
import static org.apache.hadoop.fs.s3a.Invoker.*;
|
||||||
|
import static org.apache.hadoop.fs.s3a.S3AUtils.longOption;
|
||||||
|
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DEFAULT_UPLOAD_PART_COUNT_LIMIT;
|
||||||
|
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.UPLOAD_PART_COUNT_LIMIT;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Helper for low-level operations against an S3 Bucket for writing data,
|
* Helper for low-level operations against an S3 Bucket for writing data,
|
||||||
@ -384,6 +388,7 @@ public void abortMultipartCommit(String destKey, String uploadId)
|
|||||||
* A subset of the file may be posted, by providing the starting point
|
* A subset of the file may be posted, by providing the starting point
|
||||||
* in {@code offset} and a length of block in {@code size} equal to
|
* in {@code offset} and a length of block in {@code size} equal to
|
||||||
* or less than the remaining bytes.
|
* or less than the remaining bytes.
|
||||||
|
* The part number must be less than 10000.
|
||||||
* @param destKey destination key of ongoing operation
|
* @param destKey destination key of ongoing operation
|
||||||
* @param uploadId ID of ongoing upload
|
* @param uploadId ID of ongoing upload
|
||||||
* @param partNumber current part number of the upload
|
* @param partNumber current part number of the upload
|
||||||
@ -392,6 +397,8 @@ public void abortMultipartCommit(String destKey, String uploadId)
|
|||||||
* @param sourceFile optional source file.
|
* @param sourceFile optional source file.
|
||||||
* @param offset offset in file to start reading.
|
* @param offset offset in file to start reading.
|
||||||
* @return the request.
|
* @return the request.
|
||||||
|
* @throws IllegalArgumentException if the parameters are invalid -including
|
||||||
|
* @throws PathIOException if the part number is out of range.
|
||||||
*/
|
*/
|
||||||
public UploadPartRequest newUploadPartRequest(
|
public UploadPartRequest newUploadPartRequest(
|
||||||
String destKey,
|
String destKey,
|
||||||
@ -400,18 +407,32 @@ public UploadPartRequest newUploadPartRequest(
|
|||||||
int size,
|
int size,
|
||||||
InputStream uploadStream,
|
InputStream uploadStream,
|
||||||
File sourceFile,
|
File sourceFile,
|
||||||
Long offset) {
|
Long offset) throws PathIOException {
|
||||||
checkNotNull(uploadId);
|
checkNotNull(uploadId);
|
||||||
// exactly one source must be set; xor verifies this
|
// exactly one source must be set; xor verifies this
|
||||||
checkArgument((uploadStream != null) ^ (sourceFile != null),
|
checkArgument((uploadStream != null) ^ (sourceFile != null),
|
||||||
"Data source");
|
"Data source");
|
||||||
checkArgument(size >= 0, "Invalid partition size %s", size);
|
checkArgument(size >= 0, "Invalid partition size %s", size);
|
||||||
checkArgument(partNumber > 0 && partNumber <= 10000,
|
checkArgument(partNumber > 0,
|
||||||
"partNumber must be between 1 and 10000 inclusive, but is %s",
|
"partNumber must be between 1 and %s inclusive, but is %s",
|
||||||
partNumber);
|
DEFAULT_UPLOAD_PART_COUNT_LIMIT, partNumber);
|
||||||
|
|
||||||
LOG.debug("Creating part upload request for {} #{} size {}",
|
LOG.debug("Creating part upload request for {} #{} size {}",
|
||||||
uploadId, partNumber, size);
|
uploadId, partNumber, size);
|
||||||
|
long partCountLimit = longOption(conf,
|
||||||
|
UPLOAD_PART_COUNT_LIMIT,
|
||||||
|
DEFAULT_UPLOAD_PART_COUNT_LIMIT,
|
||||||
|
1);
|
||||||
|
if (partCountLimit != DEFAULT_UPLOAD_PART_COUNT_LIMIT) {
|
||||||
|
LOG.warn("Configuration property {} shouldn't be overridden by client",
|
||||||
|
UPLOAD_PART_COUNT_LIMIT);
|
||||||
|
}
|
||||||
|
final String pathErrorMsg = "Number of parts in multipart upload exceeded."
|
||||||
|
+ " Current part count = %s, Part count limit = %s ";
|
||||||
|
if (partNumber > partCountLimit) {
|
||||||
|
throw new PathIOException(destKey,
|
||||||
|
String.format(pathErrorMsg, partNumber, partCountLimit));
|
||||||
|
}
|
||||||
UploadPartRequest request = new UploadPartRequest()
|
UploadPartRequest request = new UploadPartRequest()
|
||||||
.withBucketName(bucket)
|
.withBucketName(bucket)
|
||||||
.withKey(destKey)
|
.withKey(destKey)
|
||||||
|
@ -42,6 +42,7 @@
|
|||||||
import org.apache.hadoop.fs.LocatedFileStatus;
|
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.PathFilter;
|
import org.apache.hadoop.fs.PathFilter;
|
||||||
|
import org.apache.hadoop.fs.PathIOException;
|
||||||
import org.apache.hadoop.fs.RemoteIterator;
|
import org.apache.hadoop.fs.RemoteIterator;
|
||||||
import org.apache.hadoop.fs.s3a.S3AFileSystem;
|
import org.apache.hadoop.fs.s3a.S3AFileSystem;
|
||||||
import org.apache.hadoop.fs.s3a.S3AInstrumentation;
|
import org.apache.hadoop.fs.s3a.S3AInstrumentation;
|
||||||
@ -50,6 +51,7 @@
|
|||||||
import org.apache.hadoop.fs.s3a.commit.files.PendingSet;
|
import org.apache.hadoop.fs.s3a.commit.files.PendingSet;
|
||||||
import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
|
import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
|
||||||
import org.apache.hadoop.fs.s3a.commit.files.SuccessData;
|
import org.apache.hadoop.fs.s3a.commit.files.SuccessData;
|
||||||
|
import org.apache.hadoop.fs.s3a.impl.InternalConstants;
|
||||||
import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState;
|
import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.util.DurationInfo;
|
import org.apache.hadoop.util.DurationInfo;
|
||||||
@ -481,6 +483,15 @@ public SinglePendingCommit uploadFileToPendingCommit(File localFile,
|
|||||||
if (numParts == 0) {
|
if (numParts == 0) {
|
||||||
numParts = 1;
|
numParts = 1;
|
||||||
}
|
}
|
||||||
|
if (numParts > InternalConstants.DEFAULT_UPLOAD_PART_COUNT_LIMIT) {
|
||||||
|
// fail if the file is too big.
|
||||||
|
// it would be possible to be clever here and recalculate the part size,
|
||||||
|
// but this is not currently done.
|
||||||
|
throw new PathIOException(destPath.toString(),
|
||||||
|
String.format("File to upload (size %d)"
|
||||||
|
+ " is too big to be uploaded in parts of size %d",
|
||||||
|
numParts, length));
|
||||||
|
}
|
||||||
|
|
||||||
List<PartETag> parts = new ArrayList<>((int) numParts);
|
List<PartETag> parts = new ArrayList<>((int) numParts);
|
||||||
|
|
||||||
@ -510,7 +521,6 @@ public SinglePendingCommit uploadFileToPendingCommit(File localFile,
|
|||||||
return commitData;
|
return commitData;
|
||||||
} finally {
|
} finally {
|
||||||
if (threw && uploadId != null) {
|
if (threw && uploadId != null) {
|
||||||
statistics.commitAborted();
|
|
||||||
try {
|
try {
|
||||||
abortMultipartCommit(destKey, uploadId);
|
abortMultipartCommit(destKey, uploadId);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
|
@ -23,6 +23,8 @@
|
|||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.fs.s3a.Constants;
|
import org.apache.hadoop.fs.s3a.Constants;
|
||||||
|
|
||||||
@ -93,4 +95,19 @@ private InternalConstants() {
|
|||||||
/** Directory marker attribute: see HADOOP-16613. Value: {@value}. */
|
/** Directory marker attribute: see HADOOP-16613. Value: {@value}. */
|
||||||
public static final String X_DIRECTORY =
|
public static final String X_DIRECTORY =
|
||||||
"application/x-directory";
|
"application/x-directory";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A configuration option for test use only: maximum
|
||||||
|
* part count on block writes/uploads.
|
||||||
|
* Value: {@value}.
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
public static final String UPLOAD_PART_COUNT_LIMIT =
|
||||||
|
"fs.s3a.internal.upload.part.count.limit";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Maximum entries you can upload in a single file write/copy/upload.
|
||||||
|
* Value: {@value}.
|
||||||
|
*/
|
||||||
|
public static final int DEFAULT_UPLOAD_PART_COUNT_LIMIT = 10000;
|
||||||
}
|
}
|
||||||
|
@ -1216,6 +1216,24 @@ a new one than read to the end of a large file.
|
|||||||
Note: the threshold when data is read rather than the stream aborted can be tuned
|
Note: the threshold when data is read rather than the stream aborted can be tuned
|
||||||
by `fs.s3a.readahead.range`; seek policy in `fs.s3a.experimental.input.fadvise`.
|
by `fs.s3a.readahead.range`; seek policy in `fs.s3a.experimental.input.fadvise`.
|
||||||
|
|
||||||
|
### <a name="upload_failure"></a> `PathIOException` Number of parts in multipart upload exceeded.
|
||||||
|
|
||||||
|
Number of parts in multipart upload exceeded
|
||||||
|
|
||||||
|
```
|
||||||
|
org.apache.hadoop.fs.PathIOException: `test/testMultiPartUploadFailure': Number of parts in multipart upload exceeded. Current part count = X, Part count limit = Y
|
||||||
|
|
||||||
|
at org.apache.hadoop.fs.s3a.WriteOperationHelper.newUploadPartRequest(WriteOperationHelper.java:432)
|
||||||
|
at org.apache.hadoop.fs.s3a.S3ABlockOutputStream$MultiPartUpload.uploadBlockAsync(S3ABlockOutputStream.java:627)
|
||||||
|
at org.apache.hadoop.fs.s3a.S3ABlockOutputStream$MultiPartUpload.access$000(S3ABlockOutputStream.java:532)
|
||||||
|
at org.apache.hadoop.fs.s3a.S3ABlockOutputStream.uploadCurrentBlock(S3ABlockOutputStream.java:316)
|
||||||
|
at org.apache.hadoop.fs.s3a.S3ABlockOutputStream.write(S3ABlockOutputStream.java:301)
|
||||||
|
```
|
||||||
|
|
||||||
|
This is a known issue where upload fails if number of parts
|
||||||
|
is more than 10000 (specified by aws SDK). You can configure
|
||||||
|
`fs.s3a.multipart.size` to reduce the number of parts.
|
||||||
|
|
||||||
### <a name="no_such_bucket"></a> `UnknownStoreException` Bucket does not exist.
|
### <a name="no_such_bucket"></a> `UnknownStoreException` Bucket does not exist.
|
||||||
|
|
||||||
The bucket does not exist.
|
The bucket does not exist.
|
||||||
|
@ -18,18 +18,23 @@
|
|||||||
|
|
||||||
package org.apache.hadoop.fs.s3a;
|
package org.apache.hadoop.fs.s3a;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.PathIOException;
|
||||||
import org.apache.hadoop.fs.s3a.commit.PutTracker;
|
import org.apache.hadoop.fs.s3a.commit.PutTracker;
|
||||||
import org.apache.hadoop.util.Progressable;
|
import org.apache.hadoop.util.Progressable;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.ByteArrayInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.*;
|
||||||
import static org.mockito.Mockito.doThrow;
|
import static org.mockito.Mockito.doThrow;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.spy;
|
import static org.mockito.Mockito.spy;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Unit tests for {@link S3ABlockOutputStream}.
|
* Unit tests for {@link S3ABlockOutputStream}.
|
||||||
@ -57,10 +62,24 @@ public void setUp() throws Exception {
|
|||||||
public void testFlushNoOpWhenStreamClosed() throws Exception {
|
public void testFlushNoOpWhenStreamClosed() throws Exception {
|
||||||
doThrow(new IOException()).when(stream).checkOpen();
|
doThrow(new IOException()).when(stream).checkOpen();
|
||||||
|
|
||||||
try {
|
stream.flush();
|
||||||
stream.flush();
|
}
|
||||||
} catch (Exception e){
|
|
||||||
fail("Should not have any exception.");
|
@Test
|
||||||
}
|
public void testWriteOperationHelperPartLimits() throws Throwable {
|
||||||
|
S3AFileSystem s3a = mock(S3AFileSystem.class);
|
||||||
|
when(s3a.getBucket()).thenReturn("bucket");
|
||||||
|
WriteOperationHelper woh = new WriteOperationHelper(s3a,
|
||||||
|
new Configuration());
|
||||||
|
ByteArrayInputStream inputStream = new ByteArrayInputStream(
|
||||||
|
"a".getBytes());
|
||||||
|
// first one works
|
||||||
|
String key = "destKey";
|
||||||
|
woh.newUploadPartRequest(key,
|
||||||
|
"uploadId", 1, 1024, inputStream, null, 0L);
|
||||||
|
// but ask past the limit and a PathIOE is raised
|
||||||
|
intercept(PathIOException.class, key,
|
||||||
|
() -> woh.newUploadPartRequest(key,
|
||||||
|
"uploadId", 50000, 1024, inputStream, null, 0L));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,121 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.fs.s3a.scale;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
|
||||||
|
import org.assertj.core.api.Assertions;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import org.apache.commons.io.FileUtils;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.PathIOException;
|
||||||
|
import org.apache.hadoop.fs.s3a.S3AFileSystem;
|
||||||
|
import org.apache.hadoop.fs.s3a.S3AInstrumentation;
|
||||||
|
import org.apache.hadoop.fs.s3a.Statistic;
|
||||||
|
import org.apache.hadoop.fs.s3a.auth.ProgressCounter;
|
||||||
|
import org.apache.hadoop.fs.s3a.commit.CommitOperations;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
|
||||||
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
|
||||||
|
import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE;
|
||||||
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
|
||||||
|
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.UPLOAD_PART_COUNT_LIMIT;
|
||||||
|
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Testing S3 multipart upload for s3.
|
||||||
|
*/
|
||||||
|
public class ITestS3AMultipartUploadSizeLimits extends S3AScaleTestBase {
|
||||||
|
|
||||||
|
public static final int MPU_SIZE = 5 * _1MB;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Configuration createScaleConfiguration() {
|
||||||
|
Configuration configuration = super.createScaleConfiguration();
|
||||||
|
removeBaseAndBucketOverrides(configuration,
|
||||||
|
MULTIPART_SIZE,
|
||||||
|
UPLOAD_PART_COUNT_LIMIT);
|
||||||
|
configuration.setLong(MULTIPART_SIZE, MPU_SIZE);
|
||||||
|
// Setting the part count limit to 2 such that we
|
||||||
|
// failures.
|
||||||
|
configuration.setLong(UPLOAD_PART_COUNT_LIMIT, 2);
|
||||||
|
return configuration;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Uploads under the limit are valid.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testTwoPartUpload() throws Throwable {
|
||||||
|
Path file = path(getMethodName());
|
||||||
|
// Creating a file having parts less than configured
|
||||||
|
// part count will succeed.
|
||||||
|
createFile(getFileSystem(), file, true,
|
||||||
|
dataset(6 * _1MB, 'a', 'z' - 'a'));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests to validate that exception is thrown during a
|
||||||
|
* multi part upload when the number of parts is greater
|
||||||
|
* than the allowed limit.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testUploadOverLimitFailure() throws Throwable {
|
||||||
|
S3AFileSystem fs = getFileSystem();
|
||||||
|
Path file = path(getMethodName());
|
||||||
|
// Creating a file with more than configured part count should
|
||||||
|
// throw a PathIOE
|
||||||
|
intercept(PathIOException.class,
|
||||||
|
() -> createFile(fs,
|
||||||
|
file,
|
||||||
|
false,
|
||||||
|
dataset(15 * _1MB, 'a', 'z' - 'a')));
|
||||||
|
// and the path does not exist
|
||||||
|
assertPathDoesNotExist("upload must not have completed", file);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCommitLimitFailure() throws Throwable {
|
||||||
|
describe("verify commit uploads fail-safe when MPU limits exceeded");
|
||||||
|
S3AFileSystem fs = getFileSystem();
|
||||||
|
CommitOperations actions = new CommitOperations(fs);
|
||||||
|
File tempFile = File.createTempFile("commit", ".txt");
|
||||||
|
FileUtils.writeByteArrayToFile(tempFile,
|
||||||
|
dataset(15 * _1MB, 'a', 'z' - 'a'));
|
||||||
|
Path dest = methodPath();
|
||||||
|
final S3AInstrumentation instrumentation = fs.getInstrumentation();
|
||||||
|
final long initial = instrumentation.getCounterValue(
|
||||||
|
Statistic.COMMITTER_COMMITS_ABORTED);
|
||||||
|
|
||||||
|
intercept(PathIOException.class, () ->
|
||||||
|
actions.uploadFileToPendingCommit(tempFile,
|
||||||
|
dest,
|
||||||
|
null,
|
||||||
|
MPU_SIZE,
|
||||||
|
new ProgressCounter()));
|
||||||
|
assertPathDoesNotExist("upload must not have completed", dest);
|
||||||
|
final long after = instrumentation.getCounterValue(
|
||||||
|
Statistic.COMMITTER_COMMITS_ABORTED);
|
||||||
|
Assertions.assertThat(after).
|
||||||
|
describedAs("commit abort count")
|
||||||
|
.isEqualTo(initial + 1);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user