From 3d2193cd64843c8fa7f33240d723e991c92c3ea4 Mon Sep 17 00:00:00 2001 From: yzhangal Date: Fri, 18 Dec 2020 11:08:10 -0800 Subject: [PATCH] HADOOP-17338. Intermittent S3AInputStream failures: Premature end of Content-Length delimited message body etc (#2497) Yongjun Zhang --- .../apache/hadoop/fs/s3a/S3AInputStream.java | 57 ++++++++++++++----- 1 file changed, 42 insertions(+), 15 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index 542fe34e96..bd8adad035 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -87,6 +87,14 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead, * set */ private volatile boolean closed; + /** + * wrappedStream is associated with an object (instance of S3Object). When + * the object is garbage collected, the associated wrappedStream will be + * closed. Keep a reference to this object to prevent the wrapperStream + * still in use from being closed unexpectedly due to garbage collection. + * See HADOOP-17338 for details. + */ + private S3Object object; private S3ObjectInputStream wrappedStream; private final S3AReadOpContext context; private final AmazonS3 client; @@ -202,7 +210,7 @@ private synchronized void reopen(String reason, long targetPos, long length, String text = String.format("%s %s at %d", operation, uri, targetPos); changeTracker.maybeApplyConstraint(request); - S3Object object = Invoker.once(text, uri, + object = Invoker.once(text, uri, () -> client.getObject(request)); changeTracker.processResponse(object, operation, @@ -430,9 +438,15 @@ public synchronized int read() throws IOException { @Retries.OnceTranslated private void onReadFailure(IOException ioe, int length, boolean forceAbort) throws IOException { - - LOG.info("Got exception while trying to read from stream {}" + - " trying to recover: " + ioe, uri); + if (LOG.isDebugEnabled()) { + LOG.debug("Got exception while trying to read from stream {}, " + + "client: {} object: {}, trying to recover: ", + uri, client, object, ioe); + } else { + LOG.info("Got exception while trying to read from stream {}, " + + "client: {} object: {}, trying to recover: " + ioe, + uri, client, object); + } streamStatistics.readException(); reopen("failure recovery", pos, length, forceAbort); } @@ -550,14 +564,19 @@ public synchronized void close() throws IOException { */ @Retries.OnceRaw private void closeStream(String reason, long length, boolean forceAbort) { - if (isObjectStreamOpen()) { + if (!isObjectStreamOpen()) { + // steam is already closed + return; + } - // if the amount of data remaining in the current request is greater - // than the readahead value: abort. - long remaining = remainingInCurrentRequest(); - LOG.debug("Closing stream {}: {}", reason, - forceAbort ? "abort" : "soft"); - boolean shouldAbort = forceAbort || remaining > readahead; + // if the amount of data remaining in the current request is greater + // than the readahead value: abort. + long remaining = remainingInCurrentRequest(); + LOG.debug("Closing stream {}: {}", reason, + forceAbort ? "abort" : "soft"); + boolean shouldAbort = forceAbort || remaining > readahead; + + try { if (!shouldAbort) { try { // clean close. This will read to the end of the stream, @@ -578,25 +597,33 @@ private void closeStream(String reason, long length, boolean forceAbort) { streamStatistics.streamClose(false, drained); } catch (Exception e) { // exception escalates to an abort - LOG.debug("When closing {} stream for {}", uri, reason, e); + LOG.debug("When closing {} stream for {}, will abort the stream", + uri, reason, e); shouldAbort = true; } } if (shouldAbort) { // Abort, rather than just close, the underlying stream. Otherwise, the // remaining object payload is read from S3 while closing the stream. - LOG.debug("Aborting stream"); - wrappedStream.abort(); + LOG.debug("Aborting stream {}", uri); + try { + wrappedStream.abort(); + } catch (Exception e) { + LOG.warn("When aborting {} stream after failing to close it for {}", + uri, reason, e); + } streamStatistics.streamClose(true, remaining); } LOG.debug("Stream {} {}: {}; remaining={} streamPos={}," + " nextReadPos={}," + - " request range {}-{} length={}", + " request range {}-{} length={}", uri, (shouldAbort ? "aborted" : "closed"), reason, remaining, pos, nextReadPos, contentRangeStart, contentRangeFinish, length); + } finally { wrappedStream = null; + object = null; } }