diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java index e82fbda63d..32d00a4353 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java @@ -24,6 +24,7 @@ import java.util.Date; import java.util.List; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -472,7 +473,7 @@ protected void commitPendingUploads( Tasks.foreach(pending.getSourceFiles()) .stopOnFailure() .suppressExceptions(false) - .executeWith(buildThreadPool(context)) + .executeWith(buildSubmitter(context)) .abortWith(path -> loadAndAbort(commitContext, pending, path, true, false)) .revertWith(path -> @@ -502,7 +503,7 @@ protected void precommitCheckPendingFiles( Tasks.foreach(pending.getSourceFiles()) .stopOnFailure() .suppressExceptions(false) - .executeWith(buildThreadPool(context)) + .executeWith(buildSubmitter(context)) .run(path -> PendingSet.load(sourceFS, path)); } } @@ -525,7 +526,7 @@ private void loadAndCommit( Tasks.foreach(pendingSet.getCommits()) .stopOnFailure() .suppressExceptions(false) - .executeWith(singleCommitThreadPool()) + .executeWith(singleThreadSubmitter()) .onFailure((commit, exception) -> commitContext.abortSingleCommit(commit)) .abortWith(commitContext::abortSingleCommit) @@ -580,7 +581,7 @@ private void loadAndAbort( path); FileSystem fs = getDestFS(); Tasks.foreach(pendingSet.getCommits()) - .executeWith(singleCommitThreadPool()) + .executeWith(singleThreadSubmitter()) .suppressExceptions(suppressExceptions) .run(commit -> { try { @@ -674,7 +675,7 @@ protected void abortPendingUploadsInCleanup( return; } Tasks.foreach(pending) - .executeWith(buildThreadPool(getJobContext())) + .executeWith(buildSubmitter(getJobContext())) .suppressExceptions(suppressExceptions) .run(u -> commitContext.abortMultipartCommit( u.getKey(), u.getUploadId())); @@ -838,45 +839,117 @@ protected String getRole() { } /** - * Returns an {@link ExecutorService} for parallel tasks. The number of + * Returns an {@link Tasks.Submitter} for parallel tasks. The number of * threads in the thread-pool is set by fs.s3a.committer.threads. * If num-threads is 0, this will return null; + * this is used in Tasks as a cue + * to switch to single-threaded execution. * * @param context the JobContext for this commit - * @return an {@link ExecutorService} or null for the number of threads + * @return a submitter or null */ - protected final synchronized ExecutorService buildThreadPool( + protected Tasks.Submitter buildSubmitter( JobContext context) { + if (getThreadCount(context) > 0) { + return new PoolSubmitter(context); + } else { + return null; + } + } + /** + * Returns an {@link ExecutorService} for parallel tasks. The number of + * threads in the thread-pool is set by fs.s3a.committer.threads. + * If num-threads is 0, this will raise an exception. + * + * @param context the JobContext for this commit + * @param numThreads threads + * @return an {@link ExecutorService} for the number of threads + */ + private synchronized ExecutorService buildThreadPool( + JobContext context, int numThreads) { + Preconditions.checkArgument(numThreads > 0, + "Cannot create a thread pool with no threads"); if (threadPool == null) { - int numThreads = context.getConfiguration().getInt( - FS_S3A_COMMITTER_THREADS, - DEFAULT_COMMITTER_THREADS); LOG.debug("{}: creating thread pool of size {}", getRole(), numThreads); - if (numThreads > 0) { - threadPool = HadoopExecutors.newFixedThreadPool(numThreads, - new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat(THREAD_PREFIX + context.getJobID() + "-%d") - .build()); - } else { - return null; - } + threadPool = HadoopExecutors.newFixedThreadPool(numThreads, + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat(THREAD_PREFIX + context.getJobID() + "-%d") + .build()); } return threadPool; } + /** + * Get the thread count for this job's commit operations. + * @param context the JobContext for this commit + * @return a possibly zero thread count. + */ + private int getThreadCount(final JobContext context) { + return context.getConfiguration().getInt( + FS_S3A_COMMITTER_THREADS, + DEFAULT_COMMITTER_THREADS); + } + + /** + * Submit a runnable. + * This will demand-create the thread pool if needed. + *

+ * This is synchronized to ensure the thread pool is always valid when + * work is synchronized. See HADOOP-16798. + * @param context the JobContext for this commit + * @param task task to execute + * @return the future of the submitted task. + */ + private synchronized Future submitRunnable( + final JobContext context, + final Runnable task) { + return buildThreadPool(context, getThreadCount(context)).submit(task); + } + + /** + * The real task submitter, which hands off the work to + * the current thread pool. + */ + private final class PoolSubmitter implements Tasks.Submitter { + + private final JobContext context; + + private final int numThreads; + + private PoolSubmitter(final JobContext context) { + this.numThreads = getThreadCount(context); + Preconditions.checkArgument(numThreads > 0, + "Cannot create a thread pool with no threads"); + this.context = context; + } + + @Override + public Future submit(final Runnable task) { + return submitRunnable(context, task); + } + + } + /** * Destroy any thread pools; wait for that to finish, * but don't overreact if it doesn't finish in time. */ - protected synchronized void destroyThreadPool() { - if (threadPool != null) { - LOG.debug("Destroying thread pool"); - HadoopExecutors.shutdown(threadPool, LOG, - THREAD_POOL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS); + protected void destroyThreadPool() { + ExecutorService pool; + // reset the thread pool in a sync block, then shut it down + // afterwards. This allows for other threads to create a + // new thread pool on demand. + synchronized(this) { + pool = this.threadPool; threadPool = null; } + if (pool != null) { + LOG.debug("Destroying thread pool"); + HadoopExecutors.shutdown(pool, LOG, + THREAD_POOL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS); + } } /** @@ -884,11 +957,9 @@ protected synchronized void destroyThreadPool() { * within the commit of all uploads of a single task. * This is currently null; it is here to allow the Tasks class to * provide the logic for execute/revert. - * Why not use the existing thread pool? Too much fear of deadlocking, - * and tasks are being committed in parallel anyway. * @return null. always. */ - protected final synchronized ExecutorService singleCommitThreadPool() { + protected final synchronized Tasks.Submitter singleThreadSubmitter() { return null; } @@ -932,7 +1003,7 @@ protected void abortPendingUploads(JobContext context, CommitOperations.CommitContext commitContext = initiateCommitOperation()) { Tasks.foreach(pending) - .executeWith(buildThreadPool(context)) + .executeWith(buildSubmitter(context)) .suppressExceptions(suppressExceptions) .run(commitContext::abortSingleCommit); } @@ -961,7 +1032,7 @@ protected void abortPendingUploads( CommitOperations.CommitContext commitContext = initiateCommitOperation()) { Tasks.foreach(pending.getSourceFiles()) - .executeWith(buildThreadPool(context)) + .executeWith(buildSubmitter(context)) .suppressExceptions(suppressExceptions) .run(path -> loadAndAbort(commitContext, diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/Tasks.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/Tasks.java index b6b6b9707e..c318e86605 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/Tasks.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/Tasks.java @@ -25,7 +25,6 @@ import java.util.List; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; @@ -76,7 +75,7 @@ public interface FailureTask { */ public static class Builder { private final Iterable items; - private ExecutorService service = null; + private Submitter service = null; private FailureTask onFailure = null; private boolean stopOnFailure = false; private boolean suppressExceptions = false; @@ -96,11 +95,11 @@ public static class Builder { /** * Declare executor service: if null, the tasks are executed in a single * thread. - * @param executorService service to schedule tasks with. + * @param submitter service to schedule tasks with. * @return this builder. */ - public Builder executeWith(ExecutorService executorService) { - this.service = executorService; + public Builder executeWith(Submitter submitter) { + this.service = submitter; return this; } @@ -407,4 +406,18 @@ private static void castAndThrow(Exception e) throws E { } throw (E) e; } + + /** + * Interface to whatever lets us submit tasks. + */ + public interface Submitter { + + /** + * Submit work. + * @param task task to execute + * @return the future of the submitted task. + */ + Future submit(Runnable task); + } + } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitter.java index 20aca3cf49..7be54062d2 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitter.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitter.java @@ -23,7 +23,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -187,7 +186,7 @@ private void replacePartitions( Map partitions = new ConcurrentHashMap<>(); FileSystem sourceFS = pending.getSourceFS(); - ExecutorService pool = buildThreadPool(context); + Tasks.Submitter submitter = buildSubmitter(context); try (DurationInfo ignored = new DurationInfo(LOG, "Replacing partitions")) { @@ -198,7 +197,7 @@ private void replacePartitions( Tasks.foreach(pending.getSourceFiles()) .stopOnFailure() .suppressExceptions(false) - .executeWith(pool) + .executeWith(submitter) .run(path -> { PendingSet pendingSet = PendingSet.load(sourceFS, path); Path lastParent = null; @@ -216,7 +215,7 @@ private void replacePartitions( Tasks.foreach(partitions.keySet()) .stopOnFailure() .suppressExceptions(false) - .executeWith(pool) + .executeWith(submitter) .run(partitionPath -> { LOG.debug("{}: removing partition path to be replaced: " + getRole(), partitionPath); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java index 7eca1b4265..91e68af8bb 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java @@ -699,7 +699,7 @@ protected int commitTaskInternal(final TaskAttemptContext context, Tasks.foreach(taskOutput) .stopOnFailure() .suppressExceptions(false) - .executeWith(buildThreadPool(context)) + .executeWith(buildSubmitter(context)) .run(stat -> { Path path = stat.getPath(); File localFile = new File(path.toUri().getPath()); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/TestTasks.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/TestTasks.java index 4ee39f1bfa..4211c62a77 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/TestTasks.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/TestTasks.java @@ -25,6 +25,7 @@ import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.stream.Collectors; @@ -57,6 +58,12 @@ public class TestTasks extends HadoopTestBase { * Thread pool for task execution. */ private ExecutorService threadPool; + + /** + * Task submitter bonded to the thread pool, or + * null for the 0-thread case. + */ + Tasks.Submitter submitter; private final CounterTask failingTask = new CounterTask("failing committer", FAILPOINT, Item::commit); @@ -117,6 +124,9 @@ public void setup() { .setDaemon(true) .setNameFormat(getMethodName() + "-pool-%d") .build()); + submitter = new PoolSubmitter(); + } else { + submitter = null; } } @@ -129,12 +139,21 @@ public void teardown() { } } + private class PoolSubmitter implements Tasks.Submitter { + + @Override + public Future submit(final Runnable task) { + return threadPool.submit(task); + } + + } + /** * create the builder. * @return pre-inited builder */ private Tasks.Builder builder() { - return Tasks.foreach(items).executeWith(threadPool); + return Tasks.foreach(items).executeWith(submitter); } private void assertRun(Tasks.Builder builder,