> tasks, long timeout,
- TimeUnit unit)
- throws InterruptedException, ExecutionException, TimeoutException {
- throw new RuntimeException("Not implemented");
- }
-
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
index 64fd8e5302..65df0bf888 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
@@ -35,6 +35,9 @@ public final class Constants {
private Constants() {
}
+ /** The minimum multipart size which S3 supports. */
+ public static final int MULTIPART_MIN_SIZE = 5 * 1024 * 1024;
+
// s3 access key
public static final String ACCESS_KEY = "fs.s3a.access.key";
@@ -124,14 +127,72 @@ private Constants() {
// comma separated list of directories
public static final String BUFFER_DIR = "fs.s3a.buffer.dir";
- // should we upload directly from memory rather than using a file buffer
+ // switch to the fast block-by-block upload mechanism
public static final String FAST_UPLOAD = "fs.s3a.fast.upload";
public static final boolean DEFAULT_FAST_UPLOAD = false;
//initial size of memory buffer for a fast upload
+ @Deprecated
public static final String FAST_BUFFER_SIZE = "fs.s3a.fast.buffer.size";
public static final int DEFAULT_FAST_BUFFER_SIZE = 1048576; //1MB
+ /**
+ * What buffer to use.
+ * Default is {@link #FAST_UPLOAD_BUFFER_DISK}
+ * Value: {@value}
+ */
+ @InterfaceStability.Unstable
+ public static final String FAST_UPLOAD_BUFFER =
+ "fs.s3a.fast.upload.buffer";
+
+ /**
+ * Buffer blocks to disk: {@value}.
+ * Capacity is limited to available disk space.
+ */
+
+ @InterfaceStability.Unstable
+ public static final String FAST_UPLOAD_BUFFER_DISK = "disk";
+
+ /**
+ * Use an in-memory array. Fast but will run of heap rapidly: {@value}.
+ */
+ @InterfaceStability.Unstable
+ public static final String FAST_UPLOAD_BUFFER_ARRAY = "array";
+
+ /**
+ * Use a byte buffer. May be more memory efficient than the
+ * {@link #FAST_UPLOAD_BUFFER_ARRAY}: {@value}.
+ */
+ @InterfaceStability.Unstable
+ public static final String FAST_UPLOAD_BYTEBUFFER = "bytebuffer";
+
+ /**
+ * Default buffer option: {@value}.
+ */
+ @InterfaceStability.Unstable
+ public static final String DEFAULT_FAST_UPLOAD_BUFFER =
+ FAST_UPLOAD_BUFFER_DISK;
+
+ /**
+ * Maximum Number of blocks a single output stream can have
+ * active (uploading, or queued to the central FileSystem
+ * instance's pool of queued operations.
+ * This stops a single stream overloading the shared thread pool.
+ * {@value}
+ *
+ * Default is {@link #DEFAULT_FAST_UPLOAD_ACTIVE_BLOCKS}
+ */
+ @InterfaceStability.Unstable
+ public static final String FAST_UPLOAD_ACTIVE_BLOCKS =
+ "fs.s3a.fast.upload.active.blocks";
+
+ /**
+ * Limit of queued block upload operations before writes
+ * block. Value: {@value}
+ */
+ @InterfaceStability.Unstable
+ public static final int DEFAULT_FAST_UPLOAD_ACTIVE_BLOCKS = 4;
+
// Private | PublicRead | PublicReadWrite | AuthenticatedRead |
// LogDeliveryWrite | BucketOwnerRead | BucketOwnerFullControl
public static final String CANNED_ACL = "fs.s3a.acl.default";
@@ -145,7 +206,7 @@ private Constants() {
// purge any multipart uploads older than this number of seconds
public static final String PURGE_EXISTING_MULTIPART_AGE =
"fs.s3a.multipart.purge.age";
- public static final long DEFAULT_PURGE_EXISTING_MULTIPART_AGE = 14400;
+ public static final long DEFAULT_PURGE_EXISTING_MULTIPART_AGE = 86400;
// s3 server-side encryption
public static final String SERVER_SIDE_ENCRYPTION_ALGORITHM =
@@ -215,4 +276,10 @@ private Constants() {
public static final Class extends S3ClientFactory>
DEFAULT_S3_CLIENT_FACTORY_IMPL =
S3ClientFactory.DefaultS3ClientFactory.class;
+
+ /**
+ * Maximum number of partitions in a multipart upload: {@value}.
+ */
+ @InterfaceAudience.Private
+ public static final int MAX_MULTIPART_COUNT = 10000;
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
new file mode 100644
index 0000000000..b66a23ff6f
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
@@ -0,0 +1,703 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.event.ProgressEvent;
+import com.amazonaws.event.ProgressEventType;
+import com.amazonaws.event.ProgressListener;
+import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
+import com.amazonaws.services.s3.model.PartETag;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.model.PutObjectResult;
+import com.amazonaws.services.s3.model.UploadPartRequest;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.util.Progressable;
+
+import static org.apache.hadoop.fs.s3a.S3AUtils.*;
+import static org.apache.hadoop.fs.s3a.Statistic.*;
+
+/**
+ * Upload files/parts directly via different buffering mechanisms:
+ * including memory and disk.
+ *
+ * If the stream is closed and no update has started, then the upload
+ * is instead done as a single PUT operation.
+ *
+ * Unstable: statistics and error handling might evolve.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+class S3ABlockOutputStream extends OutputStream {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(S3ABlockOutputStream.class);
+
+ /** Owner FileSystem. */
+ private final S3AFileSystem fs;
+
+ /** Object being uploaded. */
+ private final String key;
+
+ /** Size of all blocks. */
+ private final int blockSize;
+
+ /** Callback for progress. */
+ private final ProgressListener progressListener;
+ private final ListeningExecutorService executorService;
+
+ /**
+ * Retry policy for multipart commits; not all AWS SDK versions retry that.
+ */
+ private final RetryPolicy retryPolicy =
+ RetryPolicies.retryUpToMaximumCountWithProportionalSleep(
+ 5,
+ 2000,
+ TimeUnit.MILLISECONDS);
+ /**
+ * Factory for blocks.
+ */
+ private final S3ADataBlocks.BlockFactory blockFactory;
+
+ /** Preallocated byte buffer for writing single characters. */
+ private final byte[] singleCharWrite = new byte[1];
+
+ /** Multipart upload details; null means none started. */
+ private MultiPartUpload multiPartUpload;
+
+ /** Closed flag. */
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+
+ /** Current data block. Null means none currently active */
+ private S3ADataBlocks.DataBlock activeBlock;
+
+ /** Count of blocks uploaded. */
+ private long blockCount = 0;
+
+ /** Statistics to build up. */
+ private final S3AInstrumentation.OutputStreamStatistics statistics;
+
+ /**
+ * Write operation helper; encapsulation of the filesystem operations.
+ */
+ private final S3AFileSystem.WriteOperationHelper writeOperationHelper;
+
+ /**
+ * An S3A output stream which uploads partitions in a separate pool of
+ * threads; different {@link S3ADataBlocks.BlockFactory}
+ * instances can control where data is buffered.
+ *
+ * @param fs S3AFilesystem
+ * @param key S3 object to work on.
+ * @param executorService the executor service to use to schedule work
+ * @param progress report progress in order to prevent timeouts. If
+ * this object implements {@code ProgressListener} then it will be
+ * directly wired up to the AWS client, so receive detailed progress
+ * information.
+ * @param blockSize size of a single block.
+ * @param blockFactory factory for creating stream destinations
+ * @param statistics stats for this stream
+ * @param writeOperationHelper state of the write operation.
+ * @throws IOException on any problem
+ */
+ S3ABlockOutputStream(S3AFileSystem fs,
+ String key,
+ ExecutorService executorService,
+ Progressable progress,
+ long blockSize,
+ S3ADataBlocks.BlockFactory blockFactory,
+ S3AInstrumentation.OutputStreamStatistics statistics,
+ S3AFileSystem.WriteOperationHelper writeOperationHelper)
+ throws IOException {
+ this.fs = fs;
+ this.key = key;
+ this.blockFactory = blockFactory;
+ this.blockSize = (int) blockSize;
+ this.statistics = statistics;
+ this.writeOperationHelper = writeOperationHelper;
+ Preconditions.checkArgument(blockSize >= Constants.MULTIPART_MIN_SIZE,
+ "Block size is too small: %d", blockSize);
+ this.executorService = MoreExecutors.listeningDecorator(executorService);
+ this.multiPartUpload = null;
+ this.progressListener = (progress instanceof ProgressListener) ?
+ (ProgressListener) progress
+ : new ProgressableListener(progress);
+ // create that first block. This guarantees that an open + close sequence
+ // writes a 0-byte entry.
+ createBlockIfNeeded();
+ LOG.debug("Initialized S3ABlockOutputStream for {}" +
+ " output to {}", writeOperationHelper, activeBlock);
+ }
+
+ /**
+ * Demand create a destination block.
+ * @return the active block; null if there isn't one.
+ * @throws IOException on any failure to create
+ */
+ private synchronized S3ADataBlocks.DataBlock createBlockIfNeeded()
+ throws IOException {
+ if (activeBlock == null) {
+ blockCount++;
+ if (blockCount>= Constants.MAX_MULTIPART_COUNT) {
+ LOG.error("Number of partitions in stream exceeds limit for S3: " +
+ + Constants.MAX_MULTIPART_COUNT + " write may fail.");
+ }
+ activeBlock = blockFactory.create(this.blockSize);
+ }
+ return activeBlock;
+ }
+
+ /**
+ * Synchronized accessor to the active block.
+ * @return the active block; null if there isn't one.
+ */
+ private synchronized S3ADataBlocks.DataBlock getActiveBlock() {
+ return activeBlock;
+ }
+
+ /**
+ * Predicate to query whether or not there is an active block.
+ * @return true if there is an active block.
+ */
+ private synchronized boolean hasActiveBlock() {
+ return activeBlock != null;
+ }
+
+ /**
+ * Clear the active block.
+ */
+ private void clearActiveBlock() {
+ LOG.debug("Clearing active block");
+ synchronized (this) {
+ activeBlock = null;
+ }
+ }
+
+ /**
+ * Check for the filesystem being open.
+ * @throws IOException if the filesystem is closed.
+ */
+ void checkOpen() throws IOException {
+ if (closed.get()) {
+ throw new IOException("Filesystem " + writeOperationHelper + " closed");
+ }
+ }
+
+ /**
+ * The flush operation does not trigger an upload; that awaits
+ * the next block being full. What it does do is call {@code flush() }
+ * on the current block, leaving it to choose how to react.
+ * @throws IOException Any IO problem.
+ */
+ @Override
+ public synchronized void flush() throws IOException {
+ checkOpen();
+ S3ADataBlocks.DataBlock dataBlock = getActiveBlock();
+ if (dataBlock != null) {
+ dataBlock.flush();
+ }
+ }
+
+ /**
+ * Writes a byte to the destination. If this causes the buffer to reach
+ * its limit, the actual upload is submitted to the threadpool.
+ * @param b the int of which the lowest byte is written
+ * @throws IOException on any problem
+ */
+ @Override
+ public synchronized void write(int b) throws IOException {
+ singleCharWrite[0] = (byte)b;
+ write(singleCharWrite, 0, 1);
+ }
+
+ /**
+ * Writes a range of bytes from to the memory buffer. If this causes the
+ * buffer to reach its limit, the actual upload is submitted to the
+ * threadpool and the remainder of the array is written to memory
+ * (recursively).
+ * @param source byte array containing
+ * @param offset offset in array where to start
+ * @param len number of bytes to be written
+ * @throws IOException on any problem
+ */
+ @Override
+ public synchronized void write(byte[] source, int offset, int len)
+ throws IOException {
+
+ S3ADataBlocks.validateWriteArgs(source, offset, len);
+ checkOpen();
+ if (len == 0) {
+ return;
+ }
+ S3ADataBlocks.DataBlock block = createBlockIfNeeded();
+ int written = block.write(source, offset, len);
+ int remainingCapacity = block.remainingCapacity();
+ if (written < len) {
+ // not everything was written —the block has run out
+ // of capacity
+ // Trigger an upload then process the remainder.
+ LOG.debug("writing more data than block has capacity -triggering upload");
+ uploadCurrentBlock();
+ // tail recursion is mildly expensive, but given buffer sizes must be MB.
+ // it's unlikely to recurse very deeply.
+ this.write(source, offset + written, len - written);
+ } else {
+ if (remainingCapacity == 0) {
+ // the whole buffer is done, trigger an upload
+ uploadCurrentBlock();
+ }
+ }
+ }
+
+ /**
+ * Start an asynchronous upload of the current block.
+ * @throws IOException Problems opening the destination for upload
+ * or initializing the upload.
+ */
+ private synchronized void uploadCurrentBlock() throws IOException {
+ Preconditions.checkState(hasActiveBlock(), "No active block");
+ LOG.debug("Writing block # {}", blockCount);
+ if (multiPartUpload == null) {
+ LOG.debug("Initiating Multipart upload");
+ multiPartUpload = new MultiPartUpload();
+ }
+ try {
+ multiPartUpload.uploadBlockAsync(getActiveBlock());
+ } finally {
+ // set the block to null, so the next write will create a new block.
+ clearActiveBlock();
+ }
+ }
+
+ /**
+ * Close the stream.
+ *
+ * This will not return until the upload is complete
+ * or the attempt to perform the upload has failed.
+ * Exceptions raised in this method are indicative that the write has
+ * failed and data is at risk of being lost.
+ * @throws IOException on any failure.
+ */
+ @Override
+ public void close() throws IOException {
+ if (closed.getAndSet(true)) {
+ // already closed
+ LOG.debug("Ignoring close() as stream is already closed");
+ return;
+ }
+ S3ADataBlocks.DataBlock block = getActiveBlock();
+ boolean hasBlock = hasActiveBlock();
+ LOG.debug("{}: Closing block #{}: current block= {}",
+ this,
+ blockCount,
+ hasBlock ? block : "(none)");
+ try {
+ if (multiPartUpload == null) {
+ if (hasBlock) {
+ // no uploads of data have taken place, put the single block up.
+ // This must happen even if there is no data, so that 0 byte files
+ // are created.
+ putObject();
+ }
+ } else {
+ // there has already been at least one block scheduled for upload;
+ // put up the current then wait
+ if (hasBlock && block.hasData()) {
+ //send last part
+ uploadCurrentBlock();
+ }
+ // wait for the partial uploads to finish
+ final List partETags =
+ multiPartUpload.waitForAllPartUploads();
+ // then complete the operation
+ multiPartUpload.complete(partETags);
+ }
+ LOG.debug("Upload complete for {}", writeOperationHelper);
+ } catch (IOException ioe) {
+ writeOperationHelper.writeFailed(ioe);
+ throw ioe;
+ } finally {
+ LOG.debug("Closing block and factory");
+ IOUtils.closeStream(block);
+ IOUtils.closeStream(blockFactory);
+ LOG.debug("Statistics: {}", statistics);
+ IOUtils.closeStream(statistics);
+ clearActiveBlock();
+ }
+ // All end of write operations, including deleting fake parent directories
+ writeOperationHelper.writeSuccessful();
+ }
+
+ /**
+ * Upload the current block as a single PUT request; if the buffer
+ * is empty a 0-byte PUT will be invoked, as it is needed to create an
+ * entry at the far end.
+ * @throws IOException any problem.
+ */
+ private void putObject() throws IOException {
+ LOG.debug("Executing regular upload for {}", writeOperationHelper);
+
+ final S3ADataBlocks.DataBlock block = getActiveBlock();
+ int size = block.dataSize();
+ final PutObjectRequest putObjectRequest =
+ writeOperationHelper.newPutRequest(
+ block.startUpload(),
+ size);
+ long transferQueueTime = now();
+ BlockUploadProgress callback =
+ new BlockUploadProgress(
+ block, progressListener, transferQueueTime);
+ putObjectRequest.setGeneralProgressListener(callback);
+ statistics.blockUploadQueued(size);
+ ListenableFuture putObjectResult =
+ executorService.submit(new Callable() {
+ @Override
+ public PutObjectResult call() throws Exception {
+ PutObjectResult result = fs.putObjectDirect(putObjectRequest);
+ block.close();
+ return result;
+ }
+ });
+ clearActiveBlock();
+ //wait for completion
+ try {
+ putObjectResult.get();
+ } catch (InterruptedException ie) {
+ LOG.warn("Interrupted object upload", ie);
+ Thread.currentThread().interrupt();
+ } catch (ExecutionException ee) {
+ throw extractException("regular upload", key, ee);
+ }
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder(
+ "S3ABlockOutputStream{");
+ sb.append(writeOperationHelper.toString());
+ sb.append(", blockSize=").append(blockSize);
+ // unsynced access; risks consistency in exchange for no risk of deadlock.
+ S3ADataBlocks.DataBlock block = activeBlock;
+ if (block != null) {
+ sb.append(", activeBlock=").append(block);
+ }
+ sb.append('}');
+ return sb.toString();
+ }
+
+ private void incrementWriteOperations() {
+ fs.incrementWriteOperations();
+ }
+
+ /**
+ * Current time in milliseconds.
+ * @return time
+ */
+ private long now() {
+ return System.currentTimeMillis();
+ }
+
+ /**
+ * Multiple partition upload.
+ */
+ private class MultiPartUpload {
+ private final String uploadId;
+ private final List> partETagsFutures;
+
+ public MultiPartUpload() throws IOException {
+ this.uploadId = writeOperationHelper.initiateMultiPartUpload();
+ this.partETagsFutures = new ArrayList<>(2);
+ LOG.debug("Initiated multi-part upload for {} with " +
+ "id '{}'", writeOperationHelper, uploadId);
+ }
+
+ /**
+ * Upload a block of data.
+ * This will take the block
+ * @param block block to upload
+ * @throws IOException upload failure
+ */
+ private void uploadBlockAsync(final S3ADataBlocks.DataBlock block)
+ throws IOException {
+ LOG.debug("Queueing upload of {}", block);
+ final int size = block.dataSize();
+ final InputStream uploadStream = block.startUpload();
+ final int currentPartNumber = partETagsFutures.size() + 1;
+ final UploadPartRequest request =
+ writeOperationHelper.newUploadPartRequest(
+ uploadId,
+ uploadStream,
+ currentPartNumber,
+ size);
+ long transferQueueTime = now();
+ BlockUploadProgress callback =
+ new BlockUploadProgress(
+ block, progressListener, transferQueueTime);
+ request.setGeneralProgressListener(callback);
+ statistics.blockUploadQueued(block.dataSize());
+ ListenableFuture partETagFuture =
+ executorService.submit(new Callable() {
+ @Override
+ public PartETag call() throws Exception {
+ // this is the queued upload operation
+ LOG.debug("Uploading part {} for id '{}'", currentPartNumber,
+ uploadId);
+ // do the upload
+ PartETag partETag = fs.uploadPart(request).getPartETag();
+ LOG.debug("Completed upload of {}", block);
+ LOG.debug("Stream statistics of {}", statistics);
+
+ // close the block
+ block.close();
+ return partETag;
+ }
+ });
+ partETagsFutures.add(partETagFuture);
+ }
+
+ /**
+ * Block awaiting all outstanding uploads to complete.
+ * @return list of results
+ * @throws IOException IO Problems
+ */
+ private List waitForAllPartUploads() throws IOException {
+ LOG.debug("Waiting for {} uploads to complete", partETagsFutures.size());
+ try {
+ return Futures.allAsList(partETagsFutures).get();
+ } catch (InterruptedException ie) {
+ LOG.warn("Interrupted partUpload", ie);
+ Thread.currentThread().interrupt();
+ return null;
+ } catch (ExecutionException ee) {
+ //there is no way of recovering so abort
+ //cancel all partUploads
+ LOG.debug("While waiting for upload completion", ee);
+ LOG.debug("Cancelling futures");
+ for (ListenableFuture future : partETagsFutures) {
+ future.cancel(true);
+ }
+ //abort multipartupload
+ this.abort();
+ throw extractException("Multi-part upload with id '" + uploadId
+ + "' to " + key, key, ee);
+ }
+ }
+
+ /**
+ * This completes a multipart upload.
+ * Sometimes it fails; here retries are handled to avoid losing all data
+ * on a transient failure.
+ * @param partETags list of partial uploads
+ * @throws IOException on any problem
+ */
+ private CompleteMultipartUploadResult complete(List partETags)
+ throws IOException {
+ int retryCount = 0;
+ AmazonClientException lastException;
+ String operation =
+ String.format("Completing multi-part upload for key '%s'," +
+ " id '%s' with %s partitions ",
+ key, uploadId, partETags.size());
+ do {
+ try {
+ LOG.debug(operation);
+ return writeOperationHelper.completeMultipartUpload(
+ uploadId,
+ partETags);
+ } catch (AmazonClientException e) {
+ lastException = e;
+ statistics.exceptionInMultipartComplete();
+ }
+ } while (shouldRetry(operation, lastException, retryCount++));
+ // this point is only reached if the operation failed more than
+ // the allowed retry count
+ throw translateException(operation, key, lastException);
+ }
+
+ /**
+ * Abort a multi-part upload. Retries are attempted on failures.
+ * IOExceptions are caught; this is expected to be run as a cleanup process.
+ */
+ public void abort() {
+ int retryCount = 0;
+ AmazonClientException lastException;
+ fs.incrementStatistic(OBJECT_MULTIPART_UPLOAD_ABORTED);
+ String operation =
+ String.format("Aborting multi-part upload for '%s', id '%s",
+ writeOperationHelper, uploadId);
+ do {
+ try {
+ LOG.debug(operation);
+ writeOperationHelper.abortMultipartUpload(uploadId);
+ return;
+ } catch (AmazonClientException e) {
+ lastException = e;
+ statistics.exceptionInMultipartAbort();
+ }
+ } while (shouldRetry(operation, lastException, retryCount++));
+ // this point is only reached if the operation failed more than
+ // the allowed retry count
+ LOG.warn("Unable to abort multipart upload, you may need to purge " +
+ "uploaded parts", lastException);
+ }
+
+ /**
+ * Predicate to determine whether a failed operation should
+ * be attempted again.
+ * If a retry is advised, the exception is automatically logged and
+ * the filesystem statistic {@link Statistic#IGNORED_ERRORS} incremented.
+ * The method then sleeps for the sleep time suggested by the sleep policy;
+ * if the sleep is interrupted then {@code Thread.interrupted()} is set
+ * to indicate the thread was interrupted; then false is returned.
+ *
+ * @param operation operation for log message
+ * @param e exception raised.
+ * @param retryCount number of retries already attempted
+ * @return true if another attempt should be made
+ */
+ private boolean shouldRetry(String operation,
+ AmazonClientException e,
+ int retryCount) {
+ try {
+ RetryPolicy.RetryAction retryAction =
+ retryPolicy.shouldRetry(e, retryCount, 0, true);
+ boolean retry = retryAction == RetryPolicy.RetryAction.RETRY;
+ if (retry) {
+ fs.incrementStatistic(IGNORED_ERRORS);
+ LOG.info("Retrying {} after exception ", operation, e);
+ Thread.sleep(retryAction.delayMillis);
+ }
+ return retry;
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ return false;
+ } catch (Exception ignored) {
+ return false;
+ }
+ }
+
+ }
+
+ /**
+ * The upload progress listener registered for events returned
+ * during the upload of a single block.
+ * It updates statistics and handles the end of the upload.
+ * Transfer failures are logged at WARN.
+ */
+ private final class BlockUploadProgress implements ProgressListener {
+ private final S3ADataBlocks.DataBlock block;
+ private final ProgressListener nextListener;
+ private final long transferQueueTime;
+ private long transferStartTime;
+
+ /**
+ * Track the progress of a single block upload.
+ * @param block block to monitor
+ * @param nextListener optional next progress listener
+ * @param transferQueueTime time the block was transferred
+ * into the queue
+ */
+ private BlockUploadProgress(S3ADataBlocks.DataBlock block,
+ ProgressListener nextListener,
+ long transferQueueTime) {
+ this.block = block;
+ this.transferQueueTime = transferQueueTime;
+ this.nextListener = nextListener;
+ }
+
+ @Override
+ public void progressChanged(ProgressEvent progressEvent) {
+ ProgressEventType eventType = progressEvent.getEventType();
+ long bytesTransferred = progressEvent.getBytesTransferred();
+
+ int size = block.dataSize();
+ switch (eventType) {
+
+ case REQUEST_BYTE_TRANSFER_EVENT:
+ // bytes uploaded
+ statistics.bytesTransferred(bytesTransferred);
+ break;
+
+ case TRANSFER_PART_STARTED_EVENT:
+ transferStartTime = now();
+ statistics.blockUploadStarted(transferStartTime - transferQueueTime,
+ size);
+ incrementWriteOperations();
+ break;
+
+ case TRANSFER_PART_COMPLETED_EVENT:
+ statistics.blockUploadCompleted(now() - transferStartTime, size);
+ break;
+
+ case TRANSFER_PART_FAILED_EVENT:
+ statistics.blockUploadFailed(now() - transferStartTime, size);
+ LOG.warn("Transfer failure of block {}", block);
+ break;
+
+ default:
+ // nothing
+ }
+
+ if (nextListener != null) {
+ nextListener.progressChanged(progressEvent);
+ }
+ }
+ }
+
+ /**
+ * Bridge from AWS {@code ProgressListener} to Hadoop {@link Progressable}.
+ */
+ private static class ProgressableListener implements ProgressListener {
+ private final Progressable progress;
+
+ public ProgressableListener(Progressable progress) {
+ this.progress = progress;
+ }
+
+ public void progressChanged(ProgressEvent progressEvent) {
+ if (progress != null) {
+ progress.progress();
+ }
+ }
+ }
+
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java
new file mode 100644
index 0000000000..0fe2af7943
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java
@@ -0,0 +1,821 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.util.DirectBufferPool;
+
+import static org.apache.hadoop.fs.s3a.S3ADataBlocks.DataBlock.DestState.*;
+
+/**
+ * Set of classes to support output streaming into blocks which are then
+ * uploaded as partitions.
+ */
+final class S3ADataBlocks {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(S3ADataBlocks.class);
+
+ private S3ADataBlocks() {
+ }
+
+ /**
+ * Validate args to a write command. These are the same validation checks
+ * expected for any implementation of {@code OutputStream.write()}.
+ * @param b byte array containing data
+ * @param off offset in array where to start
+ * @param len number of bytes to be written
+ * @throws NullPointerException for a null buffer
+ * @throws IndexOutOfBoundsException if indices are out of range
+ */
+ static void validateWriteArgs(byte[] b, int off, int len)
+ throws IOException {
+ Preconditions.checkNotNull(b);
+ if ((off < 0) || (off > b.length) || (len < 0) ||
+ ((off + len) > b.length) || ((off + len) < 0)) {
+ throw new IndexOutOfBoundsException(
+ "write (b[" + b.length + "], " + off + ", " + len + ')');
+ }
+ }
+
+ /**
+ * Create a factory.
+ * @param owner factory owner
+ * @param name factory name -the option from {@link Constants}.
+ * @return the factory, ready to be initialized.
+ * @throws IllegalArgumentException if the name is unknown.
+ */
+ static BlockFactory createFactory(S3AFileSystem owner,
+ String name) {
+ switch (name) {
+ case Constants.FAST_UPLOAD_BUFFER_ARRAY:
+ return new ArrayBlockFactory(owner);
+ case Constants.FAST_UPLOAD_BUFFER_DISK:
+ return new DiskBlockFactory(owner);
+ case Constants.FAST_UPLOAD_BYTEBUFFER:
+ return new ByteBufferBlockFactory(owner);
+ default:
+ throw new IllegalArgumentException("Unsupported block buffer" +
+ " \"" + name + '"');
+ }
+ }
+
+ /**
+ * Base class for block factories.
+ */
+ static abstract class BlockFactory implements Closeable {
+
+ private final S3AFileSystem owner;
+
+ protected BlockFactory(S3AFileSystem owner) {
+ this.owner = owner;
+ }
+
+
+ /**
+ * Create a block.
+ * @param limit limit of the block.
+ * @return a new block.
+ */
+ abstract DataBlock create(int limit) throws IOException;
+
+ /**
+ * Implement any close/cleanup operation.
+ * Base class is a no-op
+ * @throws IOException -ideally, it shouldn't.
+ */
+ @Override
+ public void close() throws IOException {
+ }
+
+ /**
+ * Owner.
+ */
+ protected S3AFileSystem getOwner() {
+ return owner;
+ }
+ }
+
+ /**
+ * This represents a block being uploaded.
+ */
+ static abstract class DataBlock implements Closeable {
+
+ enum DestState {Writing, Upload, Closed}
+
+ private volatile DestState state = Writing;
+
+ /**
+ * Atomically enter a state, verifying current state.
+ * @param current current state. null means "no check"
+ * @param next next state
+ * @throws IllegalStateException if the current state is not as expected
+ */
+ protected synchronized final void enterState(DestState current,
+ DestState next)
+ throws IllegalStateException {
+ verifyState(current);
+ LOG.debug("{}: entering state {}", this, next);
+ state = next;
+ }
+
+ /**
+ * Verify that the block is in the declared state.
+ * @param expected expected state.
+ * @throws IllegalStateException if the DataBlock is in the wrong state
+ */
+ protected final void verifyState(DestState expected)
+ throws IllegalStateException {
+ if (expected != null && state != expected) {
+ throw new IllegalStateException("Expected stream state " + expected
+ + " -but actual state is " + state + " in " + this);
+ }
+ }
+
+ /**
+ * Current state.
+ * @return the current state.
+ */
+ final DestState getState() {
+ return state;
+ }
+
+ /**
+ * Return the current data size.
+ * @return the size of the data
+ */
+ abstract int dataSize();
+
+ /**
+ * Predicate to verify that the block has the capacity to write
+ * the given set of bytes.
+ * @param bytes number of bytes desired to be written.
+ * @return true if there is enough space.
+ */
+ abstract boolean hasCapacity(long bytes);
+
+ /**
+ * Predicate to check if there is data in the block.
+ * @return true if there is
+ */
+ boolean hasData() {
+ return dataSize() > 0;
+ }
+
+ /**
+ * The remaining capacity in the block before it is full.
+ * @return the number of bytes remaining.
+ */
+ abstract int remainingCapacity();
+
+ /**
+ * Write a series of bytes from the buffer, from the offset.
+ * Returns the number of bytes written.
+ * Only valid in the state {@code Writing}.
+ * Base class verifies the state but does no writing.
+ * @param buffer buffer
+ * @param offset offset
+ * @param length length of write
+ * @return number of bytes written
+ * @throws IOException trouble
+ */
+ int write(byte[] buffer, int offset, int length) throws IOException {
+ verifyState(Writing);
+ Preconditions.checkArgument(buffer != null, "Null buffer");
+ Preconditions.checkArgument(length >= 0, "length is negative");
+ Preconditions.checkArgument(offset >= 0, "offset is negative");
+ Preconditions.checkArgument(
+ !(buffer.length - offset < length),
+ "buffer shorter than amount of data to write");
+ return 0;
+ }
+
+ /**
+ * Flush the output.
+ * Only valid in the state {@code Writing}.
+ * In the base class, this is a no-op
+ * @throws IOException any IO problem.
+ */
+ void flush() throws IOException {
+ verifyState(Writing);
+ }
+
+ /**
+ * Switch to the upload state and return a stream for uploading.
+ * Base class calls {@link #enterState(DestState, DestState)} to
+ * manage the state machine.
+ * @return the stream
+ * @throws IOException trouble
+ */
+ InputStream startUpload() throws IOException {
+ LOG.debug("Start datablock upload");
+ enterState(Writing, Upload);
+ return null;
+ }
+
+ /**
+ * Enter the closed state.
+ * @return true if the class was in any other state, implying that
+ * the subclass should do its close operations
+ */
+ protected synchronized boolean enterClosedState() {
+ if (!state.equals(Closed)) {
+ enterState(null, Closed);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (enterClosedState()) {
+ LOG.debug("Closed {}", this);
+ innerClose();
+ }
+ }
+
+ /**
+ * Inner close logic for subclasses to implement.
+ */
+ protected void innerClose() throws IOException {
+
+ }
+
+ }
+
+ // ====================================================================
+
+ /**
+ * Use byte arrays on the heap for storage.
+ */
+ static class ArrayBlockFactory extends BlockFactory {
+
+ ArrayBlockFactory(S3AFileSystem owner) {
+ super(owner);
+ }
+
+ @Override
+ DataBlock create(int limit) throws IOException {
+ return new ByteArrayBlock(limit);
+ }
+
+ }
+
+ /**
+ * Stream to memory via a {@code ByteArrayOutputStream}.
+ *
+ * This was taken from {@code S3AFastOutputStream} and has the
+ * same problem which surfaced there: it can consume a lot of heap space
+ * proportional to the mismatch between writes to the stream and
+ * the JVM-wide upload bandwidth to the S3 endpoint.
+ * The memory consumption can be limited by tuning the filesystem settings
+ * to restrict the number of queued/active uploads.
+ */
+
+ static class ByteArrayBlock extends DataBlock {
+ private ByteArrayOutputStream buffer;
+ private final int limit;
+ // cache data size so that it is consistent after the buffer is reset.
+ private Integer dataSize;
+
+ ByteArrayBlock(int limit) {
+ this.limit = limit;
+ buffer = new ByteArrayOutputStream();
+ }
+
+ /**
+ * Get the amount of data; if there is no buffer then the size is 0.
+ * @return the amount of data available to upload.
+ */
+ @Override
+ int dataSize() {
+ return dataSize != null ? dataSize : buffer.size();
+ }
+
+ @Override
+ InputStream startUpload() throws IOException {
+ super.startUpload();
+ dataSize = buffer.size();
+ ByteArrayInputStream bufferData = new ByteArrayInputStream(
+ buffer.toByteArray());
+ buffer = null;
+ return bufferData;
+ }
+
+ @Override
+ boolean hasCapacity(long bytes) {
+ return dataSize() + bytes <= limit;
+ }
+
+ @Override
+ int remainingCapacity() {
+ return limit - dataSize();
+ }
+
+ @Override
+ int write(byte[] b, int offset, int len) throws IOException {
+ super.write(b, offset, len);
+ int written = Math.min(remainingCapacity(), len);
+ buffer.write(b, offset, written);
+ return written;
+ }
+
+ @Override
+ protected void innerClose() {
+ buffer = null;
+ }
+
+ @Override
+ public String toString() {
+ return "ByteArrayBlock{" +
+ "state=" + getState() +
+ ", limit=" + limit +
+ ", dataSize=" + dataSize +
+ '}';
+ }
+ }
+
+ // ====================================================================
+
+ /**
+ * Stream via Direct ByteBuffers; these are allocated off heap
+ * via {@link DirectBufferPool}.
+ * This is actually the most complex of all the block factories,
+ * due to the need to explicitly recycle buffers; in comparison, the
+ * {@link DiskBlock} buffer delegates the work of deleting files to
+ * the {@link DiskBlock.FileDeletingInputStream}. Here the
+ * input stream {@link ByteBufferInputStream} has a similar task, along
+ * with the foundational work of streaming data from a byte array.
+ */
+
+ static class ByteBufferBlockFactory extends BlockFactory {
+
+ private final DirectBufferPool bufferPool = new DirectBufferPool();
+ private final AtomicInteger buffersOutstanding = new AtomicInteger(0);
+
+ ByteBufferBlockFactory(S3AFileSystem owner) {
+ super(owner);
+ }
+
+ @Override
+ ByteBufferBlock create(int limit) throws IOException {
+ return new ByteBufferBlock(limit);
+ }
+
+ private ByteBuffer requestBuffer(int limit) {
+ LOG.debug("Requesting buffer of size {}", limit);
+ buffersOutstanding.incrementAndGet();
+ return bufferPool.getBuffer(limit);
+ }
+
+ private void releaseBuffer(ByteBuffer buffer) {
+ LOG.debug("Releasing buffer");
+ bufferPool.returnBuffer(buffer);
+ buffersOutstanding.decrementAndGet();
+ }
+
+ /**
+ * Get count of outstanding buffers.
+ * @return the current buffer count
+ */
+ public int getOutstandingBufferCount() {
+ return buffersOutstanding.get();
+ }
+
+ @Override
+ public String toString() {
+ return "ByteBufferBlockFactory{"
+ + "buffersOutstanding=" + buffersOutstanding +
+ '}';
+ }
+
+ /**
+ * A DataBlock which requests a buffer from pool on creation; returns
+ * it when the output stream is closed.
+ */
+ class ByteBufferBlock extends DataBlock {
+ private ByteBuffer buffer;
+ private final int bufferSize;
+ // cache data size so that it is consistent after the buffer is reset.
+ private Integer dataSize;
+
+ /**
+ * Instantiate. This will request a ByteBuffer of the desired size.
+ * @param bufferSize buffer size
+ */
+ ByteBufferBlock(int bufferSize) {
+ this.bufferSize = bufferSize;
+ buffer = requestBuffer(bufferSize);
+ }
+
+ /**
+ * Get the amount of data; if there is no buffer then the size is 0.
+ * @return the amount of data available to upload.
+ */
+ @Override
+ int dataSize() {
+ return dataSize != null ? dataSize : bufferCapacityUsed();
+ }
+
+ @Override
+ ByteBufferInputStream startUpload() throws IOException {
+ super.startUpload();
+ dataSize = bufferCapacityUsed();
+ // set the buffer up from reading from the beginning
+ buffer.limit(buffer.position());
+ buffer.position(0);
+ return new ByteBufferInputStream(dataSize, buffer);
+ }
+
+ @Override
+ public boolean hasCapacity(long bytes) {
+ return bytes <= remainingCapacity();
+ }
+
+ @Override
+ public int remainingCapacity() {
+ return buffer != null ? buffer.remaining() : 0;
+ }
+
+ private int bufferCapacityUsed() {
+ return buffer.capacity() - buffer.remaining();
+ }
+
+ @Override
+ int write(byte[] b, int offset, int len) throws IOException {
+ super.write(b, offset, len);
+ int written = Math.min(remainingCapacity(), len);
+ buffer.put(b, offset, written);
+ return written;
+ }
+
+ @Override
+ protected void innerClose() {
+ buffer = null;
+ }
+
+ @Override
+ public String toString() {
+ return "ByteBufferBlock{"
+ + "state=" + getState() +
+ ", dataSize=" + dataSize() +
+ ", limit=" + bufferSize +
+ ", remainingCapacity=" + remainingCapacity() +
+ '}';
+ }
+
+ }
+
+ /**
+ * Provide an input stream from a byte buffer; supporting
+ * {@link #mark(int)}, which is required to enable replay of failed
+ * PUT attempts.
+ * This input stream returns the buffer to the pool afterwards.
+ */
+ class ByteBufferInputStream extends InputStream {
+
+ private final int size;
+ private ByteBuffer byteBuffer;
+
+ ByteBufferInputStream(int size, ByteBuffer byteBuffer) {
+ LOG.debug("Creating ByteBufferInputStream of size {}", size);
+ this.size = size;
+ this.byteBuffer = byteBuffer;
+ }
+
+ /**
+ * Return the buffer to the pool after the stream is closed.
+ */
+ @Override
+ public synchronized void close() {
+ if (byteBuffer != null) {
+ LOG.debug("releasing buffer");
+ releaseBuffer(byteBuffer);
+ byteBuffer = null;
+ }
+ }
+
+ /**
+ * Verify that the stream is open.
+ * @throws IOException if the stream is closed
+ */
+ private void verifyOpen() throws IOException {
+ if (byteBuffer == null) {
+ throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+ }
+ }
+
+ public synchronized int read() throws IOException {
+ if (available() > 0) {
+ return byteBuffer.get() & 0xFF;
+ } else {
+ return -1;
+ }
+ }
+
+ @Override
+ public synchronized long skip(long offset) throws IOException {
+ verifyOpen();
+ long newPos = position() + offset;
+ if (newPos < 0) {
+ throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK);
+ }
+ if (newPos > size) {
+ throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
+ }
+ byteBuffer.position((int) newPos);
+ return newPos;
+ }
+
+ @Override
+ public synchronized int available() {
+ Preconditions.checkState(byteBuffer != null,
+ FSExceptionMessages.STREAM_IS_CLOSED);
+ return byteBuffer.remaining();
+ }
+
+ /**
+ * Get the current buffer position.
+ * @return the buffer position
+ */
+ public synchronized int position() {
+ return byteBuffer.position();
+ }
+
+ /**
+ * Check if there is data left.
+ * @return true if there is data remaining in the buffer.
+ */
+ public synchronized boolean hasRemaining() {
+ return byteBuffer.hasRemaining();
+ }
+
+ @Override
+ public synchronized void mark(int readlimit) {
+ LOG.debug("mark at {}", position());
+ byteBuffer.mark();
+ }
+
+ @Override
+ public synchronized void reset() throws IOException {
+ LOG.debug("reset");
+ byteBuffer.reset();
+ }
+
+ @Override
+ public boolean markSupported() {
+ return true;
+ }
+
+ /**
+ * Read in data.
+ * @param buffer destination buffer
+ * @param offset offset within the buffer
+ * @param length length of bytes to read
+ * @throws EOFException if the position is negative
+ * @throws IndexOutOfBoundsException if there isn't space for the
+ * amount of data requested.
+ * @throws IllegalArgumentException other arguments are invalid.
+ */
+ @SuppressWarnings("NullableProblems")
+ public synchronized int read(byte[] buffer, int offset, int length)
+ throws IOException {
+ Preconditions.checkArgument(length >= 0, "length is negative");
+ Preconditions.checkArgument(buffer != null, "Null buffer");
+ if (buffer.length - offset < length) {
+ throw new IndexOutOfBoundsException(
+ FSExceptionMessages.TOO_MANY_BYTES_FOR_DEST_BUFFER
+ + ": request length =" + length
+ + ", with offset =" + offset
+ + "; buffer capacity =" + (buffer.length - offset));
+ }
+ verifyOpen();
+ if (!hasRemaining()) {
+ return -1;
+ }
+
+ int toRead = Math.min(length, available());
+ byteBuffer.get(buffer, offset, toRead);
+ return toRead;
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder(
+ "ByteBufferInputStream{");
+ sb.append("size=").append(size);
+ ByteBuffer buffer = this.byteBuffer;
+ if (buffer != null) {
+ sb.append(", available=").append(buffer.remaining());
+ }
+ sb.append('}');
+ return sb.toString();
+ }
+ }
+ }
+
+ // ====================================================================
+
+ /**
+ * Buffer blocks to disk.
+ */
+ static class DiskBlockFactory extends BlockFactory {
+
+ DiskBlockFactory(S3AFileSystem owner) {
+ super(owner);
+ }
+
+ /**
+ * Create a temp file and a block which writes to it.
+ * @param limit limit of the block.
+ * @return the new block
+ * @throws IOException IO problems
+ */
+ @Override
+ DataBlock create(int limit) throws IOException {
+ File destFile = getOwner()
+ .createTmpFileForWrite("s3ablock", limit, getOwner().getConf());
+ return new DiskBlock(destFile, limit);
+ }
+ }
+
+ /**
+ * Stream to a file.
+ * This will stop at the limit; the caller is expected to create a new block
+ */
+ static class DiskBlock extends DataBlock {
+
+ private int bytesWritten;
+ private final File bufferFile;
+ private final int limit;
+ private BufferedOutputStream out;
+ private InputStream uploadStream;
+
+ DiskBlock(File bufferFile, int limit)
+ throws FileNotFoundException {
+ this.limit = limit;
+ this.bufferFile = bufferFile;
+ out = new BufferedOutputStream(new FileOutputStream(bufferFile));
+ }
+
+ @Override
+ int dataSize() {
+ return bytesWritten;
+ }
+
+ @Override
+ boolean hasCapacity(long bytes) {
+ return dataSize() + bytes <= limit;
+ }
+
+ @Override
+ int remainingCapacity() {
+ return limit - bytesWritten;
+ }
+
+ @Override
+ int write(byte[] b, int offset, int len) throws IOException {
+ super.write(b, offset, len);
+ int written = Math.min(remainingCapacity(), len);
+ out.write(b, offset, written);
+ bytesWritten += written;
+ return written;
+ }
+
+ @Override
+ InputStream startUpload() throws IOException {
+ super.startUpload();
+ try {
+ out.flush();
+ } finally {
+ out.close();
+ out = null;
+ }
+ uploadStream = new FileInputStream(bufferFile);
+ return new FileDeletingInputStream(uploadStream);
+ }
+
+ /**
+ * The close operation will delete the destination file if it still
+ * exists.
+ * @throws IOException IO problems
+ */
+ @Override
+ protected void innerClose() throws IOException {
+ final DestState state = getState();
+ LOG.debug("Closing {}", this);
+ switch (state) {
+ case Writing:
+ if (bufferFile.exists()) {
+ // file was not uploaded
+ LOG.debug("Deleting buffer file as upload did not start");
+ boolean deleted = bufferFile.delete();
+ if (!deleted && bufferFile.exists()) {
+ LOG.warn("Failed to delete buffer file {}", bufferFile);
+ }
+ }
+ break;
+
+ case Upload:
+ LOG.debug("Buffer file {} exists —close upload stream", bufferFile);
+ break;
+
+ case Closed:
+ // no-op
+ break;
+
+ default:
+ // this state can never be reached, but checkstyle complains, so
+ // it is here.
+ }
+ }
+
+ /**
+ * Flush operation will flush to disk.
+ * @throws IOException IOE raised on FileOutputStream
+ */
+ @Override
+ void flush() throws IOException {
+ super.flush();
+ out.flush();
+ }
+
+ @Override
+ public String toString() {
+ String sb = "FileBlock{"
+ + "destFile=" + bufferFile +
+ ", state=" + getState() +
+ ", dataSize=" + dataSize() +
+ ", limit=" + limit +
+ '}';
+ return sb;
+ }
+
+ /**
+ * An input stream which deletes the buffer file when closed.
+ */
+ private final class FileDeletingInputStream extends FilterInputStream {
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+
+ FileDeletingInputStream(InputStream source) {
+ super(source);
+ }
+
+ /**
+ * Delete the input file when closed.
+ * @throws IOException IO problem
+ */
+ @Override
+ public void close() throws IOException {
+ try {
+ super.close();
+ } finally {
+ if (!closed.getAndSet(true)) {
+ if (!bufferFile.delete()) {
+ LOG.warn("delete({}) returned false",
+ bufferFile.getAbsoluteFile());
+ }
+ }
+ }
+ }
+ }
+ }
+
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java
deleted file mode 100644
index c25d0fbf90..0000000000
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java
+++ /dev/null
@@ -1,410 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.s3a;
-
-import com.amazonaws.AmazonClientException;
-import com.amazonaws.event.ProgressEvent;
-import com.amazonaws.event.ProgressListener;
-import com.amazonaws.services.s3.AmazonS3;
-import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
-import com.amazonaws.services.s3.model.CannedAccessControlList;
-import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
-import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
-import com.amazonaws.services.s3.model.ObjectMetadata;
-import com.amazonaws.services.s3.model.PartETag;
-import com.amazonaws.services.s3.model.PutObjectRequest;
-import com.amazonaws.services.s3.model.PutObjectResult;
-import com.amazonaws.services.s3.model.UploadPartRequest;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.util.Progressable;
-import org.slf4j.Logger;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.List;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-
-import static org.apache.hadoop.fs.s3a.S3AUtils.*;
-import static org.apache.hadoop.fs.s3a.Statistic.*;
-
-/**
- * Upload files/parts asap directly from a memory buffer (instead of buffering
- * to a file).
- *
- * Uploads are managed low-level rather than through the AWS TransferManager.
- * This allows for uploading each part of a multi-part upload as soon as
- * the bytes are in memory, rather than waiting until the file is closed.
- *
- * Unstable: statistics and error handling might evolve
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class S3AFastOutputStream extends OutputStream {
-
- private static final Logger LOG = S3AFileSystem.LOG;
- private final String key;
- private final String bucket;
- private final AmazonS3 client;
- private final int partSize;
- private final int multiPartThreshold;
- private final S3AFileSystem fs;
- private final CannedAccessControlList cannedACL;
- private final ProgressListener progressListener;
- private final ListeningExecutorService executorService;
- private MultiPartUpload multiPartUpload;
- private boolean closed;
- private ByteArrayOutputStream buffer;
- private int bufferLimit;
-
-
- /**
- * Creates a fast OutputStream that uploads to S3 from memory.
- * For MultiPartUploads, as soon as sufficient bytes have been written to
- * the stream a part is uploaded immediately (by using the low-level
- * multi-part upload API on the AmazonS3Client).
- *
- * @param client AmazonS3Client used for S3 calls
- * @param fs S3AFilesystem
- * @param bucket S3 bucket name
- * @param key S3 key name
- * @param progress report progress in order to prevent timeouts
- * @param cannedACL used CannedAccessControlList
- * @param partSize size of a single part in a multi-part upload (except
- * last part)
- * @param multiPartThreshold files at least this size use multi-part upload
- * @param threadPoolExecutor thread factory
- * @throws IOException on any problem
- */
- public S3AFastOutputStream(AmazonS3 client,
- S3AFileSystem fs,
- String bucket,
- String key,
- Progressable progress,
- CannedAccessControlList cannedACL,
- long partSize,
- long multiPartThreshold,
- ExecutorService threadPoolExecutor)
- throws IOException {
- this.bucket = bucket;
- this.key = key;
- this.client = client;
- this.fs = fs;
- this.cannedACL = cannedACL;
- //Ensure limit as ByteArrayOutputStream size cannot exceed Integer.MAX_VALUE
- if (partSize > Integer.MAX_VALUE) {
- this.partSize = Integer.MAX_VALUE;
- LOG.warn("s3a: MULTIPART_SIZE capped to ~2.14GB (maximum allowed size " +
- "when using 'FAST_UPLOAD = true')");
- } else {
- this.partSize = (int) partSize;
- }
- if (multiPartThreshold > Integer.MAX_VALUE) {
- this.multiPartThreshold = Integer.MAX_VALUE;
- LOG.warn("s3a: MIN_MULTIPART_THRESHOLD capped to ~2.14GB (maximum " +
- "allowed size when using 'FAST_UPLOAD = true')");
- } else {
- this.multiPartThreshold = (int) multiPartThreshold;
- }
- this.bufferLimit = this.multiPartThreshold;
- this.closed = false;
- int initialBufferSize = this.fs.getConf()
- .getInt(Constants.FAST_BUFFER_SIZE, Constants.DEFAULT_FAST_BUFFER_SIZE);
- if (initialBufferSize < 0) {
- LOG.warn("s3a: FAST_BUFFER_SIZE should be a positive number. Using " +
- "default value");
- initialBufferSize = Constants.DEFAULT_FAST_BUFFER_SIZE;
- } else if (initialBufferSize > this.bufferLimit) {
- LOG.warn("s3a: automatically adjusting FAST_BUFFER_SIZE to not " +
- "exceed MIN_MULTIPART_THRESHOLD");
- initialBufferSize = this.bufferLimit;
- }
- this.buffer = new ByteArrayOutputStream(initialBufferSize);
- this.executorService = MoreExecutors.listeningDecorator(threadPoolExecutor);
- this.multiPartUpload = null;
- this.progressListener = new ProgressableListener(progress);
- LOG.debug("Initialized S3AFastOutputStream for bucket '{}' key '{}'",
- bucket, key);
- }
-
- /**
- * Writes a byte to the memory buffer. If this causes the buffer to reach
- * its limit, the actual upload is submitted to the threadpool.
- * @param b the int of which the lowest byte is written
- * @throws IOException on any problem
- */
- @Override
- public synchronized void write(int b) throws IOException {
- buffer.write(b);
- if (buffer.size() == bufferLimit) {
- uploadBuffer();
- }
- }
-
- /**
- * Writes a range of bytes from to the memory buffer. If this causes the
- * buffer to reach its limit, the actual upload is submitted to the
- * threadpool and the remainder of the array is written to memory
- * (recursively).
- * @param b byte array containing
- * @param off offset in array where to start
- * @param len number of bytes to be written
- * @throws IOException on any problem
- */
- @Override
- public synchronized void write(byte[] b, int off, int len)
- throws IOException {
- if (b == null) {
- throw new NullPointerException();
- } else if ((off < 0) || (off > b.length) || (len < 0) ||
- ((off + len) > b.length) || ((off + len) < 0)) {
- throw new IndexOutOfBoundsException();
- } else if (len == 0) {
- return;
- }
- if (buffer.size() + len < bufferLimit) {
- buffer.write(b, off, len);
- } else {
- int firstPart = bufferLimit - buffer.size();
- buffer.write(b, off, firstPart);
- uploadBuffer();
- this.write(b, off + firstPart, len - firstPart);
- }
- }
-
- private synchronized void uploadBuffer() throws IOException {
- if (multiPartUpload == null) {
- multiPartUpload = initiateMultiPartUpload();
- /* Upload the existing buffer if it exceeds partSize. This possibly
- requires multiple parts! */
- final byte[] allBytes = buffer.toByteArray();
- buffer = null; //earlier gc?
- LOG.debug("Total length of initial buffer: {}", allBytes.length);
- int processedPos = 0;
- while ((multiPartThreshold - processedPos) >= partSize) {
- LOG.debug("Initial buffer: processing from byte {} to byte {}",
- processedPos, (processedPos + partSize - 1));
- multiPartUpload.uploadPartAsync(new ByteArrayInputStream(allBytes,
- processedPos, partSize), partSize);
- processedPos += partSize;
- }
- //resize and reset stream
- bufferLimit = partSize;
- buffer = new ByteArrayOutputStream(bufferLimit);
- buffer.write(allBytes, processedPos, multiPartThreshold - processedPos);
- } else {
- //upload next part
- multiPartUpload.uploadPartAsync(new ByteArrayInputStream(buffer
- .toByteArray()), partSize);
- buffer.reset();
- }
- }
-
- /**
- * Close the stream. This will not return until the upload is complete
- * or the attempt to perform the upload has failed.
- * Exceptions raised in this method are indicative that the write has
- * failed and data is at risk of being lost.
- * @throws IOException on any failure.
- */
- @Override
- public synchronized void close() throws IOException {
- if (closed) {
- return;
- }
- closed = true;
- try {
- if (multiPartUpload == null) {
- putObject();
- } else {
- int size = buffer.size();
- if (size > 0) {
- fs.incrementPutStartStatistics(size);
- //send last part
- multiPartUpload.uploadPartAsync(new ByteArrayInputStream(buffer
- .toByteArray()), size);
- }
- final List partETags = multiPartUpload
- .waitForAllPartUploads();
- multiPartUpload.complete(partETags);
- }
- // This will delete unnecessary fake parent directories
- fs.finishedWrite(key);
- LOG.debug("Upload complete for bucket '{}' key '{}'", bucket, key);
- } finally {
- buffer = null;
- super.close();
- }
- }
-
- /**
- * Create the default metadata for a multipart upload operation.
- * @return the metadata to use/extend.
- */
- private ObjectMetadata createDefaultMetadata() {
- return fs.newObjectMetadata();
- }
-
- private MultiPartUpload initiateMultiPartUpload() throws IOException {
- final InitiateMultipartUploadRequest initiateMPURequest =
- new InitiateMultipartUploadRequest(bucket,
- key,
- createDefaultMetadata());
- initiateMPURequest.setCannedACL(cannedACL);
- try {
- return new MultiPartUpload(
- client.initiateMultipartUpload(initiateMPURequest).getUploadId());
- } catch (AmazonClientException ace) {
- throw translateException("initiate MultiPartUpload", key, ace);
- }
- }
-
- private void putObject() throws IOException {
- LOG.debug("Executing regular upload for bucket '{}' key '{}'",
- bucket, key);
- final ObjectMetadata om = createDefaultMetadata();
- final int size = buffer.size();
- om.setContentLength(size);
- final PutObjectRequest putObjectRequest =
- fs.newPutObjectRequest(key,
- om,
- new ByteArrayInputStream(buffer.toByteArray()));
- putObjectRequest.setGeneralProgressListener(progressListener);
- ListenableFuture putObjectResult =
- executorService.submit(new Callable() {
- @Override
- public PutObjectResult call() throws Exception {
- fs.incrementPutStartStatistics(size);
- return client.putObject(putObjectRequest);
- }
- });
- //wait for completion
- try {
- putObjectResult.get();
- } catch (InterruptedException ie) {
- LOG.warn("Interrupted object upload: {}", ie, ie);
- Thread.currentThread().interrupt();
- } catch (ExecutionException ee) {
- throw extractException("regular upload", key, ee);
- }
- }
-
-
- private class MultiPartUpload {
- private final String uploadId;
- private final List> partETagsFutures;
-
- public MultiPartUpload(String uploadId) {
- this.uploadId = uploadId;
- this.partETagsFutures = new ArrayList>();
- LOG.debug("Initiated multi-part upload for bucket '{}' key '{}' with " +
- "id '{}'", bucket, key, uploadId);
- }
-
- private void uploadPartAsync(ByteArrayInputStream inputStream,
- int partSize) {
- final int currentPartNumber = partETagsFutures.size() + 1;
- final UploadPartRequest request =
- new UploadPartRequest().withBucketName(bucket).withKey(key)
- .withUploadId(uploadId).withInputStream(inputStream)
- .withPartNumber(currentPartNumber).withPartSize(partSize);
- request.setGeneralProgressListener(progressListener);
- ListenableFuture partETagFuture =
- executorService.submit(new Callable() {
- @Override
- public PartETag call() throws Exception {
- LOG.debug("Uploading part {} for id '{}'", currentPartNumber,
- uploadId);
- return fs.uploadPart(request).getPartETag();
- }
- });
- partETagsFutures.add(partETagFuture);
- }
-
- private List waitForAllPartUploads() throws IOException {
- try {
- return Futures.allAsList(partETagsFutures).get();
- } catch (InterruptedException ie) {
- LOG.warn("Interrupted partUpload: {}", ie, ie);
- Thread.currentThread().interrupt();
- return null;
- } catch (ExecutionException ee) {
- //there is no way of recovering so abort
- //cancel all partUploads
- for (ListenableFuture future : partETagsFutures) {
- future.cancel(true);
- }
- //abort multipartupload
- this.abort();
- throw extractException("Multi-part upload with id '" + uploadId + "'",
- key, ee);
- }
- }
-
- private void complete(List partETags) throws IOException {
- try {
- LOG.debug("Completing multi-part upload for key '{}', id '{}'",
- key, uploadId);
- client.completeMultipartUpload(
- new CompleteMultipartUploadRequest(bucket,
- key,
- uploadId,
- partETags));
- } catch (AmazonClientException e) {
- throw translateException("Completing multi-part upload", key, e);
- }
- }
-
- public void abort() {
- LOG.warn("Aborting multi-part upload with id '{}'", uploadId);
- try {
- fs.incrementStatistic(OBJECT_MULTIPART_UPLOAD_ABORTED);
- client.abortMultipartUpload(new AbortMultipartUploadRequest(bucket,
- key, uploadId));
- } catch (Exception e2) {
- LOG.warn("Unable to abort multipart upload, you may need to purge " +
- "uploaded parts: {}", e2, e2);
- }
- }
- }
-
- private static class ProgressableListener implements ProgressListener {
- private final Progressable progress;
-
- public ProgressableListener(Progressable progress) {
- this.progress = progress;
- }
-
- public void progressChanged(ProgressEvent progressEvent) {
- if (progress != null) {
- progress.progress();
- }
- }
- }
-}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index 85d1fc7e18..2354819b76 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -37,14 +37,20 @@
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.CannedAccessControlList;
+import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
+import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
+import com.amazonaws.services.s3.model.CopyObjectRequest;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.ListObjectsRequest;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PartETag;
import com.amazonaws.services.s3.model.PutObjectRequest;
-import com.amazonaws.services.s3.model.CopyObjectRequest;
+import com.amazonaws.services.s3.model.PutObjectResult;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.amazonaws.services.s3.model.UploadPartRequest;
import com.amazonaws.services.s3.model.UploadPartResult;
@@ -55,6 +61,8 @@
import com.amazonaws.event.ProgressListener;
import com.amazonaws.event.ProgressEvent;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -68,6 +76,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.GlobalStorageStatistics;
import org.apache.hadoop.fs.InvalidRequestException;
+import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
@@ -118,9 +127,12 @@ public class S3AFileSystem extends FileSystem {
private long partSize;
private boolean enableMultiObjectsDelete;
private TransferManager transfers;
- private ExecutorService threadPoolExecutor;
+ private ListeningExecutorService threadPoolExecutor;
private long multiPartThreshold;
public static final Logger LOG = LoggerFactory.getLogger(S3AFileSystem.class);
+ private static final Logger PROGRESS =
+ LoggerFactory.getLogger("org.apache.hadoop.fs.s3a.S3AFileSystem.Progress");
+ private LocalDirAllocator directoryAllocator;
private CannedAccessControlList cannedACL;
private String serverSideEncryptionAlgorithm;
private S3AInstrumentation instrumentation;
@@ -131,6 +143,10 @@ public class S3AFileSystem extends FileSystem {
// The maximum number of entries that can be deleted in any call to s3
private static final int MAX_ENTRIES_TO_DELETE = 1000;
+ private boolean blockUploadEnabled;
+ private String blockOutputBuffer;
+ private S3ADataBlocks.BlockFactory blockFactory;
+ private int blockOutputActiveBlocks;
/** Called after a new FileSystem instance is constructed.
* @param name a uri whose authority section names the host, port, etc.
@@ -157,18 +173,11 @@ public void initialize(URI name, Configuration conf) throws IOException {
maxKeys = intOption(conf, MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS, 1);
listing = new Listing(this);
- partSize = conf.getLong(MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE);
- if (partSize < 5 * 1024 * 1024) {
- LOG.error(MULTIPART_SIZE + " must be at least 5 MB");
- partSize = 5 * 1024 * 1024;
- }
+ partSize = getMultipartSizeProperty(conf,
+ MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE);
+ multiPartThreshold = getMultipartSizeProperty(conf,
+ MIN_MULTIPART_THRESHOLD, DEFAULT_MIN_MULTIPART_THRESHOLD);
- multiPartThreshold = conf.getLong(MIN_MULTIPART_THRESHOLD,
- DEFAULT_MIN_MULTIPART_THRESHOLD);
- if (multiPartThreshold < 5 * 1024 * 1024) {
- LOG.error(MIN_MULTIPART_THRESHOLD + " must be at least 5 MB");
- multiPartThreshold = 5 * 1024 * 1024;
- }
//check but do not store the block size
longOption(conf, FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE, 1);
enableMultiObjectsDelete = conf.getBoolean(ENABLE_MULTI_DELETE, true);
@@ -189,14 +198,14 @@ public StorageStatistics provide() {
LOG.warn(MAX_THREADS + " must be at least 2: forcing to 2.");
maxThreads = 2;
}
- int totalTasks = conf.getInt(MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS);
- if (totalTasks < 1) {
- LOG.warn(MAX_TOTAL_TASKS + "must be at least 1: forcing to 1.");
- totalTasks = 1;
- }
- long keepAliveTime = conf.getLong(KEEPALIVE_TIME, DEFAULT_KEEPALIVE_TIME);
- threadPoolExecutor = new BlockingThreadPoolExecutorService(maxThreads,
- maxThreads + totalTasks, keepAliveTime, TimeUnit.SECONDS,
+ int totalTasks = intOption(conf,
+ MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS, 1);
+ long keepAliveTime = longOption(conf, KEEPALIVE_TIME,
+ DEFAULT_KEEPALIVE_TIME, 0);
+ threadPoolExecutor = BlockingThreadPoolExecutorService.newInstance(
+ maxThreads,
+ maxThreads + totalTasks,
+ keepAliveTime, TimeUnit.SECONDS,
"s3a-transfer-shared");
initTransferManager();
@@ -209,8 +218,25 @@ public StorageStatistics provide() {
serverSideEncryptionAlgorithm =
conf.getTrimmed(SERVER_SIDE_ENCRYPTION_ALGORITHM);
+ LOG.debug("Using encryption {}", serverSideEncryptionAlgorithm);
inputPolicy = S3AInputPolicy.getPolicy(
conf.getTrimmed(INPUT_FADVISE, INPUT_FADV_NORMAL));
+
+ blockUploadEnabled = conf.getBoolean(FAST_UPLOAD, DEFAULT_FAST_UPLOAD);
+
+ if (blockUploadEnabled) {
+ blockOutputBuffer = conf.getTrimmed(FAST_UPLOAD_BUFFER,
+ DEFAULT_FAST_UPLOAD_BUFFER);
+ partSize = ensureOutputParameterInRange(MULTIPART_SIZE, partSize);
+ blockFactory = S3ADataBlocks.createFactory(this, blockOutputBuffer);
+ blockOutputActiveBlocks = intOption(conf,
+ FAST_UPLOAD_ACTIVE_BLOCKS, DEFAULT_FAST_UPLOAD_ACTIVE_BLOCKS, 1);
+ LOG.debug("Using S3ABlockOutputStream with buffer = {}; block={};" +
+ " queue limit={}",
+ blockOutputBuffer, partSize, blockOutputActiveBlocks);
+ } else {
+ LOG.debug("Using S3AOutputStream");
+ }
} catch (AmazonClientException e) {
throw translateException("initializing ", new Path(name), e);
}
@@ -336,6 +362,33 @@ public S3AInputPolicy getInputPolicy() {
return inputPolicy;
}
+ /**
+ * Demand create the directory allocator, then create a temporary file.
+ * {@link LocalDirAllocator#createTmpFileForWrite(String, long, Configuration)}.
+ * @param pathStr prefix for the temporary file
+ * @param size the size of the file that is going to be written
+ * @param conf the Configuration object
+ * @return a unique temporary file
+ * @throws IOException IO problems
+ */
+ synchronized File createTmpFileForWrite(String pathStr, long size,
+ Configuration conf) throws IOException {
+ if (directoryAllocator == null) {
+ String bufferDir = conf.get(BUFFER_DIR) != null
+ ? BUFFER_DIR : "hadoop.tmp.dir";
+ directoryAllocator = new LocalDirAllocator(bufferDir);
+ }
+ return directoryAllocator.createTmpFileForWrite(pathStr, size, conf);
+ }
+
+ /**
+ * Get the bucket of this filesystem.
+ * @return the bucket
+ */
+ public String getBucket() {
+ return bucket;
+ }
+
/**
* Change the input policy for this FS.
* @param inputPolicy new policy
@@ -460,6 +513,7 @@ public FSDataInputStream open(Path f, int bufferSize)
* @see #setPermission(Path, FsPermission)
*/
@Override
+ @SuppressWarnings("IOResourceOpenedButNotSafelyClosed")
public FSDataOutputStream create(Path f, FsPermission permission,
boolean overwrite, int bufferSize, short replication, long blockSize,
Progressable progress) throws IOException {
@@ -484,28 +538,33 @@ public FSDataOutputStream create(Path f, FsPermission permission,
}
instrumentation.fileCreated();
- if (getConf().getBoolean(FAST_UPLOAD, DEFAULT_FAST_UPLOAD)) {
- return new FSDataOutputStream(
- new S3AFastOutputStream(s3,
- this,
- bucket,
+ FSDataOutputStream output;
+ if (blockUploadEnabled) {
+ output = new FSDataOutputStream(
+ new S3ABlockOutputStream(this,
key,
+ new SemaphoredDelegatingExecutor(threadPoolExecutor,
+ blockOutputActiveBlocks, true),
progress,
- cannedACL,
partSize,
- multiPartThreshold,
- threadPoolExecutor),
- statistics);
+ blockFactory,
+ instrumentation.newOutputStreamStatistics(),
+ new WriteOperationHelper(key)
+ ),
+ null);
+ } else {
+
+ // We pass null to FSDataOutputStream so it won't count writes that
+ // are being buffered to a file
+ output = new FSDataOutputStream(
+ new S3AOutputStream(getConf(),
+ this,
+ key,
+ progress
+ ),
+ null);
}
- // We pass null to FSDataOutputStream so it won't count writes that
- // are being buffered to a file
- return new FSDataOutputStream(
- new S3AOutputStream(getConf(),
- this,
- key,
- progress
- ),
- null);
+ return output;
}
/**
@@ -749,6 +808,33 @@ protected void incrementStatistic(Statistic statistic, long count) {
storageStatistics.incrementCounter(statistic, count);
}
+ /**
+ * Decrement a gauge by a specific value.
+ * @param statistic The operation to decrement
+ * @param count the count to decrement
+ */
+ protected void decrementGauge(Statistic statistic, long count) {
+ instrumentation.decrementGauge(statistic, count);
+ }
+
+ /**
+ * Increment a gauge by a specific value.
+ * @param statistic The operation to increment
+ * @param count the count to increment
+ */
+ protected void incrementGauge(Statistic statistic, long count) {
+ instrumentation.incrementGauge(statistic, count);
+ }
+
+ /**
+ * Get the storage statistics of this filesystem.
+ * @return the storage statistics
+ */
+ @Override
+ public S3AStorageStatistics getStorageStatistics() {
+ return storageStatistics;
+ }
+
/**
* Request object metadata; increments counters in the process.
* @param key key
@@ -896,7 +982,9 @@ public ObjectMetadata newObjectMetadata() {
*/
public ObjectMetadata newObjectMetadata(long length) {
final ObjectMetadata om = newObjectMetadata();
- om.setContentLength(length);
+ if (length >= 0) {
+ om.setContentLength(length);
+ }
return om;
}
@@ -918,7 +1006,41 @@ public Upload putObject(PutObjectRequest putObjectRequest) {
len = putObjectRequest.getMetadata().getContentLength();
}
incrementPutStartStatistics(len);
- return transfers.upload(putObjectRequest);
+ try {
+ Upload upload = transfers.upload(putObjectRequest);
+ incrementPutCompletedStatistics(true, len);
+ return upload;
+ } catch (AmazonClientException e) {
+ incrementPutCompletedStatistics(false, len);
+ throw e;
+ }
+ }
+
+ /**
+ * PUT an object directly (i.e. not via the transfer manager).
+ * Byte length is calculated from the file length, or, if there is no
+ * file, from the content length of the header.
+ * @param putObjectRequest the request
+ * @return the upload initiated
+ * @throws AmazonClientException on problems
+ */
+ public PutObjectResult putObjectDirect(PutObjectRequest putObjectRequest)
+ throws AmazonClientException {
+ long len;
+ if (putObjectRequest.getFile() != null) {
+ len = putObjectRequest.getFile().length();
+ } else {
+ len = putObjectRequest.getMetadata().getContentLength();
+ }
+ incrementPutStartStatistics(len);
+ try {
+ PutObjectResult result = s3.putObject(putObjectRequest);
+ incrementPutCompletedStatistics(true, len);
+ return result;
+ } catch (AmazonClientException e) {
+ incrementPutCompletedStatistics(false, len);
+ throw e;
+ }
}
/**
@@ -926,10 +1048,20 @@ public Upload putObject(PutObjectRequest putObjectRequest) {
* Increments the write and put counters
* @param request request
* @return the result of the operation.
+ * @throws AmazonClientException on problems
*/
- public UploadPartResult uploadPart(UploadPartRequest request) {
- incrementPutStartStatistics(request.getPartSize());
- return s3.uploadPart(request);
+ public UploadPartResult uploadPart(UploadPartRequest request)
+ throws AmazonClientException {
+ long len = request.getPartSize();
+ incrementPutStartStatistics(len);
+ try {
+ UploadPartResult uploadPartResult = s3.uploadPart(request);
+ incrementPutCompletedStatistics(true, len);
+ return uploadPartResult;
+ } catch (AmazonClientException e) {
+ incrementPutCompletedStatistics(false, len);
+ throw e;
+ }
}
/**
@@ -942,9 +1074,28 @@ public void incrementPutStartStatistics(long bytes) {
LOG.debug("PUT start {} bytes", bytes);
incrementWriteOperations();
incrementStatistic(OBJECT_PUT_REQUESTS);
+ incrementGauge(OBJECT_PUT_REQUESTS_ACTIVE, 1);
+ if (bytes > 0) {
+ incrementGauge(OBJECT_PUT_BYTES_PENDING, bytes);
+ }
+ }
+
+ /**
+ * At the end of a put/multipart upload operation, update the
+ * relevant counters and gauges.
+ *
+ * @param success did the operation succeed?
+ * @param bytes bytes in the request.
+ */
+ public void incrementPutCompletedStatistics(boolean success, long bytes) {
+ LOG.debug("PUT completed success={}; {} bytes", success, bytes);
+ incrementWriteOperations();
if (bytes > 0) {
incrementStatistic(OBJECT_PUT_BYTES, bytes);
+ decrementGauge(OBJECT_PUT_BYTES_PENDING, bytes);
}
+ incrementStatistic(OBJECT_PUT_REQUESTS_COMPLETED);
+ decrementGauge(OBJECT_PUT_REQUESTS_ACTIVE, 1);
}
/**
@@ -955,7 +1106,7 @@ public void incrementPutStartStatistics(long bytes) {
* @param bytes bytes successfully uploaded.
*/
public void incrementPutProgressStatistics(String key, long bytes) {
- LOG.debug("PUT {}: {} bytes", key, bytes);
+ PROGRESS.debug("PUT {}: {} bytes", key, bytes);
incrementWriteOperations();
if (bytes > 0) {
statistics.incrementBytesWritten(bytes);
@@ -1475,7 +1626,7 @@ private void innerCopyFromLocalFile(boolean delSrc, boolean overwrite,
LocalFileSystem local = getLocal(getConf());
File srcfile = local.pathToFile(src);
- final ObjectMetadata om = newObjectMetadata();
+ final ObjectMetadata om = newObjectMetadata(srcfile.length());
PutObjectRequest putObjectRequest = newPutObjectRequest(key, om, srcfile);
Upload up = putObject(putObjectRequest);
ProgressableProgressListener listener = new ProgressableProgressListener(
@@ -1743,6 +1894,10 @@ public String toString() {
.append(serverSideEncryptionAlgorithm)
.append('\'');
}
+ if (blockFactory != null) {
+ sb.append(", blockFactory=").append(blockFactory);
+ }
+ sb.append(", executor=").append(threadPoolExecutor);
sb.append(", statistics {")
.append(statistics)
.append("}");
@@ -1950,4 +2105,163 @@ LocatedFileStatus toLocatedFileStatus(FileStatus status)
getFileBlockLocations(status, 0, status.getLen())
: null);
}
+
+ /**
+ * Helper for an ongoing write operation.
+ *
+ * It hides direct access to the S3 API from the output stream,
+ * and is a location where the object upload process can be evolved/enhanced.
+ *
+ * Features
+ *
+ * - Methods to create and submit requests to S3, so avoiding
+ * all direct interaction with the AWS APIs.
+ * - Some extra preflight checks of arguments, so failing fast on
+ * errors.
+ * - Callbacks to let the FS know of events in the output stream
+ * upload process.
+ *
+ *
+ * Each instance of this state is unique to a single output stream.
+ */
+ final class WriteOperationHelper {
+ private final String key;
+
+ private WriteOperationHelper(String key) {
+ this.key = key;
+ }
+
+ /**
+ * Create a {@link PutObjectRequest} request.
+ * The metadata is assumed to have been configured with the size of the
+ * operation.
+ * @param inputStream source data.
+ * @param length size, if known. Use -1 for not known
+ * @return the request
+ */
+ PutObjectRequest newPutRequest(InputStream inputStream, long length) {
+ return newPutObjectRequest(key, newObjectMetadata(length), inputStream);
+ }
+
+ /**
+ * Callback on a successful write.
+ */
+ void writeSuccessful() {
+ finishedWrite(key);
+ }
+
+ /**
+ * Callback on a write failure.
+ * @param e Any exception raised which triggered the failure.
+ */
+ void writeFailed(Exception e) {
+ LOG.debug("Write to {} failed", this, e);
+ }
+
+ /**
+ * Create a new object metadata instance.
+ * Any standard metadata headers are added here, for example:
+ * encryption.
+ * @param length size, if known. Use -1 for not known
+ * @return a new metadata instance
+ */
+ public ObjectMetadata newObjectMetadata(long length) {
+ return S3AFileSystem.this.newObjectMetadata(length);
+ }
+
+ /**
+ * Start the multipart upload process.
+ * @return the upload result containing the ID
+ * @throws IOException IO problem
+ */
+ String initiateMultiPartUpload() throws IOException {
+ LOG.debug("Initiating Multipart upload");
+ final InitiateMultipartUploadRequest initiateMPURequest =
+ new InitiateMultipartUploadRequest(bucket,
+ key,
+ newObjectMetadata(-1));
+ initiateMPURequest.setCannedACL(cannedACL);
+ try {
+ return s3.initiateMultipartUpload(initiateMPURequest)
+ .getUploadId();
+ } catch (AmazonClientException ace) {
+ throw translateException("initiate MultiPartUpload", key, ace);
+ }
+ }
+
+ /**
+ * Complete a multipart upload operation.
+ * @param uploadId multipart operation Id
+ * @param partETags list of partial uploads
+ * @return the result
+ * @throws AmazonClientException on problems.
+ */
+ CompleteMultipartUploadResult completeMultipartUpload(String uploadId,
+ List partETags) throws AmazonClientException {
+ Preconditions.checkNotNull(uploadId);
+ Preconditions.checkNotNull(partETags);
+ Preconditions.checkArgument(!partETags.isEmpty(),
+ "No partitions have been uploaded");
+ return s3.completeMultipartUpload(
+ new CompleteMultipartUploadRequest(bucket,
+ key,
+ uploadId,
+ partETags));
+ }
+
+ /**
+ * Abort a multipart upload operation.
+ * @param uploadId multipart operation Id
+ * @return the result
+ * @throws AmazonClientException on problems.
+ */
+ void abortMultipartUpload(String uploadId) throws AmazonClientException {
+ s3.abortMultipartUpload(
+ new AbortMultipartUploadRequest(bucket, key, uploadId));
+ }
+
+ /**
+ * Create and initialize a part request of a multipart upload.
+ * @param uploadId ID of ongoing upload
+ * @param uploadStream source of data to upload
+ * @param partNumber current part number of the upload
+ * @param size amount of data
+ * @return the request.
+ */
+ UploadPartRequest newUploadPartRequest(String uploadId,
+ InputStream uploadStream,
+ int partNumber,
+ int size) {
+ Preconditions.checkNotNull(uploadId);
+ Preconditions.checkNotNull(uploadStream);
+ Preconditions.checkArgument(size > 0, "Invalid partition size %s", size);
+ Preconditions.checkArgument(partNumber> 0 && partNumber <=10000,
+ "partNumber must be between 1 and 10000 inclusive, but is %s",
+ partNumber);
+
+ LOG.debug("Creating part upload request for {} #{} size {}",
+ uploadId, partNumber, size);
+ return new UploadPartRequest()
+ .withBucketName(bucket)
+ .withKey(key)
+ .withUploadId(uploadId)
+ .withInputStream(uploadStream)
+ .withPartNumber(partNumber)
+ .withPartSize(size);
+ }
+
+ /**
+ * The toString method is intended to be used in logging/toString calls.
+ * @return a string description.
+ */
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder(
+ "{bucket=").append(bucket);
+ sb.append(", key='").append(key).append('\'');
+ sb.append('}');
+ return sb.toString();
+ }
+ }
+
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
index 26b5b51b6c..963c53facd 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
@@ -18,7 +18,9 @@
package org.apache.hadoop.fs.s3a;
-import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.metrics2.MetricStringBuilder;
@@ -29,10 +31,12 @@
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
import org.apache.hadoop.metrics2.lib.MutableMetric;
+import java.io.Closeable;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
import static org.apache.hadoop.fs.s3a.Statistic.*;
@@ -50,6 +54,9 @@
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class S3AInstrumentation {
+ private static final Logger LOG = LoggerFactory.getLogger(
+ S3AInstrumentation.class);
+
public static final String CONTEXT = "S3AFileSystem";
private final MetricsRegistry registry =
new MetricsRegistry("S3AFileSystem").setContext(CONTEXT);
@@ -100,7 +107,23 @@ public class S3AInstrumentation {
OBJECT_METADATA_REQUESTS,
OBJECT_MULTIPART_UPLOAD_ABORTED,
OBJECT_PUT_BYTES,
- OBJECT_PUT_REQUESTS
+ OBJECT_PUT_REQUESTS,
+ OBJECT_PUT_REQUESTS_COMPLETED,
+ STREAM_WRITE_FAILURES,
+ STREAM_WRITE_BLOCK_UPLOADS,
+ STREAM_WRITE_BLOCK_UPLOADS_COMMITTED,
+ STREAM_WRITE_BLOCK_UPLOADS_ABORTED,
+ STREAM_WRITE_TOTAL_TIME,
+ STREAM_WRITE_TOTAL_DATA,
+ };
+
+
+ private static final Statistic[] GAUGES_TO_CREATE = {
+ OBJECT_PUT_REQUESTS_ACTIVE,
+ OBJECT_PUT_BYTES_PENDING,
+ STREAM_WRITE_BLOCK_UPLOADS_ACTIVE,
+ STREAM_WRITE_BLOCK_UPLOADS_PENDING,
+ STREAM_WRITE_BLOCK_UPLOADS_DATA_PENDING,
};
public S3AInstrumentation(URI name) {
@@ -143,6 +166,9 @@ public S3AInstrumentation(URI name) {
for (Statistic statistic : COUNTERS_TO_CREATE) {
counter(statistic);
}
+ for (Statistic statistic : GAUGES_TO_CREATE) {
+ gauge(statistic.getSymbol(), statistic.getDescription());
+ }
}
/**
@@ -254,13 +280,13 @@ public long getCounterValue(String name) {
* Lookup a counter by name. Return null if it is not known.
* @param name counter name
* @return the counter
+ * @throws IllegalStateException if the metric is not a counter
*/
private MutableCounterLong lookupCounter(String name) {
MutableMetric metric = lookupMetric(name);
if (metric == null) {
return null;
}
- Preconditions.checkNotNull(metric, "not found: " + name);
if (!(metric instanceof MutableCounterLong)) {
throw new IllegalStateException("Metric " + name
+ " is not a MutableCounterLong: " + metric);
@@ -268,6 +294,20 @@ private MutableCounterLong lookupCounter(String name) {
return (MutableCounterLong) metric;
}
+ /**
+ * Look up a gauge.
+ * @param name gauge name
+ * @return the gauge or null
+ * @throws ClassCastException if the metric is not a Gauge.
+ */
+ public MutableGaugeLong lookupGauge(String name) {
+ MutableMetric metric = lookupMetric(name);
+ if (metric == null) {
+ LOG.debug("No gauge {}", name);
+ }
+ return (MutableGaugeLong) metric;
+ }
+
/**
* Look up a metric from both the registered set and the lighter weight
* stream entries.
@@ -349,6 +389,47 @@ public void incrementCounter(Statistic op, long count) {
counter.incr(count);
}
}
+ /**
+ * Increment a specific counter.
+ * No-op if not defined.
+ * @param op operation
+ * @param count atomic long containing value
+ */
+ public void incrementCounter(Statistic op, AtomicLong count) {
+ incrementCounter(op, count.get());
+ }
+
+ /**
+ * Increment a specific gauge.
+ * No-op if not defined.
+ * @param op operation
+ * @param count increment value
+ * @throws ClassCastException if the metric is of the wrong type
+ */
+ public void incrementGauge(Statistic op, long count) {
+ MutableGaugeLong gauge = lookupGauge(op.getSymbol());
+ if (gauge != null) {
+ gauge.incr(count);
+ } else {
+ LOG.debug("No Gauge: "+ op);
+ }
+ }
+
+ /**
+ * Decrement a specific gauge.
+ * No-op if not defined.
+ * @param op operation
+ * @param count increment value
+ * @throws ClassCastException if the metric is of the wrong type
+ */
+ public void decrementGauge(Statistic op, long count) {
+ MutableGaugeLong gauge = lookupGauge(op.getSymbol());
+ if (gauge != null) {
+ gauge.decr(count);
+ } else {
+ LOG.debug("No Gauge: " + op);
+ }
+ }
/**
* Create a stream input statistics instance.
@@ -553,4 +634,165 @@ public String toString() {
return sb.toString();
}
}
+
+ /**
+ * Create a stream output statistics instance.
+ * @return the new instance
+ */
+
+ OutputStreamStatistics newOutputStreamStatistics() {
+ return new OutputStreamStatistics();
+ }
+
+ /**
+ * Merge in the statistics of a single output stream into
+ * the filesystem-wide statistics.
+ * @param statistics stream statistics
+ */
+ private void mergeOutputStreamStatistics(OutputStreamStatistics statistics) {
+ incrementCounter(STREAM_WRITE_TOTAL_TIME, statistics.totalUploadDuration());
+ incrementCounter(STREAM_WRITE_QUEUE_DURATION, statistics.queueDuration);
+ incrementCounter(STREAM_WRITE_TOTAL_DATA, statistics.bytesUploaded);
+ incrementCounter(STREAM_WRITE_BLOCK_UPLOADS,
+ statistics.blockUploadsCompleted);
+ }
+
+ /**
+ * Statistics updated by an output stream during its actual operation.
+ * Some of these stats may be relayed. However, as block upload is
+ * spans multiple
+ */
+ @InterfaceAudience.Private
+ @InterfaceStability.Unstable
+ public final class OutputStreamStatistics implements Closeable {
+ private final AtomicLong blocksSubmitted = new AtomicLong(0);
+ private final AtomicLong blocksInQueue = new AtomicLong(0);
+ private final AtomicLong blocksActive = new AtomicLong(0);
+ private final AtomicLong blockUploadsCompleted = new AtomicLong(0);
+ private final AtomicLong blockUploadsFailed = new AtomicLong(0);
+ private final AtomicLong bytesPendingUpload = new AtomicLong(0);
+
+ private final AtomicLong bytesUploaded = new AtomicLong(0);
+ private final AtomicLong transferDuration = new AtomicLong(0);
+ private final AtomicLong queueDuration = new AtomicLong(0);
+ private final AtomicLong exceptionsInMultipartFinalize = new AtomicLong(0);
+
+ /**
+ * Block is queued for upload.
+ */
+ void blockUploadQueued(int blockSize) {
+ blocksSubmitted.incrementAndGet();
+ blocksInQueue.incrementAndGet();
+ bytesPendingUpload.addAndGet(blockSize);
+ incrementGauge(STREAM_WRITE_BLOCK_UPLOADS_PENDING, 1);
+ incrementGauge(STREAM_WRITE_BLOCK_UPLOADS_DATA_PENDING, blockSize);
+ }
+
+ /** Queued block has been scheduled for upload. */
+ void blockUploadStarted(long duration, int blockSize) {
+ queueDuration.addAndGet(duration);
+ blocksInQueue.decrementAndGet();
+ blocksActive.incrementAndGet();
+ incrementGauge(STREAM_WRITE_BLOCK_UPLOADS_PENDING, -1);
+ incrementGauge(STREAM_WRITE_BLOCK_UPLOADS_ACTIVE, 1);
+ }
+
+ /** A block upload has completed. */
+ void blockUploadCompleted(long duration, int blockSize) {
+ this.transferDuration.addAndGet(duration);
+ incrementGauge(STREAM_WRITE_BLOCK_UPLOADS_ACTIVE, -1);
+ blocksActive.decrementAndGet();
+ blockUploadsCompleted.incrementAndGet();
+ }
+
+ /**
+ * A block upload has failed.
+ * A final transfer completed event is still expected, so this
+ * does not decrement the active block counter.
+ */
+ void blockUploadFailed(long duration, int blockSize) {
+ blockUploadsFailed.incrementAndGet();
+ }
+
+ /** Intermediate report of bytes uploaded. */
+ void bytesTransferred(long byteCount) {
+ bytesUploaded.addAndGet(byteCount);
+ bytesPendingUpload.addAndGet(-byteCount);
+ incrementGauge(STREAM_WRITE_BLOCK_UPLOADS_DATA_PENDING, -byteCount);
+ }
+
+ /**
+ * Note an exception in a multipart complete.
+ */
+ void exceptionInMultipartComplete() {
+ exceptionsInMultipartFinalize.incrementAndGet();
+ }
+
+ /**
+ * Note an exception in a multipart abort.
+ */
+ void exceptionInMultipartAbort() {
+ exceptionsInMultipartFinalize.incrementAndGet();
+ }
+
+ /**
+ * Get the number of bytes pending upload.
+ * @return the number of bytes in the pending upload state.
+ */
+ public long getBytesPendingUpload() {
+ return bytesPendingUpload.get();
+ }
+
+ /**
+ * Output stream has closed.
+ * Trigger merge in of all statistics not updated during operation.
+ */
+ @Override
+ public void close() {
+ if (bytesPendingUpload.get() > 0) {
+ LOG.warn("Closing output stream statistics while data is still marked" +
+ " as pending upload in {}", this);
+ }
+ mergeOutputStreamStatistics(this);
+ }
+
+ long averageQueueTime() {
+ return blocksSubmitted.get() > 0 ?
+ (queueDuration.get() / blocksSubmitted.get()) : 0;
+ }
+
+ double effectiveBandwidth() {
+ double duration = totalUploadDuration() / 1000.0;
+ return duration > 0 ?
+ (bytesUploaded.get() / duration) : 0;
+ }
+
+ long totalUploadDuration() {
+ return queueDuration.get() + transferDuration.get();
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder(
+ "OutputStreamStatistics{");
+ sb.append("blocksSubmitted=").append(blocksSubmitted);
+ sb.append(", blocksInQueue=").append(blocksInQueue);
+ sb.append(", blocksActive=").append(blocksActive);
+ sb.append(", blockUploadsCompleted=").append(blockUploadsCompleted);
+ sb.append(", blockUploadsFailed=").append(blockUploadsFailed);
+ sb.append(", bytesPendingUpload=").append(bytesPendingUpload);
+ sb.append(", bytesUploaded=").append(bytesUploaded);
+ sb.append(", exceptionsInMultipartFinalize=").append(
+ exceptionsInMultipartFinalize);
+ sb.append(", transferDuration=").append(transferDuration).append(" ms");
+ sb.append(", queueDuration=").append(queueDuration).append(" ms");
+ sb.append(", averageQueueTime=").append(averageQueueTime()).append(" ms");
+ sb.append(", totalUploadDuration=").append(totalUploadDuration())
+ .append(" ms");
+ sb.append(", effectiveBandwidth=").append(effectiveBandwidth())
+ .append(" bytes/s");
+ sb.append('}');
+ return sb.toString();
+ }
+ }
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java
index 23ba6828e7..6ebc9e496a 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -35,8 +35,8 @@
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
+import java.util.concurrent.atomic.AtomicBoolean;
-import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
/**
@@ -45,37 +45,27 @@
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class S3AOutputStream extends OutputStream {
- private OutputStream backupStream;
- private File backupFile;
- private boolean closed;
- private String key;
- private Progressable progress;
- private long partSize;
- private long partSizeThreshold;
- private S3AFileSystem fs;
- private LocalDirAllocator lDirAlloc;
+ private final OutputStream backupStream;
+ private final File backupFile;
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+ private final String key;
+ private final Progressable progress;
+ private final S3AFileSystem fs;
public static final Logger LOG = S3AFileSystem.LOG;
public S3AOutputStream(Configuration conf,
- S3AFileSystem fs, String key, Progressable progress)
+ S3AFileSystem fs,
+ String key,
+ Progressable progress)
throws IOException {
this.key = key;
this.progress = progress;
this.fs = fs;
- partSize = fs.getPartitionSize();
- partSizeThreshold = fs.getMultiPartThreshold();
- if (conf.get(BUFFER_DIR, null) != null) {
- lDirAlloc = new LocalDirAllocator(BUFFER_DIR);
- } else {
- lDirAlloc = new LocalDirAllocator("${hadoop.tmp.dir}/s3a");
- }
-
- backupFile = lDirAlloc.createTmpFileForWrite("output-",
+ backupFile = fs.createTmpFileForWrite("output-",
LocalDirAllocator.SIZE_UNKNOWN, conf);
- closed = false;
LOG.debug("OutputStream for key '{}' writing to tempfile: {}",
key, backupFile);
@@ -84,25 +74,33 @@ public S3AOutputStream(Configuration conf,
new FileOutputStream(backupFile));
}
+ /**
+ * Check for the filesystem being open.
+ * @throws IOException if the filesystem is closed.
+ */
+ void checkOpen() throws IOException {
+ if (closed.get()) {
+ throw new IOException("Output Stream closed");
+ }
+ }
+
@Override
public void flush() throws IOException {
+ checkOpen();
backupStream.flush();
}
@Override
- public synchronized void close() throws IOException {
- if (closed) {
+ public void close() throws IOException {
+ if (closed.getAndSet(true)) {
return;
}
backupStream.close();
LOG.debug("OutputStream for key '{}' closed. Now beginning upload", key);
- LOG.debug("Minimum upload part size: {} threshold {}" , partSize,
- partSizeThreshold);
-
try {
- final ObjectMetadata om = fs.newObjectMetadata();
+ final ObjectMetadata om = fs.newObjectMetadata(backupFile.length());
Upload upload = fs.putObject(
fs.newPutObjectRequest(
key,
@@ -126,18 +124,19 @@ public synchronized void close() throws IOException {
LOG.warn("Could not delete temporary s3a file: {}", backupFile);
}
super.close();
- closed = true;
}
LOG.debug("OutputStream for key '{}' upload complete", key);
}
@Override
public void write(int b) throws IOException {
+ checkOpen();
backupStream.write(b);
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
+ checkOpen();
backupStream.write(b, off, len);
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
index 93d819b72a..c89f6904cd 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
@@ -49,6 +49,7 @@
import static org.apache.hadoop.fs.s3a.Constants.ACCESS_KEY;
import static org.apache.hadoop.fs.s3a.Constants.AWS_CREDENTIALS_PROVIDER;
import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT;
+import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_MIN_SIZE;
import static org.apache.hadoop.fs.s3a.Constants.SECRET_KEY;
/**
@@ -460,4 +461,42 @@ static long longOption(Configuration conf,
key, v, min));
return v;
}
+
+ /**
+ * Get a size property from the configuration: this property must
+ * be at least equal to {@link Constants#MULTIPART_MIN_SIZE}.
+ * If it is too small, it is rounded up to that minimum, and a warning
+ * printed.
+ * @param conf configuration
+ * @param property property name
+ * @param defVal default value
+ * @return the value, guaranteed to be above the minimum size
+ */
+ public static long getMultipartSizeProperty(Configuration conf,
+ String property, long defVal) {
+ long partSize = conf.getLong(property, defVal);
+ if (partSize < MULTIPART_MIN_SIZE) {
+ LOG.warn("{} must be at least 5 MB; configured value is {}",
+ property, partSize);
+ partSize = MULTIPART_MIN_SIZE;
+ }
+ return partSize;
+ }
+
+ /**
+ * Ensure that the long value is in the range of an integer.
+ * @param name property name for error messages
+ * @param size original size
+ * @return the size, guaranteed to be less than or equal to the max
+ * value of an integer.
+ */
+ public static int ensureOutputParameterInRange(String name, long size) {
+ if (size > Integer.MAX_VALUE) {
+ LOG.warn("s3a: {} capped to ~2.14GB" +
+ " (maximum allowed size with current output mechanism)", name);
+ return Integer.MAX_VALUE;
+ } else {
+ return (int)size;
+ }
+ }
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/SemaphoredDelegatingExecutor.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/SemaphoredDelegatingExecutor.java
new file mode 100644
index 0000000000..6b21912871
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/SemaphoredDelegatingExecutor.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import com.google.common.util.concurrent.ForwardingListeningExecutorService;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * This ExecutorService blocks the submission of new tasks when its queue is
+ * already full by using a semaphore. Task submissions require permits, task
+ * completions release permits.
+ *
+ * This is a refactoring of {@link BlockingThreadPoolExecutorService}; that code
+ * contains the thread pool logic, whereas this isolates the semaphore
+ * and submit logic for use with other thread pools and delegation models.
+ * In particular, it permits multiple per stream executors to share a
+ * single per-FS-instance executor; the latter to throttle overall
+ * load from the the FS, the others to limit the amount of load which
+ * a single output stream can generate.
+ *
+ * This is inspired by
+ * this s4 threadpool
+ */
+@SuppressWarnings("NullableProblems")
+@InterfaceAudience.Private
+class SemaphoredDelegatingExecutor extends
+ ForwardingListeningExecutorService {
+
+ private final Semaphore queueingPermits;
+ private final ListeningExecutorService executorDelegatee;
+ private final int permitCount;
+
+ /**
+ * Instantiate.
+ * @param executorDelegatee Executor to delegate to
+ * @param permitCount number of permits into the queue permitted
+ * @param fair should the semaphore be "fair"
+ */
+ SemaphoredDelegatingExecutor(ListeningExecutorService executorDelegatee,
+ int permitCount,
+ boolean fair) {
+ this.permitCount = permitCount;
+ queueingPermits = new Semaphore(permitCount, fair);
+ this.executorDelegatee = executorDelegatee;
+ }
+
+ @Override
+ protected ListeningExecutorService delegate() {
+ return executorDelegatee;
+ }
+
+
+ @Override
+ public List> invokeAll(Collection extends Callable> tasks)
+ throws InterruptedException {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public List> invokeAll(Collection extends Callable> tasks,
+ long timeout, TimeUnit unit) throws InterruptedException {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public T invokeAny(Collection extends Callable> tasks)
+ throws InterruptedException, ExecutionException {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public T invokeAny(Collection extends Callable> tasks, long timeout,
+ TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public ListenableFuture submit(Callable task) {
+ try {
+ queueingPermits.acquire();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return Futures.immediateFailedCheckedFuture(e);
+ }
+ return super.submit(new CallableWithPermitRelease<>(task));
+ }
+
+ @Override
+ public ListenableFuture submit(Runnable task, T result) {
+ try {
+ queueingPermits.acquire();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return Futures.immediateFailedCheckedFuture(e);
+ }
+ return super.submit(new RunnableWithPermitRelease(task), result);
+ }
+
+ @Override
+ public ListenableFuture> submit(Runnable task) {
+ try {
+ queueingPermits.acquire();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return Futures.immediateFailedCheckedFuture(e);
+ }
+ return super.submit(new RunnableWithPermitRelease(task));
+ }
+
+ @Override
+ public void execute(Runnable command) {
+ try {
+ queueingPermits.acquire();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ super.execute(new RunnableWithPermitRelease(command));
+ }
+
+ /**
+ * Get the number of permits available; guaranteed to be
+ * {@code 0 <= availablePermits <= size}.
+ * @return the number of permits available at the time of invocation.
+ */
+ public int getAvailablePermits() {
+ return queueingPermits.availablePermits();
+ }
+
+ /**
+ * Get the number of threads waiting to acquire a permit.
+ * @return snapshot of the length of the queue of blocked threads.
+ */
+ public int getWaitingCount() {
+ return queueingPermits.getQueueLength();
+ }
+
+ /**
+ * Total number of permits.
+ * @return the number of permits as set in the constructor
+ */
+ public int getPermitCount() {
+ return permitCount;
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder(
+ "SemaphoredDelegatingExecutor{");
+ sb.append("permitCount=").append(getPermitCount());
+ sb.append(", available=").append(getAvailablePermits());
+ sb.append(", waiting=").append(getWaitingCount());
+ sb.append('}');
+ return sb.toString();
+ }
+
+ /**
+ * Releases a permit after the task is executed.
+ */
+ class RunnableWithPermitRelease implements Runnable {
+
+ private Runnable delegatee;
+
+ public RunnableWithPermitRelease(Runnable delegatee) {
+ this.delegatee = delegatee;
+ }
+
+ @Override
+ public void run() {
+ try {
+ delegatee.run();
+ } finally {
+ queueingPermits.release();
+ }
+
+ }
+ }
+
+ /**
+ * Releases a permit after the task is completed.
+ */
+ class CallableWithPermitRelease implements Callable {
+
+ private Callable delegatee;
+
+ public CallableWithPermitRelease(Callable delegatee) {
+ this.delegatee = delegatee;
+ }
+
+ @Override
+ public T call() throws Exception {
+ try {
+ return delegatee.call();
+ } finally {
+ queueingPermits.release();
+ }
+ }
+
+ }
+
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
index d84a355991..36ec50b723 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
@@ -81,10 +81,16 @@ public enum Statistic {
"Object multipart upload aborted"),
OBJECT_PUT_REQUESTS("object_put_requests",
"Object put/multipart upload count"),
+ OBJECT_PUT_REQUESTS_COMPLETED("object_put_requests_completed",
+ "Object put/multipart upload completed count"),
+ OBJECT_PUT_REQUESTS_ACTIVE("object_put_requests_active",
+ "Current number of active put requests"),
OBJECT_PUT_BYTES("object_put_bytes", "number of bytes uploaded"),
+ OBJECT_PUT_BYTES_PENDING("object_put_bytes_pending",
+ "number of bytes queued for upload/being actively uploaded"),
STREAM_ABORTED("stream_aborted",
"Count of times the TCP stream was aborted"),
- STREAM_BACKWARD_SEEK_OPERATIONS("stream_backward_seek_pperations",
+ STREAM_BACKWARD_SEEK_OPERATIONS("stream_backward_seek_operations",
"Number of executed seek operations which went backwards in a stream"),
STREAM_CLOSED("streamClosed", "Count of times the TCP stream was closed"),
STREAM_CLOSE_OPERATIONS("stream_close_operations",
@@ -112,7 +118,29 @@ public enum Statistic {
STREAM_CLOSE_BYTES_READ("stream_bytes_read_in_close",
"Count of bytes read when closing streams during seek operations."),
STREAM_ABORT_BYTES_DISCARDED("stream_bytes_discarded_in_abort",
- "Count of bytes discarded by aborting the stream");
+ "Count of bytes discarded by aborting the stream"),
+ STREAM_WRITE_FAILURES("stream_write_failures",
+ "Count of stream write failures reported"),
+ STREAM_WRITE_BLOCK_UPLOADS("stream_write_block_uploads",
+ "Count of block/partition uploads completed"),
+ STREAM_WRITE_BLOCK_UPLOADS_ACTIVE("stream_write_block_uploads_active",
+ "Count of block/partition uploads completed"),
+ STREAM_WRITE_BLOCK_UPLOADS_COMMITTED("stream_write_block_uploads_committed",
+ "Count of number of block uploads committed"),
+ STREAM_WRITE_BLOCK_UPLOADS_ABORTED("stream_write_block_uploads_aborted",
+ "Count of number of block uploads aborted"),
+
+ STREAM_WRITE_BLOCK_UPLOADS_PENDING("stream_write_block_uploads_pending",
+ "Gauge of block/partitions uploads queued to be written"),
+ STREAM_WRITE_BLOCK_UPLOADS_DATA_PENDING(
+ "stream_write_block_uploads_data_pending",
+ "Gauge of block/partitions data uploads queued to be written"),
+ STREAM_WRITE_TOTAL_TIME("stream_write_total_time",
+ "Count of total time taken for uploads to complete"),
+ STREAM_WRITE_TOTAL_DATA("stream_write_total_data",
+ "Count of total data uploaded in block output"),
+ STREAM_WRITE_QUEUE_DURATION("stream_write_queue_duration",
+ "Total queue duration of all block uploads");
private static final Map SYMBOL_MAP =
new HashMap<>(Statistic.values().length);
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
index cf785d5cf6..c23e782d38 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
@@ -1,3 +1,4 @@
+