diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SemaphoredDelegatingExecutor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SemaphoredDelegatingExecutor.java index 45b9a98c68..c4c11e57b3 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SemaphoredDelegatingExecutor.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SemaphoredDelegatingExecutor.java @@ -22,6 +22,8 @@ import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.fs.statistics.DurationTracker; +import org.apache.hadoop.fs.statistics.DurationTrackerFactory; import java.util.Collection; import java.util.List; @@ -33,6 +35,10 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import static java.util.Objects.requireNonNull; +import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.stubDurationTrackerFactory; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_EXECUTOR_ACQUIRED; + /** * This ExecutorService blocks the submission of new tasks when its queue is * already full by using a semaphore. Task submissions require permits, task @@ -53,20 +59,39 @@ public class SemaphoredDelegatingExecutor extends private final Semaphore queueingPermits; private final ExecutorService executorDelegatee; private final int permitCount; + private final DurationTrackerFactory trackerFactory; /** * Instantiate. * @param executorDelegatee Executor to delegate to * @param permitCount number of permits into the queue permitted * @param fair should the semaphore be "fair" + * @param trackerFactory duration tracker factory. + */ + public SemaphoredDelegatingExecutor( + ExecutorService executorDelegatee, + int permitCount, + boolean fair, + DurationTrackerFactory trackerFactory) { + this.permitCount = permitCount; + queueingPermits = new Semaphore(permitCount, fair); + this.executorDelegatee = requireNonNull(executorDelegatee); + this.trackerFactory = trackerFactory != null + ? trackerFactory + : stubDurationTrackerFactory(); + } + + /** + * Instantiate without collecting executor aquisition duration information. + * @param executorDelegatee Executor to delegate to + * @param permitCount number of permits into the queue permitted + * @param fair should the semaphore be "fair" */ public SemaphoredDelegatingExecutor( ExecutorService executorDelegatee, int permitCount, boolean fair) { - this.permitCount = permitCount; - queueingPermits = new Semaphore(permitCount, fair); - this.executorDelegatee = executorDelegatee; + this(executorDelegatee, permitCount, fair, null); } @Override @@ -102,7 +127,8 @@ public T invokeAny(Collection> tasks, long timeout, @Override public Future submit(Callable task) { - try { + try (DurationTracker ignored = + trackerFactory.trackDuration(ACTION_EXECUTOR_ACQUIRED)) { queueingPermits.acquire(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -113,7 +139,8 @@ public Future submit(Callable task) { @Override public Future submit(Runnable task, T result) { - try { + try (DurationTracker ignored = + trackerFactory.trackDuration(ACTION_EXECUTOR_ACQUIRED)) { queueingPermits.acquire(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -124,7 +151,8 @@ public Future submit(Runnable task, T result) { @Override public Future submit(Runnable task) { - try { + try (DurationTracker ignored = + trackerFactory.trackDuration(ACTION_EXECUTOR_ACQUIRED)) { queueingPermits.acquire(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -135,7 +163,8 @@ public Future submit(Runnable task) { @Override public void execute(Runnable command) { - try { + try (DurationTracker ignored = + trackerFactory.trackDuration(ACTION_EXECUTOR_ACQUIRED)) { queueingPermits.acquire(); } catch (InterruptedException e) { Thread.currentThread().interrupt();