diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/AsyncChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/AsyncChecker.java new file mode 100644 index 0000000000..1d534a369d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/AsyncChecker.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode.checker; + +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * A class that can be used to schedule an asynchronous check on a given + * {@link Checkable}. If the check is successfully scheduled then a + * {@link ListenableFuture} is returned. + * + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public interface AsyncChecker { + + /** + * Schedule an asynchronous check for the given object. + * + * @param target object to be checked. + * + * @param context the interpretation of the context depends on the + * target. + * + * @return returns a {@link ListenableFuture} that can be used to + * retrieve the result of the asynchronous check. + */ + ListenableFuture schedule(Checkable target, K context); + + /** + * Cancel all executing checks and wait for them to complete. + * First attempts a graceful cancellation, then cancels forcefully. + * Waits for the supplied timeout after both attempts. + * + * See {@link ExecutorService#awaitTermination} for a description of + * the parameters. + * + * @throws InterruptedException + */ + void shutdownAndWait(long timeout, TimeUnit timeUnit) + throws InterruptedException; +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/Checkable.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/Checkable.java new file mode 100644 index 0000000000..833ebda15f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/Checkable.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode.checker; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + + +/** + * A Checkable is an object whose health can be probed by invoking its + * {@link #check} method. + * + * e.g. a {@link Checkable} instance may represent a single hardware + * resource. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public interface Checkable { + + /** + * Query the health of this object. This method may hang + * indefinitely depending on the status of the target resource. + * + * @param context for the probe operation. May be null depending + * on the implementation. + * + * @return result of the check operation. + * + * @throws Exception encountered during the check operation. An + * exception indicates that the check failed. + */ + V check(K context) throws Exception; +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/ThrottledAsyncChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/ThrottledAsyncChecker.java new file mode 100644 index 0000000000..d0ee3d2c0b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/ThrottledAsyncChecker.java @@ -0,0 +1,224 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode.checker; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.util.Timer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.util.HashMap; +import java.util.Map; +import java.util.WeakHashMap; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * An implementation of {@link AsyncChecker} that skips checking recently + * checked objects. It will enforce at least {@link minMsBetweenChecks} + * milliseconds between two successive checks of any one object. + * + * It is assumed that the total number of Checkable objects in the system + * is small, (not more than a few dozen) since the checker uses O(Checkables) + * storage and also potentially O(Checkables) threads. + * + * {@link minMsBetweenChecks} should be configured reasonably + * by the caller to avoid spinning up too many threads frequently. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class ThrottledAsyncChecker implements AsyncChecker { + public static final Logger LOG = + LoggerFactory.getLogger(ThrottledAsyncChecker.class); + + private final Timer timer; + + /** + * The ExecutorService used to schedule asynchronous checks. + */ + private final ListeningExecutorService executorService; + + /** + * The minimum gap in milliseconds between two successive checks + * of the same object. This is the throttle. + */ + private final long minMsBetweenChecks; + + /** + * Map of checks that are currently in progress. Protected by the object + * lock. + */ + private final Map> checksInProgress; + + /** + * Maps Checkable objects to a future that can be used to retrieve + * the results of the operation. + * Protected by the object lock. + */ + private final Map> completedChecks; + + ThrottledAsyncChecker(final Timer timer, + final long minMsBetweenChecks, + final ExecutorService executorService) { + this.timer = timer; + this.minMsBetweenChecks = minMsBetweenChecks; + this.executorService = MoreExecutors.listeningDecorator(executorService); + this.checksInProgress = new HashMap<>(); + this.completedChecks = new WeakHashMap<>(); + } + + /** + * See {@link AsyncChecker#schedule} + * + * If the object has been checked recently then the check will + * be skipped. Multiple concurrent checks for the same object + * will receive the same Future. + */ + @Override + public synchronized ListenableFuture schedule( + final Checkable target, + final K context) { + LOG.debug("Scheduling a check of {}", target); + + if (checksInProgress.containsKey(target)) { + return checksInProgress.get(target); + } + + if (completedChecks.containsKey(target)) { + final LastCheckResult result = completedChecks.get(target); + final long msSinceLastCheck = timer.monotonicNow() - result.completedAt; + if (msSinceLastCheck < minMsBetweenChecks) { + LOG.debug("Skipped checking {}. Time since last check {}ms " + + "is less than the min gap {}ms.", + target, msSinceLastCheck, minMsBetweenChecks); + return result.result != null ? + Futures.immediateFuture(result.result) : + Futures.immediateFailedFuture(result.exception); + } + } + + final ListenableFuture lf = executorService.submit( + new Callable() { + @Override + public V call() throws Exception { + return target.check(context); + } + }); + checksInProgress.put(target, lf); + addResultCachingCallback(target, lf); + return lf; + } + + /** + * Register a callback to cache the result of a check. + * @param target + * @param lf + */ + private void addResultCachingCallback( + Checkable target, ListenableFuture lf) { + Futures.addCallback(lf, new FutureCallback() { + @Override + public void onSuccess(@Nullable V result) { + synchronized (ThrottledAsyncChecker.this) { + checksInProgress.remove(target); + completedChecks.put(target, new LastCheckResult<>( + result, timer.monotonicNow())); + } + } + + @Override + public void onFailure(@Nonnull Throwable t) { + synchronized (ThrottledAsyncChecker.this) { + checksInProgress.remove(target); + completedChecks.put(target, new LastCheckResult<>( + t, timer.monotonicNow())); + } + } + }); + } + + /** + * {@inheritDoc}. + */ + @Override + public void shutdownAndWait(long timeout, TimeUnit timeUnit) + throws InterruptedException { + // Try orderly shutdown. + executorService.shutdown(); + + if (!executorService.awaitTermination(timeout, timeUnit)) { + // Interrupt executing tasks and wait again. + executorService.shutdownNow(); + executorService.awaitTermination(timeout, timeUnit); + } + } + + /** + * Status of running a check. It can either be a result or an + * exception, depending on whether the check completed or threw. + */ + private static final class LastCheckResult { + /** + * Timestamp at which the check completed. + */ + private final long completedAt; + + /** + * Result of running the check if it completed. null if it threw. + */ + @Nullable + private final V result; + + /** + * Exception thrown by the check. null if it returned a result. + */ + private final Throwable exception; // null on success. + + /** + * Initialize with a result. + * @param result + */ + private LastCheckResult(V result, long completedAt) { + this.result = result; + this.exception = null; + this.completedAt = completedAt; + } + + /** + * Initialize with an exception. + * @param completedAt + * @param t + */ + private LastCheckResult(Throwable t, long completedAt) { + this.result = null; + this.exception = t; + this.completedAt = completedAt; + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/package-info.java new file mode 100644 index 0000000000..52822e9a8f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/package-info.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Datanode support for running disk checks. + */ +@InterfaceAudience.LimitedPrivate({"HDFS"}) +@InterfaceStability.Evolving +package org.apache.hadoop.hdfs.server.datanode.checker; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestThrottledAsyncChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestThrottledAsyncChecker.java new file mode 100644 index 0000000000..70795caf65 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestThrottledAsyncChecker.java @@ -0,0 +1,276 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode.checker; + +import com.google.common.base.Supplier; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.FakeTimer; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.core.Is.isA; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Verify functionality of {@link ThrottledAsyncChecker}. + */ +public class TestThrottledAsyncChecker { + public static final Logger LOG = + LoggerFactory.getLogger(TestThrottledAsyncChecker.class); + private static final long MIN_ERROR_CHECK_GAP = 1000; + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + /** + * Test various scheduling combinations to ensure scheduling and + * throttling behave as expected. + */ + @Test(timeout=60000) + public void testScheduler() throws Exception { + final NoOpCheckable target1 = new NoOpCheckable(); + final NoOpCheckable target2 = new NoOpCheckable(); + final FakeTimer timer = new FakeTimer(); + ThrottledAsyncChecker checker = + new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP, + getExecutorService()); + + // check target1 and ensure we get back the expected result. + assertTrue(checker.schedule(target1, true).get()); + assertThat(target1.numChecks.get(), is(1L)); + + // Check target1 again without advancing the timer. target1 should not + // be checked again and the cached result should be returned. + assertTrue(checker.schedule(target1, true).get()); + assertThat(target1.numChecks.get(), is(1L)); + + // Schedule target2 scheduled without advancing the timer. + // target2 should be checked as it has never been checked before. + assertTrue(checker.schedule(target2, true).get()); + assertThat(target2.numChecks.get(), is(1L)); + + // Advance the timer but just short of the min gap. + // Neither target1 nor target2 should be checked again. + timer.advance(MIN_ERROR_CHECK_GAP - 1); + assertTrue(checker.schedule(target1, true).get()); + assertThat(target1.numChecks.get(), is(1L)); + assertTrue(checker.schedule(target2, true).get()); + assertThat(target2.numChecks.get(), is(1L)); + + // Advance the timer again. + // Both targets should be checked now. + timer.advance(MIN_ERROR_CHECK_GAP); + assertTrue(checker.schedule(target1, true).get()); + assertThat(target1.numChecks.get(), is(2L)); + assertTrue(checker.schedule(target2, true).get()); + assertThat(target1.numChecks.get(), is(2L)); + } + + @Test (timeout=60000) + public void testCancellation() throws Exception { + LatchedCheckable target = new LatchedCheckable(); + final FakeTimer timer = new FakeTimer(); + final LatchedCallback callback = new LatchedCallback(target); + ThrottledAsyncChecker checker = + new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP, + getExecutorService()); + + ListenableFuture lf = checker.schedule(target, true); + Futures.addCallback(lf, callback); + + // Request immediate cancellation. + checker.shutdownAndWait(0, TimeUnit.MILLISECONDS); + try { + assertFalse(lf.get()); + fail("Failed to get expected InterruptedException"); + } catch (ExecutionException ee) { + assertTrue(ee.getCause() instanceof InterruptedException); + } + callback.failureLatch.await(); + } + + @Test (timeout=60000) + public void testConcurrentChecks() throws Exception { + LatchedCheckable target = new LatchedCheckable(); + final FakeTimer timer = new FakeTimer(); + ThrottledAsyncChecker checker = + new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP, + getExecutorService()); + final ListenableFuture lf1 = checker.schedule(target, true); + final ListenableFuture lf2 = checker.schedule(target, true); + + // Ensure that concurrent requests return the same future object. + assertTrue(lf1 == lf2); + + // Unblock the latch and wait for it to finish execution. + target.latch.countDown(); + lf1.get(); + + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + // We should not get back the same future as before. + // This can take a short while until the internal callback in + // ThrottledAsyncChecker is scheduled for execution. + // Also this should not trigger a new check operation as the timer + // was not advanced. If it does trigger a new check then the test + // will fail with a timeout. + final ListenableFuture lf3 = checker.schedule(target, true); + return lf3 != lf2; + } + }, 100, 10000); + } + + /** + * Ensure that the context is passed through to the Checkable#check + * method. + * @throws Exception + */ + @Test(timeout=60000) + public void testContextIsPassed() throws Exception { + final NoOpCheckable target1 = new NoOpCheckable(); + final FakeTimer timer = new FakeTimer(); + ThrottledAsyncChecker checker = + new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP, + getExecutorService()); + + assertTrue(checker.schedule(target1, true).get()); + assertThat(target1.numChecks.get(), is(1L)); + timer.advance(MIN_ERROR_CHECK_GAP + 1); + assertFalse(checker.schedule(target1, false).get()); + assertThat(target1.numChecks.get(), is(2L)); + } + + /** + * Ensure that the exeption from a failed check is cached + * and returned without re-running the check when the minimum + * gap has not elapsed. + * + * @throws Exception + */ + @Test(timeout=60000) + public void testExceptionCaching() throws Exception { + final ThrowingCheckable target1 = new ThrowingCheckable(); + final FakeTimer timer = new FakeTimer(); + ThrottledAsyncChecker checker = + new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP, + getExecutorService()); + + thrown.expectCause(isA(DummyException.class)); + checker.schedule(target1, true).get(); + assertThat(target1.numChecks.get(), is(1L)); + + thrown.expectCause(isA(DummyException.class)); + checker.schedule(target1, true).get(); + assertThat(target1.numChecks.get(), is(2L)); + } + + /** + * A simple ExecutorService for testing. + */ + private ExecutorService getExecutorService() { + return new ScheduledThreadPoolExecutor(1); + } + + /** + * A Checkable that just returns its input. + */ + private static class NoOpCheckable + implements Checkable { + private final AtomicLong numChecks = new AtomicLong(0); + @Override + public Boolean check(Boolean context) { + numChecks.incrementAndGet(); + return context; + } + } + + private static class ThrowingCheckable + implements Checkable { + private final AtomicLong numChecks = new AtomicLong(0); + @Override + public Boolean check(Boolean context) throws DummyException { + numChecks.incrementAndGet(); + throw new DummyException(); + } + + } + + private static class DummyException extends Exception { + } + + /** + * A checkable that hangs until signaled. + */ + private static class LatchedCheckable + implements Checkable { + private final CountDownLatch latch = new CountDownLatch(1); + + @Override + public Boolean check(Boolean ignored) throws InterruptedException { + LOG.info("LatchedCheckable {} waiting.", this); + latch.await(); + return true; // Unreachable. + } + } + + /** + * A {@link FutureCallback} that counts its invocations. + */ + private static final class LatchedCallback + implements FutureCallback { + private final CountDownLatch successLatch = new CountDownLatch(1); + private final CountDownLatch failureLatch = new CountDownLatch(1); + private final Checkable target; + + private LatchedCallback(Checkable target) { + this.target = target; + } + + @Override + public void onSuccess(@Nonnull Boolean result) { + LOG.info("onSuccess callback invoked for {}", target); + successLatch.countDown(); + } + + @Override + public void onFailure(@Nonnull Throwable t) { + LOG.info("onFailure callback invoked for {} with exception", target, t); + failureLatch.countDown(); + } + } +}