From 3fbf4cd5da13dde68b77e581ea2d4aa564c8c8b7 Mon Sep 17 00:00:00 2001 From: Anu Engineer Date: Thu, 20 Oct 2016 12:33:58 -0700 Subject: [PATCH] HADOOP-13716. Add LambdaTestUtils class for tests; fix eventual consistency problem in contract test setup. Contributed by Steve Loughran. --- .../AbstractContractRootDirectoryTest.java | 48 +- .../hadoop/fs/contract/ContractTestUtils.java | 6 +- .../apache/hadoop/test/LambdaTestUtils.java | 521 ++++++++++++++++++ .../hadoop/test/TestLambdaTestUtils.java | 395 +++++++++++++ .../fs/s3a/ITestS3AFailureHandling.java | 20 +- .../apache/hadoop/fs/s3a/S3ATestUtils.java | 48 -- 6 files changed, 962 insertions(+), 76 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/LambdaTestUtils.java create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/TestLambdaTestUtils.java diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractRootDirectoryTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractRootDirectoryTest.java index 0a8f464486..5fba4bfc27 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractRootDirectoryTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractRootDirectoryTest.java @@ -27,12 +27,16 @@ import java.io.IOException; import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.test.LambdaTestUtils; import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile; import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; import static org.apache.hadoop.fs.contract.ContractTestUtils.deleteChildren; +import static org.apache.hadoop.fs.contract.ContractTestUtils.dumpStats; import static org.apache.hadoop.fs.contract.ContractTestUtils.listChildren; import static org.apache.hadoop.fs.contract.ContractTestUtils.toList; import static org.apache.hadoop.fs.contract.ContractTestUtils.treeWalk; @@ -45,6 +49,7 @@ public abstract class AbstractContractRootDirectoryTest extends AbstractFSContractTestBase { private static final Logger LOG = LoggerFactory.getLogger(AbstractContractRootDirectoryTest.class); + public static final int OBJECTSTORE_RETRY_TIMEOUT = 30000; @Override public void setup() throws Exception { @@ -79,23 +84,34 @@ public void testRmEmptyRootDirNonRecursive() throws Throwable { // extra sanity checks here to avoid support calls about complete loss // of data skipIfUnsupported(TEST_ROOT_TESTS_ENABLED); - Path root = new Path("/"); + final Path root = new Path("/"); assertIsDirectory(root); - // make sure it is clean - FileSystem fs = getFileSystem(); - deleteChildren(fs, root, true); - FileStatus[] children = listChildren(fs, root); - if (children.length > 0) { - StringBuilder error = new StringBuilder(); - error.append("Deletion of child entries failed, still have") - .append(children.length) - .append(System.lineSeparator()); - for (FileStatus child : children) { - error.append(" ").append(child.getPath()) - .append(System.lineSeparator()); - } - fail(error.toString()); - } + // make sure the directory is clean. This includes some retry logic + // to forgive blobstores whose listings can be out of sync with the file + // status; + final FileSystem fs = getFileSystem(); + final AtomicInteger iterations = new AtomicInteger(0); + final FileStatus[] originalChildren = listChildren(fs, root); + LambdaTestUtils.eventually( + OBJECTSTORE_RETRY_TIMEOUT, + new Callable() { + @Override + public Void call() throws Exception { + FileStatus[] deleted = deleteChildren(fs, root, true); + FileStatus[] children = listChildren(fs, root); + if (children.length > 0) { + fail(String.format( + "After %d attempts: listing after rm /* not empty" + + "\n%s\n%s\n%s", + iterations.incrementAndGet(), + dumpStats("final", children), + dumpStats("deleted", deleted), + dumpStats("original", originalChildren))); + } + return null; + } + }, + new LambdaTestUtils.ProportionalRetryInterval(50, 1000)); // then try to delete the empty one boolean deleted = fs.delete(root, false); LOG.info("rm / of empty dir result is {}", deleted); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java index 16bfb9a696..73c8f1ce09 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java @@ -400,18 +400,18 @@ public static void rejectRootOperation(Path path) throws IOException { * @param fileSystem filesystem * @param path path to delete * @param recursive flag to indicate child entry deletion should be recursive - * @return the number of child entries found and deleted (not including + * @return the immediate child entries found and deleted (not including * any recursive children of those entries) * @throws IOException problem in the deletion process. */ - public static int deleteChildren(FileSystem fileSystem, + public static FileStatus[] deleteChildren(FileSystem fileSystem, Path path, boolean recursive) throws IOException { FileStatus[] children = listChildren(fileSystem, path); for (FileStatus entry : children) { fileSystem.delete(entry.getPath(), recursive); } - return children.length; + return children; } /** diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/LambdaTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/LambdaTestUtils.java new file mode 100644 index 0000000000..1fa5c3f20e --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/LambdaTestUtils.java @@ -0,0 +1,521 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.test; + +import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.util.Time; + +import java.util.concurrent.Callable; +import java.util.concurrent.TimeoutException; + +/** + * Class containing methods and associated classes to make the most of Lambda + * expressions in Hadoop tests. + * + * The code has been designed from the outset to be Java-8 friendly, but + * to still be usable in Java 7. + * + * The code is modelled on {@code GenericTestUtils#waitFor(Supplier, int, int)}, + * but also lifts concepts from Scalatest's {@code awaitResult} and + * its notion of pluggable retry logic (simple, backoff, maybe even things + * with jitter: test author gets to choose). + * The {@link #intercept(Class, Callable)} method is also all credit due + * Scalatest, though it's been extended to also support a string message + * check; useful when checking the contents of the exception. + */ +public final class LambdaTestUtils { + public static final Logger LOG = + LoggerFactory.getLogger(LambdaTestUtils.class); + + private LambdaTestUtils() { + } + + /** + * This is the string included in the assertion text in + * {@link #intercept(Class, Callable)} if + * the closure returned a null value. + */ + public static final String NULL_RESULT = "(null)"; + + /** + * Interface to implement for converting a timeout into some form + * of exception to raise. + */ + public interface TimeoutHandler { + + /** + * Create an exception (or throw one, if desired). + * @param timeoutMillis timeout which has arisen + * @param caught any exception which was caught; may be null + * @return an exception which will then be thrown + * @throws Exception if the handler wishes to raise an exception + * that way. + */ + Exception evaluate(int timeoutMillis, Exception caught) throws Exception; + } + + /** + * Wait for a condition to be met, with a retry policy returning the + * sleep time before the next attempt is made. If, at the end + * of the timeout period, the condition is still false (or failing with + * an exception), the timeout handler is invoked, passing in the timeout + * and any exception raised in the last invocation. The exception returned + * by this timeout handler is then rethrown. + *

+ * Example: Wait 30s for a condition to be met, with a sleep of 30s + * between each probe. + * If the operation is failing, then, after 30s, the timeout handler + * is called. This returns the exception passed in (if any), + * or generates a new one. + *

+   * await(
+   *   30 * 1000,
+   *   () -> { return 0 == filesystem.listFiles(new Path("/")).length); },
+   *   () -> 500),
+   *   (timeout, ex) -> ex != null ? ex : new TimeoutException("timeout"));
+   * 
+ * + * @param timeoutMillis timeout in milliseconds. + * Can be zero, in which case only one attempt is made. + * @param check predicate to evaluate + * @param retry retry escalation logic + * @param timeoutHandler handler invoked on timeout; + * the returned exception will be thrown + * @return the number of iterations before the condition was satisfied + * @throws Exception the exception returned by {@code timeoutHandler} on + * timeout + * @throws FailFastException immediately if the evaluated operation raises it + * @throws InterruptedException if interrupted. + */ + public static int await(int timeoutMillis, + Callable check, + Callable retry, + TimeoutHandler timeoutHandler) + throws Exception { + Preconditions.checkArgument(timeoutMillis >= 0, + "timeoutMillis must be >= 0"); + Preconditions.checkNotNull(timeoutHandler); + + long endTime = Time.now() + timeoutMillis; + Exception ex = null; + boolean running = true; + int iterations = 0; + while (running) { + iterations++; + try { + if (check.call()) { + return iterations; + } + // the probe failed but did not raise an exception. Reset any + // exception raised by a previous probe failure. + ex = null; + } catch (InterruptedException | FailFastException e) { + throw e; + } catch (Exception e) { + LOG.debug("eventually() iteration {}", iterations, e); + ex = e; + } + running = Time.now() < endTime; + if (running) { + int sleeptime = retry.call(); + if (sleeptime >= 0) { + Thread.sleep(sleeptime); + } else { + running = false; + } + } + } + // timeout + Exception evaluate = timeoutHandler.evaluate(timeoutMillis, ex); + if (evaluate == null) { + // bad timeout handler logic; fall back to GenerateTimeout so the + // underlying problem isn't lost. + LOG.error("timeout handler {} did not throw an exception ", + timeoutHandler); + evaluate = new GenerateTimeout().evaluate(timeoutMillis, ex); + } + throw evaluate; + } + + /** + * Simplified {@link #await(int, Callable, Callable, TimeoutHandler)} + * operation with a fixed interval + * and {@link GenerateTimeout} handler to generate a {@code TimeoutException}. + *

+ * Example: await for probe to succeed: + *

+   * await(
+   *   30 * 1000, 500,
+   *   () -> { return 0 == filesystem.listFiles(new Path("/")).length); });
+   * 
+ * + * @param timeoutMillis timeout in milliseconds. + * Can be zero, in which case only one attempt is made. + * @param intervalMillis interval in milliseconds between checks + * @param check predicate to evaluate + * @return the number of iterations before the condition was satisfied + * @throws Exception returned by {@code failure} on timeout + * @throws FailFastException immediately if the evaluated operation raises it + * @throws InterruptedException if interrupted. + */ + public static int await(int timeoutMillis, + int intervalMillis, + Callable check) throws Exception { + return await(timeoutMillis, check, + new FixedRetryInterval(intervalMillis), + new GenerateTimeout()); + } + + /** + * Repeatedly execute a closure until it returns a value rather than + * raise an exception. + * Exceptions are caught and, with one exception, + * trigger a sleep and retry. This is similar of ScalaTest's + * {@code eventually(timeout, closure)} operation, though that lacks + * the ability to fail fast if the inner closure has determined that + * a failure condition is non-recoverable. + *

+ * Example: spin until an the number of files in a filesystem is non-zero, + * returning the files found. + * The sleep interval backs off by 500 ms each iteration to a maximum of 5s. + *

+   * FileStatus[] files = eventually( 30 * 1000,
+   *   () -> {
+   *     FileStatus[] f = filesystem.listFiles(new Path("/"));
+   *     assertEquals(0, f.length);
+   *     return f;
+   *   },
+   *   new ProportionalRetryInterval(500, 5000));
+   * 
+ * This allows for a fast exit, yet reduces probe frequency over time. + * + * @param return type + * @param timeoutMillis timeout in milliseconds. + * Can be zero, in which case only one attempt is made before failing. + * @param eval expression to evaluate + * @param retry retry interval generator + * @return result of the first successful eval call + * @throws Exception the last exception thrown before timeout was triggered + * @throws FailFastException if raised -without any retry attempt. + * @throws InterruptedException if interrupted during the sleep operation. + */ + public static T eventually(int timeoutMillis, + Callable eval, + Callable retry) throws Exception { + Preconditions.checkArgument(timeoutMillis >= 0, + "timeoutMillis must be >= 0"); + long endTime = Time.now() + timeoutMillis; + Exception ex; + boolean running; + int sleeptime; + int iterations = 0; + do { + iterations++; + try { + return eval.call(); + } catch (InterruptedException | FailFastException e) { + // these two exceptions trigger an immediate exit + throw e; + } catch (Exception e) { + LOG.debug("evaluate() iteration {}", iterations, e); + ex = e; + } + running = Time.now() < endTime; + if (running && (sleeptime = retry.call()) >= 0) { + Thread.sleep(sleeptime); + } + } while (running); + // timeout. Throw the last exception raised + throw ex; + } + + /** + * Simplified {@link #eventually(int, Callable, Callable)} method + * with a fixed interval. + *

+ * Example: wait 30s until an assertion holds, sleeping 1s between each + * check. + *

+   * eventually( 30 * 1000, 1000,
+   *   () -> { assertEquals(0, filesystem.listFiles(new Path("/")).length); }
+   * );
+   * 
+ * + * @param timeoutMillis timeout in milliseconds. + * Can be zero, in which case only one attempt is made before failing. + * @param intervalMillis interval in milliseconds + * @param eval expression to evaluate + * @return result of the first successful invocation of {@code eval()} + * @throws Exception the last exception thrown before timeout was triggered + * @throws FailFastException if raised -without any retry attempt. + * @throws InterruptedException if interrupted during the sleep operation. + */ + public static T eventually(int timeoutMillis, + int intervalMillis, + Callable eval) throws Exception { + return eventually(timeoutMillis, eval, + new FixedRetryInterval(intervalMillis)); + } + + /** + * Intercept an exception; throw an {@code AssertionError} if one not raised. + * The caught exception is rethrown if it is of the wrong class or + * does not contain the text defined in {@code contained}. + *

+ * Example: expect deleting a nonexistent file to raise a + * {@code FileNotFoundException}. + *

+   * FileNotFoundException ioe = intercept(FileNotFoundException.class,
+   *   () -> {
+   *     filesystem.delete(new Path("/missing"), false);
+   *   });
+   * 
+ * + * @param clazz class of exception; the raised exception must be this class + * or a subclass. + * @param eval expression to eval + * @param return type of expression + * @param exception class + * @return the caught exception if it was of the expected type + * @throws Exception any other exception raised + * @throws AssertionError if the evaluation call didn't raise an exception. + * The error includes the {@code toString()} value of the result, if this + * can be determined. + */ + @SuppressWarnings("unchecked") + public static E intercept( + Class clazz, + Callable eval) + throws Exception { + try { + T result = eval.call(); + throw new AssertionError("Expected an exception, got " + + robustToString(result)); + } catch (Throwable e) { + if (clazz.isAssignableFrom(e.getClass())) { + return (E)e; + } + throw e; + } + } + + /** + * Intercept an exception; throw an {@code AssertionError} if one not raised. + * The caught exception is rethrown if it is of the wrong class or + * does not contain the text defined in {@code contained}. + *

+ * Example: expect deleting a nonexistent file to raise a + * {@code FileNotFoundException} with the {@code toString()} value + * containing the text {@code "missing"}. + *

+   * FileNotFoundException ioe = intercept(FileNotFoundException.class,
+   *   "missing",
+   *   () -> {
+   *     filesystem.delete(new Path("/missing"), false);
+   *   });
+   * 
+ * + * @param clazz class of exception; the raised exception must be this class + * or a subclass. + * @param contained string which must be in the {@code toString()} value + * of the exception + * @param eval expression to eval + * @param return type of expression + * @param exception class + * @return the caught exception if it was of the expected type and contents + * @throws Exception any other exception raised + * @throws AssertionError if the evaluation call didn't raise an exception. + * The error includes the {@code toString()} value of the result, if this + * can be determined. + * @see GenericTestUtils#assertExceptionContains(String, Throwable) + */ + public static E intercept( + Class clazz, + String contained, + Callable eval) + throws Exception { + E ex = intercept(clazz, eval); + GenericTestUtils.assertExceptionContains(contained, ex); + return ex; + } + + /** + * Robust string converter for exception messages; if the {@code toString()} + * method throws an exception then that exception is caught and logged, + * then a simple string of the classname logged. + * This stops a {@code toString()} failure hiding underlying problems. + * @param o object to stringify + * @return a string for exception messages + */ + private static String robustToString(Object o) { + if (o == null) { + return NULL_RESULT; + } else { + try { + return o.toString(); + } catch (Exception e) { + LOG.info("Exception calling toString()", e); + return o.getClass().toString(); + } + } + } + + /** + * Returns {@code TimeoutException} on a timeout. If + * there was a inner class passed in, includes it as the + * inner failure. + */ + public static class GenerateTimeout implements TimeoutHandler { + private final String message; + + public GenerateTimeout(String message) { + this.message = message; + } + + public GenerateTimeout() { + this("timeout"); + } + + /** + * Evaluate operation creates a new {@code TimeoutException}. + * @param timeoutMillis timeout in millis + * @param caught optional caught exception + * @return TimeoutException + */ + @Override + public Exception evaluate(int timeoutMillis, Exception caught) + throws Exception { + String s = String.format("%s: after %d millis", message, + timeoutMillis); + String caughtText = caught != null + ? ("; " + robustToString(caught)) : ""; + + return (TimeoutException) (new TimeoutException(s + caughtText) + .initCause(caught)); + } + } + + /** + * Retry at a fixed time period between calls. + */ + public static class FixedRetryInterval implements Callable { + private final int intervalMillis; + private int invocationCount = 0; + + public FixedRetryInterval(int intervalMillis) { + Preconditions.checkArgument(intervalMillis > 0); + this.intervalMillis = intervalMillis; + } + + @Override + public Integer call() throws Exception { + invocationCount++; + return intervalMillis; + } + + public int getInvocationCount() { + return invocationCount; + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder( + "FixedRetryInterval{"); + sb.append("interval=").append(intervalMillis); + sb.append(", invocationCount=").append(invocationCount); + sb.append('}'); + return sb.toString(); + } + } + + /** + * Gradually increase the sleep time by the initial interval, until + * the limit set by {@code maxIntervalMillis} is reached. + */ + public static class ProportionalRetryInterval implements Callable { + private final int intervalMillis; + private final int maxIntervalMillis; + private int current; + private int invocationCount = 0; + + public ProportionalRetryInterval(int intervalMillis, + int maxIntervalMillis) { + Preconditions.checkArgument(intervalMillis > 0); + Preconditions.checkArgument(maxIntervalMillis > 0); + this.intervalMillis = intervalMillis; + this.current = intervalMillis; + this.maxIntervalMillis = maxIntervalMillis; + } + + @Override + public Integer call() throws Exception { + invocationCount++; + int last = current; + if (last < maxIntervalMillis) { + current += intervalMillis; + } + return last; + } + + public int getInvocationCount() { + return invocationCount; + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder( + "ProportionalRetryInterval{"); + sb.append("interval=").append(intervalMillis); + sb.append(", current=").append(current); + sb.append(", limit=").append(maxIntervalMillis); + sb.append(", invocationCount=").append(invocationCount); + sb.append('}'); + return sb.toString(); + } + } + + /** + * An exception which triggers a fast exist from the + * {@link #eventually(int, Callable, Callable)} and + * {@link #await(int, Callable, Callable, TimeoutHandler)} loops. + */ + public static class FailFastException extends Exception { + + public FailFastException(String detailMessage) { + super(detailMessage); + } + + public FailFastException(String message, Throwable cause) { + super(message, cause); + } + + /** + * Instantiate from a format string. + * @param format format string + * @param args arguments to format + * @return an instance with the message string constructed. + */ + public static FailFastException newInstance(String format, Object...args) { + return new FailFastException(String.format(format, args)); + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/TestLambdaTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/TestLambdaTestUtils.java new file mode 100644 index 0000000000..d3d5cb4fde --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/TestLambdaTestUtils.java @@ -0,0 +1,395 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.test; + +import org.junit.Assert; +import org.junit.Test; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeoutException; + +import static org.apache.hadoop.test.LambdaTestUtils.*; +import static org.apache.hadoop.test.GenericTestUtils.*; + +/** + * Test the logic in {@link LambdaTestUtils}. + * This test suite includes Java 8 and Java 7 code; the Java 8 code exists + * to verify that the API is easily used with Lambda expressions. + */ +public class TestLambdaTestUtils extends Assert { + + public static final int INTERVAL = 10; + public static final int TIMEOUT = 50; + private FixedRetryInterval retry = new FixedRetryInterval(INTERVAL); + // counter for lambda expressions to use + private int count; + + /** + * Always evaluates to true. + */ + public static final Callable ALWAYS_TRUE = + new Callable() { + @Override + public Boolean call() throws Exception { + return true; + } + }; + + /** + * Always evaluates to false. + */ + public static final Callable ALWAYS_FALSE = + new Callable() { + @Override + public Boolean call() throws Exception { + return false; + } + }; + + /** + * Text in the raised FNFE. + */ + public static final String MISSING = "not found"; + + /** + * A predicate that always throws a FileNotFoundException. + */ + public static final Callable ALWAYS_FNFE = + new Callable() { + @Override + public Boolean call() throws Exception { + throw new FileNotFoundException(MISSING); + } + }; + + /** + * reusable timeout handler. + */ + public static final GenerateTimeout + TIMEOUT_FAILURE_HANDLER = new GenerateTimeout(); + + /** + * Always evaluates to 3L. + */ + public static final Callable EVAL_3L = new Callable() { + @Override + public Long call() throws Exception { + return 3L; + } + }; + + /** + * Always raises a {@code FileNotFoundException}. + */ + public static final Callable EVAL_FNFE = new Callable() { + @Override + public Long call() throws Exception { + throw new FileNotFoundException(MISSING); + } + }; + + /** + * Assert the retry count is as expected. + * @param expected expected value + */ + protected void assertRetryCount(int expected) { + assertEquals(retry.toString(), expected, retry.getInvocationCount()); + } + + /** + * Assert the retry count is as expected. + * @param minCount minimum value + */ + protected void assertMinRetryCount(int minCount) { + assertTrue("retry count of " + retry + " is not >= " + minCount, + minCount <= retry.getInvocationCount()); + } + + @Test + public void testAwaitAlwaysTrue() throws Throwable { + await(TIMEOUT, + ALWAYS_TRUE, + new FixedRetryInterval(INTERVAL), + TIMEOUT_FAILURE_HANDLER); + } + + @Test + public void testAwaitAlwaysFalse() throws Throwable { + try { + await(TIMEOUT, + ALWAYS_FALSE, + retry, + TIMEOUT_FAILURE_HANDLER); + fail("should not have got here"); + } catch (TimeoutException e) { + assertTrue(retry.getInvocationCount() > 4); + } + } + + @Test + public void testAwaitLinearRetry() throws Throwable { + ProportionalRetryInterval linearRetry = + new ProportionalRetryInterval(INTERVAL * 2, TIMEOUT * 2); + try { + await(TIMEOUT, + ALWAYS_FALSE, + linearRetry, + TIMEOUT_FAILURE_HANDLER); + fail("should not have got here"); + } catch (TimeoutException e) { + assertEquals(linearRetry.toString(), + 2, linearRetry.getInvocationCount()); + } + } + + @Test + public void testAwaitFNFE() throws Throwable { + try { + await(TIMEOUT, + ALWAYS_FNFE, + retry, + TIMEOUT_FAILURE_HANDLER); + fail("should not have got here"); + } catch (TimeoutException e) { + // inner clause is included + assertTrue(retry.getInvocationCount() > 0); + assertTrue(e.getCause() instanceof FileNotFoundException); + assertExceptionContains(MISSING, e); + } + } + + @Test + public void testRetryInterval() throws Throwable { + ProportionalRetryInterval interval = + new ProportionalRetryInterval(200, 1000); + assertEquals(200, (int) interval.call()); + assertEquals(400, (int) interval.call()); + assertEquals(600, (int) interval.call()); + assertEquals(800, (int) interval.call()); + assertEquals(1000, (int) interval.call()); + assertEquals(1000, (int) interval.call()); + assertEquals(1000, (int) interval.call()); + } + + @Test + public void testInterceptSuccess() throws Throwable { + IOException ioe = intercept(IOException.class, ALWAYS_FNFE); + assertExceptionContains(MISSING, ioe); + } + + @Test + public void testInterceptContains() throws Throwable { + intercept(IOException.class, MISSING, ALWAYS_FNFE); + } + + @Test + public void testInterceptContainsWrongString() throws Throwable { + try { + FileNotFoundException e = + intercept(FileNotFoundException.class, "404", ALWAYS_FNFE); + assertNotNull(e); + throw e; + } catch (AssertionError expected) { + assertExceptionContains(MISSING, expected); + } + } + + @Test + public void testInterceptVoidCallable() throws Throwable { + intercept(AssertionError.class, + NULL_RESULT, + new Callable() { + @Override + public IOException call() throws Exception { + return intercept(IOException.class, + new Callable() { + @Override + public Void call() throws Exception { + return null; + } + }); + } + }); + } + + @Test + public void testEventually() throws Throwable { + long result = eventually(TIMEOUT, EVAL_3L, retry); + assertEquals(3, result); + assertEquals(0, retry.getInvocationCount()); + } + + @Test + public void testEventuallyFailuresRetry() throws Throwable { + try { + eventually(TIMEOUT, EVAL_FNFE, retry); + fail("should not have got here"); + } catch (IOException expected) { + // expected + assertMinRetryCount(1); + } + } + + /* + * Java 8 Examples go below this line. + */ + + @Test + public void testInterceptFailure() throws Throwable { + try { + IOException ioe = intercept(IOException.class, () -> "hello"); + assertNotNull(ioe); + throw ioe; + } catch (AssertionError expected) { + assertExceptionContains("hello", expected); + } + } + + @Test + public void testInterceptInterceptLambda() throws Throwable { + // here we use intercept() to test itself. + intercept(AssertionError.class, + MISSING, + () -> intercept(FileNotFoundException.class, "404", ALWAYS_FNFE)); + } + + @Test + public void testInterceptInterceptVoidResultLambda() throws Throwable { + // see what happens when a null is returned; type inference -> Void + intercept(AssertionError.class, + NULL_RESULT, + () -> intercept(IOException.class, () -> null)); + } + + @Test + public void testInterceptInterceptStringResultLambda() throws Throwable { + // see what happens when a string is returned; it should be used + // in the message + intercept(AssertionError.class, + "hello, world", + () -> intercept(IOException.class, + () -> "hello, world")); + } + + @Test + public void testAwaitNoTimeoutLambda() throws Throwable { + await(0, + () -> true, + retry, + (timeout, ex) -> ex != null ? ex : new Exception("timeout")); + assertRetryCount(0); + } + + @Test + public void testAwaitLambdaRepetitions() throws Throwable { + count = 0; + + // lambda expression which will succeed after exactly 4 probes + int reps = await(TIMEOUT, + () -> ++count == 4, + () -> 10, + (timeout, ex) -> ex != null ? ex : new Exception("timeout")); + assertEquals(4, reps); + } + + @Test + public void testInterceptAwaitLambdaException() throws Throwable { + count = 0; + IOException ioe = intercept(IOException.class, + () -> await( + TIMEOUT, + () -> { + throw new IOException("inner " + ++count); + }, + retry, + (timeout, ex) -> ex)); + assertRetryCount(count - 1); + // verify that the exception returned was the last one raised + assertExceptionContains(Integer.toString(count), ioe); + } + + @Test + public void testInterceptAwaitLambdaDiagnostics() throws Throwable { + intercept(IOException.class, "generated", + () -> await(5, + () -> false, + () -> -1, // force checks -1 timeout probes + (timeout, ex) -> new IOException("generated"))); + } + + @Test + public void testInterceptAwaitFailFastLambda() throws Throwable { + intercept(FailFastException.class, + () -> await(TIMEOUT, + () -> { + throw new FailFastException("ffe"); + }, + retry, + (timeout, ex) -> ex)); + assertRetryCount(0); + } + + @Test + public void testEventuallyOnceLambda() throws Throwable { + String result = eventually(0, () -> "hello", retry); + assertEquals("hello", result); + assertEquals(0, retry.getInvocationCount()); + } + + @Test + public void testEventuallyLambda() throws Throwable { + long result = eventually(TIMEOUT, () -> 3, retry); + assertEquals(3, result); + assertRetryCount(0); + } + + @Test + public void testInterceptEventuallyLambdaFailures() throws Throwable { + intercept(IOException.class, + "oops", + () -> eventually(TIMEOUT, + () -> { + throw new IOException("oops"); + }, + retry)); + assertMinRetryCount(1); + } + + @Test + public void testInterceptEventuallyambdaFailuresNegativeRetry() + throws Throwable { + intercept(FileNotFoundException.class, + () -> eventually(TIMEOUT, EVAL_FNFE, () -> -1)); + } + + @Test + public void testInterceptEventuallyLambdaFailFast() throws Throwable { + intercept(FailFastException.class, "oops", + () -> eventually( + TIMEOUT, + () -> { + throw new FailFastException("oops"); + }, + retry)); + assertRetryCount(0); + } + +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFailureHandling.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFailureHandling.java index 06864881e3..e284ea7226 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFailureHandling.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFailureHandling.java @@ -25,6 +25,8 @@ import org.apache.hadoop.fs.contract.AbstractFSContract; import org.apache.hadoop.fs.contract.AbstractFSContractTestBase; import org.apache.hadoop.fs.contract.s3a.S3AContract; +import org.apache.hadoop.test.LambdaTestUtils; + import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,8 +36,6 @@ import java.util.concurrent.Callable; import static org.apache.hadoop.fs.contract.ContractTestUtils.*; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; -import static org.apache.hadoop.fs.s3a.S3AUtils.*; /** * Test S3A Failure translation, including a functional test @@ -68,13 +68,15 @@ public void testReadFileChanged() throws Throwable { writeDataset(fs, testpath, shortDataset, shortDataset.length, 1024, true); // here the file length is less. Probe the file to see if this is true, // with a spin and wait - eventually(30 *1000, new Callable() { - @Override - public Void call() throws Exception { - assertEquals(shortLen, fs.getFileStatus(testpath).getLen()); - return null; - } - }); + LambdaTestUtils.eventually(30 * 1000, 1000, + new Callable() { + @Override + public Void call() throws Exception { + assertEquals(shortLen, fs.getFileStatus(testpath).getLen()); + return null; + } + }); + // here length is shorter. Assuming it has propagated to all replicas, // the position of the input stream is now beyond the EOF. // An attempt to seek backwards to a position greater than the diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java index c67e118e97..19dccac15b 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java @@ -28,7 +28,6 @@ import java.io.IOException; import java.net.URI; -import java.util.concurrent.Callable; import static org.apache.hadoop.fs.contract.ContractTestUtils.skip; import static org.apache.hadoop.fs.s3a.S3ATestConstants.*; @@ -135,32 +134,6 @@ public static FileContext createTestFileContext(Configuration conf) return fc; } - /** - * Repeatedly attempt a callback until timeout or a {@link FailFastException} - * is raised. This is modeled on ScalaTests {@code eventually(Closure)} code. - * @param timeout timeout - * @param callback callback to invoke - * @throws FailFastException any fast-failure - * @throws Exception the exception which caused the iterator to fail - */ - public static void eventually(int timeout, Callable callback) - throws Exception { - Exception lastException; - long endtime = System.currentTimeMillis() + timeout; - do { - try { - callback.call(); - return; - } catch (InterruptedException | FailFastException e) { - throw e; - } catch (Exception e) { - lastException = e; - } - Thread.sleep(500); - } while (endtime > System.currentTimeMillis()); - throw lastException; - } - /** * patch the endpoint option so that irrespective of where other tests * are working, the IO performance tests can work with the landsat @@ -290,27 +263,6 @@ public static String getTestProperty(Configuration conf, ? propval : confVal; } - /** - * The exception to raise so as to exit fast from - * {@link #eventually(int, Callable)}. - */ - public static class FailFastException extends Exception { - public FailFastException() { - } - - public FailFastException(String message) { - super(message); - } - - public FailFastException(String message, Throwable cause) { - super(message, cause); - } - - public FailFastException(Throwable cause) { - super(cause); - } - } - /** * Verify the class of an exception. If it is not as expected, rethrow it. * Comparison is on the exact class, not subclass-of inference as