From 86ad35c94ccbc7d88f20c465812d3bf439e839bb Mon Sep 17 00:00:00 2001 From: Mukund Thakur Date: Wed, 10 May 2023 14:19:21 -0500 Subject: [PATCH] Revert "HADOOP-18637. S3A to support upload of files greater than 2 GB using DiskBlocks (#5630)" This reverts commit df209dd2e338e9dd5afc3d33ebf5f0f8470f5fee. Caused test failures because of incorrect merge conflict resolution. --- hadoop-tools/hadoop-aws/pom.xml | 2 - .../org/apache/hadoop/fs/s3a/Constants.java | 19 ---- .../hadoop/fs/s3a/S3ABlockOutputStream.java | 72 ++++----------- .../apache/hadoop/fs/s3a/S3ADataBlocks.java | 76 ++++++---------- .../apache/hadoop/fs/s3a/S3AFileSystem.java | 20 ++--- .../hadoop/fs/s3a/S3AInstrumentation.java | 8 +- .../org/apache/hadoop/fs/s3a/S3AUtils.java | 33 ------- .../hadoop/fs/s3a/WriteOperationHelper.java | 4 +- .../apache/hadoop/fs/s3a/WriteOperations.java | 2 +- .../hadoop/fs/s3a/api/RequestFactory.java | 5 +- .../fs/s3a/commit/AbstractS3ACommitter.java | 4 - .../fs/s3a/impl/RequestFactoryImpl.java | 30 +------ .../BlockOutputStreamStatistics.java | 8 +- .../impl/EmptyS3AStatisticsContext.java | 8 +- .../site/markdown/tools/hadoop-aws/index.md | 4 +- .../hadoop/fs/s3a/MockS3AFileSystem.java | 5 -- .../ITestMagicCommitProtocolFailure.java | 69 -------------- .../ITestStagingCommitProtocolFailure.java | 69 -------------- .../fs/s3a/impl/TestRequestFactory.java | 3 +- .../ITestS3AHugeFileUploadSinglePut.java | 89 ------------------- 20 files changed, 73 insertions(+), 457 deletions(-) delete mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocolFailure.java delete mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocolFailure.java delete mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUploadSinglePut.java diff --git a/hadoop-tools/hadoop-aws/pom.xml b/hadoop-tools/hadoop-aws/pom.xml index 207b1c0f94..d8d717a3b4 100644 --- a/hadoop-tools/hadoop-aws/pom.xml +++ b/hadoop-tools/hadoop-aws/pom.xml @@ -107,7 +107,6 @@ ${testsThreadCount} false - false ${maven-surefire-plugin.argLine} -DminiClusterDedicatedDirs=true ${testsThreadCount} @@ -277,7 +276,6 @@ verify - false ${fs.s3a.scale.test.enabled} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index 451b9b0ee2..8c765fa72d 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -1256,23 +1256,4 @@ private Constants() { */ public static final int DEFAULT_AWS_S3_VECTOR_ACTIVE_RANGE_READS = 4; - /** - * Option to enable or disable the multipart uploads. - * Value: {@value}. - *

- * Default is {@link #DEFAULT_MULTIPART_UPLOAD_ENABLED}. - */ - public static final String MULTIPART_UPLOADS_ENABLED = "fs.s3a.multipart.uploads.enabled"; - - /** - * Default value for multipart uploads. - * {@value} - */ - public static final boolean DEFAULT_MULTIPART_UPLOAD_ENABLED = true; - - /** - * Stream supports multipart uploads to the given path. - */ - public static final String STORE_CAPABILITY_DIRECTORY_MARKER_MULTIPART_UPLOAD_ENABLED = - "fs.s3a.capability.multipart.uploads.enabled"; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java index df3c9315ba..19943ff2f7 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java @@ -101,7 +101,7 @@ class S3ABlockOutputStream extends OutputStream implements private final String key; /** Size of all blocks. */ - private final long blockSize; + private final int blockSize; /** IO Statistics. */ private final IOStatistics iostatistics; @@ -169,9 +169,6 @@ class S3ABlockOutputStream extends OutputStream implements /** Thread level IOStatistics Aggregator. */ private final IOStatisticsAggregator threadIOStatisticsAggregator; - /** Is multipart upload enabled? */ - private final boolean isMultipartUploadEnabled; - /** * An S3A output stream which uploads partitions in a separate pool of * threads; different {@link S3ADataBlocks.BlockFactory} @@ -184,6 +181,7 @@ class S3ABlockOutputStream extends OutputStream implements this.builder = builder; this.key = builder.key; this.blockFactory = builder.blockFactory; + this.blockSize = (int) builder.blockSize; this.statistics = builder.statistics; // test instantiations may not provide statistics; this.iostatistics = statistics.getIOStatistics(); @@ -197,26 +195,17 @@ class S3ABlockOutputStream extends OutputStream implements (ProgressListener) progress : new ProgressableListener(progress); downgradeSyncableExceptions = builder.downgradeSyncableExceptions; - - // look for multipart support. - this.isMultipartUploadEnabled = builder.isMultipartUploadEnabled; - // block size is infinite if multipart is disabled, so ignore - // what was passed in from the builder. - this.blockSize = isMultipartUploadEnabled - ? builder.blockSize - : -1; - + // create that first block. This guarantees that an open + close sequence + // writes a 0-byte entry. + createBlockIfNeeded(); + LOG.debug("Initialized S3ABlockOutputStream for {}" + + " output to {}", key, activeBlock); if (putTracker.initialize()) { LOG.debug("Put tracker requests multipart upload"); initMultipartUpload(); } this.isCSEEnabled = builder.isCSEEnabled; this.threadIOStatisticsAggregator = builder.ioStatisticsAggregator; - // create that first block. This guarantees that an open + close sequence - // writes a 0-byte entry. - createBlockIfNeeded(); - LOG.debug("Initialized S3ABlockOutputStream for {}" + - " output to {}", key, activeBlock); } /** @@ -329,15 +318,7 @@ public synchronized void write(byte[] source, int offset, int len) statistics.writeBytes(len); S3ADataBlocks.DataBlock block = createBlockIfNeeded(); int written = block.write(source, offset, len); - if (!isMultipartUploadEnabled) { - // no need to check for space as multipart uploads - // are not available...everything is saved to a single - // (disk) block. - return; - } - // look to see if another block is needed to complete - // the upload or exactly a block was written. - int remainingCapacity = (int) block.remainingCapacity(); + int remainingCapacity = block.remainingCapacity(); if (written < len) { // not everything was written —the block has run out // of capacity @@ -388,8 +369,6 @@ private synchronized void uploadCurrentBlock(boolean isLast) */ @Retries.RetryTranslated private void initMultipartUpload() throws IOException { - Preconditions.checkState(isMultipartUploadEnabled, - "multipart upload is disabled"); if (multiPartUpload == null) { LOG.debug("Initiating Multipart upload"); multiPartUpload = new MultiPartUpload(key); @@ -579,20 +558,19 @@ public String toString() { } /** - * Upload the current block as a single PUT request; if the buffer is empty a - * 0-byte PUT will be invoked, as it is needed to create an entry at the far - * end. - * @return number of bytes uploaded. If thread was interrupted while waiting - * for upload to complete, returns zero with interrupted flag set on this - * thread. - * @throws IOException - * any problem. + * Upload the current block as a single PUT request; if the buffer + * is empty a 0-byte PUT will be invoked, as it is needed to create an + * entry at the far end. + * @throws IOException any problem. + * @return number of bytes uploaded. If thread was interrupted while + * waiting for upload to complete, returns zero with interrupted flag set + * on this thread. */ - private long putObject() throws IOException { + private int putObject() throws IOException { LOG.debug("Executing regular upload for {}", writeOperationHelper); final S3ADataBlocks.DataBlock block = getActiveBlock(); - long size = block.dataSize(); + int size = block.dataSize(); final S3ADataBlocks.BlockUploadData uploadData = block.startUpload(); final PutObjectRequest putObjectRequest = uploadData.hasFile() ? writeOperationHelper.createPutObjectRequest( @@ -639,7 +617,6 @@ public String toString() { "S3ABlockOutputStream{"); sb.append(writeOperationHelper.toString()); sb.append(", blockSize=").append(blockSize); - sb.append(", isMultipartUploadEnabled=").append(isMultipartUploadEnabled); // unsynced access; risks consistency in exchange for no risk of deadlock. S3ADataBlocks.DataBlock block = activeBlock; if (block != null) { @@ -858,7 +835,7 @@ private void uploadBlockAsync(final S3ADataBlocks.DataBlock block, Preconditions.checkNotNull(uploadId, "Null uploadId"); maybeRethrowUploadFailure(); partsSubmitted++; - final long size = block.dataSize(); + final int size = block.dataSize(); bytesSubmitted += size; final int currentPartNumber = partETagsFutures.size() + 1; final UploadPartRequest request; @@ -1034,7 +1011,7 @@ public void progressChanged(ProgressEvent progressEvent) { ProgressEventType eventType = progressEvent.getEventType(); long bytesTransferred = progressEvent.getBytesTransferred(); - long size = block.dataSize(); + int size = block.dataSize(); switch (eventType) { case REQUEST_BYTE_TRANSFER_EVENT: @@ -1149,11 +1126,6 @@ public static final class BlockOutputStreamBuilder { */ private IOStatisticsAggregator ioStatisticsAggregator; - /** - * Is Multipart Uploads enabled for the given upload. - */ - private boolean isMultipartUploadEnabled; - private BlockOutputStreamBuilder() { } @@ -1304,11 +1276,5 @@ public BlockOutputStreamBuilder withIOStatisticsAggregator( ioStatisticsAggregator = value; return this; } - - public BlockOutputStreamBuilder withMultipartEnabled( - final boolean value) { - isMultipartUploadEnabled = value; - return this; - } } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java index ddaf059b05..2503177069 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java @@ -180,7 +180,7 @@ protected BlockFactory(S3AFileSystem owner) { * @param statistics stats to work with * @return a new block. */ - abstract DataBlock create(long index, long limit, + abstract DataBlock create(long index, int limit, BlockOutputStreamStatistics statistics) throws IOException; @@ -258,7 +258,7 @@ final DestState getState() { * Return the current data size. * @return the size of the data */ - abstract long dataSize(); + abstract int dataSize(); /** * Predicate to verify that the block has the capacity to write @@ -280,7 +280,7 @@ boolean hasData() { * The remaining capacity in the block before it is full. * @return the number of bytes remaining. */ - abstract long remainingCapacity(); + abstract int remainingCapacity(); /** * Write a series of bytes from the buffer, from the offset. @@ -391,11 +391,9 @@ static class ArrayBlockFactory extends BlockFactory { } @Override - DataBlock create(long index, long limit, + DataBlock create(long index, int limit, BlockOutputStreamStatistics statistics) throws IOException { - Preconditions.checkArgument(limit > 0, - "Invalid block size: %d", limit); return new ByteArrayBlock(0, limit, statistics); } @@ -438,11 +436,11 @@ static class ByteArrayBlock extends DataBlock { private Integer dataSize; ByteArrayBlock(long index, - long limit, + int limit, BlockOutputStreamStatistics statistics) { super(index, statistics); - this.limit = (limit > Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int) limit; - buffer = new S3AByteArrayOutputStream(this.limit); + this.limit = limit; + buffer = new S3AByteArrayOutputStream(limit); blockAllocated(); } @@ -451,7 +449,7 @@ static class ByteArrayBlock extends DataBlock { * @return the amount of data available to upload. */ @Override - long dataSize() { + int dataSize() { return dataSize != null ? dataSize : buffer.size(); } @@ -470,14 +468,14 @@ boolean hasCapacity(long bytes) { } @Override - long remainingCapacity() { + int remainingCapacity() { return limit - dataSize(); } @Override int write(byte[] b, int offset, int len) throws IOException { super.write(b, offset, len); - int written = (int) Math.min(remainingCapacity(), len); + int written = Math.min(remainingCapacity(), len); buffer.write(b, offset, written); return written; } @@ -516,11 +514,9 @@ static class ByteBufferBlockFactory extends BlockFactory { } @Override - ByteBufferBlock create(long index, long limit, + ByteBufferBlock create(long index, int limit, BlockOutputStreamStatistics statistics) throws IOException { - Preconditions.checkArgument(limit > 0, - "Invalid block size: %d", limit); return new ByteBufferBlock(index, limit, statistics); } @@ -568,12 +564,11 @@ class ByteBufferBlock extends DataBlock { * @param statistics statistics to update */ ByteBufferBlock(long index, - long bufferSize, + int bufferSize, BlockOutputStreamStatistics statistics) { super(index, statistics); - this.bufferSize = bufferSize > Integer.MAX_VALUE ? - Integer.MAX_VALUE : (int) bufferSize; - blockBuffer = requestBuffer(this.bufferSize); + this.bufferSize = bufferSize; + blockBuffer = requestBuffer(bufferSize); blockAllocated(); } @@ -582,7 +577,7 @@ class ByteBufferBlock extends DataBlock { * @return the amount of data available to upload. */ @Override - long dataSize() { + int dataSize() { return dataSize != null ? dataSize : bufferCapacityUsed(); } @@ -603,7 +598,7 @@ public boolean hasCapacity(long bytes) { } @Override - public long remainingCapacity() { + public int remainingCapacity() { return blockBuffer != null ? blockBuffer.remaining() : 0; } @@ -614,7 +609,7 @@ private int bufferCapacityUsed() { @Override int write(byte[] b, int offset, int len) throws IOException { super.write(b, offset, len); - int written = (int) Math.min(remainingCapacity(), len); + int written = Math.min(remainingCapacity(), len); blockBuffer.put(b, offset, written); return written; } @@ -807,18 +802,16 @@ static class DiskBlockFactory extends BlockFactory { * Create a temp file and a {@link DiskBlock} instance to manage it. * * @param index block index - * @param limit limit of the block. -1 means "no limit" + * @param limit limit of the block. * @param statistics statistics to update * @return the new block * @throws IOException IO problems */ @Override DataBlock create(long index, - long limit, + int limit, BlockOutputStreamStatistics statistics) throws IOException { - Preconditions.checkArgument(limit != 0, - "Invalid block size: %d", limit); File destFile = getOwner() .createTmpFileForWrite(String.format("s3ablock-%04d-", index), limit, getOwner().getConf()); @@ -832,14 +825,14 @@ DataBlock create(long index, */ static class DiskBlock extends DataBlock { - private long bytesWritten; + private int bytesWritten; private final File bufferFile; - private final long limit; + private final int limit; private BufferedOutputStream out; private final AtomicBoolean closed = new AtomicBoolean(false); DiskBlock(File bufferFile, - long limit, + int limit, long index, BlockOutputStreamStatistics statistics) throws FileNotFoundException { @@ -851,39 +844,24 @@ static class DiskBlock extends DataBlock { } @Override - long dataSize() { + int dataSize() { return bytesWritten; } - /** - * Does this block have unlimited space? - * @return true if a block with no size limit was created. - */ - private boolean unlimited() { - return limit < 0; - } - @Override boolean hasCapacity(long bytes) { - return unlimited() || dataSize() + bytes <= limit; + return dataSize() + bytes <= limit; } - /** - * {@inheritDoc}. - * If there is no limit to capacity, return MAX_VALUE. - * @return capacity in the block. - */ @Override - long remainingCapacity() { - return unlimited() - ? Integer.MAX_VALUE - : limit - bytesWritten; + int remainingCapacity() { + return limit - bytesWritten; } @Override int write(byte[] b, int offset, int len) throws IOException { super.write(b, offset, len); - int written = (int) Math.min(remainingCapacity(), len); + int written = Math.min(remainingCapacity(), len); out.write(b, offset, written); bytesWritten += written; return written; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 124e0ee551..0a438d63d4 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -413,11 +413,6 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, */ private ArnResource accessPoint; - /** - * Does this S3A FS instance have multipart upload enabled? - */ - private boolean isMultipartUploadEnabled = DEFAULT_MULTIPART_UPLOAD_ENABLED; - /** * A cache of files that should be deleted when the FileSystem is closed * or the JVM is exited. @@ -539,8 +534,6 @@ public void initialize(URI name, Configuration originalConf) longBytesOption(conf, FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE, 1); enableMultiObjectsDelete = conf.getBoolean(ENABLE_MULTI_DELETE, true); - this.isMultipartUploadEnabled = conf.getBoolean(MULTIPART_UPLOADS_ENABLED, - DEFAULT_MULTIPART_UPLOAD_ENABLED); this.prefetchEnabled = conf.getBoolean(PREFETCH_ENABLED_KEY, PREFETCH_ENABLED_DEFAULT); long prefetchBlockSizeLong = longBytesOption(conf, PREFETCH_BLOCK_SIZE_KEY, PREFETCH_BLOCK_DEFAULT_SIZE, @@ -613,6 +606,7 @@ public void initialize(URI name, Configuration originalConf) } blockOutputBuffer = conf.getTrimmed(FAST_UPLOAD_BUFFER, DEFAULT_FAST_UPLOAD_BUFFER); + partSize = ensureOutputParameterInRange(MULTIPART_SIZE, partSize); blockFactory = S3ADataBlocks.createFactory(this, blockOutputBuffer); blockOutputActiveBlocks = intOption(conf, FAST_UPLOAD_ACTIVE_BLOCKS, DEFAULT_FAST_UPLOAD_ACTIVE_BLOCKS, 1); @@ -621,8 +615,8 @@ public void initialize(URI name, Configuration originalConf) blockOutputActiveBlocks = 1; } LOG.debug("Using S3ABlockOutputStream with buffer = {}; block={};" + - " queue limit={}; multipart={}", - blockOutputBuffer, partSize, blockOutputActiveBlocks, isMultipartUploadEnabled); + " queue limit={}", + blockOutputBuffer, partSize, blockOutputActiveBlocks); // verify there's no S3Guard in the store config. checkNoS3Guard(this.getUri(), getConf()); @@ -789,8 +783,8 @@ private void initThreadPools(Configuration conf) { int activeTasksForBoundedThreadPool = maxThreads; int waitingTasksForBoundedThreadPool = maxThreads + totalTasks + numPrefetchThreads; boundedThreadPool = BlockingThreadPoolExecutorService.newInstance( - maxThreads, - maxThreads + totalTasks, + activeTasksForBoundedThreadPool, + waitingTasksForBoundedThreadPool, keepAliveTime, TimeUnit.SECONDS, name + "-bounded"); unboundedThreadPool = new ThreadPoolExecutor( @@ -5437,8 +5431,4 @@ public RequestFactory getRequestFactory() { public boolean isCSEEnabled() { return isCSEEnabled; } - - public boolean isMultipartUploadEnabled() { - return isMultipartUploadEnabled; - } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java index 5f753757d8..02409f7449 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java @@ -1547,7 +1547,7 @@ public void blockReleased() { * of block uploads pending (1) and the bytes pending (blockSize). */ @Override - public void blockUploadQueued(long blockSize) { + public void blockUploadQueued(int blockSize) { incCounter(StreamStatisticNames.STREAM_WRITE_BLOCK_UPLOADS); incAllGauges(STREAM_WRITE_BLOCK_UPLOADS_PENDING, 1); incAllGauges(STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING, blockSize); @@ -1560,7 +1560,7 @@ public void blockUploadQueued(long blockSize) { * {@code STREAM_WRITE_BLOCK_UPLOADS_ACTIVE}. */ @Override - public void blockUploadStarted(Duration timeInQueue, long blockSize) { + public void blockUploadStarted(Duration timeInQueue, int blockSize) { // the local counter is used in toString reporting. queueDuration.addAndGet(timeInQueue.toMillis()); // update the duration fields in the IOStatistics. @@ -1588,7 +1588,7 @@ private IOStatisticsStore localIOStatistics() { @Override public void blockUploadCompleted( Duration timeSinceUploadStarted, - long blockSize) { + int blockSize) { transferDuration.addAndGet(timeSinceUploadStarted.toMillis()); incAllGauges(STREAM_WRITE_BLOCK_UPLOADS_ACTIVE, -1); blockUploadsCompleted.incrementAndGet(); @@ -1602,7 +1602,7 @@ public void blockUploadCompleted( @Override public void blockUploadFailed( Duration timeSinceUploadStarted, - long blockSize) { + int blockSize) { incCounter(StreamStatisticNames.STREAM_WRITE_EXCEPTIONS); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java index 8159244dc3..1401c181e7 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java @@ -41,7 +41,6 @@ import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.fs.PathIOException; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.util.functional.RemoteIterators; import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets; @@ -1027,38 +1026,6 @@ public static long getMultipartSizeProperty(Configuration conf, return partSize; } - /** - * Validates the output stream configuration. - * @param path path: for error messages - * @param conf : configuration object for the given context - * @throws PathIOException Unsupported configuration. - */ - public static void validateOutputStreamConfiguration(final Path path, - Configuration conf) throws PathIOException { - if(!checkDiskBuffer(conf)){ - throw new PathIOException(path.toString(), - "Unable to create OutputStream with the given" - + " multipart upload and buffer configuration."); - } - } - - /** - * Check whether the configuration for S3ABlockOutputStream is - * consistent or not. Multipart uploads allow all kinds of fast buffers to - * be supported. When the option is disabled only disk buffers are allowed to - * be used as the file size might be bigger than the buffer size that can be - * allocated. - * @param conf : configuration object for the given context - * @return true if the disk buffer and the multipart settings are supported - */ - public static boolean checkDiskBuffer(Configuration conf) { - boolean isMultipartUploadEnabled = conf.getBoolean(MULTIPART_UPLOADS_ENABLED, - DEFAULT_MULTIPART_UPLOAD_ENABLED); - return isMultipartUploadEnabled - || FAST_UPLOAD_BUFFER_DISK.equals( - conf.get(FAST_UPLOAD_BUFFER, DEFAULT_FAST_UPLOAD_BUFFER)); - } - /** * Ensure that the long value is in the range of an integer. * @param name property name for error messages diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java index 7e0875d8bb..060a3c3b96 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java @@ -269,6 +269,8 @@ public PutObjectRequest createPutObjectRequest( String dest, File sourceFile, final PutObjectOptions options) { + Preconditions.checkState(sourceFile.length() < Integer.MAX_VALUE, + "File length is too big for a single PUT upload"); activateAuditSpan(); final ObjectMetadata objectMetadata = newObjectMetadata((int) sourceFile.length()); @@ -530,7 +532,7 @@ public UploadPartRequest newUploadPartRequest( String destKey, String uploadId, int partNumber, - long size, + int size, InputStream uploadStream, File sourceFile, Long offset) throws IOException { diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java index 9063423122..91e0342cf7 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java @@ -233,7 +233,7 @@ UploadPartRequest newUploadPartRequest( String destKey, String uploadId, int partNumber, - long size, + int size, InputStream uploadStream, File sourceFile, Long offset) throws IOException; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java index 2a4771925f..cae4d3ef03 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java @@ -196,11 +196,10 @@ AbortMultipartUploadRequest newAbortMultipartUploadRequest( * @param destKey destination object key * @param options options for the request * @return the request. - * @throws PathIOException if multipart uploads are disabled */ InitiateMultipartUploadRequest newMultipartUploadRequest( String destKey, - @Nullable PutObjectOptions options) throws PathIOException; + @Nullable PutObjectOptions options); /** * Complete a multipart upload. @@ -249,7 +248,7 @@ UploadPartRequest newUploadPartRequest( String destKey, String uploadId, int partNumber, - long size, + int size, InputStream uploadStream, File sourceFile, long offset) throws PathIOException; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java index e53c690431..d6044edde2 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java @@ -217,10 +217,6 @@ protected AbstractS3ACommitter( LOG.debug("{} instantiated for job \"{}\" ID {} with destination {}", role, jobName(context), jobIdString(context), outputPath); S3AFileSystem fs = getDestS3AFS(); - if (!fs.isMultipartUploadEnabled()) { - throw new PathCommitException(outputPath, "Multipart uploads are disabled for the FileSystem," - + " the committer can't proceed."); - } // set this thread's context with the job ID. // audit spans created in this thread will pick // up this value., including the commit operations instance diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java index 614ed32cc7..5d9c7bb5d1 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java @@ -124,11 +124,6 @@ public class RequestFactoryImpl implements RequestFactory { */ private final StorageClass storageClass; - /** - * Is multipart upload enabled. - */ - private final boolean isMultipartUploadEnabled; - /** * Constructor. * @param builder builder with all the configuration. @@ -142,7 +137,6 @@ protected RequestFactoryImpl( this.requestPreparer = builder.requestPreparer; this.contentEncoding = builder.contentEncoding; this.storageClass = builder.storageClass; - this.isMultipartUploadEnabled = builder.isMultipartUploadEnabled; } /** @@ -466,10 +460,7 @@ public AbortMultipartUploadRequest newAbortMultipartUploadRequest( @Override public InitiateMultipartUploadRequest newMultipartUploadRequest( final String destKey, - @Nullable final PutObjectOptions options) throws PathIOException { - if (!isMultipartUploadEnabled) { - throw new PathIOException(destKey, "Multipart uploads are disabled."); - } + @Nullable final PutObjectOptions options) { final ObjectMetadata objectMetadata = newObjectMetadata(-1); maybeSetMetadata(options, objectMetadata); final InitiateMultipartUploadRequest initiateMPURequest = @@ -518,7 +509,7 @@ public UploadPartRequest newUploadPartRequest( String destKey, String uploadId, int partNumber, - long size, + int size, InputStream uploadStream, File sourceFile, long offset) throws PathIOException { @@ -691,11 +682,6 @@ public static final class RequestFactoryBuilder { */ private PrepareRequest requestPreparer; - /** - * Is Multipart Enabled on the path. - */ - private boolean isMultipartUploadEnabled = true; - private RequestFactoryBuilder() { } @@ -781,18 +767,6 @@ public RequestFactoryBuilder withRequestPreparer( this.requestPreparer = value; return this; } - - /** - * Multipart upload enabled. - * - * @param value new value - * @return the builder - */ - public RequestFactoryBuilder withMultipartUploadEnabled( - final boolean value) { - this.isMultipartUploadEnabled = value; - return this; - } } /** diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/BlockOutputStreamStatistics.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/BlockOutputStreamStatistics.java index 554b628d00..bd1466b2a4 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/BlockOutputStreamStatistics.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/BlockOutputStreamStatistics.java @@ -32,21 +32,21 @@ public interface BlockOutputStreamStatistics extends Closeable, * Block is queued for upload. * @param blockSize block size. */ - void blockUploadQueued(long blockSize); + void blockUploadQueued(int blockSize); /** * Queued block has been scheduled for upload. * @param timeInQueue time in the queue. * @param blockSize block size. */ - void blockUploadStarted(Duration timeInQueue, long blockSize); + void blockUploadStarted(Duration timeInQueue, int blockSize); /** * A block upload has completed. Duration excludes time in the queue. * @param timeSinceUploadStarted time in since the transfer began. * @param blockSize block size */ - void blockUploadCompleted(Duration timeSinceUploadStarted, long blockSize); + void blockUploadCompleted(Duration timeSinceUploadStarted, int blockSize); /** * A block upload has failed. Duration excludes time in the queue. @@ -57,7 +57,7 @@ public interface BlockOutputStreamStatistics extends Closeable, * @param timeSinceUploadStarted time in since the transfer began. * @param blockSize block size */ - void blockUploadFailed(Duration timeSinceUploadStarted, long blockSize); + void blockUploadFailed(Duration timeSinceUploadStarted, int blockSize); /** * Intermediate report of bytes uploaded. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java index 6454065b24..d10b648417 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java @@ -442,22 +442,22 @@ private static final class EmptyBlockOutputStreamStatistics implements BlockOutputStreamStatistics { @Override - public void blockUploadQueued(final long blockSize) { + public void blockUploadQueued(final int blockSize) { } @Override public void blockUploadStarted(final Duration timeInQueue, - final long blockSize) { + final int blockSize) { } @Override public void blockUploadCompleted(final Duration timeSinceUploadStarted, - final long blockSize) { + final int blockSize) { } @Override public void blockUploadFailed(final Duration timeSinceUploadStarted, - final long blockSize) { + final int blockSize) { } @Override diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md index 549e69115d..dcc459c25d 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md @@ -1723,9 +1723,7 @@ The "fast" output stream 1. Uploads large files as blocks with the size set by `fs.s3a.multipart.size`. That is: the threshold at which multipart uploads - begin and the size of each upload are identical. This behavior can be enabled - or disabled by using the flag `fs.s3a.multipart.uploads.enabled` which by - default is set to true. + begin and the size of each upload are identical. 1. Buffers blocks to disk (default) or in on-heap or off-heap memory. 1. Uploads blocks in parallel in background threads. 1. Begins uploading blocks as soon as the buffered data exceeds this partition diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3AFileSystem.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3AFileSystem.java index 658ceb4917..42fc630924 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3AFileSystem.java @@ -200,11 +200,6 @@ public boolean isMagicCommitEnabled() { return true; } - @Override - public boolean isMultipartUploadEnabled() { - return true; - } - /** * Make operation to set the s3 client public. * @param client client. diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocolFailure.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocolFailure.java deleted file mode 100644 index 41593c2b26..0000000000 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocolFailure.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.s3a.commit.magic; - -import org.junit.Test; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.s3a.AbstractS3ATestBase; -import org.apache.hadoop.fs.s3a.commit.CommitConstants; -import org.apache.hadoop.fs.s3a.commit.PathCommitException; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; - -import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_UPLOADS_ENABLED; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBucketOverrides; -import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_NAME; -import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_COMMITTER_ENABLED; -import static org.apache.hadoop.fs.s3a.commit.CommitConstants.S3A_COMMITTER_FACTORY_KEY; -import static org.apache.hadoop.test.LambdaTestUtils.intercept; - -/** - * Verify that the magic committer cannot be created if the FS doesn't support multipart - * uploads. - */ -public class ITestMagicCommitProtocolFailure extends AbstractS3ATestBase { - - @Override - protected Configuration createConfiguration() { - Configuration conf = super.createConfiguration(); - removeBucketOverrides(getTestBucketName(conf), conf, - MAGIC_COMMITTER_ENABLED, - S3A_COMMITTER_FACTORY_KEY, - FS_S3A_COMMITTER_NAME, - MULTIPART_UPLOADS_ENABLED); - conf.setBoolean(MULTIPART_UPLOADS_ENABLED, false); - conf.set(S3A_COMMITTER_FACTORY_KEY, CommitConstants.S3A_COMMITTER_FACTORY); - conf.set(FS_S3A_COMMITTER_NAME, CommitConstants.COMMITTER_NAME_MAGIC); - return conf; - } - - @Test - public void testCreateCommitter() throws Exception { - TaskAttemptContext tContext = new TaskAttemptContextImpl(getConfiguration(), - new TaskAttemptID()); - Path commitPath = methodPath(); - LOG.debug("Trying to create a committer on the path: {}", commitPath); - intercept(PathCommitException.class, - () -> new MagicS3GuardCommitter(commitPath, tContext)); - } -} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocolFailure.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocolFailure.java deleted file mode 100644 index a6d2c57d1d..0000000000 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocolFailure.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.s3a.commit.staging.integration; - -import org.junit.Test; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.s3a.AbstractS3ATestBase; -import org.apache.hadoop.fs.s3a.commit.CommitConstants; -import org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants; -import org.apache.hadoop.fs.s3a.commit.PathCommitException; -import org.apache.hadoop.fs.s3a.commit.staging.StagingCommitter; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; - -import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_UPLOADS_ENABLED; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBucketOverrides; -import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_NAME; -import static org.apache.hadoop.fs.s3a.commit.CommitConstants.S3A_COMMITTER_FACTORY_KEY; -import static org.apache.hadoop.test.LambdaTestUtils.intercept; - -/** - * Verify that a staging committer cannot be created if the FS doesn't support multipart - * uploads. - */ -public class ITestStagingCommitProtocolFailure extends AbstractS3ATestBase { - - @Override - protected Configuration createConfiguration() { - Configuration conf = super.createConfiguration(); - removeBucketOverrides(getTestBucketName(conf), conf, - S3A_COMMITTER_FACTORY_KEY, - FS_S3A_COMMITTER_NAME, - MULTIPART_UPLOADS_ENABLED); - conf.setBoolean(MULTIPART_UPLOADS_ENABLED, false); - conf.set(S3A_COMMITTER_FACTORY_KEY, CommitConstants.S3A_COMMITTER_FACTORY); - conf.set(FS_S3A_COMMITTER_NAME, InternalCommitterConstants.COMMITTER_NAME_STAGING); - return conf; - } - - @Test - public void testCreateCommitter() throws Exception { - TaskAttemptContext tContext = new TaskAttemptContextImpl(getConfiguration(), - new TaskAttemptID()); - Path commitPath = methodPath(); - LOG.debug("Trying to create a committer on the path: {}", commitPath); - intercept(PathCommitException.class, - () -> new StagingCommitter(commitPath, tContext)); - } -} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java index 7c85142d43..5c243bb820 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java @@ -20,7 +20,6 @@ import java.io.ByteArrayInputStream; import java.io.File; -import java.io.IOException; import java.util.ArrayList; import java.util.concurrent.atomic.AtomicLong; @@ -156,7 +155,7 @@ public T prepareRequest(final T t) { * Create objects through the factory. * @param factory factory */ - private void createFactoryObjects(RequestFactory factory) throws IOException { + private void createFactoryObjects(RequestFactory factory) { String path = "path"; String path2 = "path2"; String id = "1"; diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUploadSinglePut.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUploadSinglePut.java deleted file mode 100644 index 08192969e2..0000000000 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUploadSinglePut.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.s3a.scale; - -import java.io.IOException; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.junit.Test; - -import org.apache.hadoop.fs.s3a.S3AFileSystem; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.contract.ContractTestUtils; -import org.apache.hadoop.fs.s3a.Constants; - -import static org.apache.hadoop.fs.contract.ContractTestUtils.IO_CHUNK_BUFFER_SIZE; -import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER; -import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER_DISK; -import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE; -import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_UPLOADS_ENABLED; -import static org.apache.hadoop.fs.s3a.Constants.REQUEST_TIMEOUT; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestPropertyBytes; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBucketOverrides; -import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_PUT_REQUESTS; -import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter; - -/** - * Test a file upload using a single PUT operation. Multipart uploads will - * be disabled in the test. - */ -public class ITestS3AHugeFileUploadSinglePut extends S3AScaleTestBase { - - public static final Logger LOG = LoggerFactory.getLogger( - ITestS3AHugeFileUploadSinglePut.class); - - private long fileSize; - - @Override - protected Configuration createScaleConfiguration() { - Configuration conf = super.createScaleConfiguration(); - removeBucketOverrides(getTestBucketName(conf), conf, - FAST_UPLOAD_BUFFER, - IO_CHUNK_BUFFER_SIZE, - KEY_HUGE_FILESIZE, - MULTIPART_UPLOADS_ENABLED, - MULTIPART_SIZE, - REQUEST_TIMEOUT); - conf.setBoolean(Constants.MULTIPART_UPLOADS_ENABLED, false); - fileSize = getTestPropertyBytes(conf, KEY_HUGE_FILESIZE, - DEFAULT_HUGE_FILESIZE); - // set a small part size to verify it does not impact block allocation size - conf.setLong(MULTIPART_SIZE, 10_000); - conf.set(FAST_UPLOAD_BUFFER, FAST_UPLOAD_BUFFER_DISK); - conf.setInt(IO_CHUNK_BUFFER_SIZE, 655360); - conf.set(REQUEST_TIMEOUT, "1h"); - return conf; - } - - @Test - public void uploadFileSinglePut() throws IOException { - LOG.info("Creating file with size : {}", fileSize); - S3AFileSystem fs = getFileSystem(); - ContractTestUtils.createAndVerifyFile(fs, - methodPath(), fileSize); - // Exactly three put requests should be made during the upload of the file - // First one being the creation of the directory marker - // Second being the creation of the test file - // Third being the creation of directory marker on the file delete - assertThatStatisticCounter(fs.getIOStatistics(), OBJECT_PUT_REQUESTS.getSymbol()) - .isEqualTo(3); - } -}