From 36198b5edf5b761b46fa5d1696ad6aa85b35b72a Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Tue, 16 Jan 2024 14:14:03 +0000 Subject: [PATCH] HADOOP-19027. S3A: S3AInputStream doesn't recover from HTTP/channel exceptions (#6425) Differentiate from "EOF out of range/end of GET" from "EOF channel problems" through two different subclasses of EOFException and input streams to always retry on http channel errors; out of range GET requests are not retried. Currently an EOFException is always treated as a fail-fast call in read() This allows for all existing external code catching EOFException to handle both, but S3AInputStream to cleanly differentiate range errors (map to -1) from channel errors (retry) - HttpChannelEOFException is subclass of EOFException, so all code which catches EOFException is still happy. retry policy: connectivityFailure - RangeNotSatisfiableEOFException is the subclass of EOFException raised on 416 GET range errors. retry policy: fail - Method ErrorTranslation.maybeExtractChannelException() to create this from shaded/unshaded NoHttpResponseException, using string match to avoid classpath problems. - And do this for SdkClientExceptions with OpenSSL error code WFOPENSSL0035. We believe this is the OpenSSL equivalent. - ErrorTranslation.maybeExtractIOException() to perform this translation as appropriate. S3AInputStream.reopen() code retries on EOF, except on RangeNotSatisfiableEOFException, which is converted to a -1 response to the caller as is done historically. S3AInputStream knows to handle these with read(): HttpChannelEOFException: stream aborting close then retry lazySeek(): Map RangeNotSatisfiableEOFException to -1, but do not map any other EOFException class raised. This means that * out of range reads map to -1 * channel problems in reopen are retried * channel problems in read() abort the failed http connection so it isn't recycled Tests for this using/abusing mocking. Testing through actually raising 416 exceptions and verifying that readFully(), char read() and vector reads are all good. There is no attempt to recover within a readFully(); there's a boolean constant switch to turn this on, but if anyone does it a test will spin forever as the inner PositionedReadable.read(position, buffer, len) downgrades all EOF exceptions to -1. A new method would need to be added which controls whether to downgrade/rethrow exceptions. What does that mean? Possibly reduced resilience to non-retried failures on the inner stream, even though more channel exceptions are retried on. Contributed by Steve Loughran --- .../fs/s3a/HttpChannelEOFException.java | 42 +++ .../org/apache/hadoop/fs/s3a/Invoker.java | 2 +- .../s3a/RangeNotSatisfiableEOFException.java | 39 +++ .../apache/hadoop/fs/s3a/S3AInputStream.java | 83 ++++-- .../apache/hadoop/fs/s3a/S3ARetryPolicy.java | 13 +- .../org/apache/hadoop/fs/s3a/S3AUtils.java | 22 +- .../fs/s3a/audit/AWSRequestAnalyzer.java | 7 +- .../auth/IAMInstanceCredentialsProvider.java | 3 +- .../hadoop/fs/s3a/impl/ErrorTranslation.java | 97 ++++++- .../s3a/ITestS3AContractVectoredRead.java | 57 +++- .../apache/hadoop/fs/s3a/S3ATestUtils.java | 59 ++++ .../fs/s3a/TestS3AExceptionTranslation.java | 125 ++++++++- .../fs/s3a/TestS3AInputStreamRetry.java | 242 ++++++++++++---- .../fs/s3a/impl/TestErrorTranslation.java | 10 +- .../fs/s3a/performance/ITestS3AOpenCost.java | 262 +++++++++++++++--- 15 files changed, 914 insertions(+), 149 deletions(-) create mode 100644 hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/HttpChannelEOFException.java create mode 100644 hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/RangeNotSatisfiableEOFException.java diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/HttpChannelEOFException.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/HttpChannelEOFException.java new file mode 100644 index 0000000000..665d485d7e --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/HttpChannelEOFException.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import java.io.EOFException; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Http channel exception; subclass of EOFException. + * In particular: + * - NoHttpResponseException + * - OpenSSL errors + * The http client library exceptions may be shaded/unshaded; this is the + * exception used in retry policies. + */ +@InterfaceAudience.Private +public class HttpChannelEOFException extends EOFException { + + public HttpChannelEOFException(final String path, + final String error, + final Throwable cause) { + super(error); + initCause(cause); + } +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Invoker.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Invoker.java index 9b2c95a90c..286e4e00a4 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Invoker.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Invoker.java @@ -478,7 +478,7 @@ public T retryUntranslated( if (caught instanceof IOException) { translated = (IOException) caught; } else { - translated = S3AUtils.translateException(text, "", + translated = S3AUtils.translateException(text, "/", (SdkException) caught); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/RangeNotSatisfiableEOFException.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/RangeNotSatisfiableEOFException.java new file mode 100644 index 0000000000..4c6b9decb0 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/RangeNotSatisfiableEOFException.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import java.io.EOFException; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Status code 416, range not satisfiable. + * Subclass of {@link EOFException} so that any code which expects that to + * be the outcome of a 416 failure will continue to work. + */ +@InterfaceAudience.Private +public class RangeNotSatisfiableEOFException extends EOFException { + + public RangeNotSatisfiableEOFException( + String operation, + Exception cause) { + super(operation); + initCause(cause); + } +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index 2ed9083efc..3d2ecc7737 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -99,6 +99,14 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead, public static final String OPERATION_OPEN = "open"; public static final String OPERATION_REOPEN = "re-open"; + /** + * Switch for behavior on when wrappedStream.read() + * returns -1 or raises an EOF; the original semantics + * are that the stream is kept open. + * Value {@value}. + */ + private static final boolean CLOSE_WRAPPED_STREAM_ON_NEGATIVE_READ = true; + /** * This is the maximum temporary buffer size we use while * populating the data in direct byte buffers during a vectored IO @@ -333,7 +341,7 @@ private void seekQuietly(long positiveTargetPos) { @Retries.OnceTranslated private void seekInStream(long targetPos, long length) throws IOException { checkNotClosed(); - if (wrappedStream == null) { + if (!isObjectStreamOpen()) { return; } // compute how much more to skip @@ -406,22 +414,29 @@ public boolean seekToNewSource(long targetPos) throws IOException { /** * Perform lazy seek and adjust stream to correct position for reading. - * + * If an EOF Exception is raised there are two possibilities + *
    + *
  1. the stream is at the end of the file
  2. + *
  3. something went wrong with the network connection
  4. + *
