diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteObject.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteObject.java index d749e9df02..3ab0022bb0 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteObject.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteObject.java @@ -27,6 +27,7 @@ import com.amazonaws.services.s3.model.GetObjectRequest; import com.amazonaws.services.s3.model.S3Object; +import com.amazonaws.services.s3.model.S3ObjectInputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,12 +37,10 @@ import org.apache.hadoop.fs.s3a.S3AReadOpContext; import org.apache.hadoop.fs.s3a.S3ObjectAttributes; import org.apache.hadoop.fs.s3a.impl.ChangeTracker; +import org.apache.hadoop.fs.s3a.impl.SDKStreamDrainer; import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; import org.apache.hadoop.fs.statistics.DurationTracker; -import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration; -import static org.apache.hadoop.io.IOUtils.cleanupWithLogger; - /** * Encapsulates low level interactions with S3 object on AWS. */ @@ -79,7 +78,7 @@ public class S3ARemoteObject { * Maps a stream returned by openForRead() to the associated S3 object. * That allows us to close the object when closing the stream. */ - private Map s3Objects; + private final Map s3Objects; /** * uri of the object being read. @@ -225,104 +224,27 @@ public InputStream openForRead(long offset, int size) throws IOException { void close(InputStream inputStream, int numRemainingBytes) { S3Object obj; synchronized (s3Objects) { - obj = s3Objects.get(inputStream); + obj = s3Objects.remove(inputStream); if (obj == null) { throw new IllegalArgumentException("inputStream not found"); } - s3Objects.remove(inputStream); } - + SDKStreamDrainer drainer = new SDKStreamDrainer( + uri, + obj, + (S3ObjectInputStream)inputStream, + false, + numRemainingBytes, + streamStatistics, + "close() operation"); if (numRemainingBytes <= context.getAsyncDrainThreshold()) { // don't bother with async io. - drain(false, "close() operation", numRemainingBytes, obj, inputStream); + drainer.apply(); } else { LOG.debug("initiating asynchronous drain of {} bytes", numRemainingBytes); - // schedule an async drain/abort with references to the fields so they - // can be reused - client.submit( - () -> drain(false, "close() operation", numRemainingBytes, obj, - inputStream)); + // schedule an async drain/abort + client.submit(drainer); } } - /** - * drain the stream. This method is intended to be - * used directly or asynchronously, and measures the - * duration of the operation in the stream statistics. - * - * @param shouldAbort force an abort; used if explicitly requested. - * @param reason reason for stream being closed; used in messages - * @param remaining remaining bytes - * @param requestObject http request object; - * @param inputStream stream to close. - * @return was the stream aborted? - */ - private boolean drain( - final boolean shouldAbort, - final String reason, - final long remaining, - final S3Object requestObject, - final InputStream inputStream) { - - try { - return invokeTrackingDuration( - streamStatistics.initiateInnerStreamClose(shouldAbort), - () -> drainOrAbortHttpStream(shouldAbort, reason, remaining, - requestObject, inputStream)); - } catch (IOException e) { - // this is only here because invokeTrackingDuration() has it in its - // signature - return shouldAbort; - } - } - - /** - * Drain or abort the inner stream. - * Exceptions are swallowed. - * If a close() is attempted and fails, the operation escalates to - * an abort. - * - * @param shouldAbort force an abort; used if explicitly requested. - * @param reason reason for stream being closed; used in messages - * @param remaining remaining bytes - * @param requestObject http request object - * @param inputStream stream to close. - * @return was the stream aborted? - */ - private boolean drainOrAbortHttpStream( - boolean shouldAbort, - final String reason, - final long remaining, - final S3Object requestObject, - final InputStream inputStream) { - - if (!shouldAbort && remaining > 0) { - try { - long drained = 0; - byte[] buffer = new byte[DRAIN_BUFFER_SIZE]; - while (true) { - final int count = inputStream.read(buffer); - if (count < 0) { - // no more data is left - break; - } - drained += count; - } - LOG.debug("Drained stream of {} bytes", drained); - } catch (Exception e) { - // exception escalates to an abort - LOG.debug("When closing {} stream for {}, will abort the stream", uri, - reason, e); - shouldAbort = true; - } - } - cleanupWithLogger(LOG, inputStream); - cleanupWithLogger(LOG, requestObject); - streamStatistics.streamClose(shouldAbort, remaining); - - LOG.debug("Stream {} {}: {}; remaining={}", uri, - (shouldAbort ? "aborted" : "closed"), reason, - remaining); - return shouldAbort; - } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestUnbufferDraining.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestUnbufferDraining.java index 0295c07e56..a03f181cb3 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestUnbufferDraining.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestUnbufferDraining.java @@ -45,6 +45,7 @@ import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE; import static org.apache.hadoop.fs.s3a.Constants.MAXIMUM_CONNECTIONS; import static org.apache.hadoop.fs.s3a.Constants.MAX_ERROR_RETRIES; +import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY; import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE; import static org.apache.hadoop.fs.s3a.Constants.REQUEST_TIMEOUT; import static org.apache.hadoop.fs.s3a.Constants.RETRY_LIMIT; @@ -102,6 +103,7 @@ public Configuration createConfiguration() { INPUT_FADVISE, MAX_ERROR_RETRIES, MAXIMUM_CONNECTIONS, + PREFETCH_ENABLED_KEY, READAHEAD_RANGE, REQUEST_TIMEOUT, RETRY_LIMIT,