diff --git a/LICENSE.txt b/LICENSE.txt
index d334204aaa..52da57acb6 100644
--- a/LICENSE.txt
+++ b/LICENSE.txt
@@ -2659,4 +2659,24 @@ available under the Creative Commons By Attribution 3.0 License.
available upon request from time to time. For the avoidance of doubt,
this trademark restriction does not form part of this License.
- Creative Commons may be contacted at https://creativecommons.org/.
\ No newline at end of file
+ Creative Commons may be contacted at https://creativecommons.org/.
+--------------------------------------------------------------------------------
+
+For: hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs
+/server/datanode/checker/AbstractFuture.java and
+hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs
+/server/datanode/checker/TimeoutFuture.java
+
+Copyright (C) 2007 The Guava Authors
+
+Licensed under the Apache License, Version 2.0 (the "License"); you may not
+use this file except in compliance with the License. You may obtain a copy of
+the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+License for the specific language governing permissions and limitations under
+the License.
diff --git a/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
index 598f4fd2a6..be54efb242 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
@@ -240,4 +240,16 @@
+ *
This class implements all methods in {@code ListenableFuture}.
+ * Subclasses should provide a way to set the result of the computation
+ * through the protected methods {@link #set(Object)},
+ * {@link #setFuture(ListenableFuture)} and {@link #setException(Throwable)}.
+ * Subclasses may also override {@link #interruptTask()}, which will be
+ * invoked automatically if a call to {@link #cancel(boolean) cancel(true)}
+ * succeeds in canceling the future. Subclasses should rarely override other
+ * methods.
+ */
+
+@GwtCompatible(emulated = true)
+public abstract class AbstractFuture
+ * The valid values are:
+ *
+ * The default {@link AbstractFuture} implementation throws {@code
+ * InterruptedException} if the current thread is interrupted before or
+ * during the call, even if the value is already available.
+ *
+ * @throws InterruptedException if the current thread was interrupted
+ * before or during the call
+ * (optional but recommended).
+ * @throws CancellationException {@inheritDoc}
+ */
+ @Override
+ public V get(long timeout, TimeUnit unit)
+ throws InterruptedException, TimeoutException, ExecutionException {
+ // NOTE: if timeout < 0, remainingNanos will be < 0 and we will fall into
+ // the while(true) loop at the bottom and throw a timeoutexception.
+ long remainingNanos = unit
+ .toNanos(timeout); // we rely on the implicit null check on unit.
+ if (Thread.interrupted()) {
+ throw new InterruptedException();
+ }
+ Object localValue = value;
+ if (localValue != null & !(localValue instanceof SetFuture)) {
+ return getDoneValue(localValue);
+ }
+ // we delay calling nanoTime until we know we will need to either park or
+ // spin
+ final long endNanos = remainingNanos > 0 ? System
+ .nanoTime() + remainingNanos : 0;
+ long_wait_loop:
+ if (remainingNanos >= SPIN_THRESHOLD_NANOS) {
+ Waiter oldHead = waiters;
+ if (oldHead != Waiter.TOMBSTONE) {
+ Waiter node = new Waiter();
+ do {
+ node.setNext(oldHead);
+ if (ATOMIC_HELPER.casWaiters(this, oldHead, node)) {
+ while (true) {
+ LockSupport.parkNanos(this, remainingNanos);
+ // Check interruption first, if we woke up due to interruption
+ // we need to honor that.
+ if (Thread.interrupted()) {
+ removeWaiter(node);
+ throw new InterruptedException();
+ }
+
+ // Otherwise re-read and check doneness. If we loop then it must
+ // have been a spurious wakeup
+ localValue = value;
+ if (localValue != null & !(localValue instanceof SetFuture)) {
+ return getDoneValue(localValue);
+ }
+
+ // timed out?
+ remainingNanos = endNanos - System.nanoTime();
+ if (remainingNanos < SPIN_THRESHOLD_NANOS) {
+ // Remove the waiter, one way or another we are done parking
+ // this thread.
+ removeWaiter(node);
+ break long_wait_loop; // jump down to the busy wait loop
+ }
+ }
+ }
+ oldHead = waiters; // re-read and loop.
+ } while (oldHead != Waiter.TOMBSTONE);
+ }
+ // re-read value, if we get here then we must have observed a TOMBSTONE
+ // while trying to add a waiter.
+ return getDoneValue(value);
+ }
+ // If we get here then we have remainingNanos < SPIN_THRESHOLD_NANOS and
+ // there is no node on the waiters list
+ while (remainingNanos > 0) {
+ localValue = value;
+ if (localValue != null & !(localValue instanceof SetFuture)) {
+ return getDoneValue(localValue);
+ }
+ if (Thread.interrupted()) {
+ throw new InterruptedException();
+ }
+ remainingNanos = endNanos - System.nanoTime();
+ }
+ throw new TimeoutException();
+ }
+
+ /*
+ * Improve the documentation of when InterruptedException is thrown. Our
+ * behavior matches the JDK's, but the JDK's documentation is misleading.
+ */
+
+ /**
+ * {@inheritDoc}
+ *
+ * The default {@link AbstractFuture} implementation throws {@code
+ * InterruptedException} if the current thread is interrupted before or
+ * during the call, even if the value is already available.
+ *
+ * @throws InterruptedException if the current thread was interrupted
+ * before or during the call
+ * (optional but recommended).
+ * @throws CancellationException {@inheritDoc}
+ */
+ @Override
+ public V get() throws InterruptedException, ExecutionException {
+ if (Thread.interrupted()) {
+ throw new InterruptedException();
+ }
+ Object localValue = value;
+ if (localValue != null & !(localValue instanceof SetFuture)) {
+ return getDoneValue(localValue);
+ }
+ Waiter oldHead = waiters;
+ if (oldHead != Waiter.TOMBSTONE) {
+ Waiter node = new Waiter();
+ do {
+ node.setNext(oldHead);
+ if (ATOMIC_HELPER.casWaiters(this, oldHead, node)) {
+ // we are on the stack, now wait for completion.
+ while (true) {
+ LockSupport.park(this);
+ // Check interruption first, if we woke up due to interruption we
+ // need to honor that.
+ if (Thread.interrupted()) {
+ removeWaiter(node);
+ throw new InterruptedException();
+ }
+ // Otherwise re-read and check doneness. If we loop then it must
+ // have been a spurious wakeup
+ localValue = value;
+ if (localValue != null & !(localValue instanceof SetFuture)) {
+ return getDoneValue(localValue);
+ }
+ }
+ }
+ oldHead = waiters; // re-read and loop.
+ } while (oldHead != Waiter.TOMBSTONE);
+ }
+ // re-read value, if we get here then we must have observed a TOMBSTONE
+ // while trying to add a waiter.
+ return getDoneValue(value);
+ }
+
+ /**
+ * Unboxes {@code obj}. Assumes that obj is not {@code null} or a
+ * {@link SetFuture}.
+ */
+ private V getDoneValue(Object obj) throws ExecutionException {
+ // While this seems like it might be too branch-y, simple benchmarking
+ // proves it to be unmeasurable (comparing done AbstractFutures with
+ // immediateFuture)
+ if (obj instanceof Cancellation) {
+ throw cancellationExceptionWithCause(
+ "Task was cancelled.", ((Cancellation) obj).cause);
+ } else if (obj instanceof Failure) {
+ throw new ExecutionException(((Failure) obj).exception);
+ } else if (obj == NULL) {
+ return null;
+ } else {
+ @SuppressWarnings("unchecked") // this is the only other option
+ V asV = (V) obj;
+ return asV;
+ }
+ }
+
+ @Override
+ public boolean isDone() {
+ final Object localValue = value;
+ return localValue != null & !(localValue instanceof SetFuture);
+ }
+
+ @Override
+ public boolean isCancelled() {
+ final Object localValue = value;
+ return localValue instanceof Cancellation;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * If a cancellation attempt succeeds on a {@code Future} that had
+ * previously been {@linkplain#setFuture set asynchronously}, then the
+ * cancellation will also be propagated to the delegate {@code Future} that
+ * was supplied in the {@code setFuture} call.
+ */
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ Object localValue = value;
+ boolean rValue = false;
+ if (localValue == null | localValue instanceof SetFuture) {
+ // Try to delay allocating the exception. At this point we may still
+ // lose the CAS, but it is certainly less likely.
+ Throwable cause =
+ GENERATE_CANCELLATION_CAUSES
+ ? new CancellationException("Future.cancel() was called.")
+ : null;
+ Object valueToSet = new Cancellation(mayInterruptIfRunning, cause);
+ AbstractFuture> abstractFuture = this;
+ while (true) {
+ if (ATOMIC_HELPER.casValue(abstractFuture, localValue, valueToSet)) {
+ rValue = true;
+ // We call interuptTask before calling complete(), which is
+ // consistent with FutureTask
+ if (mayInterruptIfRunning) {
+ abstractFuture.interruptTask();
+ }
+ complete(abstractFuture);
+ if (localValue instanceof SetFuture) {
+ // propagate cancellation to the future set in setfuture, this is
+ // racy, and we don't care if we are successful or not.
+ ListenableFuture> futureToPropagateTo = ((SetFuture) localValue)
+ .future;
+ if (futureToPropagateTo instanceof TrustedFuture) {
+ // If the future is a TrustedFuture then we specifically avoid
+ // calling cancel() this has 2 benefits
+ // 1. for long chains of futures strung together with setFuture
+ // we consume less stack
+ // 2. we avoid allocating Cancellation objects at every level of
+ // the cancellation chain
+ // We can only do this for TrustedFuture, because
+ // TrustedFuture.cancel is final and does nothing but delegate
+ // to this method.
+ AbstractFuture> trusted = (AbstractFuture>)
+ futureToPropagateTo;
+ localValue = trusted.value;
+ if (localValue == null | localValue instanceof SetFuture) {
+ abstractFuture = trusted;
+ continue; // loop back up and try to complete the new future
+ }
+ } else {
+ // not a TrustedFuture, call cancel directly.
+ futureToPropagateTo.cancel(mayInterruptIfRunning);
+ }
+ }
+ break;
+ }
+ // obj changed, reread
+ localValue = abstractFuture.value;
+ if (!(localValue instanceof SetFuture)) {
+ // obj cannot be null at this point, because value can only change
+ // from null to non-null. So if value changed (and it did since we
+ // lost the CAS), then it cannot be null and since it isn't a
+ // SetFuture, then the future must be done and we should exit the loop
+ break;
+ }
+ }
+ }
+ return rValue;
+ }
+
+ /**
+ * Subclasses can override this method to implement interruption of the
+ * future's computation. The method is invoked automatically by a
+ * successful call to {@link #cancel(boolean) cancel(true)}.
+ *
+ * The default implementation does nothing.
+ *
+ * @since 10.0
+ */
+ protected void interruptTask() {
+ }
+
+ /**
+ * Returns true if this future was cancelled with {@code
+ * mayInterruptIfRunning} set to {@code true}.
+ *
+ * @since 14.0
+ */
+ protected final boolean wasInterrupted() {
+ final Object localValue = value;
+ return (localValue instanceof Cancellation) && ((Cancellation) localValue)
+ .wasInterrupted;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @since 10.0
+ */
+ @Override
+ public void addListener(Runnable listener, Executor executor) {
+ checkNotNull(listener, "Runnable was null.");
+ checkNotNull(executor, "Executor was null.");
+ Listener oldHead = listeners;
+ if (oldHead != Listener.TOMBSTONE) {
+ Listener newNode = new Listener(listener, executor);
+ do {
+ newNode.next = oldHead;
+ if (ATOMIC_HELPER.casListeners(this, oldHead, newNode)) {
+ return;
+ }
+ oldHead = listeners; // re-read
+ } while (oldHead != Listener.TOMBSTONE);
+ }
+ // If we get here then the Listener TOMBSTONE was set, which means the
+ // future is done, call the listener.
+ executeListener(listener, executor);
+ }
+
+ /**
+ * Sets the result of this {@code Future} unless this {@code Future} has
+ * already been cancelled or set (including
+ * {@linkplain #setFuture set asynchronously}). When a call to this method
+ * returns, the {@code Future} is guaranteed to be
+ * {@linkplain #isDone done} only if the call was accepted (in which
+ * case it returns {@code true}). If it returns {@code false}, the {@code
+ * Future} may have previously been set asynchronously, in which case its
+ * result may not be known yet. That result, though not yet known, cannot
+ * be overridden by a call to a {@code set*} method, only by a call to
+ * {@link #cancel}.
+ *
+ * @param value the value to be used as the result
+ * @return true if the attempt was accepted, completing the {@code Future}
+ */
+ protected boolean set(@Nullable V value) {
+ Object valueToSet = value == null ? NULL : value;
+ if (ATOMIC_HELPER.casValue(this, null, valueToSet)) {
+ complete(this);
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Sets the failed result of this {@code Future} unless this {@code Future}
+ * has already been cancelled or set (including
+ * {@linkplain #setFuture set asynchronously}). When a call to this method
+ * returns, the {@code Future} is guaranteed to be
+ * {@linkplain #isDone done} only if the call was accepted (in which
+ * case it returns {@code true}). If it returns {@code false}, the
+ * {@code Future} may have previously been set asynchronously, in which case
+ * its result may not be known yet. That result, though not yet known,
+ * cannot be overridden by a call to a {@code set*} method, only by a call
+ * to {@link #cancel}.
+ *
+ * @param throwable the exception to be used as the failed result
+ * @return true if the attempt was accepted, completing the {@code Future}
+ */
+ protected boolean setException(Throwable throwable) {
+ Object valueToSet = new Failure(checkNotNull(throwable));
+ if (ATOMIC_HELPER.casValue(this, null, valueToSet)) {
+ complete(this);
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Sets the result of this {@code Future} to match the supplied input
+ * {@code Future} once the supplied {@code Future} is done, unless this
+ * {@code Future} has already been cancelled or set (including "set
+ * asynchronously," defined below).
+ *
+ * If the supplied future is {@linkplain #isDone done} when this method
+ * is called and the call is accepted, then this future is guaranteed to
+ * have been completed with the supplied future by the time this method
+ * returns. If the supplied future is not done and the call is accepted, then
+ * the future will be set asynchronously. Note that such a result,
+ * though not yet known, cannot be overridden by a call to a {@code set*}
+ * method, only by a call to {@link #cancel}.
+ *
+ * If the call {@code setFuture(delegate)} is accepted and this {@code
+ * Future} is later cancelled, cancellation will be propagated to {@code
+ * delegate}. Additionally, any call to {@code setFuture} after any
+ * cancellation will propagate cancellation to the supplied {@code Future}.
+ *
+ * @param future the future to delegate to
+ * @return true if the attempt was accepted, indicating that the {@code
+ * Future} was not previously cancelled or set.
+ * @since 19.0
+ */
+ @Beta
+ protected boolean setFuture(ListenableFuture extends V> future) {
+ checkNotNull(future);
+ Object localValue = value;
+ if (localValue == null) {
+ if (future.isDone()) {
+ Object value = getFutureValue(future);
+ if (ATOMIC_HELPER.casValue(this, null, value)) {
+ complete(this);
+ return true;
+ }
+ return false;
+ }
+ SetFuture valueToSet = new SetFuture
+ * This is approximately the inverse of {@link #getDoneValue(Object)}
+ */
+ private static Object getFutureValue(ListenableFuture> future) {
+ Object valueToSet;
+ if (future instanceof TrustedFuture) {
+ // Break encapsulation for TrustedFuture instances since we know that
+ // subclasses cannot override .get() (since it is final) and therefore
+ // this is equivalent to calling .get() and unpacking the exceptions
+ // like we do below (just much faster because it is a single field read
+ // instead of a read, several branches and possibly creating exceptions).
+ return ((AbstractFuture>) future).value;
+ } else {
+ // Otherwise calculate valueToSet by calling .get()
+ try {
+ Object v = getDone(future);
+ valueToSet = v == null ? NULL : v;
+ } catch (ExecutionException exception) {
+ valueToSet = new Failure(exception.getCause());
+ } catch (CancellationException cancellation) {
+ valueToSet = new Cancellation(false, cancellation);
+ } catch (Throwable t) {
+ valueToSet = new Failure(t);
+ }
+ }
+ return valueToSet;
+ }
+
+ /**
+ * Unblocks all threads and runs all listeners.
+ */
+ private static void complete(AbstractFuture> future) {
+ Listener next = null;
+ outer:
+ while (true) {
+ future.releaseWaiters();
+ // We call this before the listeners in order to avoid needing to manage
+ // a separate stack data structure for them. afterDone() should be
+ // generally fast and only used for cleanup work... but in theory can
+ // also be recursive and create StackOverflowErrors
+ future.afterDone();
+ // push the current set of listeners onto next
+ next = future.clearListeners(next);
+ future = null;
+ while (next != null) {
+ Listener curr = next;
+ next = next.next;
+ Runnable task = curr.task;
+ if (task instanceof SetFuture) {
+ SetFuture> setFuture = (SetFuture>) task;
+ // We unwind setFuture specifically to avoid StackOverflowErrors in
+ // the case of long chains of SetFutures
+ // Handling this special case is important because there is no way
+ // to pass an executor to setFuture, so a user couldn't break the
+ // chain by doing this themselves. It is also potentially common
+ // if someone writes a recursive Futures.transformAsync transformer.
+ future = setFuture.owner;
+ if (future.value == setFuture) {
+ Object valueToSet = getFutureValue(setFuture.future);
+ if (ATOMIC_HELPER.casValue(future, setFuture, valueToSet)) {
+ continue outer;
+ }
+ }
+ // other wise the future we were trying to set is already done.
+ } else {
+ executeListener(task, curr.executor);
+ }
+ }
+ break;
+ }
+ }
+
+ public static
+ * If {@link #interruptTask} is also run during completion,
+ * {@link #afterDone} runs after it.
+ *
+ * The default implementation of this method in {@code AbstractFuture}
+ * does nothing. This is intended for very lightweight cleanup work, for
+ * example, timing statistics or clearing fields.
+ * If your task does anything heavier consider, just using a listener with
+ * an executor.
+ *
+ * @since 20.0
+ */
+ @Beta
+ protected void afterDone() {
+ }
+
+ /**
+ * If this future has been cancelled (and possibly interrupted), cancels
+ * (and possibly interrupts) the given future (if available).
+ *
+ * This method should be used only when this future is completed. It is
+ * designed to be called from {@code done}.
+ */
+ final void maybePropagateCancellation(@Nullable Future> related) {
+ if (related != null & isCancelled()) {
+ related.cancel(wasInterrupted());
+ }
+ }
+
+ /**
+ * Releases all threads in the {@link #waiters} list, and clears the list.
+ */
+ private void releaseWaiters() {
+ Waiter head;
+ do {
+ head = waiters;
+ } while (!ATOMIC_HELPER.casWaiters(this, head, Waiter.TOMBSTONE));
+ for (
+ Waiter currentWaiter = head;
+ currentWaiter != null;
+ currentWaiter = currentWaiter.next) {
+ currentWaiter.unpark();
+ }
+ }
+
+ /**
+ * Clears the {@link #listeners} list and prepends its contents to {@code
+ * onto}, least recently added first.
+ */
+ private Listener clearListeners(Listener onto) {
+ // We need to
+ // 1. atomically swap the listeners with TOMBSTONE, this is because
+ // addListener uses that to to synchronize with us
+ // 2. reverse the linked list, because despite our rather clear contract,
+ // people depend on us executing listeners in the order they were added
+ // 3. push all the items onto 'onto' and return the new head of the stack
+ Listener head;
+ do {
+ head = listeners;
+ } while (!ATOMIC_HELPER.casListeners(this, head, Listener.TOMBSTONE));
+ Listener reversedList = onto;
+ while (head != null) {
+ Listener tmp = head;
+ head = head.next;
+ tmp.next = reversedList;
+ reversedList = tmp;
+ }
+ return reversedList;
+ }
+
+ /**
+ * Submits the given runnable to the given {@link Executor} catching and
+ * logging all {@linkplain RuntimeException runtime exceptions} thrown by
+ * the executor.
+ */
+ private static void executeListener(Runnable runnable, Executor executor) {
+ try {
+ executor.execute(runnable);
+ } catch (RuntimeException e) {
+ // Log it and keep going -- bad runnable and/or executor. Don't punish
+ // the other runnables if we're given a bad one. We only catch
+ // RuntimeException because we want Errors to propagate up.
+ log.log(
+ Level.SEVERE,
+ "RuntimeException while executing runnable " + runnable + " with " +
+ "executor " + executor,
+ e);
+ }
+ }
+
+ private abstract static class AtomicHelper {
+ /**
+ * Non volatile write of the thread to the {@link Waiter#thread} field.
+ */
+ abstract void putThread(Waiter waiter, Thread newValue);
+
+ /**
+ * Non volatile write of the waiter to the {@link Waiter#next} field.
+ */
+ abstract void putNext(Waiter waiter, Waiter newValue);
+
+ /**
+ * Performs a CAS operation on the {@link #waiters} field.
+ */
+ abstract boolean casWaiters(
+ AbstractFuture> future, Waiter expect,
+ Waiter update);
+
+ /**
+ * Performs a CAS operation on the {@link #listeners} field.
+ */
+ abstract boolean casListeners(
+ AbstractFuture> future, Listener expect,
+ Listener update);
+
+ /**
+ * Performs a CAS operation on the {@link #value} field.
+ */
+ abstract boolean casValue(
+ AbstractFuture> future, Object expect, Object update);
+ }
+
+ /**
+ * {@link AtomicHelper} based on {@link sun.misc.Unsafe}.
+ *
+ * Static initialization of this class will fail if the
+ * {@link sun.misc.Unsafe} object cannot be accessed.
+ */
+ private static final class UnsafeAtomicHelper extends AtomicHelper {
+ static final sun.misc.Unsafe UNSAFE;
+ static final long LISTENERS_OFFSET;
+ static final long WAITERS_OFFSET;
+ static final long VALUE_OFFSET;
+ static final long WAITER_THREAD_OFFSET;
+ static final long WAITER_NEXT_OFFSET;
+
+ static {
+ sun.misc.Unsafe unsafe = null;
+ try {
+ unsafe = sun.misc.Unsafe.getUnsafe();
+ } catch (SecurityException tryReflectionInstead) {
+ try {
+ unsafe =
+ AccessController.doPrivileged(
+ new PrivilegedExceptionAction
+ * This is an implementation of last resort for when certain basic VM
+ * features are broken (like AtomicReferenceFieldUpdater).
+ */
+ private static final class SynchronizedHelper extends AtomicHelper {
+ @Override
+ void putThread(Waiter waiter, Thread newValue) {
+ waiter.thread = newValue;
+ }
+
+ @Override
+ void putNext(Waiter waiter, Waiter newValue) {
+ waiter.next = newValue;
+ }
+
+ @Override
+ boolean casWaiters(AbstractFuture> future, Waiter expect, Waiter
+ update) {
+ synchronized (future) {
+ if (future.waiters == expect) {
+ future.waiters = update;
+ return true;
+ }
+ return false;
+ }
+ }
+
+ @Override
+ boolean casListeners(
+ AbstractFuture> future, Listener expect, Listener update) {
+ synchronized (future) {
+ if (future.listeners == expect) {
+ future.listeners = update;
+ return true;
+ }
+ return false;
+ }
+ }
+
+ @Override
+ boolean casValue(AbstractFuture> future, Object expect, Object update) {
+ synchronized (future) {
+ if (future.value == expect) {
+ future.value = update;
+ return true;
+ }
+ return false;
+ }
+ }
+ }
+
+ private static CancellationException cancellationExceptionWithCause(
+ @Nullable String message, @Nullable Throwable cause) {
+ CancellationException exception = new CancellationException(message);
+ exception.initCause(cause);
+ return exception;
+ }
+
+ /**
+ * Returns an {@link Executor} that runs each task in the thread that invokes
+ * {@link Executor#execute execute}, as in {@link CallerRunsPolicy}.
+ *
+ * This instance is equivalent to:
+ * This should be preferred to {@link #newDirectExecutorService()}
+ * because implementing the {@link ExecutorService} subinterface
+ * necessitates significant performance overhead.
+ *
+ * @since 18.0
+ */
+ public static Executor directExecutor() {
+ return DirectExecutor.INSTANCE;
+ }
+
+ /**
+ * See {@link #directExecutor} for behavioral notes.
+ */
+ private enum DirectExecutor implements Executor {
+ INSTANCE;
+
+ @Override
+ public void execute(Runnable command) {
+ command.run();
+ }
+
+ @Override
+ public String toString() {
+ return "MoreExecutors.directExecutor()";
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java
index 9ad47f012d..6ab6425ad7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java
@@ -91,6 +91,7 @@ public class DatasetVolumeChecker {
* Minimum time between two successive disk checks of a volume.
*/
private final long minDiskCheckGapMs;
+ private final long diskCheckTimeout;
/**
* Timestamp of the last check of all volumes.
@@ -136,6 +137,17 @@ public DatasetVolumeChecker(Configuration conf, Timer timer)
+ minDiskCheckGapMs + " (should be >= 0)");
}
+ diskCheckTimeout = conf.getTimeDuration(
+ DFSConfigKeys.DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY,
+ DFSConfigKeys.DFS_DATANODE_DISK_CHECK_TIMEOUT_DEFAULT,
+ TimeUnit.MILLISECONDS);
+
+ if (diskCheckTimeout < 0) {
+ throw new DiskErrorException("Invalid value configured for "
+ + DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY + " - "
+ + diskCheckTimeout + " (should be >= 0)");
+ }
+
lastAllVolumesCheck = timer.monotonicNow() - minDiskCheckGapMs;
if (maxVolumeFailuresTolerated < 0) {
@@ -145,7 +157,8 @@ public DatasetVolumeChecker(Configuration conf, Timer timer)
}
delegateChecker = new ThrottledAsyncChecker<>(
- timer, minDiskCheckGapMs, Executors.newCachedThreadPool(
+ timer, minDiskCheckGapMs, diskCheckTimeout,
+ Executors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setNameFormat("DataNode DiskChecker thread %d")
.setDaemon(true)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/StorageLocationChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/StorageLocationChecker.java
index a0bffcd01e..2d1eebe0bd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/StorageLocationChecker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/StorageLocationChecker.java
@@ -119,6 +119,7 @@ public StorageLocationChecker(Configuration conf, Timer timer)
DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_DEFAULT,
TimeUnit.MILLISECONDS),
+ 0,
Executors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setNameFormat("StorageLocationChecker thread %d")
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/ThrottledAsyncChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/ThrottledAsyncChecker.java
index 83c554d4dc..7584d97b5f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/ThrottledAsyncChecker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/ThrottledAsyncChecker.java
@@ -38,6 +38,8 @@
import java.util.WeakHashMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
@@ -64,12 +66,14 @@ public class ThrottledAsyncChecker
+ * Future that delegates to another but will finish early (via a
+ * {@link TimeoutException} wrapped in an {@link ExecutionException}) if the
+ * specified duration expires. The delegate future is interrupted and
+ * cancelled if it times out.
+ */
+final class TimeoutFuture
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.checker;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.LogVerificationAppender;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.util.FakeTimer;
+import org.apache.log4j.Logger;
+import org.apache.log4j.spi.LoggingEvent;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.*;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.anySet;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import org.slf4j.LoggerFactory;
+
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class TestThrottledAsyncCheckerTimeout {
+ public static final org.slf4j.Logger LOG =
+ LoggerFactory.getLogger(TestThrottledAsyncCheckerTimeout.class);
+
+ @Rule
+ public TestName testName = new TestName();
+
+ Configuration conf;
+ private static final long DISK_CHECK_TIMEOUT = 10;
+ private static final long DISK_CHECK_TIME = 100;
+ private ReentrantLock lock;
+
+ private ExecutorService getExecutorService() {
+ return new ScheduledThreadPoolExecutor(1);
+ }
+
+ @Before
+ public void initializeLock() {
+ lock = new ReentrantLock();
+ }
+
+ @Test (timeout = 1000)
+ public void testDiskCheckTimeout() throws Exception {
+ LOG.info("Executing {}", testName.getMethodName());
+
+ final DummyCheckable target = new DummyCheckable();
+ final FakeTimer timer = new FakeTimer();
+ ThrottledAsyncChecker
+ *
+ */
+ private void removeWaiter(Waiter node) {
+ node.thread = null; // mark as 'deleted'
+ restart:
+ while (true) {
+ Waiter pred = null;
+ Waiter curr = waiters;
+ if (curr == Waiter.TOMBSTONE) {
+ return; // give up if someone is calling complete
+ }
+ Waiter succ;
+ while (curr != null) {
+ succ = curr.next;
+ if (curr.thread != null) { // we aren't unlinking this node, update
+ // pred.
+ pred = curr;
+ } else if (pred != null) { // We are unlinking this node and it has a
+ // predecessor.
+ pred.next = succ;
+ if (pred.thread == null) { // We raced with another node that
+ // unlinked pred. Restart.
+ continue restart;
+ }
+ } else if (!ATOMIC_HELPER
+ .casWaiters(this, curr, succ)) { // We are unlinking head
+ continue restart; // We raced with an add or complete
+ }
+ curr = succ;
+ }
+ break;
+ }
+ }
+
+ /**
+ * Listeners also form a stack through the {@link #listeners} field.
+ */
+ private static final class Listener {
+ static final Listener TOMBSTONE = new Listener(null, null);
+ final Runnable task;
+ final Executor executor;
+
+ // writes to next are made visible by subsequent CAS's on the listeners
+ // field
+ @Nullable Listener next;
+
+ Listener(Runnable task, Executor executor) {
+ this.task = task;
+ this.executor = executor;
+ }
+ }
+
+ /**
+ * A special value to represent {@code null}.
+ */
+ private static final Object NULL = new Object();
+
+ /**
+ * A special value to represent failure, when {@link #setException} is
+ * called successfully.
+ */
+ private static final class Failure {
+ static final Failure FALLBACK_INSTANCE =
+ new Failure(
+ new Throwable("Failure occurred while trying to finish a future" +
+ ".") {
+ @Override
+ public synchronized Throwable fillInStackTrace() {
+ return this; // no stack trace
+ }
+ });
+ final Throwable exception;
+
+ Failure(Throwable exception) {
+ this.exception = checkNotNull(exception);
+ }
+ }
+
+ /**
+ * A special value to represent cancellation and the 'wasInterrupted' bit.
+ */
+ private static final class Cancellation {
+ final boolean wasInterrupted;
+ @Nullable final Throwable cause;
+
+ Cancellation(boolean wasInterrupted, @Nullable Throwable cause) {
+ this.wasInterrupted = wasInterrupted;
+ this.cause = cause;
+ }
+ }
+
+ /**
+ * A special value that encodes the 'setFuture' state.
+ */
+ private static final class SetFuture
+ *
+ */
+ private volatile Object value;
+
+ /**
+ * All listeners.
+ */
+ private volatile Listener listeners;
+
+ /**
+ * All waiting threads.
+ */
+ private volatile Waiter waiters;
+
+ /**
+ * Constructor for use by subclasses.
+ */
+ protected AbstractFuture() {
+ }
+
+ // Gets and Timed Gets
+ //
+ // * Be responsive to interruption
+ // * Don't create Waiter nodes if you aren't going to park, this helps
+ // reduce contention on the waiters field.
+ // * Future completion is defined by when #value becomes non-null/non
+ // SetFuture
+ // * Future completion can be observed if the waiters field contains a
+ // TOMBSTONE
+
+ // Timed Get
+ // There are a few design constraints to consider
+ // * We want to be responsive to small timeouts, unpark() has non trivial
+ // latency overheads (I have observed 12 micros on 64 bit linux systems to
+ // wake up a parked thread). So if the timeout is small we shouldn't park().
+ // This needs to be traded off with the cpu overhead of spinning, so we use
+ // SPIN_THRESHOLD_NANOS which is what AbstractQueuedSynchronizer uses for
+ // similar purposes.
+ // * We want to behave reasonably for timeouts of 0
+ // * We are more responsive to completion than timeouts. This is because
+ // parkNanos depends on system scheduling and as such we could either miss
+ // our deadline, or unpark() could be delayed so that it looks like we
+ // timed out even though we didn't. For comparison FutureTask respects
+ // completion preferably and AQS is non-deterministic (depends on where in
+ // the queue the waiter is). If we wanted to be strict about it, we could
+ // store the unpark() time in the Waiter node and we could use that to make
+ // a decision about whether or not we timed out prior to being unparked.
+
+ /*
+ * Improve the documentation of when InterruptedException is thrown. Our
+ * behavior matches the JDK's, but the JDK's documentation is misleading.
+ */
+
+ /**
+ * {@inheritDoc}
+ * {@code
+ * final class DirectExecutor implements Executor {
+ * public void execute(Runnable r) {
+ * r.run();
+ * }
+ * }}
+ *