diff --git a/hadoop-tools/hadoop-aws/pom.xml b/hadoop-tools/hadoop-aws/pom.xml index d8d717a3b4..207b1c0f94 100644 --- a/hadoop-tools/hadoop-aws/pom.xml +++ b/hadoop-tools/hadoop-aws/pom.xml @@ -107,6 +107,7 @@ ${testsThreadCount} false + false ${maven-surefire-plugin.argLine} -DminiClusterDedicatedDirs=true ${testsThreadCount} @@ -276,6 +277,7 @@ verify + false ${fs.s3a.scale.test.enabled} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index 8c765fa72d..451b9b0ee2 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -1256,4 +1256,23 @@ private Constants() { */ public static final int DEFAULT_AWS_S3_VECTOR_ACTIVE_RANGE_READS = 4; + /** + * Option to enable or disable the multipart uploads. + * Value: {@value}. + *

+ * Default is {@link #DEFAULT_MULTIPART_UPLOAD_ENABLED}. + */ + public static final String MULTIPART_UPLOADS_ENABLED = "fs.s3a.multipart.uploads.enabled"; + + /** + * Default value for multipart uploads. + * {@value} + */ + public static final boolean DEFAULT_MULTIPART_UPLOAD_ENABLED = true; + + /** + * Stream supports multipart uploads to the given path. + */ + public static final String STORE_CAPABILITY_DIRECTORY_MARKER_MULTIPART_UPLOAD_ENABLED = + "fs.s3a.capability.multipart.uploads.enabled"; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java index 19943ff2f7..df3c9315ba 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java @@ -101,7 +101,7 @@ class S3ABlockOutputStream extends OutputStream implements private final String key; /** Size of all blocks. */ - private final int blockSize; + private final long blockSize; /** IO Statistics. */ private final IOStatistics iostatistics; @@ -169,6 +169,9 @@ class S3ABlockOutputStream extends OutputStream implements /** Thread level IOStatistics Aggregator. */ private final IOStatisticsAggregator threadIOStatisticsAggregator; + /** Is multipart upload enabled? */ + private final boolean isMultipartUploadEnabled; + /** * An S3A output stream which uploads partitions in a separate pool of * threads; different {@link S3ADataBlocks.BlockFactory} @@ -181,7 +184,6 @@ class S3ABlockOutputStream extends OutputStream implements this.builder = builder; this.key = builder.key; this.blockFactory = builder.blockFactory; - this.blockSize = (int) builder.blockSize; this.statistics = builder.statistics; // test instantiations may not provide statistics; this.iostatistics = statistics.getIOStatistics(); @@ -195,17 +197,26 @@ class S3ABlockOutputStream extends OutputStream implements (ProgressListener) progress : new ProgressableListener(progress); downgradeSyncableExceptions = builder.downgradeSyncableExceptions; - // create that first block. This guarantees that an open + close sequence - // writes a 0-byte entry. - createBlockIfNeeded(); - LOG.debug("Initialized S3ABlockOutputStream for {}" + - " output to {}", key, activeBlock); + + // look for multipart support. + this.isMultipartUploadEnabled = builder.isMultipartUploadEnabled; + // block size is infinite if multipart is disabled, so ignore + // what was passed in from the builder. + this.blockSize = isMultipartUploadEnabled + ? builder.blockSize + : -1; + if (putTracker.initialize()) { LOG.debug("Put tracker requests multipart upload"); initMultipartUpload(); } this.isCSEEnabled = builder.isCSEEnabled; this.threadIOStatisticsAggregator = builder.ioStatisticsAggregator; + // create that first block. This guarantees that an open + close sequence + // writes a 0-byte entry. + createBlockIfNeeded(); + LOG.debug("Initialized S3ABlockOutputStream for {}" + + " output to {}", key, activeBlock); } /** @@ -318,7 +329,15 @@ public synchronized void write(byte[] source, int offset, int len) statistics.writeBytes(len); S3ADataBlocks.DataBlock block = createBlockIfNeeded(); int written = block.write(source, offset, len); - int remainingCapacity = block.remainingCapacity(); + if (!isMultipartUploadEnabled) { + // no need to check for space as multipart uploads + // are not available...everything is saved to a single + // (disk) block. + return; + } + // look to see if another block is needed to complete + // the upload or exactly a block was written. + int remainingCapacity = (int) block.remainingCapacity(); if (written < len) { // not everything was written —the block has run out // of capacity @@ -369,6 +388,8 @@ private synchronized void uploadCurrentBlock(boolean isLast) */ @Retries.RetryTranslated private void initMultipartUpload() throws IOException { + Preconditions.checkState(isMultipartUploadEnabled, + "multipart upload is disabled"); if (multiPartUpload == null) { LOG.debug("Initiating Multipart upload"); multiPartUpload = new MultiPartUpload(key); @@ -558,19 +579,20 @@ public String toString() { } /** - * Upload the current block as a single PUT request; if the buffer - * is empty a 0-byte PUT will be invoked, as it is needed to create an - * entry at the far end. - * @throws IOException any problem. - * @return number of bytes uploaded. If thread was interrupted while - * waiting for upload to complete, returns zero with interrupted flag set - * on this thread. + * Upload the current block as a single PUT request; if the buffer is empty a + * 0-byte PUT will be invoked, as it is needed to create an entry at the far + * end. + * @return number of bytes uploaded. If thread was interrupted while waiting + * for upload to complete, returns zero with interrupted flag set on this + * thread. + * @throws IOException + * any problem. */ - private int putObject() throws IOException { + private long putObject() throws IOException { LOG.debug("Executing regular upload for {}", writeOperationHelper); final S3ADataBlocks.DataBlock block = getActiveBlock(); - int size = block.dataSize(); + long size = block.dataSize(); final S3ADataBlocks.BlockUploadData uploadData = block.startUpload(); final PutObjectRequest putObjectRequest = uploadData.hasFile() ? writeOperationHelper.createPutObjectRequest( @@ -617,6 +639,7 @@ public String toString() { "S3ABlockOutputStream{"); sb.append(writeOperationHelper.toString()); sb.append(", blockSize=").append(blockSize); + sb.append(", isMultipartUploadEnabled=").append(isMultipartUploadEnabled); // unsynced access; risks consistency in exchange for no risk of deadlock. S3ADataBlocks.DataBlock block = activeBlock; if (block != null) { @@ -835,7 +858,7 @@ private void uploadBlockAsync(final S3ADataBlocks.DataBlock block, Preconditions.checkNotNull(uploadId, "Null uploadId"); maybeRethrowUploadFailure(); partsSubmitted++; - final int size = block.dataSize(); + final long size = block.dataSize(); bytesSubmitted += size; final int currentPartNumber = partETagsFutures.size() + 1; final UploadPartRequest request; @@ -1011,7 +1034,7 @@ public void progressChanged(ProgressEvent progressEvent) { ProgressEventType eventType = progressEvent.getEventType(); long bytesTransferred = progressEvent.getBytesTransferred(); - int size = block.dataSize(); + long size = block.dataSize(); switch (eventType) { case REQUEST_BYTE_TRANSFER_EVENT: @@ -1126,6 +1149,11 @@ public static final class BlockOutputStreamBuilder { */ private IOStatisticsAggregator ioStatisticsAggregator; + /** + * Is Multipart Uploads enabled for the given upload. + */ + private boolean isMultipartUploadEnabled; + private BlockOutputStreamBuilder() { } @@ -1276,5 +1304,11 @@ public BlockOutputStreamBuilder withIOStatisticsAggregator( ioStatisticsAggregator = value; return this; } + + public BlockOutputStreamBuilder withMultipartEnabled( + final boolean value) { + isMultipartUploadEnabled = value; + return this; + } } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java index 2503177069..ddaf059b05 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java @@ -180,7 +180,7 @@ protected BlockFactory(S3AFileSystem owner) { * @param statistics stats to work with * @return a new block. */ - abstract DataBlock create(long index, int limit, + abstract DataBlock create(long index, long limit, BlockOutputStreamStatistics statistics) throws IOException; @@ -258,7 +258,7 @@ final DestState getState() { * Return the current data size. * @return the size of the data */ - abstract int dataSize(); + abstract long dataSize(); /** * Predicate to verify that the block has the capacity to write @@ -280,7 +280,7 @@ boolean hasData() { * The remaining capacity in the block before it is full. * @return the number of bytes remaining. */ - abstract int remainingCapacity(); + abstract long remainingCapacity(); /** * Write a series of bytes from the buffer, from the offset. @@ -391,9 +391,11 @@ static class ArrayBlockFactory extends BlockFactory { } @Override - DataBlock create(long index, int limit, + DataBlock create(long index, long limit, BlockOutputStreamStatistics statistics) throws IOException { + Preconditions.checkArgument(limit > 0, + "Invalid block size: %d", limit); return new ByteArrayBlock(0, limit, statistics); } @@ -436,11 +438,11 @@ static class ByteArrayBlock extends DataBlock { private Integer dataSize; ByteArrayBlock(long index, - int limit, + long limit, BlockOutputStreamStatistics statistics) { super(index, statistics); - this.limit = limit; - buffer = new S3AByteArrayOutputStream(limit); + this.limit = (limit > Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int) limit; + buffer = new S3AByteArrayOutputStream(this.limit); blockAllocated(); } @@ -449,7 +451,7 @@ static class ByteArrayBlock extends DataBlock { * @return the amount of data available to upload. */ @Override - int dataSize() { + long dataSize() { return dataSize != null ? dataSize : buffer.size(); } @@ -468,14 +470,14 @@ boolean hasCapacity(long bytes) { } @Override - int remainingCapacity() { + long remainingCapacity() { return limit - dataSize(); } @Override int write(byte[] b, int offset, int len) throws IOException { super.write(b, offset, len); - int written = Math.min(remainingCapacity(), len); + int written = (int) Math.min(remainingCapacity(), len); buffer.write(b, offset, written); return written; } @@ -514,9 +516,11 @@ static class ByteBufferBlockFactory extends BlockFactory { } @Override - ByteBufferBlock create(long index, int limit, + ByteBufferBlock create(long index, long limit, BlockOutputStreamStatistics statistics) throws IOException { + Preconditions.checkArgument(limit > 0, + "Invalid block size: %d", limit); return new ByteBufferBlock(index, limit, statistics); } @@ -564,11 +568,12 @@ class ByteBufferBlock extends DataBlock { * @param statistics statistics to update */ ByteBufferBlock(long index, - int bufferSize, + long bufferSize, BlockOutputStreamStatistics statistics) { super(index, statistics); - this.bufferSize = bufferSize; - blockBuffer = requestBuffer(bufferSize); + this.bufferSize = bufferSize > Integer.MAX_VALUE ? + Integer.MAX_VALUE : (int) bufferSize; + blockBuffer = requestBuffer(this.bufferSize); blockAllocated(); } @@ -577,7 +582,7 @@ class ByteBufferBlock extends DataBlock { * @return the amount of data available to upload. */ @Override - int dataSize() { + long dataSize() { return dataSize != null ? dataSize : bufferCapacityUsed(); } @@ -598,7 +603,7 @@ public boolean hasCapacity(long bytes) { } @Override - public int remainingCapacity() { + public long remainingCapacity() { return blockBuffer != null ? blockBuffer.remaining() : 0; } @@ -609,7 +614,7 @@ private int bufferCapacityUsed() { @Override int write(byte[] b, int offset, int len) throws IOException { super.write(b, offset, len); - int written = Math.min(remainingCapacity(), len); + int written = (int) Math.min(remainingCapacity(), len); blockBuffer.put(b, offset, written); return written; } @@ -802,16 +807,18 @@ static class DiskBlockFactory extends BlockFactory { * Create a temp file and a {@link DiskBlock} instance to manage it. * * @param index block index - * @param limit limit of the block. + * @param limit limit of the block. -1 means "no limit" * @param statistics statistics to update * @return the new block * @throws IOException IO problems */ @Override DataBlock create(long index, - int limit, + long limit, BlockOutputStreamStatistics statistics) throws IOException { + Preconditions.checkArgument(limit != 0, + "Invalid block size: %d", limit); File destFile = getOwner() .createTmpFileForWrite(String.format("s3ablock-%04d-", index), limit, getOwner().getConf()); @@ -825,14 +832,14 @@ DataBlock create(long index, */ static class DiskBlock extends DataBlock { - private int bytesWritten; + private long bytesWritten; private final File bufferFile; - private final int limit; + private final long limit; private BufferedOutputStream out; private final AtomicBoolean closed = new AtomicBoolean(false); DiskBlock(File bufferFile, - int limit, + long limit, long index, BlockOutputStreamStatistics statistics) throws FileNotFoundException { @@ -844,24 +851,39 @@ static class DiskBlock extends DataBlock { } @Override - int dataSize() { + long dataSize() { return bytesWritten; } + /** + * Does this block have unlimited space? + * @return true if a block with no size limit was created. + */ + private boolean unlimited() { + return limit < 0; + } + @Override boolean hasCapacity(long bytes) { - return dataSize() + bytes <= limit; + return unlimited() || dataSize() + bytes <= limit; } + /** + * {@inheritDoc}. + * If there is no limit to capacity, return MAX_VALUE. + * @return capacity in the block. + */ @Override - int remainingCapacity() { - return limit - bytesWritten; + long remainingCapacity() { + return unlimited() + ? Integer.MAX_VALUE + : limit - bytesWritten; } @Override int write(byte[] b, int offset, int len) throws IOException { super.write(b, offset, len); - int written = Math.min(remainingCapacity(), len); + int written = (int) Math.min(remainingCapacity(), len); out.write(b, offset, written); bytesWritten += written; return written; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 0a438d63d4..124e0ee551 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -413,6 +413,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, */ private ArnResource accessPoint; + /** + * Does this S3A FS instance have multipart upload enabled? + */ + private boolean isMultipartUploadEnabled = DEFAULT_MULTIPART_UPLOAD_ENABLED; + /** * A cache of files that should be deleted when the FileSystem is closed * or the JVM is exited. @@ -534,6 +539,8 @@ public void initialize(URI name, Configuration originalConf) longBytesOption(conf, FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE, 1); enableMultiObjectsDelete = conf.getBoolean(ENABLE_MULTI_DELETE, true); + this.isMultipartUploadEnabled = conf.getBoolean(MULTIPART_UPLOADS_ENABLED, + DEFAULT_MULTIPART_UPLOAD_ENABLED); this.prefetchEnabled = conf.getBoolean(PREFETCH_ENABLED_KEY, PREFETCH_ENABLED_DEFAULT); long prefetchBlockSizeLong = longBytesOption(conf, PREFETCH_BLOCK_SIZE_KEY, PREFETCH_BLOCK_DEFAULT_SIZE, @@ -606,7 +613,6 @@ public void initialize(URI name, Configuration originalConf) } blockOutputBuffer = conf.getTrimmed(FAST_UPLOAD_BUFFER, DEFAULT_FAST_UPLOAD_BUFFER); - partSize = ensureOutputParameterInRange(MULTIPART_SIZE, partSize); blockFactory = S3ADataBlocks.createFactory(this, blockOutputBuffer); blockOutputActiveBlocks = intOption(conf, FAST_UPLOAD_ACTIVE_BLOCKS, DEFAULT_FAST_UPLOAD_ACTIVE_BLOCKS, 1); @@ -615,8 +621,8 @@ public void initialize(URI name, Configuration originalConf) blockOutputActiveBlocks = 1; } LOG.debug("Using S3ABlockOutputStream with buffer = {}; block={};" + - " queue limit={}", - blockOutputBuffer, partSize, blockOutputActiveBlocks); + " queue limit={}; multipart={}", + blockOutputBuffer, partSize, blockOutputActiveBlocks, isMultipartUploadEnabled); // verify there's no S3Guard in the store config. checkNoS3Guard(this.getUri(), getConf()); @@ -783,8 +789,8 @@ private void initThreadPools(Configuration conf) { int activeTasksForBoundedThreadPool = maxThreads; int waitingTasksForBoundedThreadPool = maxThreads + totalTasks + numPrefetchThreads; boundedThreadPool = BlockingThreadPoolExecutorService.newInstance( - activeTasksForBoundedThreadPool, - waitingTasksForBoundedThreadPool, + maxThreads, + maxThreads + totalTasks, keepAliveTime, TimeUnit.SECONDS, name + "-bounded"); unboundedThreadPool = new ThreadPoolExecutor( @@ -5431,4 +5437,8 @@ public RequestFactory getRequestFactory() { public boolean isCSEEnabled() { return isCSEEnabled; } + + public boolean isMultipartUploadEnabled() { + return isMultipartUploadEnabled; + } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java index 02409f7449..5f753757d8 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java @@ -1547,7 +1547,7 @@ public void blockReleased() { * of block uploads pending (1) and the bytes pending (blockSize). */ @Override - public void blockUploadQueued(int blockSize) { + public void blockUploadQueued(long blockSize) { incCounter(StreamStatisticNames.STREAM_WRITE_BLOCK_UPLOADS); incAllGauges(STREAM_WRITE_BLOCK_UPLOADS_PENDING, 1); incAllGauges(STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING, blockSize); @@ -1560,7 +1560,7 @@ public void blockUploadQueued(int blockSize) { * {@code STREAM_WRITE_BLOCK_UPLOADS_ACTIVE}. */ @Override - public void blockUploadStarted(Duration timeInQueue, int blockSize) { + public void blockUploadStarted(Duration timeInQueue, long blockSize) { // the local counter is used in toString reporting. queueDuration.addAndGet(timeInQueue.toMillis()); // update the duration fields in the IOStatistics. @@ -1588,7 +1588,7 @@ private IOStatisticsStore localIOStatistics() { @Override public void blockUploadCompleted( Duration timeSinceUploadStarted, - int blockSize) { + long blockSize) { transferDuration.addAndGet(timeSinceUploadStarted.toMillis()); incAllGauges(STREAM_WRITE_BLOCK_UPLOADS_ACTIVE, -1); blockUploadsCompleted.incrementAndGet(); @@ -1602,7 +1602,7 @@ public void blockUploadCompleted( @Override public void blockUploadFailed( Duration timeSinceUploadStarted, - int blockSize) { + long blockSize) { incCounter(StreamStatisticNames.STREAM_WRITE_EXCEPTIONS); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java index 1401c181e7..8159244dc3 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java @@ -41,6 +41,7 @@ import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.fs.PathIOException; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.util.functional.RemoteIterators; import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets; @@ -1026,6 +1027,38 @@ public static long getMultipartSizeProperty(Configuration conf, return partSize; } + /** + * Validates the output stream configuration. + * @param path path: for error messages + * @param conf : configuration object for the given context + * @throws PathIOException Unsupported configuration. + */ + public static void validateOutputStreamConfiguration(final Path path, + Configuration conf) throws PathIOException { + if(!checkDiskBuffer(conf)){ + throw new PathIOException(path.toString(), + "Unable to create OutputStream with the given" + + " multipart upload and buffer configuration."); + } + } + + /** + * Check whether the configuration for S3ABlockOutputStream is + * consistent or not. Multipart uploads allow all kinds of fast buffers to + * be supported. When the option is disabled only disk buffers are allowed to + * be used as the file size might be bigger than the buffer size that can be + * allocated. + * @param conf : configuration object for the given context + * @return true if the disk buffer and the multipart settings are supported + */ + public static boolean checkDiskBuffer(Configuration conf) { + boolean isMultipartUploadEnabled = conf.getBoolean(MULTIPART_UPLOADS_ENABLED, + DEFAULT_MULTIPART_UPLOAD_ENABLED); + return isMultipartUploadEnabled + || FAST_UPLOAD_BUFFER_DISK.equals( + conf.get(FAST_UPLOAD_BUFFER, DEFAULT_FAST_UPLOAD_BUFFER)); + } + /** * Ensure that the long value is in the range of an integer. * @param name property name for error messages diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java index 060a3c3b96..7e0875d8bb 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java @@ -269,8 +269,6 @@ public PutObjectRequest createPutObjectRequest( String dest, File sourceFile, final PutObjectOptions options) { - Preconditions.checkState(sourceFile.length() < Integer.MAX_VALUE, - "File length is too big for a single PUT upload"); activateAuditSpan(); final ObjectMetadata objectMetadata = newObjectMetadata((int) sourceFile.length()); @@ -532,7 +530,7 @@ public UploadPartRequest newUploadPartRequest( String destKey, String uploadId, int partNumber, - int size, + long size, InputStream uploadStream, File sourceFile, Long offset) throws IOException { diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java index 91e0342cf7..9063423122 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java @@ -233,7 +233,7 @@ UploadPartRequest newUploadPartRequest( String destKey, String uploadId, int partNumber, - int size, + long size, InputStream uploadStream, File sourceFile, Long offset) throws IOException; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java index cae4d3ef03..2a4771925f 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java @@ -196,10 +196,11 @@ AbortMultipartUploadRequest newAbortMultipartUploadRequest( * @param destKey destination object key * @param options options for the request * @return the request. + * @throws PathIOException if multipart uploads are disabled */ InitiateMultipartUploadRequest newMultipartUploadRequest( String destKey, - @Nullable PutObjectOptions options); + @Nullable PutObjectOptions options) throws PathIOException; /** * Complete a multipart upload. @@ -248,7 +249,7 @@ UploadPartRequest newUploadPartRequest( String destKey, String uploadId, int partNumber, - int size, + long size, InputStream uploadStream, File sourceFile, long offset) throws PathIOException; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java index d6044edde2..e53c690431 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java @@ -217,6 +217,10 @@ protected AbstractS3ACommitter( LOG.debug("{} instantiated for job \"{}\" ID {} with destination {}", role, jobName(context), jobIdString(context), outputPath); S3AFileSystem fs = getDestS3AFS(); + if (!fs.isMultipartUploadEnabled()) { + throw new PathCommitException(outputPath, "Multipart uploads are disabled for the FileSystem," + + " the committer can't proceed."); + } // set this thread's context with the job ID. // audit spans created in this thread will pick // up this value., including the commit operations instance diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java index 5d9c7bb5d1..614ed32cc7 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java @@ -124,6 +124,11 @@ public class RequestFactoryImpl implements RequestFactory { */ private final StorageClass storageClass; + /** + * Is multipart upload enabled. + */ + private final boolean isMultipartUploadEnabled; + /** * Constructor. * @param builder builder with all the configuration. @@ -137,6 +142,7 @@ protected RequestFactoryImpl( this.requestPreparer = builder.requestPreparer; this.contentEncoding = builder.contentEncoding; this.storageClass = builder.storageClass; + this.isMultipartUploadEnabled = builder.isMultipartUploadEnabled; } /** @@ -460,7 +466,10 @@ public AbortMultipartUploadRequest newAbortMultipartUploadRequest( @Override public InitiateMultipartUploadRequest newMultipartUploadRequest( final String destKey, - @Nullable final PutObjectOptions options) { + @Nullable final PutObjectOptions options) throws PathIOException { + if (!isMultipartUploadEnabled) { + throw new PathIOException(destKey, "Multipart uploads are disabled."); + } final ObjectMetadata objectMetadata = newObjectMetadata(-1); maybeSetMetadata(options, objectMetadata); final InitiateMultipartUploadRequest initiateMPURequest = @@ -509,7 +518,7 @@ public UploadPartRequest newUploadPartRequest( String destKey, String uploadId, int partNumber, - int size, + long size, InputStream uploadStream, File sourceFile, long offset) throws PathIOException { @@ -682,6 +691,11 @@ public static final class RequestFactoryBuilder { */ private PrepareRequest requestPreparer; + /** + * Is Multipart Enabled on the path. + */ + private boolean isMultipartUploadEnabled = true; + private RequestFactoryBuilder() { } @@ -767,6 +781,18 @@ public RequestFactoryBuilder withRequestPreparer( this.requestPreparer = value; return this; } + + /** + * Multipart upload enabled. + * + * @param value new value + * @return the builder + */ + public RequestFactoryBuilder withMultipartUploadEnabled( + final boolean value) { + this.isMultipartUploadEnabled = value; + return this; + } } /** diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/BlockOutputStreamStatistics.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/BlockOutputStreamStatistics.java index bd1466b2a4..554b628d00 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/BlockOutputStreamStatistics.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/BlockOutputStreamStatistics.java @@ -32,21 +32,21 @@ public interface BlockOutputStreamStatistics extends Closeable, * Block is queued for upload. * @param blockSize block size. */ - void blockUploadQueued(int blockSize); + void blockUploadQueued(long blockSize); /** * Queued block has been scheduled for upload. * @param timeInQueue time in the queue. * @param blockSize block size. */ - void blockUploadStarted(Duration timeInQueue, int blockSize); + void blockUploadStarted(Duration timeInQueue, long blockSize); /** * A block upload has completed. Duration excludes time in the queue. * @param timeSinceUploadStarted time in since the transfer began. * @param blockSize block size */ - void blockUploadCompleted(Duration timeSinceUploadStarted, int blockSize); + void blockUploadCompleted(Duration timeSinceUploadStarted, long blockSize); /** * A block upload has failed. Duration excludes time in the queue. @@ -57,7 +57,7 @@ public interface BlockOutputStreamStatistics extends Closeable, * @param timeSinceUploadStarted time in since the transfer began. * @param blockSize block size */ - void blockUploadFailed(Duration timeSinceUploadStarted, int blockSize); + void blockUploadFailed(Duration timeSinceUploadStarted, long blockSize); /** * Intermediate report of bytes uploaded. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java index d10b648417..6454065b24 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java @@ -442,22 +442,22 @@ private static final class EmptyBlockOutputStreamStatistics implements BlockOutputStreamStatistics { @Override - public void blockUploadQueued(final int blockSize) { + public void blockUploadQueued(final long blockSize) { } @Override public void blockUploadStarted(final Duration timeInQueue, - final int blockSize) { + final long blockSize) { } @Override public void blockUploadCompleted(final Duration timeSinceUploadStarted, - final int blockSize) { + final long blockSize) { } @Override public void blockUploadFailed(final Duration timeSinceUploadStarted, - final int blockSize) { + final long blockSize) { } @Override diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md index dcc459c25d..549e69115d 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md @@ -1723,7 +1723,9 @@ The "fast" output stream 1. Uploads large files as blocks with the size set by `fs.s3a.multipart.size`. That is: the threshold at which multipart uploads - begin and the size of each upload are identical. + begin and the size of each upload are identical. This behavior can be enabled + or disabled by using the flag `fs.s3a.multipart.uploads.enabled` which by + default is set to true. 1. Buffers blocks to disk (default) or in on-heap or off-heap memory. 1. Uploads blocks in parallel in background threads. 1. Begins uploading blocks as soon as the buffered data exceeds this partition diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3AFileSystem.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3AFileSystem.java index 42fc630924..658ceb4917 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3AFileSystem.java @@ -200,6 +200,11 @@ public boolean isMagicCommitEnabled() { return true; } + @Override + public boolean isMultipartUploadEnabled() { + return true; + } + /** * Make operation to set the s3 client public. * @param client client. diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocolFailure.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocolFailure.java new file mode 100644 index 0000000000..41593c2b26 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocolFailure.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit.magic; + +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.AbstractS3ATestBase; +import org.apache.hadoop.fs.s3a.commit.CommitConstants; +import org.apache.hadoop.fs.s3a.commit.PathCommitException; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; + +import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_UPLOADS_ENABLED; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBucketOverrides; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_NAME; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_COMMITTER_ENABLED; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.S3A_COMMITTER_FACTORY_KEY; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + +/** + * Verify that the magic committer cannot be created if the FS doesn't support multipart + * uploads. + */ +public class ITestMagicCommitProtocolFailure extends AbstractS3ATestBase { + + @Override + protected Configuration createConfiguration() { + Configuration conf = super.createConfiguration(); + removeBucketOverrides(getTestBucketName(conf), conf, + MAGIC_COMMITTER_ENABLED, + S3A_COMMITTER_FACTORY_KEY, + FS_S3A_COMMITTER_NAME, + MULTIPART_UPLOADS_ENABLED); + conf.setBoolean(MULTIPART_UPLOADS_ENABLED, false); + conf.set(S3A_COMMITTER_FACTORY_KEY, CommitConstants.S3A_COMMITTER_FACTORY); + conf.set(FS_S3A_COMMITTER_NAME, CommitConstants.COMMITTER_NAME_MAGIC); + return conf; + } + + @Test + public void testCreateCommitter() throws Exception { + TaskAttemptContext tContext = new TaskAttemptContextImpl(getConfiguration(), + new TaskAttemptID()); + Path commitPath = methodPath(); + LOG.debug("Trying to create a committer on the path: {}", commitPath); + intercept(PathCommitException.class, + () -> new MagicS3GuardCommitter(commitPath, tContext)); + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocolFailure.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocolFailure.java new file mode 100644 index 0000000000..a6d2c57d1d --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocolFailure.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit.staging.integration; + +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.AbstractS3ATestBase; +import org.apache.hadoop.fs.s3a.commit.CommitConstants; +import org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants; +import org.apache.hadoop.fs.s3a.commit.PathCommitException; +import org.apache.hadoop.fs.s3a.commit.staging.StagingCommitter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; + +import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_UPLOADS_ENABLED; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBucketOverrides; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_NAME; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.S3A_COMMITTER_FACTORY_KEY; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + +/** + * Verify that a staging committer cannot be created if the FS doesn't support multipart + * uploads. + */ +public class ITestStagingCommitProtocolFailure extends AbstractS3ATestBase { + + @Override + protected Configuration createConfiguration() { + Configuration conf = super.createConfiguration(); + removeBucketOverrides(getTestBucketName(conf), conf, + S3A_COMMITTER_FACTORY_KEY, + FS_S3A_COMMITTER_NAME, + MULTIPART_UPLOADS_ENABLED); + conf.setBoolean(MULTIPART_UPLOADS_ENABLED, false); + conf.set(S3A_COMMITTER_FACTORY_KEY, CommitConstants.S3A_COMMITTER_FACTORY); + conf.set(FS_S3A_COMMITTER_NAME, InternalCommitterConstants.COMMITTER_NAME_STAGING); + return conf; + } + + @Test + public void testCreateCommitter() throws Exception { + TaskAttemptContext tContext = new TaskAttemptContextImpl(getConfiguration(), + new TaskAttemptID()); + Path commitPath = methodPath(); + LOG.debug("Trying to create a committer on the path: {}", commitPath); + intercept(PathCommitException.class, + () -> new StagingCommitter(commitPath, tContext)); + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java index 5c243bb820..7c85142d43 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java @@ -20,6 +20,7 @@ import java.io.ByteArrayInputStream; import java.io.File; +import java.io.IOException; import java.util.ArrayList; import java.util.concurrent.atomic.AtomicLong; @@ -155,7 +156,7 @@ public T prepareRequest(final T t) { * Create objects through the factory. * @param factory factory */ - private void createFactoryObjects(RequestFactory factory) { + private void createFactoryObjects(RequestFactory factory) throws IOException { String path = "path"; String path2 = "path2"; String id = "1"; diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUploadSinglePut.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUploadSinglePut.java new file mode 100644 index 0000000000..08192969e2 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUploadSinglePut.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.scale; + +import java.io.IOException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.junit.Test; + +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.fs.s3a.Constants; + +import static org.apache.hadoop.fs.contract.ContractTestUtils.IO_CHUNK_BUFFER_SIZE; +import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER; +import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER_DISK; +import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE; +import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_UPLOADS_ENABLED; +import static org.apache.hadoop.fs.s3a.Constants.REQUEST_TIMEOUT; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestPropertyBytes; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBucketOverrides; +import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_PUT_REQUESTS; +import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter; + +/** + * Test a file upload using a single PUT operation. Multipart uploads will + * be disabled in the test. + */ +public class ITestS3AHugeFileUploadSinglePut extends S3AScaleTestBase { + + public static final Logger LOG = LoggerFactory.getLogger( + ITestS3AHugeFileUploadSinglePut.class); + + private long fileSize; + + @Override + protected Configuration createScaleConfiguration() { + Configuration conf = super.createScaleConfiguration(); + removeBucketOverrides(getTestBucketName(conf), conf, + FAST_UPLOAD_BUFFER, + IO_CHUNK_BUFFER_SIZE, + KEY_HUGE_FILESIZE, + MULTIPART_UPLOADS_ENABLED, + MULTIPART_SIZE, + REQUEST_TIMEOUT); + conf.setBoolean(Constants.MULTIPART_UPLOADS_ENABLED, false); + fileSize = getTestPropertyBytes(conf, KEY_HUGE_FILESIZE, + DEFAULT_HUGE_FILESIZE); + // set a small part size to verify it does not impact block allocation size + conf.setLong(MULTIPART_SIZE, 10_000); + conf.set(FAST_UPLOAD_BUFFER, FAST_UPLOAD_BUFFER_DISK); + conf.setInt(IO_CHUNK_BUFFER_SIZE, 655360); + conf.set(REQUEST_TIMEOUT, "1h"); + return conf; + } + + @Test + public void uploadFileSinglePut() throws IOException { + LOG.info("Creating file with size : {}", fileSize); + S3AFileSystem fs = getFileSystem(); + ContractTestUtils.createAndVerifyFile(fs, + methodPath(), fileSize); + // Exactly three put requests should be made during the upload of the file + // First one being the creation of the directory marker + // Second being the creation of the test file + // Third being the creation of directory marker on the file delete + assertThatStatisticCounter(fs.getIOStatistics(), OBJECT_PUT_REQUESTS.getSymbol()) + .isEqualTo(3); + } +}