+ * This method does not attempt to distinguish; it assumes that an EOF + * exception is always "end of file". * @param targetPos position from where data should be read * @param len length of the content that needs to be read + * @throws RangeNotSatisfiableEOFException GET is out of range + * @throws IOException anything else. */ @Retries.RetryTranslated private void lazySeek(long targetPos, long len) throws IOException { Invoker invoker = context.getReadInvoker(); - invoker.maybeRetry(streamStatistics.getOpenOperations() == 0, - "lazySeek", pathStr, true, + invoker.retry("lazySeek to " + targetPos, pathStr, true, () -> { //For lazy seek seekInStream(targetPos, len); //re-open at specific location if needed - if (wrappedStream == null) { + if (!isObjectStreamOpen()) { reopen("read from new offset", targetPos, len, false); } }); @@ -449,7 +464,9 @@ public synchronized int read() throws IOException { try { lazySeek(nextReadPos, 1); - } catch (EOFException e) { + } catch (RangeNotSatisfiableEOFException e) { + // attempt to GET beyond the end of the object + LOG.debug("Downgrading 416 response attempt to read at {} to -1 response", nextReadPos); return -1; } @@ -460,14 +477,12 @@ public synchronized int read() throws IOException { // When exception happens before re-setting wrappedStream in "reopen" called // by onReadFailure, then wrappedStream will be null. But the **retry** may // re-execute this block and cause NPE if we don't check wrappedStream - if (wrappedStream == null) { + if (!isObjectStreamOpen()) { reopen("failure recovery", getPos(), 1, false); } try { b = wrappedStream.read(); - } catch (EOFException e) { - return -1; - } catch (SocketTimeoutException e) { + } catch (HttpChannelEOFException | SocketTimeoutException e) { onReadFailure(e, true); throw e; } catch (IOException e) { @@ -480,10 +495,9 @@ public synchronized int read() throws IOException { if (byteRead >= 0) { pos++; nextReadPos++; - } - - if (byteRead >= 0) { incrementBytesRead(1); + } else { + streamReadResultNegative(); } return byteRead; } @@ -509,6 +523,18 @@ private void onReadFailure(IOException ioe, boolean forceAbort) { closeStream("failure recovery", forceAbort, false); } + /** + * the read() call returned -1. + * this means "the connection has gone past the end of the object" or + * the stream has broken for some reason. + * so close stream (without an abort). + */ + private void streamReadResultNegative() { + if (CLOSE_WRAPPED_STREAM_ON_NEGATIVE_READ) { + closeStream("wrappedStream.read() returned -1", false, false); + } + } + /** * {@inheritDoc} * @@ -534,8 +560,8 @@ public synchronized int read(byte[] buf, int off, int len) try { lazySeek(nextReadPos, len); - } catch (EOFException e) { - // the end of the file has moved + } catch (RangeNotSatisfiableEOFException e) { + // attempt to GET beyond the end of the object return -1; } @@ -548,17 +574,19 @@ public synchronized int read(byte[] buf, int off, int len) // When exception happens before re-setting wrappedStream in "reopen" called // by onReadFailure, then wrappedStream will be null. But the **retry** may // re-execute this block and cause NPE if we don't check wrappedStream - if (wrappedStream == null) { + if (!isObjectStreamOpen()) { reopen("failure recovery", getPos(), 1, false); } try { + // read data; will block until there is data or the end of the stream is reached. + // returns 0 for "stream is open but no data yet" and -1 for "end of stream". bytes = wrappedStream.read(buf, off, len); - } catch (EOFException e) { - // the base implementation swallows EOFs. - return -1; - } catch (SocketTimeoutException e) { + } catch (HttpChannelEOFException | SocketTimeoutException e) { onReadFailure(e, true); throw e; + } catch (EOFException e) { + LOG.debug("EOFException raised by http stream read(); downgrading to a -1 response", e); + return -1; } catch (IOException e) { onReadFailure(e, false); throw e; @@ -569,8 +597,10 @@ public synchronized int read(byte[] buf, int off, int len) if (bytesRead > 0) { pos += bytesRead; nextReadPos += bytesRead; + incrementBytesRead(bytesRead); + } else { + streamReadResultNegative(); } - incrementBytesRead(bytesRead); streamStatistics.readOperationCompleted(len, bytesRead); return bytesRead; } @@ -818,6 +848,9 @@ public void readFully(long position, byte[] buffer, int offset, int length) while (nread < length) { int nbytes = read(buffer, offset + nread, length - nread); if (nbytes < 0) { + // no attempt is currently made to recover from stream read problems; + // a lazy seek to the offset is probably the solution. + // but it will need more qualification against failure handling throw new EOFException(FSExceptionMessages.EOF_IN_READ_FULLY); } nread += nbytes; @@ -987,7 +1020,7 @@ private void validateRangeRequest(FileRange range) throws EOFException { final String errMsg = String.format("Requested range [%d, %d) is beyond EOF for path %s", range.getOffset(), range.getLength(), pathStr); LOG.warn(errMsg); - throw new EOFException(errMsg); + throw new RangeNotSatisfiableEOFException(errMsg, null); } } @@ -1257,8 +1290,12 @@ public boolean hasCapability(String capability) { } } + /** + * Is the inner object stream open? + * @return true if there is an active HTTP request to S3. + */ @VisibleForTesting - boolean isObjectStreamOpen() { + public boolean isObjectStreamOpen() { return wrappedStream != null; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java index 9438ac22bd..faf105c8e2 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java @@ -209,9 +209,15 @@ protected Map, RetryPolicy> createExceptionMap() { // in this map. policyMap.put(AWSClientIOException.class, retryAwsClientExceptions); + // Http Channel issues: treat as communication failure + policyMap.put(HttpChannelEOFException.class, connectivityFailure); + // server didn't respond. policyMap.put(AWSNoResponseException.class, retryIdempotentCalls); + // range header is out of scope of object; retrying won't help + policyMap.put(RangeNotSatisfiableEOFException.class, fail); + // should really be handled by resubmitting to new location; // that's beyond the scope of this retry policy policyMap.put(AWSRedirectException.class, fail); @@ -251,10 +257,7 @@ protected Map, RetryPolicy> createExceptionMap() { policyMap.put(ConnectException.class, connectivityFailure); // this can be a sign of an HTTP connection breaking early. - // which can be reacted to by another attempt if the request was idempotent. - // But: could also be a sign of trying to read past the EOF on a GET, - // which isn't going to be recovered from - policyMap.put(EOFException.class, retryIdempotentCalls); + policyMap.put(EOFException.class, connectivityFailure); // object not found. 404 when not unknown bucket; 410 "gone" policyMap.put(FileNotFoundException.class, fail); @@ -300,7 +303,7 @@ public RetryAction shouldRetry(Exception exception, if (exception instanceof SdkException) { // update the sdk exception for the purpose of exception // processing. - ex = S3AUtils.translateException("", "", (SdkException) exception); + ex = S3AUtils.translateException("", "/", (SdkException) exception); } LOG.debug("Retry probe for {} with {} retries and {} failovers," + " idempotent={}, due to {}", diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java index 6ef0cd8dc9..6a719739e7 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java @@ -167,13 +167,20 @@ public static IOException translateException(String operation, */ @SuppressWarnings("ThrowableInstanceNeverThrown") public static IOException translateException(@Nullable String operation, - String path, + @Nullable String path, SdkException exception) { String message = String.format("%s%s: %s", operation, StringUtils.isNotEmpty(path)? (" on " + path) : "", exception); + if (path == null || path.isEmpty()) { + // handle null path by giving it a stub value. + // not ideal/informative, but ensures that the path is never null in + // exceptions constructed. + path = "/"; + } + if (!(exception instanceof AwsServiceException)) { // exceptions raised client-side: connectivity, auth, network problems... Exception innerCause = containsInterruptedException(exception); @@ -196,7 +203,7 @@ public static IOException translateException(@Nullable String operation, return ioe; } // network problems covered by an IOE inside the exception chain. - ioe = maybeExtractIOException(path, exception); + ioe = maybeExtractIOException(path, exception, message); if (ioe != null) { return ioe; } @@ -300,10 +307,13 @@ public static IOException translateException(@Nullable String operation, break; // out of range. This may happen if an object is overwritten with - // a shorter one while it is being read. + // a shorter one while it is being read or openFile() was invoked + // passing a FileStatus or file length less than that of the object. + // although the HTTP specification says that the response should + // include a range header specifying the actual range available, + // this isn't picked up here. case SC_416_RANGE_NOT_SATISFIABLE: - ioe = new EOFException(message); - ioe.initCause(ase); + ioe = new RangeNotSatisfiableEOFException(message, ase); break; // this has surfaced as a "no response from server" message. @@ -673,7 +683,7 @@ public static InstanceT getInstanceFromReflection(String className, if (targetException instanceof IOException) { throw (IOException) targetException; } else if (targetException instanceof SdkException) { - throw translateException("Instantiate " + className, "", (SdkException) targetException); + throw translateException("Instantiate " + className, "/", (SdkException) targetException); } else { // supported constructor or factory method found, but the call failed throw instantiationException(uri, className, configKey, targetException); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AWSRequestAnalyzer.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AWSRequestAnalyzer.java index 3cb8d97532..3df862055d 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AWSRequestAnalyzer.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AWSRequestAnalyzer.java @@ -294,6 +294,11 @@ private static long toSafeLong(final Number size) { private static final String BYTES_PREFIX = "bytes="; + /** + * Given a range header, determine the size of the request. + * @param rangeHeader header string + * @return parsed size or -1 for problems + */ private static Number sizeFromRangeHeader(String rangeHeader) { if (rangeHeader != null && rangeHeader.startsWith(BYTES_PREFIX)) { String[] values = rangeHeader @@ -302,7 +307,7 @@ private static Number sizeFromRangeHeader(String rangeHeader) { if (values.length == 2) { try { long start = Long.parseUnsignedLong(values[0]); - long end = Long.parseUnsignedLong(values[0]); + long end = Long.parseUnsignedLong(values[1]); return end - start; } catch(NumberFormatException e) { } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/IAMInstanceCredentialsProvider.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/IAMInstanceCredentialsProvider.java index 080b79e7f2..b9a7c776b1 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/IAMInstanceCredentialsProvider.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/IAMInstanceCredentialsProvider.java @@ -101,7 +101,8 @@ public AwsCredentials resolveCredentials() { // if the exception contains an IOE, extract it // so its type is the immediate cause of this new exception. Throwable t = e; - final IOException ioe = maybeExtractIOException("IAM endpoint", e); + final IOException ioe = maybeExtractIOException("IAM endpoint", e, + "resolveCredentials()"); if (ioe != null) { t = ioe; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ErrorTranslation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ErrorTranslation.java index f8a1f907bb..7934a5c7d4 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ErrorTranslation.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ErrorTranslation.java @@ -23,8 +23,11 @@ import software.amazon.awssdk.awscore.exception.AwsServiceException; +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.fs.s3a.HttpChannelEOFException; import org.apache.hadoop.fs.PathIOException; +import static org.apache.commons.lang3.StringUtils.isNotEmpty; import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_404_NOT_FOUND; /** @@ -42,6 +45,24 @@ */ public final class ErrorTranslation { + /** + * OpenSSL stream closed error: {@value}. + * See HADOOP-19027. + */ + public static final String OPENSSL_STREAM_CLOSED = "WFOPENSSL0035"; + + /** + * Classname of unshaded Http Client exception: {@value}. + */ + private static final String RAW_NO_HTTP_RESPONSE_EXCEPTION = + "org.apache.http.NoHttpResponseException"; + + /** + * Classname of shaded Http Client exception: {@value}. + */ + private static final String SHADED_NO_HTTP_RESPONSE_EXCEPTION = + "software.amazon.awssdk.thirdparty.org.apache.http.NoHttpResponseException"; + /** * Private constructor for utility class. */ @@ -71,25 +92,51 @@ public static boolean isObjectNotFound(AwsServiceException e) { return e.statusCode() == SC_404_NOT_FOUND && !isUnknownBucket(e); } + /** + * Tail recursive extraction of the innermost throwable. + * @param thrown next thrown in chain. + * @param outer outermost. + * @return the last non-null throwable in the chain. + */ + private static Throwable getInnermostThrowable(Throwable thrown, Throwable outer) { + if (thrown == null) { + return outer; + } + return getInnermostThrowable(thrown.getCause(), thrown); + } + /** * Translate an exception if it or its inner exception is an * IOException. - * If this condition is not met, null is returned. + * This also contains the logic to extract an AWS HTTP channel exception, + * which may or may not be an IOE, depending on the underlying SSL implementation + * in use. + * If an IOException cannot be extracted, null is returned. * @param path path of operation. * @param thrown exception + * @param message message generated by the caller. * @return a translated exception or null. */ - public static IOException maybeExtractIOException(String path, Throwable thrown) { + public static IOException maybeExtractIOException( + String path, + Throwable thrown, + String message) { if (thrown == null) { return null; } - // look inside - Throwable cause = thrown.getCause(); - while (cause != null && cause.getCause() != null) { - cause = cause.getCause(); + // walk down the chain of exceptions to find the innermost. + Throwable cause = getInnermostThrowable(thrown.getCause(), thrown); + + // see if this is an http channel exception + HttpChannelEOFException channelException = + maybeExtractChannelException(path, message, cause); + if (channelException != null) { + return channelException; } + + // not a channel exception, not an IOE. if (!(cause instanceof IOException)) { return null; } @@ -102,8 +149,7 @@ public static IOException maybeExtractIOException(String path, Throwable thrown) // unless no suitable constructor is available. final IOException ioe = (IOException) cause; - return wrapWithInnerIOE(path, thrown, ioe); - + return wrapWithInnerIOE(path, message, thrown, ioe); } /** @@ -116,6 +162,7 @@ public static IOException maybeExtractIOException(String path, Throwable thrown) * See {@code NetUtils}. * @param type of inner exception. * @param path path of the failure. + * @param message message generated by the caller. * @param outer outermost exception. * @param inner inner exception. * @return the new exception. @@ -123,9 +170,12 @@ public static IOException maybeExtractIOException(String path, Throwable thrown) @SuppressWarnings("unchecked") private static IOException wrapWithInnerIOE( String path, + String message, Throwable outer, T inner) { - String msg = outer.toString() + ": " + inner.getMessage(); + String msg = (isNotEmpty(message) ? (message + ":" + + " ") : "") + + outer.toString() + ": " + inner.getMessage(); Class clazz = inner.getClass(); try { Constructor ctor = clazz.getConstructor(String.class); @@ -136,6 +186,35 @@ private static IOException wrapWithInnerIOE( } } + /** + * Extract an AWS HTTP channel exception if the inner exception is considered + * an HttpClient {@code NoHttpResponseException} or an OpenSSL channel exception. + * This is based on string matching, which is inelegant and brittle. + * @param path path of the failure. + * @param message message generated by the caller. + * @param thrown inner exception. + * @return the new exception. + */ + @VisibleForTesting + public static HttpChannelEOFException maybeExtractChannelException( + String path, + String message, + Throwable thrown) { + final String classname = thrown.getClass().getName(); + if (thrown instanceof IOException + && (classname.equals(RAW_NO_HTTP_RESPONSE_EXCEPTION) + || classname.equals(SHADED_NO_HTTP_RESPONSE_EXCEPTION))) { + // shaded or unshaded http client exception class + return new HttpChannelEOFException(path, message, thrown); + } + // there's ambiguity about what exception class this is + // so rather than use its type, we look for an OpenSSL string in the message + if (thrown.getMessage().contains(OPENSSL_STREAM_CLOSED)) { + return new HttpChannelEOFException(path, message, thrown); + } + return null; + } + /** * AWS error codes explicitly recognized and processes specially; * kept in their own class for isolation. diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java index 4c357e288c..9966393d41 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java @@ -21,9 +21,12 @@ import java.io.EOFException; import java.io.IOException; import java.io.InterruptedIOException; +import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import org.junit.Test; import org.slf4j.Logger; @@ -36,7 +39,9 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest; import org.apache.hadoop.fs.contract.AbstractFSContract; +import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.fs.s3a.Constants; +import org.apache.hadoop.fs.s3a.RangeNotSatisfiableEOFException; import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.S3ATestUtils; import org.apache.hadoop.fs.statistics.IOStatistics; @@ -44,10 +49,13 @@ import org.apache.hadoop.fs.statistics.StreamStatisticNames; import org.apache.hadoop.test.LambdaTestUtils; +import static org.apache.hadoop.fs.FSExceptionMessages.EOF_IN_READ_FULLY; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH; import static org.apache.hadoop.fs.contract.ContractTestUtils.returnBuffersToPoolPostRead; import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue; import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString; +import static org.apache.hadoop.test.LambdaTestUtils.interceptFuture; import static org.apache.hadoop.test.MoreAsserts.assertEqual; public class ITestS3AContractVectoredRead extends AbstractContractVectoredReadTest { @@ -72,7 +80,54 @@ public void testEOFRanges() throws Exception { FileSystem fs = getFileSystem(); List fileRanges = new ArrayList<>(); fileRanges.add(FileRange.createFileRange(DATASET_LEN, 100)); - verifyExceptionalVectoredRead(fs, fileRanges, EOFException.class); + verifyExceptionalVectoredRead(fs, fileRanges, RangeNotSatisfiableEOFException.class); + } + + /** + * Verify response to a vector read request which is beyond the + * real length of the file. + * Unlike the {@link #testEOFRanges()} test, the input stream in + * this test thinks the file is longer than it is, so the call + * fails in the GET request. + */ + @Test + public void testEOFRanges416Handling() throws Exception { + FileSystem fs = getFileSystem(); + + final int extendedLen = DATASET_LEN + 1024; + CompletableFuture builder = + fs.openFile(path(VECTORED_READ_FILE_NAME)) + .mustLong(FS_OPTION_OPENFILE_LENGTH, extendedLen) + .build(); + List fileRanges = new ArrayList<>(); + fileRanges.add(FileRange.createFileRange(DATASET_LEN, 100)); + + describe("Read starting from past EOF"); + try (FSDataInputStream in = builder.get()) { + in.readVectored(fileRanges, getAllocate()); + FileRange res = fileRanges.get(0); + CompletableFuture data = res.getData(); + interceptFuture(RangeNotSatisfiableEOFException.class, + "416", + ContractTestUtils.VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS, + TimeUnit.SECONDS, + data); + } + + describe("Read starting 0 continuing past EOF"); + try (FSDataInputStream in = fs.openFile(path(VECTORED_READ_FILE_NAME)) + .mustLong(FS_OPTION_OPENFILE_LENGTH, extendedLen) + .build().get()) { + final FileRange range = FileRange.createFileRange(0, extendedLen); + in.readVectored(Arrays.asList(range), getAllocate()); + CompletableFuture data = range.getData(); + interceptFuture(EOFException.class, + EOF_IN_READ_FULLY, + ContractTestUtils.VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS, + TimeUnit.SECONDS, + data); + } + } @Test diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java index 0d4cf6a296..6dc3ca1102 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java @@ -19,6 +19,7 @@ package org.apache.hadoop.fs.s3a; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; @@ -72,6 +73,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.core.exception.SdkClientException; import java.io.Closeable; import java.io.File; @@ -456,6 +458,8 @@ public static E verifyExceptionClass(Class clazz, .describedAs("Exception expected of class %s", clazz) .isNotNull(); if (!(ex.getClass().equals(clazz))) { + LOG.warn("Rethrowing exception: {} as it is not an instance of {}", + ex, clazz, ex); throw ex; } return (E)ex; @@ -1711,4 +1715,59 @@ public static String etag(FileStatus status) { "Not an EtagSource: %s", status); return ((EtagSource) status).getEtag(); } + + /** + * Create an SDK client exception. + * @param message message + * @param cause nullable cause + * @return the exception + */ + public static SdkClientException sdkClientException( + String message, Throwable cause) { + return SdkClientException.builder() + .message(message) + .cause(cause) + .build(); + } + + /** + * Create an SDK client exception using the string value of the cause + * as the message. + * @param cause nullable cause + * @return the exception + */ + public static SdkClientException sdkClientException( + Throwable cause) { + return SdkClientException.builder() + .message(cause.toString()) + .cause(cause) + .build(); + } + + private static final String BYTES_PREFIX = "bytes="; + + /** + * Given a range header, split into start and end. + * Based on AWSRequestAnalyzer. + * @param rangeHeader header string + * @return parse range, or (-1, -1) for problems + */ + public static Pair requestRange(String rangeHeader) { + if (rangeHeader != null && rangeHeader.startsWith(BYTES_PREFIX)) { + String[] values = rangeHeader + .substring(BYTES_PREFIX.length()) + .split("-"); + if (values.length == 2) { + try { + long start = Long.parseUnsignedLong(values[0]); + long end = Long.parseUnsignedLong(values[1]); + return Pair.of(start, end); + } catch (NumberFormatException e) { + LOG.warn("Failed to parse range header {}", rangeHeader, e); + } + } + } + // error case + return Pair.of(-1L, -1L); + } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AExceptionTranslation.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AExceptionTranslation.java index b26ca6889b..6b894a6813 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AExceptionTranslation.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AExceptionTranslation.java @@ -20,9 +20,11 @@ import static org.apache.hadoop.fs.s3a.AWSCredentialProviderList.maybeTranslateCredentialException; import static org.apache.hadoop.fs.s3a.Constants.*; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.sdkClientException; import static org.apache.hadoop.fs.s3a.S3ATestUtils.verifyExceptionClass; import static org.apache.hadoop.fs.s3a.S3AUtils.*; import static org.apache.hadoop.fs.s3a.audit.AuditIntegration.maybeTranslateAuditException; +import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.maybeExtractChannelException; import static org.apache.hadoop.fs.s3a.impl.InternalConstants.*; import static org.apache.hadoop.test.LambdaTestUtils.verifyCause; import static org.junit.Assert.*; @@ -36,11 +38,11 @@ import java.util.function.Consumer; import org.assertj.core.api.Assertions; +import org.junit.Before; import software.amazon.awssdk.awscore.exception.AwsErrorDetails; import software.amazon.awssdk.awscore.exception.AwsServiceException; import software.amazon.awssdk.core.exception.ApiCallAttemptTimeoutException; import software.amazon.awssdk.core.exception.ApiCallTimeoutException; -import software.amazon.awssdk.core.exception.SdkClientException; import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.http.SdkHttpResponse; import software.amazon.awssdk.services.s3.model.S3Exception; @@ -53,15 +55,32 @@ import org.apache.hadoop.fs.s3a.audit.AuditOperationRejectedException; import org.apache.hadoop.fs.s3a.impl.ErrorTranslation; import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.test.AbstractHadoopTestBase; +import org.apache.http.NoHttpResponseException; import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains; /** - * Unit test suite covering translation of AWS SDK exceptions to S3A exceptions, + * Unit test suite covering translation of AWS/network exceptions to S3A exceptions, * and retry/recovery policies. */ @SuppressWarnings("ThrowableNotThrown") -public class TestS3AExceptionTranslation { +public class TestS3AExceptionTranslation extends AbstractHadoopTestBase { + + public static final String WFOPENSSL_0035_STREAM_IS_CLOSED = + "Unable to execute HTTP request: " + + ErrorTranslation.OPENSSL_STREAM_CLOSED + + " Stream is closed"; + + /** + * Retry policy to use in tests. + */ + private S3ARetryPolicy retryPolicy; + + @Before + public void setup() { + retryPolicy = new S3ARetryPolicy(new Configuration(false)); + } @Test public void test301ContainsRegion() throws Exception { @@ -91,10 +110,10 @@ protected void assertContained(String text, String contained) { text != null && text.contains(contained)); } - protected void verifyTranslated( + protected E verifyTranslated( int status, Class expected) throws Exception { - verifyTranslated(expected, createS3Exception(status)); + return verifyTranslated(expected, createS3Exception(status)); } @Test @@ -142,7 +161,12 @@ public void test410isNotFound() throws Exception { @Test public void test416isEOF() throws Exception { - verifyTranslated(SC_416_RANGE_NOT_SATISFIABLE, EOFException.class); + + // 416 maps the the subclass of EOFException + final IOException ex = verifyTranslated(SC_416_RANGE_NOT_SATISFIABLE, + RangeNotSatisfiableEOFException.class); + Assertions.assertThat(ex) + .isInstanceOf(EOFException.class); } @Test @@ -254,12 +278,6 @@ public void testExtractInterruptedIO() throws Throwable { .build())); } - private SdkClientException sdkClientException(String message, Throwable cause) { - return SdkClientException.builder() - .message(message) - .cause(cause) - .build(); - } @Test public void testTranslateCredentialException() throws Throwable { verifyExceptionClass(AccessDeniedException.class, @@ -375,10 +393,89 @@ public void testApiCallAttemptTimeoutExceptionToTimeout() throws Throwable { verifyCause(ApiCallAttemptTimeoutException.class, ex); // and confirm these timeouts are retried. - final S3ARetryPolicy retryPolicy = new S3ARetryPolicy(new Configuration(false)); + assertRetried(ex); + } + + @Test + public void testChannelExtraction() throws Throwable { + verifyExceptionClass(HttpChannelEOFException.class, + maybeExtractChannelException("", "/", + new NoHttpResponseException("no response"))); + } + + @Test + public void testShadedChannelExtraction() throws Throwable { + verifyExceptionClass(HttpChannelEOFException.class, + maybeExtractChannelException("", "/", + shadedNoHttpResponse())); + } + + @Test + public void testOpenSSLErrorChannelExtraction() throws Throwable { + verifyExceptionClass(HttpChannelEOFException.class, + maybeExtractChannelException("", "/", + sdkClientException(WFOPENSSL_0035_STREAM_IS_CLOSED, null))); + } + + /** + * Test handling of the unshaded HTTP client exception. + */ + @Test + public void testRawNoHttpResponseExceptionRetry() throws Throwable { + assertRetried( + verifyExceptionClass(HttpChannelEOFException.class, + translateException("test", "/", + sdkClientException(new NoHttpResponseException("no response"))))); + } + + /** + * Test handling of the shaded HTTP client exception. + */ + @Test + public void testShadedNoHttpResponseExceptionRetry() throws Throwable { + assertRetried( + verifyExceptionClass(HttpChannelEOFException.class, + translateException("test", "/", + sdkClientException(shadedNoHttpResponse())))); + } + + @Test + public void testOpenSSLErrorRetry() throws Throwable { + assertRetried( + verifyExceptionClass(HttpChannelEOFException.class, + translateException("test", "/", + sdkClientException(WFOPENSSL_0035_STREAM_IS_CLOSED, null)))); + } + + /** + * Create a shaded NoHttpResponseException. + * @return an exception. + */ + private static Exception shadedNoHttpResponse() { + return new software.amazon.awssdk.thirdparty.org.apache.http.NoHttpResponseException("shaded"); + } + + /** + * Assert that an exception is retried. + * @param ex exception + * @throws Exception failure during retry policy evaluation. + */ + private void assertRetried(final Exception ex) throws Exception { + assertRetryOutcome(ex, RetryPolicy.RetryAction.RetryDecision.RETRY); + } + + /** + * Assert that the retry policy is as expected for a given exception. + * @param ex exception + * @param decision expected decision + * @throws Exception failure during retry policy evaluation. + */ + private void assertRetryOutcome( + final Exception ex, + final RetryPolicy.RetryAction.RetryDecision decision) throws Exception { Assertions.assertThat(retryPolicy.shouldRetry(ex, 0, 0, true).action) .describedAs("retry policy for exception %s", ex) - .isEqualTo(RetryPolicy.RetryAction.RetryDecision.RETRY); + .isEqualTo(decision); } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java index da1284343d..6eccdc23dd 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java @@ -24,7 +24,9 @@ import java.net.SocketException; import java.nio.charset.StandardCharsets; import java.util.concurrent.CompletableFuture; +import java.util.function.Function; +import org.assertj.core.api.Assertions; import software.amazon.awssdk.awscore.exception.AwsErrorDetails; import software.amazon.awssdk.awscore.exception.AwsServiceException; import software.amazon.awssdk.core.ResponseInputStream; @@ -34,41 +36,57 @@ import org.junit.Test; import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3a.audit.impl.NoopSpan; import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets; import org.apache.hadoop.util.functional.CallableRaisingIOE; +import org.apache.http.NoHttpResponseException; - -import static java.lang.Math.min; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.requestRange; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.sdkClientException; +import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_416_RANGE_NOT_SATISFIABLE; import static org.apache.hadoop.util.functional.FutureIO.eval; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; /** * Tests S3AInputStream retry behavior on read failure. + *

* These tests are for validating expected behavior of retrying the * S3AInputStream read() and read(b, off, len), it tests that the read should * reopen the input stream and retry the read when IOException is thrown * during the read process. + *

+ * This includes handling of out of range requests. */ public class TestS3AInputStreamRetry extends AbstractS3AMockTest { - private static final String INPUT = "ab"; + /** + * Test input stream content: charAt(x) == hex value of x. + */ + private static final String INPUT = "012345678ABCDEF"; + + /** + * Status code to raise by default. + */ + public static final int STATUS = 0; @Test public void testInputStreamReadRetryForException() throws IOException { - S3AInputStream s3AInputStream = getMockedS3AInputStream(); - assertEquals("'a' from the test input stream 'ab' should be the first " + + S3AInputStream s3AInputStream = getMockedS3AInputStream(failingInputStreamCallbacks( + awsServiceException(STATUS))); + assertEquals("'0' from the test input stream should be the first " + "character being read", INPUT.charAt(0), s3AInputStream.read()); - assertEquals("'b' from the test input stream 'ab' should be the second " + + assertEquals("'1' from the test input stream should be the second " + "character being read", INPUT.charAt(1), s3AInputStream.read()); } @Test public void testInputStreamReadLengthRetryForException() throws IOException { byte[] result = new byte[INPUT.length()]; - S3AInputStream s3AInputStream = getMockedS3AInputStream(); + S3AInputStream s3AInputStream = getMockedS3AInputStream( + failingInputStreamCallbacks(awsServiceException(STATUS))); s3AInputStream.read(result, 0, INPUT.length()); assertArrayEquals( @@ -79,7 +97,8 @@ public void testInputStreamReadLengthRetryForException() throws IOException { @Test public void testInputStreamReadFullyRetryForException() throws IOException { byte[] result = new byte[INPUT.length()]; - S3AInputStream s3AInputStream = getMockedS3AInputStream(); + S3AInputStream s3AInputStream = getMockedS3AInputStream(failingInputStreamCallbacks( + awsServiceException(STATUS))); s3AInputStream.readFully(0, result); assertArrayEquals( @@ -87,7 +106,65 @@ public void testInputStreamReadFullyRetryForException() throws IOException { INPUT.getBytes(), result); } - private S3AInputStream getMockedS3AInputStream() { + /** + * Seek and read repeatedly with every second GET failing with {@link NoHttpResponseException}. + * This should be effective in simulating {@code reopen()} failures caused by network problems. + */ + @Test + public void testReadMultipleSeeksNoHttpResponse() throws Throwable { + final RuntimeException ex = sdkClientException(new NoHttpResponseException("no response")); + // fail on even reads + S3AInputStream stream = getMockedS3AInputStream( + maybeFailInGetCallback(ex, (index) -> (index % 2 == 0))); + // 10 reads with repeated failures. + for (int i = 0; i < 10; i++) { + stream.seek(0); + final int r = stream.read(); + assertReadValueMatchesOffset(r, 0, "read attempt " + i + " of " + stream); + } + } + + /** + * Seek and read repeatedly with every second GET failing with {@link NoHttpResponseException}. + * This should be effective in simulating {@code reopen()} failures caused by network problems. + */ + @Test + public void testReadMultipleSeeksStreamClosed() throws Throwable { + final RuntimeException ex = sdkClientException(new NoHttpResponseException("no response")); + // fail on even reads + S3AInputStream stream = getMockedS3AInputStream( + maybeFailInGetCallback(ex, (index) -> (index % 2 == 0))); + // 10 reads with repeated failures. + for (int i = 0; i < 10; i++) { + stream.seek(0); + final int r = stream.read(); + assertReadValueMatchesOffset(r, 0, "read attempt " + i + " of " + stream); + } + } + + /** + * Assert that the result of read() matches the char at the expected offset. + * @param r read result + * @param pos pos in stream + * @param text text for error string. + */ + private static void assertReadValueMatchesOffset( + final int r, final int pos, final String text) { + Assertions.assertThat(r) + .describedAs("read() at %d of %s", pos, text) + .isGreaterThan(-1); + Assertions.assertThat(Character.toString((char) r)) + .describedAs("read() at %d of %s", pos, text) + .isEqualTo(String.valueOf(INPUT.charAt(pos))); + } + + /** + * Create a mocked input stream for a given callback. + * @param streamCallback callback to use on GET calls + * @return a stream. + */ + private S3AInputStream getMockedS3AInputStream( + S3AInputStream.InputStreamCallbacks streamCallback) { Path path = new Path("test-path"); String eTag = "test-etag"; String versionId = "test-version-id"; @@ -113,55 +190,108 @@ private S3AInputStream getMockedS3AInputStream() { return new S3AInputStream( s3AReadOpContext, s3ObjectAttributes, - getMockedInputStreamCallback(), + streamCallback, s3AReadOpContext.getS3AStatisticsContext().newInputStreamStatistics(), null); } /** - * Get mocked InputStreamCallbacks where we return mocked S3Object. - * + * Create mocked InputStreamCallbacks which returns a mocked S3Object and fails on + * the third invocation. + * This is the original mock stream used in this test suite; the failure logic and stream + * selection has been factored out to support different failure modes. + * @param ex exception to raise on failure * @return mocked object. */ - private S3AInputStream.InputStreamCallbacks getMockedInputStreamCallback() { + private S3AInputStream.InputStreamCallbacks failingInputStreamCallbacks( + final RuntimeException ex) { + GetObjectResponse objectResponse = GetObjectResponse.builder() .eTag("test-etag") .build(); - ResponseInputStream[] responseInputStreams = - new ResponseInputStream[] { - getMockedInputStream(objectResponse, true), - getMockedInputStream(objectResponse, true), - getMockedInputStream(objectResponse, false) - }; + final SSLException ioe = new SSLException(new SocketException("Connection reset")); + + // open() -> lazySeek() -> reopen() + // -> getObject (mockedS3ObjectIndex=1) -> getObjectContent(objectInputStreamBad1) + // read() -> objectInputStreamBad1 throws exception + // -> onReadFailure -> close wrappedStream + // -> retry(1) -> wrappedStream==null -> reopen -> getObject (mockedS3ObjectIndex=2) + // -> getObjectContent(objectInputStreamBad2)-> objectInputStreamBad2 + // -> wrappedStream.read -> objectInputStreamBad2 throws exception + // -> onReadFailure -> close wrappedStream + // -> retry(2) -> wrappedStream==null -> reopen + // -> getObject (mockedS3ObjectIndex=3) throws exception + // -> retry(3) -> wrappedStream==null -> reopen -> getObject (mockedS3ObjectIndex=4) + // -> getObjectContent(objectInputStreamGood)-> objectInputStreamGood + // -> wrappedStream.read + + return mockInputStreamCallback(ex, + attempt -> 3 == attempt, + attempt -> mockedInputStream(objectResponse, attempt < 3, ioe)); + } + + /** + * Create mocked InputStreamCallbacks which returns a mocked S3Object and fails + * when the the predicate indicates that it should. + * The stream response itself does not fail. + * @param ex exception to raise on failure + * @return mocked object. + */ + private S3AInputStream.InputStreamCallbacks maybeFailInGetCallback( + final RuntimeException ex, + final Function failurePredicate) { + GetObjectResponse objectResponse = GetObjectResponse.builder() + .eTag("test-etag") + .build(); + + return mockInputStreamCallback(ex, + failurePredicate, + attempt -> mockedInputStream(objectResponse, false, null)); + } + + /** + * Create mocked InputStreamCallbacks which returns a mocked S3Object. + * Raises the given runtime exception if the failure predicate returns true; + * the stream factory returns the input stream for the given attempt. + * @param ex exception to raise on failure + * @param failurePredicate predicate which, when true, triggers a failure on the given attempt. + * @param streamFactory factory for the stream to return on the given attempt. + * @return mocked object. + */ + private S3AInputStream.InputStreamCallbacks mockInputStreamCallback( + final RuntimeException ex, + final Function failurePredicate, + final Function> streamFactory) { + return new S3AInputStream.InputStreamCallbacks() { - private Integer mockedS3ObjectIndex = 0; + private int attempt = 0; @Override public ResponseInputStream getObject(GetObjectRequest request) { - // Set s3 client to return mocked s3object with defined read behavior. - mockedS3ObjectIndex++; - // open() -> lazySeek() -> reopen() - // -> getObject (mockedS3ObjectIndex=1) -> getObjectContent(objectInputStreamBad1) - // read() -> objectInputStreamBad1 throws exception - // -> onReadFailure -> close wrappedStream - // -> retry(1) -> wrappedStream==null -> reopen -> getObject (mockedS3ObjectIndex=2) - // -> getObjectContent(objectInputStreamBad2)-> objectInputStreamBad2 - // -> wrappedStream.read -> objectInputStreamBad2 throws exception - // -> onReadFailure -> close wrappedStream - // -> retry(2) -> wrappedStream==null -> reopen - // -> getObject (mockedS3ObjectIndex=3) throws exception - // -> retry(3) -> wrappedStream==null -> reopen -> getObject (mockedS3ObjectIndex=4) - // -> getObjectContent(objectInputStreamGood)-> objectInputStreamGood - // -> wrappedStream.read - if (mockedS3ObjectIndex == 3) { - throw AwsServiceException.builder() - .message("Failed to get S3Object") - .awsErrorDetails(AwsErrorDetails.builder().errorCode("test-code").build()) - .build(); + attempt++; + + if (failurePredicate.apply(attempt)) { + throw ex; } - return responseInputStreams[min(mockedS3ObjectIndex, responseInputStreams.length) - 1]; + final Pair r = requestRange(request.range()); + final int start = r.getLeft().intValue(); + final int end = r.getRight().intValue(); + if (start < 0 || end < 0 || start > end) { + // not satisfiable + throw awsServiceException(SC_416_RANGE_NOT_SATISFIABLE); + } + + final ResponseInputStream stream = streamFactory.apply(attempt); + + // skip the given number of bytes from the start of the array; no-op if 0. + try { + stream.skip(start); + } catch (IOException e) { + throw sdkClientException(e); + } + return stream; } @Override @@ -180,27 +310,41 @@ public void close() { }; } + /** + * Create an AwsServiceException with the given status code. + * + * @param status HTTP status code + * @return an exception. + */ + private static AwsServiceException awsServiceException(int status) { + return AwsServiceException.builder() + .message("Failed to get S3Object") + .statusCode(status) + .awsErrorDetails(AwsErrorDetails.builder().errorCode("test-code").build()) + .build(); + } + /** * Get mocked ResponseInputStream where we can trigger IOException to * simulate the read failure. * - * @param triggerFailure true when a failure injection is enabled. + * @param triggerFailure true when a failure injection is enabled in read() + * @param ioe exception to raise * @return mocked object. */ - private ResponseInputStream getMockedInputStream( - GetObjectResponse objectResponse, boolean triggerFailure) { + private ResponseInputStream mockedInputStream( + GetObjectResponse objectResponse, + boolean triggerFailure, + final IOException ioe) { FilterInputStream inputStream = new FilterInputStream(IOUtils.toInputStream(INPUT, StandardCharsets.UTF_8)) { - private final IOException exception = - new SSLException(new SocketException("Connection reset")); - @Override public int read() throws IOException { int result = super.read(); if (triggerFailure) { - throw exception; + throw ioe; } return result; } @@ -209,7 +353,7 @@ public int read() throws IOException { public int read(byte[] b, int off, int len) throws IOException { int result = super.read(b, off, len); if (triggerFailure) { - throw exception; + throw ioe; } return result; } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestErrorTranslation.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestErrorTranslation.java index 0f0b2c0c34..3a4994897a 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestErrorTranslation.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestErrorTranslation.java @@ -67,7 +67,7 @@ public void testUnknownHostExceptionExtraction() throws Throwable { new UnknownHostException("bottom"))); final IOException ioe = intercept(UnknownHostException.class, "top", () -> { - throw maybeExtractIOException("", thrown); + throw maybeExtractIOException("", thrown, ""); }); // the wrapped exception is the top level one: no stack traces have @@ -85,7 +85,7 @@ public void testNoRouteToHostExceptionExtraction() throws Throwable { throw maybeExtractIOException("p2", sdkException("top", sdkException("middle", - new NoRouteToHostException("bottom")))); + new NoRouteToHostException("bottom"))), null); }); } @@ -96,7 +96,7 @@ public void testConnectExceptionExtraction() throws Throwable { throw maybeExtractIOException("p1", sdkException("top", sdkException("middle", - new ConnectException("bottom")))); + new ConnectException("bottom"))), null); }); } @@ -113,7 +113,7 @@ public void testUncheckedIOExceptionExtraction() throws Throwable { new UncheckedIOException( new SocketTimeoutException("bottom")))); throw maybeExtractIOException("p1", - new NoAwsCredentialsException("IamProvider", thrown.toString(), thrown)); + new NoAwsCredentialsException("IamProvider", thrown.toString(), thrown), null); }); } @@ -124,7 +124,7 @@ public void testNoConstructorExtraction() throws Throwable { throw maybeExtractIOException("p1", sdkException("top", sdkException("middle", - new NoConstructorIOE()))); + new NoConstructorIOE())), null); }); } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java index 4aae84dca8..361c376cff 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java @@ -20,19 +20,29 @@ import java.io.EOFException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.concurrent.TimeUnit; +import org.assertj.core.api.Assertions; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileRange; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.S3AInputStream; import org.apache.hadoop.fs.s3a.S3ATestUtils; +import org.apache.hadoop.fs.s3a.Statistic; import org.apache.hadoop.fs.statistics.IOStatistics; +import static org.apache.hadoop.fs.FSExceptionMessages.EOF_IN_READ_FULLY; import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_RANDOM; import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL; import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH; import static org.apache.hadoop.fs.contract.ContractTestUtils.readStream; @@ -47,6 +57,7 @@ import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.demandStringifyIOStatistics; import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_FILE_OPENED; import static org.apache.hadoop.test.LambdaTestUtils.intercept; +import static org.apache.hadoop.test.LambdaTestUtils.interceptFuture; /** * Cost of openFile(). @@ -56,11 +67,13 @@ public class ITestS3AOpenCost extends AbstractS3ACostTest { private static final Logger LOG = LoggerFactory.getLogger(ITestS3AOpenCost.class); + public static final String TEXT = "0123456789ABCDEF"; + private Path testFile; private FileStatus testFileStatus; - private long fileLength; + private int fileLength; public ITestS3AOpenCost() { super(true); @@ -76,9 +89,9 @@ public void setup() throws Exception { S3AFileSystem fs = getFileSystem(); testFile = methodPath(); - writeTextFile(fs, testFile, "openfile", true); + writeTextFile(fs, testFile, TEXT, true); testFileStatus = fs.getFileStatus(testFile); - fileLength = testFileStatus.getLen(); + fileLength = (int)testFileStatus.getLen(); } /** @@ -137,15 +150,8 @@ public void testOpenFileShorterLength() throws Throwable { int offset = 2; long shortLen = fileLength - offset; // open the file - FSDataInputStream in2 = verifyMetrics(() -> - fs.openFile(testFile) - .must(FS_OPTION_OPENFILE_READ_POLICY, - FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL) - .mustLong(FS_OPTION_OPENFILE_LENGTH, shortLen) - .build() - .get(), - always(NO_HEAD_OR_LIST), - with(STREAM_READ_OPENED, 0)); + FSDataInputStream in2 = openFile(shortLen, + FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL); // verify that the statistics are in range IOStatistics ioStatistics = extractStatistics(in2); @@ -171,39 +177,227 @@ public void testOpenFileShorterLength() throws Throwable { } @Test - public void testOpenFileLongerLength() throws Throwable { - // do a second read with the length declared as longer + public void testOpenFileLongerLengthReadFully() throws Throwable { + // do a read with the length declared as longer // than it is. // An EOF will be read on readFully(), -1 on a read() - S3AFileSystem fs = getFileSystem(); - // set a length past the actual file length - long longLen = fileLength + 10; - FSDataInputStream in3 = verifyMetrics(() -> - fs.openFile(testFile) - .must(FS_OPTION_OPENFILE_READ_POLICY, - FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL) - .mustLong(FS_OPTION_OPENFILE_LENGTH, longLen) - .build() - .get(), - always(NO_HEAD_OR_LIST)); + final int extra = 10; + long longLen = fileLength + extra; + // assert behaviors of seeking/reading past the file length. // there is no attempt at recovery. verifyMetrics(() -> { - byte[] out = new byte[(int) longLen]; - intercept(EOFException.class, - () -> in3.readFully(0, out)); - in3.seek(longLen - 1); - assertEquals("read past real EOF on " + in3, - -1, in3.read()); - in3.close(); - return in3.toString(); + try (FSDataInputStream in = openFile(longLen, + FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)) { + byte[] out = new byte[(int) (longLen)]; + intercept(EOFException.class, () -> in.readFully(0, out)); + in.seek(longLen - 1); + assertEquals("read past real EOF on " + in, -1, in.read()); + return in.toString(); + } }, // two GET calls were made, one for readFully, // the second on the read() past the EOF // the operation has got as far as S3 - with(STREAM_READ_OPENED, 2)); + with(STREAM_READ_OPENED, 1 + 1)); + // now on a new stream, try a full read from after the EOF + verifyMetrics(() -> { + try (FSDataInputStream in = + openFile(longLen, FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)) { + byte[] out = new byte[extra]; + intercept(EOFException.class, () -> in.readFully(fileLength, out)); + return in.toString(); + } + }, + // two GET calls were made, one for readFully, + // the second on the read() past the EOF + // the operation has got as far as S3 + + with(STREAM_READ_OPENED, 1)); + } + + /** + * Open a file. + * @param longLen length to declare + * @param policy read policy + * @return file handle + */ + private FSDataInputStream openFile(final long longLen, String policy) + throws Exception { + S3AFileSystem fs = getFileSystem(); + // set a length past the actual file length + return verifyMetrics(() -> + fs.openFile(testFile) + .must(FS_OPTION_OPENFILE_READ_POLICY, policy) + .mustLong(FS_OPTION_OPENFILE_LENGTH, longLen) + .build() + .get(), + always(NO_HEAD_OR_LIST)); + } + + /** + * Open a file with a length declared as longer than the actual file length. + * Validate input stream.read() semantics. + */ + @Test + public void testReadPastEOF() throws Throwable { + + // set a length past the actual file length + final int extra = 10; + int longLen = fileLength + extra; + try (FSDataInputStream in = openFile(longLen, + FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) { + for (int i = 0; i < fileLength; i++) { + Assertions.assertThat(in.read()) + .describedAs("read() at %d", i) + .isEqualTo(TEXT.charAt(i)); + } + } + + // now open and read after the EOF; this is + // expected to return -1 on each read; there's a GET per call. + // as the counters are updated on close(), the stream must be closed + // within the verification clause. + // note how there's no attempt to alter file expected length... + // instead the call always goes to S3. + // there's no information in the exception from the SDK + describe("reading past the end of the file"); + + verifyMetrics(() -> { + try (FSDataInputStream in = + openFile(longLen, FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) { + for (int i = 0; i < extra; i++) { + final int p = fileLength + i; + in.seek(p); + Assertions.assertThat(in.read()) + .describedAs("read() at %d", p) + .isEqualTo(-1); + } + return in.toString(); + } + }, + with(Statistic.ACTION_HTTP_GET_REQUEST, extra)); + } + + /** + * Test {@code PositionedReadable.readFully()} past EOF in a file. + */ + @Test + public void testPositionedReadableReadFullyPastEOF() throws Throwable { + // now, next corner case. Do a readFully() of more bytes than the file length. + // we expect failure. + // this codepath does a GET to the end of the (expected) file length, and when + // that GET returns -1 from the read because the bytes returned is less than + // expected then the readFully call fails. + describe("PositionedReadable.readFully() past the end of the file"); + // set a length past the actual file length + final int extra = 10; + int longLen = fileLength + extra; + verifyMetrics(() -> { + try (FSDataInputStream in = + openFile(longLen, FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) { + byte[] buf = new byte[(int) (longLen + 1)]; + // readFully will fail + intercept(EOFException.class, () -> { + in.readFully(0, buf); + return in; + }); + assertS3StreamClosed(in); + return "readFully past EOF"; + } + }, + with(Statistic.ACTION_HTTP_GET_REQUEST, 1)); // no attempt to re-open + } + + /** + * Test {@code PositionedReadable.read()} past EOF in a file. + */ + @Test + public void testPositionedReadableReadPastEOF() throws Throwable { + + // set a length past the actual file length + final int extra = 10; + int longLen = fileLength + extra; + + describe("PositionedReadable.read() past the end of the file"); + + verifyMetrics(() -> { + try (FSDataInputStream in = + openFile(longLen, FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) { + byte[] buf = new byte[(int) (longLen + 1)]; + + // readFully will read to the end of the file + Assertions.assertThat(in.read(0, buf, 0, buf.length)) + .isEqualTo(fileLength); + assertS3StreamOpen(in); + + // now attempt to read after EOF + Assertions.assertThat(in.read(fileLength, buf, 0, buf.length)) + .describedAs("PositionedReadable.read() past EOF") + .isEqualTo(-1); + // stream is closed as part of this failure + assertS3StreamClosed(in); + + return "PositionedReadable.read()) past EOF"; + } + }, + with(Statistic.ACTION_HTTP_GET_REQUEST, 1)); // no attempt to re-open + } + + /** + * Test Vector Read past EOF in a file. + * See related tests in {@code ITestS3AContractVectoredRead} + */ + @Test + public void testVectorReadPastEOF() throws Throwable { + + // set a length past the actual file length + final int extra = 10; + int longLen = fileLength + extra; + + describe("Vector read past the end of the file"); + verifyMetrics(() -> { + try (FSDataInputStream in = + openFile(longLen, FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) { + assertS3StreamClosed(in); + byte[] buf = new byte[longLen]; + ByteBuffer bb = ByteBuffer.wrap(buf); + final FileRange range = FileRange.createFileRange(0, longLen); + in.readVectored(Arrays.asList(range), (i) -> bb); + interceptFuture(EOFException.class, + EOF_IN_READ_FULLY, + ContractTestUtils.VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS, + TimeUnit.SECONDS, + range.getData()); + assertS3StreamClosed(in); + return "vector read past EOF"; + } + }, + with(Statistic.ACTION_HTTP_GET_REQUEST, 1)); + } + + /** + * Assert that the inner S3 Stream is closed. + * @param in input stream + */ + private static void assertS3StreamClosed(final FSDataInputStream in) { + S3AInputStream s3ain = (S3AInputStream) in.getWrappedStream(); + Assertions.assertThat(s3ain.isObjectStreamOpen()) + .describedAs("stream is open") + .isFalse(); + } + + /** + * Assert that the inner S3 Stream is open. + * @param in input stream + */ + private static void assertS3StreamOpen(final FSDataInputStream in) { + S3AInputStream s3ain = (S3AInputStream) in.getWrappedStream(); + Assertions.assertThat(s3ain.isObjectStreamOpen()) + .describedAs("stream is closed") + .isTrue(); } }