From 266b1bd1bb93cc5100629e88a1276bba3c795c01 Mon Sep 17 00:00:00 2001 From: Bobby Wang Date: Sat, 31 Jul 2021 03:04:11 +0800 Subject: [PATCH] HADOOP-17812. NPE in S3AInputStream read() after failure to reconnect to store (#3222) This improves error handling after multiple failures reading data -when the read fails and attempts to reconnect() also fail. Contributed by Bobby Wang. --- .../apache/hadoop/fs/s3a/S3AInputStream.java | 29 ++++++++++++------- .../fs/s3a/TestS3AInputStreamRetry.java | 19 ++++++++++++ 2 files changed, 38 insertions(+), 10 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index b65dcc9529..86317ab21c 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -418,15 +418,21 @@ public synchronized int read() throws IOException { int byteRead = invoker.retry("read", pathStr, true, () -> { int b; + // When exception happens before re-setting wrappedStream in "reopen" called + // by onReadFailure, then wrappedStream will be null. But the **retry** may + // re-execute this block and cause NPE if we don't check wrappedStream + if (wrappedStream == null) { + reopen("failure recovery", getPos(), 1, false); + } try { b = wrappedStream.read(); } catch (EOFException e) { return -1; } catch (SocketTimeoutException e) { - onReadFailure(e, 1, true); + onReadFailure(e, true); throw e; } catch (IOException e) { - onReadFailure(e, 1, false); + onReadFailure(e, false); throw e; } return b; @@ -444,15 +450,12 @@ public synchronized int read() throws IOException { } /** - * Handle an IOE on a read by attempting to re-open the stream. + * Close the stream on read failure. * The filesystem's readException count will be incremented. * @param ioe exception caught. - * @param length length of data being attempted to read - * @throws IOException any exception thrown on the re-open attempt. */ @Retries.OnceTranslated - private void onReadFailure(IOException ioe, int length, boolean forceAbort) - throws IOException { + private void onReadFailure(IOException ioe, boolean forceAbort) { if (LOG.isDebugEnabled()) { LOG.debug("Got exception while trying to read from stream {}, " + "client: {} object: {}, trying to recover: ", @@ -463,7 +466,7 @@ private void onReadFailure(IOException ioe, int length, boolean forceAbort) uri, client, object); } streamStatistics.readException(); - reopen("failure recovery", pos, length, forceAbort); + closeStream("failure recovery", contentRangeFinish, forceAbort); } /** @@ -506,16 +509,22 @@ public synchronized int read(byte[] buf, int off, int len) int bytesRead = invoker.retry("read", pathStr, true, () -> { int bytes; + // When exception happens before re-setting wrappedStream in "reopen" called + // by onReadFailure, then wrappedStream will be null. But the **retry** may + // re-execute this block and cause NPE if we don't check wrappedStream + if (wrappedStream == null) { + reopen("failure recovery", getPos(), 1, false); + } try { bytes = wrappedStream.read(buf, off, len); } catch (EOFException e) { // the base implementation swallows EOFs. return -1; } catch (SocketTimeoutException e) { - onReadFailure(e, len, true); + onReadFailure(e, true); throw e; } catch (IOException e) { - onReadFailure(e, len, false); + onReadFailure(e, false); throw e; } return bytes; diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java index b1c46e0eed..a1e56c3ce4 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java @@ -23,6 +23,7 @@ import java.net.SocketException; import java.nio.charset.Charset; +import com.amazonaws.SdkClientException; import com.amazonaws.services.s3.model.GetObjectRequest; import com.amazonaws.services.s3.model.ObjectMetadata; import com.amazonaws.services.s3.model.S3Object; @@ -120,10 +121,28 @@ private S3AInputStream.InputStreamCallbacks getMockedInputStreamCallback() { return new S3AInputStream.InputStreamCallbacks() { private final S3Object mockedS3Object = getMockedS3Object(); + private Integer mockedS3ObjectIndex = 0; @Override public S3Object getObject(GetObjectRequest request) { // Set s3 client to return mocked s3object with defined read behavior. + mockedS3ObjectIndex++; + // open() -> lazySeek() -> reopen() + // -> getObject (mockedS3ObjectIndex=1) -> getObjectContent(objectInputStreamBad1) + // read() -> objectInputStreamBad1 throws exception + // -> onReadFailure -> close wrappedStream + // -> retry(1) -> wrappedStream==null -> reopen -> getObject (mockedS3ObjectIndex=2) + // -> getObjectContent(objectInputStreamBad2)-> objectInputStreamBad2 + // -> wrappedStream.read -> objectInputStreamBad2 throws exception + // -> onReadFailure -> close wrappedStream + // -> retry(2) -> wrappedStream==null -> reopen + // -> getObject (mockedS3ObjectIndex=3) throws exception + // -> retry(3) -> wrappedStream==null -> reopen -> getObject (mockedS3ObjectIndex=4) + // -> getObjectContent(objectInputStreamGood)-> objectInputStreamGood + // -> wrappedStream.read + if (mockedS3ObjectIndex == 3) { + throw new SdkClientException("Failed to get S3Object"); + } return mockedS3Object; }