diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/ExecutorServiceFuturePool.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/ExecutorServiceFuturePool.java index 9ef50e50d7..645de28039 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/ExecutorServiceFuturePool.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/ExecutorServiceFuturePool.java @@ -22,8 +22,13 @@ import java.util.Locale; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.function.Supplier; +import org.slf4j.Logger; + +import org.apache.hadoop.util.concurrent.HadoopExecutors; + /** * A FuturePool implementation backed by a java.util.concurrent.ExecutorService. * @@ -37,7 +42,8 @@ * */ public class ExecutorServiceFuturePool { - private ExecutorService executor; + + private final ExecutorService executor; public ExecutorServiceFuturePool(ExecutorService executor) { this.executor = executor; @@ -64,6 +70,18 @@ public Future executeRunnable(final Runnable r) { return (Future) executor.submit(r::run); } + /** + * Utility to shutdown the {@link ExecutorService} used by this class. Will wait up to a + * certain timeout for the ExecutorService to gracefully shutdown. + * + * @param logger Logger + * @param timeout the maximum time to wait + * @param unit the time unit of the timeout argument + */ + public void shutdown(Logger logger, long timeout, TimeUnit unit) { + HadoopExecutors.shutdown(executor, logger, timeout, unit); + } + public String toString() { return String.format(Locale.ROOT, "ExecutorServiceFuturePool(executor=%s)", executor); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopExecutors.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopExecutors.java index 0bbceb59c3..6e2838bfe9 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopExecutors.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopExecutors.java @@ -115,9 +115,8 @@ public static void shutdown(ExecutorService executorService, Logger logger, try { executorService.shutdown(); - logger.debug( - "Gracefully shutting down executor service. Waiting max {} {}", - timeout, unit); + logger.debug("Gracefully shutting down executor service {}. Waiting max {} {}", + executorService, timeout, unit); if (!executorService.awaitTermination(timeout, unit)) { logger.debug( "Executor service has not shutdown yet. Forcing. " diff --git a/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml b/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml index e823840fd7..b8e419f3c8 100644 --- a/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml +++ b/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml @@ -59,6 +59,10 @@ + + + + diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 29cd158641..bdb444f8f0 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -653,17 +653,11 @@ public void initialize(URI name, Configuration originalConf) // amazon client exception: stop all services then throw the translation cleanupWithLogger(LOG, span); stopAllServices(); - if (this.futurePool != null) { - this.futurePool = null; - } throw translateException("initializing ", new Path(name), e); } catch (IOException | RuntimeException e) { // other exceptions: stop the services. cleanupWithLogger(LOG, span); stopAllServices(); - if (this.futurePool != null) { - this.futurePool = null; - } throw e; } } @@ -4056,6 +4050,10 @@ protected synchronized void stopAllServices() { HadoopExecutors.shutdown(unboundedThreadPool, LOG, THREAD_POOL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS); unboundedThreadPool = null; + if (futurePool != null) { + futurePool.shutdown(LOG, THREAD_POOL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS); + futurePool = null; + } // other services are shutdown. cleanupWithLogger(LOG, instrumentation,