Contributed by Harshit Gupta.
This commit is contained in:
parent
86ad35c94c
commit
f312a0c784
@ -107,6 +107,7 @@
|
|||||||
<configuration>
|
<configuration>
|
||||||
<forkCount>${testsThreadCount}</forkCount>
|
<forkCount>${testsThreadCount}</forkCount>
|
||||||
<reuseForks>false</reuseForks>
|
<reuseForks>false</reuseForks>
|
||||||
|
<trimStackTrace>false</trimStackTrace>
|
||||||
<argLine>${maven-surefire-plugin.argLine} -DminiClusterDedicatedDirs=true</argLine>
|
<argLine>${maven-surefire-plugin.argLine} -DminiClusterDedicatedDirs=true</argLine>
|
||||||
<systemPropertyVariables>
|
<systemPropertyVariables>
|
||||||
<testsThreadCount>${testsThreadCount}</testsThreadCount>
|
<testsThreadCount>${testsThreadCount}</testsThreadCount>
|
||||||
@ -276,6 +277,7 @@
|
|||||||
<goal>verify</goal>
|
<goal>verify</goal>
|
||||||
</goals>
|
</goals>
|
||||||
<configuration>
|
<configuration>
|
||||||
|
<trimStackTrace>false</trimStackTrace>
|
||||||
<systemPropertyVariables>
|
<systemPropertyVariables>
|
||||||
<!-- Propagate scale parameters -->
|
<!-- Propagate scale parameters -->
|
||||||
<fs.s3a.scale.test.enabled>${fs.s3a.scale.test.enabled}</fs.s3a.scale.test.enabled>
|
<fs.s3a.scale.test.enabled>${fs.s3a.scale.test.enabled}</fs.s3a.scale.test.enabled>
|
||||||
|
@ -1256,4 +1256,23 @@ private Constants() {
|
|||||||
*/
|
*/
|
||||||
public static final int DEFAULT_AWS_S3_VECTOR_ACTIVE_RANGE_READS = 4;
|
public static final int DEFAULT_AWS_S3_VECTOR_ACTIVE_RANGE_READS = 4;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Option to enable or disable the multipart uploads.
|
||||||
|
* Value: {@value}.
|
||||||
|
* <p>
|
||||||
|
* Default is {@link #DEFAULT_MULTIPART_UPLOAD_ENABLED}.
|
||||||
|
*/
|
||||||
|
public static final String MULTIPART_UPLOADS_ENABLED = "fs.s3a.multipart.uploads.enabled";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Default value for multipart uploads.
|
||||||
|
* {@value}
|
||||||
|
*/
|
||||||
|
public static final boolean DEFAULT_MULTIPART_UPLOAD_ENABLED = true;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stream supports multipart uploads to the given path.
|
||||||
|
*/
|
||||||
|
public static final String STORE_CAPABILITY_DIRECTORY_MARKER_MULTIPART_UPLOAD_ENABLED =
|
||||||
|
"fs.s3a.capability.multipart.uploads.enabled";
|
||||||
}
|
}
|
||||||
|
@ -101,7 +101,7 @@ class S3ABlockOutputStream extends OutputStream implements
|
|||||||
private final String key;
|
private final String key;
|
||||||
|
|
||||||
/** Size of all blocks. */
|
/** Size of all blocks. */
|
||||||
private final int blockSize;
|
private final long blockSize;
|
||||||
|
|
||||||
/** IO Statistics. */
|
/** IO Statistics. */
|
||||||
private final IOStatistics iostatistics;
|
private final IOStatistics iostatistics;
|
||||||
@ -169,6 +169,9 @@ class S3ABlockOutputStream extends OutputStream implements
|
|||||||
/** Thread level IOStatistics Aggregator. */
|
/** Thread level IOStatistics Aggregator. */
|
||||||
private final IOStatisticsAggregator threadIOStatisticsAggregator;
|
private final IOStatisticsAggregator threadIOStatisticsAggregator;
|
||||||
|
|
||||||
|
/** Is multipart upload enabled? */
|
||||||
|
private final boolean isMultipartUploadEnabled;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An S3A output stream which uploads partitions in a separate pool of
|
* An S3A output stream which uploads partitions in a separate pool of
|
||||||
* threads; different {@link S3ADataBlocks.BlockFactory}
|
* threads; different {@link S3ADataBlocks.BlockFactory}
|
||||||
@ -181,7 +184,6 @@ class S3ABlockOutputStream extends OutputStream implements
|
|||||||
this.builder = builder;
|
this.builder = builder;
|
||||||
this.key = builder.key;
|
this.key = builder.key;
|
||||||
this.blockFactory = builder.blockFactory;
|
this.blockFactory = builder.blockFactory;
|
||||||
this.blockSize = (int) builder.blockSize;
|
|
||||||
this.statistics = builder.statistics;
|
this.statistics = builder.statistics;
|
||||||
// test instantiations may not provide statistics;
|
// test instantiations may not provide statistics;
|
||||||
this.iostatistics = statistics.getIOStatistics();
|
this.iostatistics = statistics.getIOStatistics();
|
||||||
@ -195,17 +197,26 @@ class S3ABlockOutputStream extends OutputStream implements
|
|||||||
(ProgressListener) progress
|
(ProgressListener) progress
|
||||||
: new ProgressableListener(progress);
|
: new ProgressableListener(progress);
|
||||||
downgradeSyncableExceptions = builder.downgradeSyncableExceptions;
|
downgradeSyncableExceptions = builder.downgradeSyncableExceptions;
|
||||||
// create that first block. This guarantees that an open + close sequence
|
|
||||||
// writes a 0-byte entry.
|
// look for multipart support.
|
||||||
createBlockIfNeeded();
|
this.isMultipartUploadEnabled = builder.isMultipartUploadEnabled;
|
||||||
LOG.debug("Initialized S3ABlockOutputStream for {}" +
|
// block size is infinite if multipart is disabled, so ignore
|
||||||
" output to {}", key, activeBlock);
|
// what was passed in from the builder.
|
||||||
|
this.blockSize = isMultipartUploadEnabled
|
||||||
|
? builder.blockSize
|
||||||
|
: -1;
|
||||||
|
|
||||||
if (putTracker.initialize()) {
|
if (putTracker.initialize()) {
|
||||||
LOG.debug("Put tracker requests multipart upload");
|
LOG.debug("Put tracker requests multipart upload");
|
||||||
initMultipartUpload();
|
initMultipartUpload();
|
||||||
}
|
}
|
||||||
this.isCSEEnabled = builder.isCSEEnabled;
|
this.isCSEEnabled = builder.isCSEEnabled;
|
||||||
this.threadIOStatisticsAggregator = builder.ioStatisticsAggregator;
|
this.threadIOStatisticsAggregator = builder.ioStatisticsAggregator;
|
||||||
|
// create that first block. This guarantees that an open + close sequence
|
||||||
|
// writes a 0-byte entry.
|
||||||
|
createBlockIfNeeded();
|
||||||
|
LOG.debug("Initialized S3ABlockOutputStream for {}" +
|
||||||
|
" output to {}", key, activeBlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -318,7 +329,15 @@ public synchronized void write(byte[] source, int offset, int len)
|
|||||||
statistics.writeBytes(len);
|
statistics.writeBytes(len);
|
||||||
S3ADataBlocks.DataBlock block = createBlockIfNeeded();
|
S3ADataBlocks.DataBlock block = createBlockIfNeeded();
|
||||||
int written = block.write(source, offset, len);
|
int written = block.write(source, offset, len);
|
||||||
int remainingCapacity = block.remainingCapacity();
|
if (!isMultipartUploadEnabled) {
|
||||||
|
// no need to check for space as multipart uploads
|
||||||
|
// are not available...everything is saved to a single
|
||||||
|
// (disk) block.
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// look to see if another block is needed to complete
|
||||||
|
// the upload or exactly a block was written.
|
||||||
|
int remainingCapacity = (int) block.remainingCapacity();
|
||||||
if (written < len) {
|
if (written < len) {
|
||||||
// not everything was written —the block has run out
|
// not everything was written —the block has run out
|
||||||
// of capacity
|
// of capacity
|
||||||
@ -369,6 +388,8 @@ private synchronized void uploadCurrentBlock(boolean isLast)
|
|||||||
*/
|
*/
|
||||||
@Retries.RetryTranslated
|
@Retries.RetryTranslated
|
||||||
private void initMultipartUpload() throws IOException {
|
private void initMultipartUpload() throws IOException {
|
||||||
|
Preconditions.checkState(isMultipartUploadEnabled,
|
||||||
|
"multipart upload is disabled");
|
||||||
if (multiPartUpload == null) {
|
if (multiPartUpload == null) {
|
||||||
LOG.debug("Initiating Multipart upload");
|
LOG.debug("Initiating Multipart upload");
|
||||||
multiPartUpload = new MultiPartUpload(key);
|
multiPartUpload = new MultiPartUpload(key);
|
||||||
@ -558,19 +579,20 @@ public String toString() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Upload the current block as a single PUT request; if the buffer
|
* Upload the current block as a single PUT request; if the buffer is empty a
|
||||||
* is empty a 0-byte PUT will be invoked, as it is needed to create an
|
* 0-byte PUT will be invoked, as it is needed to create an entry at the far
|
||||||
* entry at the far end.
|
* end.
|
||||||
* @throws IOException any problem.
|
* @return number of bytes uploaded. If thread was interrupted while waiting
|
||||||
* @return number of bytes uploaded. If thread was interrupted while
|
* for upload to complete, returns zero with interrupted flag set on this
|
||||||
* waiting for upload to complete, returns zero with interrupted flag set
|
* thread.
|
||||||
* on this thread.
|
* @throws IOException
|
||||||
|
* any problem.
|
||||||
*/
|
*/
|
||||||
private int putObject() throws IOException {
|
private long putObject() throws IOException {
|
||||||
LOG.debug("Executing regular upload for {}", writeOperationHelper);
|
LOG.debug("Executing regular upload for {}", writeOperationHelper);
|
||||||
|
|
||||||
final S3ADataBlocks.DataBlock block = getActiveBlock();
|
final S3ADataBlocks.DataBlock block = getActiveBlock();
|
||||||
int size = block.dataSize();
|
long size = block.dataSize();
|
||||||
final S3ADataBlocks.BlockUploadData uploadData = block.startUpload();
|
final S3ADataBlocks.BlockUploadData uploadData = block.startUpload();
|
||||||
final PutObjectRequest putObjectRequest = uploadData.hasFile() ?
|
final PutObjectRequest putObjectRequest = uploadData.hasFile() ?
|
||||||
writeOperationHelper.createPutObjectRequest(
|
writeOperationHelper.createPutObjectRequest(
|
||||||
@ -617,6 +639,7 @@ public String toString() {
|
|||||||
"S3ABlockOutputStream{");
|
"S3ABlockOutputStream{");
|
||||||
sb.append(writeOperationHelper.toString());
|
sb.append(writeOperationHelper.toString());
|
||||||
sb.append(", blockSize=").append(blockSize);
|
sb.append(", blockSize=").append(blockSize);
|
||||||
|
sb.append(", isMultipartUploadEnabled=").append(isMultipartUploadEnabled);
|
||||||
// unsynced access; risks consistency in exchange for no risk of deadlock.
|
// unsynced access; risks consistency in exchange for no risk of deadlock.
|
||||||
S3ADataBlocks.DataBlock block = activeBlock;
|
S3ADataBlocks.DataBlock block = activeBlock;
|
||||||
if (block != null) {
|
if (block != null) {
|
||||||
@ -835,7 +858,7 @@ private void uploadBlockAsync(final S3ADataBlocks.DataBlock block,
|
|||||||
Preconditions.checkNotNull(uploadId, "Null uploadId");
|
Preconditions.checkNotNull(uploadId, "Null uploadId");
|
||||||
maybeRethrowUploadFailure();
|
maybeRethrowUploadFailure();
|
||||||
partsSubmitted++;
|
partsSubmitted++;
|
||||||
final int size = block.dataSize();
|
final long size = block.dataSize();
|
||||||
bytesSubmitted += size;
|
bytesSubmitted += size;
|
||||||
final int currentPartNumber = partETagsFutures.size() + 1;
|
final int currentPartNumber = partETagsFutures.size() + 1;
|
||||||
final UploadPartRequest request;
|
final UploadPartRequest request;
|
||||||
@ -1011,7 +1034,7 @@ public void progressChanged(ProgressEvent progressEvent) {
|
|||||||
ProgressEventType eventType = progressEvent.getEventType();
|
ProgressEventType eventType = progressEvent.getEventType();
|
||||||
long bytesTransferred = progressEvent.getBytesTransferred();
|
long bytesTransferred = progressEvent.getBytesTransferred();
|
||||||
|
|
||||||
int size = block.dataSize();
|
long size = block.dataSize();
|
||||||
switch (eventType) {
|
switch (eventType) {
|
||||||
|
|
||||||
case REQUEST_BYTE_TRANSFER_EVENT:
|
case REQUEST_BYTE_TRANSFER_EVENT:
|
||||||
@ -1126,6 +1149,11 @@ public static final class BlockOutputStreamBuilder {
|
|||||||
*/
|
*/
|
||||||
private IOStatisticsAggregator ioStatisticsAggregator;
|
private IOStatisticsAggregator ioStatisticsAggregator;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Is Multipart Uploads enabled for the given upload.
|
||||||
|
*/
|
||||||
|
private boolean isMultipartUploadEnabled;
|
||||||
|
|
||||||
private BlockOutputStreamBuilder() {
|
private BlockOutputStreamBuilder() {
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1276,5 +1304,11 @@ public BlockOutputStreamBuilder withIOStatisticsAggregator(
|
|||||||
ioStatisticsAggregator = value;
|
ioStatisticsAggregator = value;
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public BlockOutputStreamBuilder withMultipartEnabled(
|
||||||
|
final boolean value) {
|
||||||
|
isMultipartUploadEnabled = value;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -180,7 +180,7 @@ protected BlockFactory(S3AFileSystem owner) {
|
|||||||
* @param statistics stats to work with
|
* @param statistics stats to work with
|
||||||
* @return a new block.
|
* @return a new block.
|
||||||
*/
|
*/
|
||||||
abstract DataBlock create(long index, int limit,
|
abstract DataBlock create(long index, long limit,
|
||||||
BlockOutputStreamStatistics statistics)
|
BlockOutputStreamStatistics statistics)
|
||||||
throws IOException;
|
throws IOException;
|
||||||
|
|
||||||
@ -258,7 +258,7 @@ final DestState getState() {
|
|||||||
* Return the current data size.
|
* Return the current data size.
|
||||||
* @return the size of the data
|
* @return the size of the data
|
||||||
*/
|
*/
|
||||||
abstract int dataSize();
|
abstract long dataSize();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Predicate to verify that the block has the capacity to write
|
* Predicate to verify that the block has the capacity to write
|
||||||
@ -280,7 +280,7 @@ boolean hasData() {
|
|||||||
* The remaining capacity in the block before it is full.
|
* The remaining capacity in the block before it is full.
|
||||||
* @return the number of bytes remaining.
|
* @return the number of bytes remaining.
|
||||||
*/
|
*/
|
||||||
abstract int remainingCapacity();
|
abstract long remainingCapacity();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Write a series of bytes from the buffer, from the offset.
|
* Write a series of bytes from the buffer, from the offset.
|
||||||
@ -391,9 +391,11 @@ static class ArrayBlockFactory extends BlockFactory {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
DataBlock create(long index, int limit,
|
DataBlock create(long index, long limit,
|
||||||
BlockOutputStreamStatistics statistics)
|
BlockOutputStreamStatistics statistics)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
Preconditions.checkArgument(limit > 0,
|
||||||
|
"Invalid block size: %d", limit);
|
||||||
return new ByteArrayBlock(0, limit, statistics);
|
return new ByteArrayBlock(0, limit, statistics);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -436,11 +438,11 @@ static class ByteArrayBlock extends DataBlock {
|
|||||||
private Integer dataSize;
|
private Integer dataSize;
|
||||||
|
|
||||||
ByteArrayBlock(long index,
|
ByteArrayBlock(long index,
|
||||||
int limit,
|
long limit,
|
||||||
BlockOutputStreamStatistics statistics) {
|
BlockOutputStreamStatistics statistics) {
|
||||||
super(index, statistics);
|
super(index, statistics);
|
||||||
this.limit = limit;
|
this.limit = (limit > Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int) limit;
|
||||||
buffer = new S3AByteArrayOutputStream(limit);
|
buffer = new S3AByteArrayOutputStream(this.limit);
|
||||||
blockAllocated();
|
blockAllocated();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -449,7 +451,7 @@ static class ByteArrayBlock extends DataBlock {
|
|||||||
* @return the amount of data available to upload.
|
* @return the amount of data available to upload.
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
int dataSize() {
|
long dataSize() {
|
||||||
return dataSize != null ? dataSize : buffer.size();
|
return dataSize != null ? dataSize : buffer.size();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -468,14 +470,14 @@ boolean hasCapacity(long bytes) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
int remainingCapacity() {
|
long remainingCapacity() {
|
||||||
return limit - dataSize();
|
return limit - dataSize();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
int write(byte[] b, int offset, int len) throws IOException {
|
int write(byte[] b, int offset, int len) throws IOException {
|
||||||
super.write(b, offset, len);
|
super.write(b, offset, len);
|
||||||
int written = Math.min(remainingCapacity(), len);
|
int written = (int) Math.min(remainingCapacity(), len);
|
||||||
buffer.write(b, offset, written);
|
buffer.write(b, offset, written);
|
||||||
return written;
|
return written;
|
||||||
}
|
}
|
||||||
@ -514,9 +516,11 @@ static class ByteBufferBlockFactory extends BlockFactory {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
ByteBufferBlock create(long index, int limit,
|
ByteBufferBlock create(long index, long limit,
|
||||||
BlockOutputStreamStatistics statistics)
|
BlockOutputStreamStatistics statistics)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
Preconditions.checkArgument(limit > 0,
|
||||||
|
"Invalid block size: %d", limit);
|
||||||
return new ByteBufferBlock(index, limit, statistics);
|
return new ByteBufferBlock(index, limit, statistics);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -564,11 +568,12 @@ class ByteBufferBlock extends DataBlock {
|
|||||||
* @param statistics statistics to update
|
* @param statistics statistics to update
|
||||||
*/
|
*/
|
||||||
ByteBufferBlock(long index,
|
ByteBufferBlock(long index,
|
||||||
int bufferSize,
|
long bufferSize,
|
||||||
BlockOutputStreamStatistics statistics) {
|
BlockOutputStreamStatistics statistics) {
|
||||||
super(index, statistics);
|
super(index, statistics);
|
||||||
this.bufferSize = bufferSize;
|
this.bufferSize = bufferSize > Integer.MAX_VALUE ?
|
||||||
blockBuffer = requestBuffer(bufferSize);
|
Integer.MAX_VALUE : (int) bufferSize;
|
||||||
|
blockBuffer = requestBuffer(this.bufferSize);
|
||||||
blockAllocated();
|
blockAllocated();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -577,7 +582,7 @@ class ByteBufferBlock extends DataBlock {
|
|||||||
* @return the amount of data available to upload.
|
* @return the amount of data available to upload.
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
int dataSize() {
|
long dataSize() {
|
||||||
return dataSize != null ? dataSize : bufferCapacityUsed();
|
return dataSize != null ? dataSize : bufferCapacityUsed();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -598,7 +603,7 @@ public boolean hasCapacity(long bytes) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int remainingCapacity() {
|
public long remainingCapacity() {
|
||||||
return blockBuffer != null ? blockBuffer.remaining() : 0;
|
return blockBuffer != null ? blockBuffer.remaining() : 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -609,7 +614,7 @@ private int bufferCapacityUsed() {
|
|||||||
@Override
|
@Override
|
||||||
int write(byte[] b, int offset, int len) throws IOException {
|
int write(byte[] b, int offset, int len) throws IOException {
|
||||||
super.write(b, offset, len);
|
super.write(b, offset, len);
|
||||||
int written = Math.min(remainingCapacity(), len);
|
int written = (int) Math.min(remainingCapacity(), len);
|
||||||
blockBuffer.put(b, offset, written);
|
blockBuffer.put(b, offset, written);
|
||||||
return written;
|
return written;
|
||||||
}
|
}
|
||||||
@ -802,16 +807,18 @@ static class DiskBlockFactory extends BlockFactory {
|
|||||||
* Create a temp file and a {@link DiskBlock} instance to manage it.
|
* Create a temp file and a {@link DiskBlock} instance to manage it.
|
||||||
*
|
*
|
||||||
* @param index block index
|
* @param index block index
|
||||||
* @param limit limit of the block.
|
* @param limit limit of the block. -1 means "no limit"
|
||||||
* @param statistics statistics to update
|
* @param statistics statistics to update
|
||||||
* @return the new block
|
* @return the new block
|
||||||
* @throws IOException IO problems
|
* @throws IOException IO problems
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
DataBlock create(long index,
|
DataBlock create(long index,
|
||||||
int limit,
|
long limit,
|
||||||
BlockOutputStreamStatistics statistics)
|
BlockOutputStreamStatistics statistics)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
Preconditions.checkArgument(limit != 0,
|
||||||
|
"Invalid block size: %d", limit);
|
||||||
File destFile = getOwner()
|
File destFile = getOwner()
|
||||||
.createTmpFileForWrite(String.format("s3ablock-%04d-", index),
|
.createTmpFileForWrite(String.format("s3ablock-%04d-", index),
|
||||||
limit, getOwner().getConf());
|
limit, getOwner().getConf());
|
||||||
@ -825,14 +832,14 @@ DataBlock create(long index,
|
|||||||
*/
|
*/
|
||||||
static class DiskBlock extends DataBlock {
|
static class DiskBlock extends DataBlock {
|
||||||
|
|
||||||
private int bytesWritten;
|
private long bytesWritten;
|
||||||
private final File bufferFile;
|
private final File bufferFile;
|
||||||
private final int limit;
|
private final long limit;
|
||||||
private BufferedOutputStream out;
|
private BufferedOutputStream out;
|
||||||
private final AtomicBoolean closed = new AtomicBoolean(false);
|
private final AtomicBoolean closed = new AtomicBoolean(false);
|
||||||
|
|
||||||
DiskBlock(File bufferFile,
|
DiskBlock(File bufferFile,
|
||||||
int limit,
|
long limit,
|
||||||
long index,
|
long index,
|
||||||
BlockOutputStreamStatistics statistics)
|
BlockOutputStreamStatistics statistics)
|
||||||
throws FileNotFoundException {
|
throws FileNotFoundException {
|
||||||
@ -844,24 +851,39 @@ static class DiskBlock extends DataBlock {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
int dataSize() {
|
long dataSize() {
|
||||||
return bytesWritten;
|
return bytesWritten;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Does this block have unlimited space?
|
||||||
|
* @return true if a block with no size limit was created.
|
||||||
|
*/
|
||||||
|
private boolean unlimited() {
|
||||||
|
return limit < 0;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
boolean hasCapacity(long bytes) {
|
boolean hasCapacity(long bytes) {
|
||||||
return dataSize() + bytes <= limit;
|
return unlimited() || dataSize() + bytes <= limit;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}.
|
||||||
|
* If there is no limit to capacity, return MAX_VALUE.
|
||||||
|
* @return capacity in the block.
|
||||||
|
*/
|
||||||
@Override
|
@Override
|
||||||
int remainingCapacity() {
|
long remainingCapacity() {
|
||||||
return limit - bytesWritten;
|
return unlimited()
|
||||||
|
? Integer.MAX_VALUE
|
||||||
|
: limit - bytesWritten;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
int write(byte[] b, int offset, int len) throws IOException {
|
int write(byte[] b, int offset, int len) throws IOException {
|
||||||
super.write(b, offset, len);
|
super.write(b, offset, len);
|
||||||
int written = Math.min(remainingCapacity(), len);
|
int written = (int) Math.min(remainingCapacity(), len);
|
||||||
out.write(b, offset, written);
|
out.write(b, offset, written);
|
||||||
bytesWritten += written;
|
bytesWritten += written;
|
||||||
return written;
|
return written;
|
||||||
|
@ -413,6 +413,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|||||||
*/
|
*/
|
||||||
private ArnResource accessPoint;
|
private ArnResource accessPoint;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Does this S3A FS instance have multipart upload enabled?
|
||||||
|
*/
|
||||||
|
private boolean isMultipartUploadEnabled = DEFAULT_MULTIPART_UPLOAD_ENABLED;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A cache of files that should be deleted when the FileSystem is closed
|
* A cache of files that should be deleted when the FileSystem is closed
|
||||||
* or the JVM is exited.
|
* or the JVM is exited.
|
||||||
@ -534,6 +539,8 @@ public void initialize(URI name, Configuration originalConf)
|
|||||||
longBytesOption(conf, FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE, 1);
|
longBytesOption(conf, FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE, 1);
|
||||||
enableMultiObjectsDelete = conf.getBoolean(ENABLE_MULTI_DELETE, true);
|
enableMultiObjectsDelete = conf.getBoolean(ENABLE_MULTI_DELETE, true);
|
||||||
|
|
||||||
|
this.isMultipartUploadEnabled = conf.getBoolean(MULTIPART_UPLOADS_ENABLED,
|
||||||
|
DEFAULT_MULTIPART_UPLOAD_ENABLED);
|
||||||
this.prefetchEnabled = conf.getBoolean(PREFETCH_ENABLED_KEY, PREFETCH_ENABLED_DEFAULT);
|
this.prefetchEnabled = conf.getBoolean(PREFETCH_ENABLED_KEY, PREFETCH_ENABLED_DEFAULT);
|
||||||
long prefetchBlockSizeLong =
|
long prefetchBlockSizeLong =
|
||||||
longBytesOption(conf, PREFETCH_BLOCK_SIZE_KEY, PREFETCH_BLOCK_DEFAULT_SIZE,
|
longBytesOption(conf, PREFETCH_BLOCK_SIZE_KEY, PREFETCH_BLOCK_DEFAULT_SIZE,
|
||||||
@ -606,7 +613,6 @@ public void initialize(URI name, Configuration originalConf)
|
|||||||
}
|
}
|
||||||
blockOutputBuffer = conf.getTrimmed(FAST_UPLOAD_BUFFER,
|
blockOutputBuffer = conf.getTrimmed(FAST_UPLOAD_BUFFER,
|
||||||
DEFAULT_FAST_UPLOAD_BUFFER);
|
DEFAULT_FAST_UPLOAD_BUFFER);
|
||||||
partSize = ensureOutputParameterInRange(MULTIPART_SIZE, partSize);
|
|
||||||
blockFactory = S3ADataBlocks.createFactory(this, blockOutputBuffer);
|
blockFactory = S3ADataBlocks.createFactory(this, blockOutputBuffer);
|
||||||
blockOutputActiveBlocks = intOption(conf,
|
blockOutputActiveBlocks = intOption(conf,
|
||||||
FAST_UPLOAD_ACTIVE_BLOCKS, DEFAULT_FAST_UPLOAD_ACTIVE_BLOCKS, 1);
|
FAST_UPLOAD_ACTIVE_BLOCKS, DEFAULT_FAST_UPLOAD_ACTIVE_BLOCKS, 1);
|
||||||
@ -615,8 +621,8 @@ public void initialize(URI name, Configuration originalConf)
|
|||||||
blockOutputActiveBlocks = 1;
|
blockOutputActiveBlocks = 1;
|
||||||
}
|
}
|
||||||
LOG.debug("Using S3ABlockOutputStream with buffer = {}; block={};" +
|
LOG.debug("Using S3ABlockOutputStream with buffer = {}; block={};" +
|
||||||
" queue limit={}",
|
" queue limit={}; multipart={}",
|
||||||
blockOutputBuffer, partSize, blockOutputActiveBlocks);
|
blockOutputBuffer, partSize, blockOutputActiveBlocks, isMultipartUploadEnabled);
|
||||||
// verify there's no S3Guard in the store config.
|
// verify there's no S3Guard in the store config.
|
||||||
checkNoS3Guard(this.getUri(), getConf());
|
checkNoS3Guard(this.getUri(), getConf());
|
||||||
|
|
||||||
@ -1876,6 +1882,7 @@ private FSDataOutputStream innerCreateFile(
|
|||||||
DOWNGRADE_SYNCABLE_EXCEPTIONS_DEFAULT))
|
DOWNGRADE_SYNCABLE_EXCEPTIONS_DEFAULT))
|
||||||
.withCSEEnabled(isCSEEnabled)
|
.withCSEEnabled(isCSEEnabled)
|
||||||
.withPutOptions(putOptions)
|
.withPutOptions(putOptions)
|
||||||
|
.withMultipartEnabled(isMultipartUploadEnabled)
|
||||||
.withIOStatisticsAggregator(
|
.withIOStatisticsAggregator(
|
||||||
IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator());
|
IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator());
|
||||||
return new FSDataOutputStream(
|
return new FSDataOutputStream(
|
||||||
@ -5431,4 +5438,8 @@ public RequestFactory getRequestFactory() {
|
|||||||
public boolean isCSEEnabled() {
|
public boolean isCSEEnabled() {
|
||||||
return isCSEEnabled;
|
return isCSEEnabled;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isMultipartUploadEnabled() {
|
||||||
|
return isMultipartUploadEnabled;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -1547,7 +1547,7 @@ public void blockReleased() {
|
|||||||
* of block uploads pending (1) and the bytes pending (blockSize).
|
* of block uploads pending (1) and the bytes pending (blockSize).
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void blockUploadQueued(int blockSize) {
|
public void blockUploadQueued(long blockSize) {
|
||||||
incCounter(StreamStatisticNames.STREAM_WRITE_BLOCK_UPLOADS);
|
incCounter(StreamStatisticNames.STREAM_WRITE_BLOCK_UPLOADS);
|
||||||
incAllGauges(STREAM_WRITE_BLOCK_UPLOADS_PENDING, 1);
|
incAllGauges(STREAM_WRITE_BLOCK_UPLOADS_PENDING, 1);
|
||||||
incAllGauges(STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING, blockSize);
|
incAllGauges(STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING, blockSize);
|
||||||
@ -1560,7 +1560,7 @@ public void blockUploadQueued(int blockSize) {
|
|||||||
* {@code STREAM_WRITE_BLOCK_UPLOADS_ACTIVE}.
|
* {@code STREAM_WRITE_BLOCK_UPLOADS_ACTIVE}.
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void blockUploadStarted(Duration timeInQueue, int blockSize) {
|
public void blockUploadStarted(Duration timeInQueue, long blockSize) {
|
||||||
// the local counter is used in toString reporting.
|
// the local counter is used in toString reporting.
|
||||||
queueDuration.addAndGet(timeInQueue.toMillis());
|
queueDuration.addAndGet(timeInQueue.toMillis());
|
||||||
// update the duration fields in the IOStatistics.
|
// update the duration fields in the IOStatistics.
|
||||||
@ -1588,7 +1588,7 @@ private IOStatisticsStore localIOStatistics() {
|
|||||||
@Override
|
@Override
|
||||||
public void blockUploadCompleted(
|
public void blockUploadCompleted(
|
||||||
Duration timeSinceUploadStarted,
|
Duration timeSinceUploadStarted,
|
||||||
int blockSize) {
|
long blockSize) {
|
||||||
transferDuration.addAndGet(timeSinceUploadStarted.toMillis());
|
transferDuration.addAndGet(timeSinceUploadStarted.toMillis());
|
||||||
incAllGauges(STREAM_WRITE_BLOCK_UPLOADS_ACTIVE, -1);
|
incAllGauges(STREAM_WRITE_BLOCK_UPLOADS_ACTIVE, -1);
|
||||||
blockUploadsCompleted.incrementAndGet();
|
blockUploadsCompleted.incrementAndGet();
|
||||||
@ -1602,7 +1602,7 @@ public void blockUploadCompleted(
|
|||||||
@Override
|
@Override
|
||||||
public void blockUploadFailed(
|
public void blockUploadFailed(
|
||||||
Duration timeSinceUploadStarted,
|
Duration timeSinceUploadStarted,
|
||||||
int blockSize) {
|
long blockSize) {
|
||||||
incCounter(StreamStatisticNames.STREAM_WRITE_EXCEPTIONS);
|
incCounter(StreamStatisticNames.STREAM_WRITE_EXCEPTIONS);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -41,6 +41,7 @@
|
|||||||
import org.apache.hadoop.fs.LocatedFileStatus;
|
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.PathFilter;
|
import org.apache.hadoop.fs.PathFilter;
|
||||||
|
import org.apache.hadoop.fs.PathIOException;
|
||||||
import org.apache.hadoop.fs.RemoteIterator;
|
import org.apache.hadoop.fs.RemoteIterator;
|
||||||
import org.apache.hadoop.util.functional.RemoteIterators;
|
import org.apache.hadoop.util.functional.RemoteIterators;
|
||||||
import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
|
import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
|
||||||
@ -1026,6 +1027,38 @@ public static long getMultipartSizeProperty(Configuration conf,
|
|||||||
return partSize;
|
return partSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Validates the output stream configuration.
|
||||||
|
* @param path path: for error messages
|
||||||
|
* @param conf : configuration object for the given context
|
||||||
|
* @throws PathIOException Unsupported configuration.
|
||||||
|
*/
|
||||||
|
public static void validateOutputStreamConfiguration(final Path path,
|
||||||
|
Configuration conf) throws PathIOException {
|
||||||
|
if(!checkDiskBuffer(conf)){
|
||||||
|
throw new PathIOException(path.toString(),
|
||||||
|
"Unable to create OutputStream with the given"
|
||||||
|
+ " multipart upload and buffer configuration.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check whether the configuration for S3ABlockOutputStream is
|
||||||
|
* consistent or not. Multipart uploads allow all kinds of fast buffers to
|
||||||
|
* be supported. When the option is disabled only disk buffers are allowed to
|
||||||
|
* be used as the file size might be bigger than the buffer size that can be
|
||||||
|
* allocated.
|
||||||
|
* @param conf : configuration object for the given context
|
||||||
|
* @return true if the disk buffer and the multipart settings are supported
|
||||||
|
*/
|
||||||
|
public static boolean checkDiskBuffer(Configuration conf) {
|
||||||
|
boolean isMultipartUploadEnabled = conf.getBoolean(MULTIPART_UPLOADS_ENABLED,
|
||||||
|
DEFAULT_MULTIPART_UPLOAD_ENABLED);
|
||||||
|
return isMultipartUploadEnabled
|
||||||
|
|| FAST_UPLOAD_BUFFER_DISK.equals(
|
||||||
|
conf.get(FAST_UPLOAD_BUFFER, DEFAULT_FAST_UPLOAD_BUFFER));
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Ensure that the long value is in the range of an integer.
|
* Ensure that the long value is in the range of an integer.
|
||||||
* @param name property name for error messages
|
* @param name property name for error messages
|
||||||
|
@ -269,8 +269,6 @@ public PutObjectRequest createPutObjectRequest(
|
|||||||
String dest,
|
String dest,
|
||||||
File sourceFile,
|
File sourceFile,
|
||||||
final PutObjectOptions options) {
|
final PutObjectOptions options) {
|
||||||
Preconditions.checkState(sourceFile.length() < Integer.MAX_VALUE,
|
|
||||||
"File length is too big for a single PUT upload");
|
|
||||||
activateAuditSpan();
|
activateAuditSpan();
|
||||||
final ObjectMetadata objectMetadata =
|
final ObjectMetadata objectMetadata =
|
||||||
newObjectMetadata((int) sourceFile.length());
|
newObjectMetadata((int) sourceFile.length());
|
||||||
@ -532,7 +530,7 @@ public UploadPartRequest newUploadPartRequest(
|
|||||||
String destKey,
|
String destKey,
|
||||||
String uploadId,
|
String uploadId,
|
||||||
int partNumber,
|
int partNumber,
|
||||||
int size,
|
long size,
|
||||||
InputStream uploadStream,
|
InputStream uploadStream,
|
||||||
File sourceFile,
|
File sourceFile,
|
||||||
Long offset) throws IOException {
|
Long offset) throws IOException {
|
||||||
|
@ -233,7 +233,7 @@ UploadPartRequest newUploadPartRequest(
|
|||||||
String destKey,
|
String destKey,
|
||||||
String uploadId,
|
String uploadId,
|
||||||
int partNumber,
|
int partNumber,
|
||||||
int size,
|
long size,
|
||||||
InputStream uploadStream,
|
InputStream uploadStream,
|
||||||
File sourceFile,
|
File sourceFile,
|
||||||
Long offset) throws IOException;
|
Long offset) throws IOException;
|
||||||
|
@ -196,10 +196,11 @@ AbortMultipartUploadRequest newAbortMultipartUploadRequest(
|
|||||||
* @param destKey destination object key
|
* @param destKey destination object key
|
||||||
* @param options options for the request
|
* @param options options for the request
|
||||||
* @return the request.
|
* @return the request.
|
||||||
|
* @throws PathIOException if multipart uploads are disabled
|
||||||
*/
|
*/
|
||||||
InitiateMultipartUploadRequest newMultipartUploadRequest(
|
InitiateMultipartUploadRequest newMultipartUploadRequest(
|
||||||
String destKey,
|
String destKey,
|
||||||
@Nullable PutObjectOptions options);
|
@Nullable PutObjectOptions options) throws PathIOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Complete a multipart upload.
|
* Complete a multipart upload.
|
||||||
@ -248,7 +249,7 @@ UploadPartRequest newUploadPartRequest(
|
|||||||
String destKey,
|
String destKey,
|
||||||
String uploadId,
|
String uploadId,
|
||||||
int partNumber,
|
int partNumber,
|
||||||
int size,
|
long size,
|
||||||
InputStream uploadStream,
|
InputStream uploadStream,
|
||||||
File sourceFile,
|
File sourceFile,
|
||||||
long offset) throws PathIOException;
|
long offset) throws PathIOException;
|
||||||
|
@ -217,6 +217,10 @@ protected AbstractS3ACommitter(
|
|||||||
LOG.debug("{} instantiated for job \"{}\" ID {} with destination {}",
|
LOG.debug("{} instantiated for job \"{}\" ID {} with destination {}",
|
||||||
role, jobName(context), jobIdString(context), outputPath);
|
role, jobName(context), jobIdString(context), outputPath);
|
||||||
S3AFileSystem fs = getDestS3AFS();
|
S3AFileSystem fs = getDestS3AFS();
|
||||||
|
if (!fs.isMultipartUploadEnabled()) {
|
||||||
|
throw new PathCommitException(outputPath, "Multipart uploads are disabled for the FileSystem,"
|
||||||
|
+ " the committer can't proceed.");
|
||||||
|
}
|
||||||
// set this thread's context with the job ID.
|
// set this thread's context with the job ID.
|
||||||
// audit spans created in this thread will pick
|
// audit spans created in this thread will pick
|
||||||
// up this value., including the commit operations instance
|
// up this value., including the commit operations instance
|
||||||
|
@ -124,6 +124,11 @@ public class RequestFactoryImpl implements RequestFactory {
|
|||||||
*/
|
*/
|
||||||
private final StorageClass storageClass;
|
private final StorageClass storageClass;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Is multipart upload enabled.
|
||||||
|
*/
|
||||||
|
private final boolean isMultipartUploadEnabled;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructor.
|
* Constructor.
|
||||||
* @param builder builder with all the configuration.
|
* @param builder builder with all the configuration.
|
||||||
@ -137,6 +142,7 @@ protected RequestFactoryImpl(
|
|||||||
this.requestPreparer = builder.requestPreparer;
|
this.requestPreparer = builder.requestPreparer;
|
||||||
this.contentEncoding = builder.contentEncoding;
|
this.contentEncoding = builder.contentEncoding;
|
||||||
this.storageClass = builder.storageClass;
|
this.storageClass = builder.storageClass;
|
||||||
|
this.isMultipartUploadEnabled = builder.isMultipartUploadEnabled;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -460,7 +466,10 @@ public AbortMultipartUploadRequest newAbortMultipartUploadRequest(
|
|||||||
@Override
|
@Override
|
||||||
public InitiateMultipartUploadRequest newMultipartUploadRequest(
|
public InitiateMultipartUploadRequest newMultipartUploadRequest(
|
||||||
final String destKey,
|
final String destKey,
|
||||||
@Nullable final PutObjectOptions options) {
|
@Nullable final PutObjectOptions options) throws PathIOException {
|
||||||
|
if (!isMultipartUploadEnabled) {
|
||||||
|
throw new PathIOException(destKey, "Multipart uploads are disabled.");
|
||||||
|
}
|
||||||
final ObjectMetadata objectMetadata = newObjectMetadata(-1);
|
final ObjectMetadata objectMetadata = newObjectMetadata(-1);
|
||||||
maybeSetMetadata(options, objectMetadata);
|
maybeSetMetadata(options, objectMetadata);
|
||||||
final InitiateMultipartUploadRequest initiateMPURequest =
|
final InitiateMultipartUploadRequest initiateMPURequest =
|
||||||
@ -509,7 +518,7 @@ public UploadPartRequest newUploadPartRequest(
|
|||||||
String destKey,
|
String destKey,
|
||||||
String uploadId,
|
String uploadId,
|
||||||
int partNumber,
|
int partNumber,
|
||||||
int size,
|
long size,
|
||||||
InputStream uploadStream,
|
InputStream uploadStream,
|
||||||
File sourceFile,
|
File sourceFile,
|
||||||
long offset) throws PathIOException {
|
long offset) throws PathIOException {
|
||||||
@ -682,6 +691,11 @@ public static final class RequestFactoryBuilder {
|
|||||||
*/
|
*/
|
||||||
private PrepareRequest requestPreparer;
|
private PrepareRequest requestPreparer;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Is Multipart Enabled on the path.
|
||||||
|
*/
|
||||||
|
private boolean isMultipartUploadEnabled = true;
|
||||||
|
|
||||||
private RequestFactoryBuilder() {
|
private RequestFactoryBuilder() {
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -767,6 +781,18 @@ public RequestFactoryBuilder withRequestPreparer(
|
|||||||
this.requestPreparer = value;
|
this.requestPreparer = value;
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Multipart upload enabled.
|
||||||
|
*
|
||||||
|
* @param value new value
|
||||||
|
* @return the builder
|
||||||
|
*/
|
||||||
|
public RequestFactoryBuilder withMultipartUploadEnabled(
|
||||||
|
final boolean value) {
|
||||||
|
this.isMultipartUploadEnabled = value;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -32,21 +32,21 @@ public interface BlockOutputStreamStatistics extends Closeable,
|
|||||||
* Block is queued for upload.
|
* Block is queued for upload.
|
||||||
* @param blockSize block size.
|
* @param blockSize block size.
|
||||||
*/
|
*/
|
||||||
void blockUploadQueued(int blockSize);
|
void blockUploadQueued(long blockSize);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Queued block has been scheduled for upload.
|
* Queued block has been scheduled for upload.
|
||||||
* @param timeInQueue time in the queue.
|
* @param timeInQueue time in the queue.
|
||||||
* @param blockSize block size.
|
* @param blockSize block size.
|
||||||
*/
|
*/
|
||||||
void blockUploadStarted(Duration timeInQueue, int blockSize);
|
void blockUploadStarted(Duration timeInQueue, long blockSize);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A block upload has completed. Duration excludes time in the queue.
|
* A block upload has completed. Duration excludes time in the queue.
|
||||||
* @param timeSinceUploadStarted time in since the transfer began.
|
* @param timeSinceUploadStarted time in since the transfer began.
|
||||||
* @param blockSize block size
|
* @param blockSize block size
|
||||||
*/
|
*/
|
||||||
void blockUploadCompleted(Duration timeSinceUploadStarted, int blockSize);
|
void blockUploadCompleted(Duration timeSinceUploadStarted, long blockSize);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A block upload has failed. Duration excludes time in the queue.
|
* A block upload has failed. Duration excludes time in the queue.
|
||||||
@ -57,7 +57,7 @@ public interface BlockOutputStreamStatistics extends Closeable,
|
|||||||
* @param timeSinceUploadStarted time in since the transfer began.
|
* @param timeSinceUploadStarted time in since the transfer began.
|
||||||
* @param blockSize block size
|
* @param blockSize block size
|
||||||
*/
|
*/
|
||||||
void blockUploadFailed(Duration timeSinceUploadStarted, int blockSize);
|
void blockUploadFailed(Duration timeSinceUploadStarted, long blockSize);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Intermediate report of bytes uploaded.
|
* Intermediate report of bytes uploaded.
|
||||||
|
@ -442,22 +442,22 @@ private static final class EmptyBlockOutputStreamStatistics
|
|||||||
implements BlockOutputStreamStatistics {
|
implements BlockOutputStreamStatistics {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void blockUploadQueued(final int blockSize) {
|
public void blockUploadQueued(final long blockSize) {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void blockUploadStarted(final Duration timeInQueue,
|
public void blockUploadStarted(final Duration timeInQueue,
|
||||||
final int blockSize) {
|
final long blockSize) {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void blockUploadCompleted(final Duration timeSinceUploadStarted,
|
public void blockUploadCompleted(final Duration timeSinceUploadStarted,
|
||||||
final int blockSize) {
|
final long blockSize) {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void blockUploadFailed(final Duration timeSinceUploadStarted,
|
public void blockUploadFailed(final Duration timeSinceUploadStarted,
|
||||||
final int blockSize) {
|
final long blockSize) {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -1723,7 +1723,9 @@ The "fast" output stream
|
|||||||
|
|
||||||
1. Uploads large files as blocks with the size set by
|
1. Uploads large files as blocks with the size set by
|
||||||
`fs.s3a.multipart.size`. That is: the threshold at which multipart uploads
|
`fs.s3a.multipart.size`. That is: the threshold at which multipart uploads
|
||||||
begin and the size of each upload are identical.
|
begin and the size of each upload are identical. This behavior can be enabled
|
||||||
|
or disabled by using the flag `fs.s3a.multipart.uploads.enabled` which by
|
||||||
|
default is set to true.
|
||||||
1. Buffers blocks to disk (default) or in on-heap or off-heap memory.
|
1. Buffers blocks to disk (default) or in on-heap or off-heap memory.
|
||||||
1. Uploads blocks in parallel in background threads.
|
1. Uploads blocks in parallel in background threads.
|
||||||
1. Begins uploading blocks as soon as the buffered data exceeds this partition
|
1. Begins uploading blocks as soon as the buffered data exceeds this partition
|
||||||
|
@ -200,6 +200,11 @@ public boolean isMagicCommitEnabled() {
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isMultipartUploadEnabled() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Make operation to set the s3 client public.
|
* Make operation to set the s3 client public.
|
||||||
* @param client client.
|
* @param client client.
|
||||||
|
@ -0,0 +1,69 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.fs.s3a.commit.magic;
|
||||||
|
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
|
||||||
|
import org.apache.hadoop.fs.s3a.commit.CommitConstants;
|
||||||
|
import org.apache.hadoop.fs.s3a.commit.PathCommitException;
|
||||||
|
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
||||||
|
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
||||||
|
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_UPLOADS_ENABLED;
|
||||||
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
|
||||||
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBucketOverrides;
|
||||||
|
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_NAME;
|
||||||
|
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_COMMITTER_ENABLED;
|
||||||
|
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.S3A_COMMITTER_FACTORY_KEY;
|
||||||
|
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verify that the magic committer cannot be created if the FS doesn't support multipart
|
||||||
|
* uploads.
|
||||||
|
*/
|
||||||
|
public class ITestMagicCommitProtocolFailure extends AbstractS3ATestBase {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Configuration createConfiguration() {
|
||||||
|
Configuration conf = super.createConfiguration();
|
||||||
|
removeBucketOverrides(getTestBucketName(conf), conf,
|
||||||
|
MAGIC_COMMITTER_ENABLED,
|
||||||
|
S3A_COMMITTER_FACTORY_KEY,
|
||||||
|
FS_S3A_COMMITTER_NAME,
|
||||||
|
MULTIPART_UPLOADS_ENABLED);
|
||||||
|
conf.setBoolean(MULTIPART_UPLOADS_ENABLED, false);
|
||||||
|
conf.set(S3A_COMMITTER_FACTORY_KEY, CommitConstants.S3A_COMMITTER_FACTORY);
|
||||||
|
conf.set(FS_S3A_COMMITTER_NAME, CommitConstants.COMMITTER_NAME_MAGIC);
|
||||||
|
return conf;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCreateCommitter() throws Exception {
|
||||||
|
TaskAttemptContext tContext = new TaskAttemptContextImpl(getConfiguration(),
|
||||||
|
new TaskAttemptID());
|
||||||
|
Path commitPath = methodPath();
|
||||||
|
LOG.debug("Trying to create a committer on the path: {}", commitPath);
|
||||||
|
intercept(PathCommitException.class,
|
||||||
|
() -> new MagicS3GuardCommitter(commitPath, tContext));
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,69 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.fs.s3a.commit.staging.integration;
|
||||||
|
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
|
||||||
|
import org.apache.hadoop.fs.s3a.commit.CommitConstants;
|
||||||
|
import org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants;
|
||||||
|
import org.apache.hadoop.fs.s3a.commit.PathCommitException;
|
||||||
|
import org.apache.hadoop.fs.s3a.commit.staging.StagingCommitter;
|
||||||
|
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
||||||
|
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
||||||
|
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_UPLOADS_ENABLED;
|
||||||
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
|
||||||
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBucketOverrides;
|
||||||
|
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_NAME;
|
||||||
|
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.S3A_COMMITTER_FACTORY_KEY;
|
||||||
|
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verify that a staging committer cannot be created if the FS doesn't support multipart
|
||||||
|
* uploads.
|
||||||
|
*/
|
||||||
|
public class ITestStagingCommitProtocolFailure extends AbstractS3ATestBase {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Configuration createConfiguration() {
|
||||||
|
Configuration conf = super.createConfiguration();
|
||||||
|
removeBucketOverrides(getTestBucketName(conf), conf,
|
||||||
|
S3A_COMMITTER_FACTORY_KEY,
|
||||||
|
FS_S3A_COMMITTER_NAME,
|
||||||
|
MULTIPART_UPLOADS_ENABLED);
|
||||||
|
conf.setBoolean(MULTIPART_UPLOADS_ENABLED, false);
|
||||||
|
conf.set(S3A_COMMITTER_FACTORY_KEY, CommitConstants.S3A_COMMITTER_FACTORY);
|
||||||
|
conf.set(FS_S3A_COMMITTER_NAME, InternalCommitterConstants.COMMITTER_NAME_STAGING);
|
||||||
|
return conf;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCreateCommitter() throws Exception {
|
||||||
|
TaskAttemptContext tContext = new TaskAttemptContextImpl(getConfiguration(),
|
||||||
|
new TaskAttemptID());
|
||||||
|
Path commitPath = methodPath();
|
||||||
|
LOG.debug("Trying to create a committer on the path: {}", commitPath);
|
||||||
|
intercept(PathCommitException.class,
|
||||||
|
() -> new StagingCommitter(commitPath, tContext));
|
||||||
|
}
|
||||||
|
}
|
@ -20,6 +20,7 @@
|
|||||||
|
|
||||||
import java.io.ByteArrayInputStream;
|
import java.io.ByteArrayInputStream;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
@ -155,7 +156,7 @@ public <T extends AmazonWebServiceRequest> T prepareRequest(final T t) {
|
|||||||
* Create objects through the factory.
|
* Create objects through the factory.
|
||||||
* @param factory factory
|
* @param factory factory
|
||||||
*/
|
*/
|
||||||
private void createFactoryObjects(RequestFactory factory) {
|
private void createFactoryObjects(RequestFactory factory) throws IOException {
|
||||||
String path = "path";
|
String path = "path";
|
||||||
String path2 = "path2";
|
String path2 = "path2";
|
||||||
String id = "1";
|
String id = "1";
|
||||||
|
@ -0,0 +1,89 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.fs.s3a.scale;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.s3a.S3AFileSystem;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
||||||
|
import org.apache.hadoop.fs.s3a.Constants;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.IO_CHUNK_BUFFER_SIZE;
|
||||||
|
import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER;
|
||||||
|
import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER_DISK;
|
||||||
|
import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE;
|
||||||
|
import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_UPLOADS_ENABLED;
|
||||||
|
import static org.apache.hadoop.fs.s3a.Constants.REQUEST_TIMEOUT;
|
||||||
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
|
||||||
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestPropertyBytes;
|
||||||
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBucketOverrides;
|
||||||
|
import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_PUT_REQUESTS;
|
||||||
|
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test a file upload using a single PUT operation. Multipart uploads will
|
||||||
|
* be disabled in the test.
|
||||||
|
*/
|
||||||
|
public class ITestS3AHugeFileUploadSinglePut extends S3AScaleTestBase {
|
||||||
|
|
||||||
|
public static final Logger LOG = LoggerFactory.getLogger(
|
||||||
|
ITestS3AHugeFileUploadSinglePut.class);
|
||||||
|
|
||||||
|
private long fileSize;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Configuration createScaleConfiguration() {
|
||||||
|
Configuration conf = super.createScaleConfiguration();
|
||||||
|
removeBucketOverrides(getTestBucketName(conf), conf,
|
||||||
|
FAST_UPLOAD_BUFFER,
|
||||||
|
IO_CHUNK_BUFFER_SIZE,
|
||||||
|
KEY_HUGE_FILESIZE,
|
||||||
|
MULTIPART_UPLOADS_ENABLED,
|
||||||
|
MULTIPART_SIZE,
|
||||||
|
REQUEST_TIMEOUT);
|
||||||
|
conf.setBoolean(Constants.MULTIPART_UPLOADS_ENABLED, false);
|
||||||
|
fileSize = getTestPropertyBytes(conf, KEY_HUGE_FILESIZE,
|
||||||
|
DEFAULT_HUGE_FILESIZE);
|
||||||
|
// set a small part size to verify it does not impact block allocation size
|
||||||
|
conf.setLong(MULTIPART_SIZE, 10_000);
|
||||||
|
conf.set(FAST_UPLOAD_BUFFER, FAST_UPLOAD_BUFFER_DISK);
|
||||||
|
conf.setInt(IO_CHUNK_BUFFER_SIZE, 655360);
|
||||||
|
conf.set(REQUEST_TIMEOUT, "1h");
|
||||||
|
return conf;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void uploadFileSinglePut() throws IOException {
|
||||||
|
LOG.info("Creating file with size : {}", fileSize);
|
||||||
|
S3AFileSystem fs = getFileSystem();
|
||||||
|
ContractTestUtils.createAndVerifyFile(fs,
|
||||||
|
methodPath(), fileSize);
|
||||||
|
// Exactly three put requests should be made during the upload of the file
|
||||||
|
// First one being the creation of the directory marker
|
||||||
|
// Second being the creation of the test file
|
||||||
|
// Third being the creation of directory marker on the file delete
|
||||||
|
assertThatStatisticCounter(fs.getIOStatistics(), OBJECT_PUT_REQUESTS.getSymbol())
|
||||||
|
.isEqualTo(3);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user