HADOOP-16798. S3A Committer thread pool shutdown problems. (#1963)

Contributed by Steve Loughran.

Fixes a condition which can cause job commit to fail if a task was
aborted < 60s before the job commit commenced: the task abort
will shut down the thread pool with a hard exit after 60s; the
job commit POST requests would be scheduled through the same pool,
so be interrupted and fail. At present the access is synchronized,
but presumably the executor shutdown code is calling wait() and releasing
locks.

Task abort is triggered from the AM when task attempts succeed but
there are still active speculative task attempts running. Thus it
only surfaces when speculation is enabled and the final tasks are
speculating, which, given they are the stragglers, is not unheard of.

Note: this problem has never been seen in production; it has surfaced
in the hadoop-aws tests on a heavily overloaded desktop
This commit is contained in:
Steve Loughran 2020-06-30 10:44:51 +01:00 committed by GitHub
parent cd188ea9f0
commit 4249c04d45
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 143 additions and 41 deletions

View File

@ -24,6 +24,7 @@
import java.util.Date;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@ -472,7 +473,7 @@ protected void commitPendingUploads(
Tasks.foreach(pending.getSourceFiles())
.stopOnFailure()
.suppressExceptions(false)
.executeWith(buildThreadPool(context))
.executeWith(buildSubmitter(context))
.abortWith(path ->
loadAndAbort(commitContext, pending, path, true, false))
.revertWith(path ->
@ -502,7 +503,7 @@ protected void precommitCheckPendingFiles(
Tasks.foreach(pending.getSourceFiles())
.stopOnFailure()
.suppressExceptions(false)
.executeWith(buildThreadPool(context))
.executeWith(buildSubmitter(context))
.run(path -> PendingSet.load(sourceFS, path));
}
}
@ -525,7 +526,7 @@ private void loadAndCommit(
Tasks.foreach(pendingSet.getCommits())
.stopOnFailure()
.suppressExceptions(false)
.executeWith(singleCommitThreadPool())
.executeWith(singleThreadSubmitter())
.onFailure((commit, exception) ->
commitContext.abortSingleCommit(commit))
.abortWith(commitContext::abortSingleCommit)
@ -580,7 +581,7 @@ private void loadAndAbort(
path);
FileSystem fs = getDestFS();
Tasks.foreach(pendingSet.getCommits())
.executeWith(singleCommitThreadPool())
.executeWith(singleThreadSubmitter())
.suppressExceptions(suppressExceptions)
.run(commit -> {
try {
@ -674,7 +675,7 @@ protected void abortPendingUploadsInCleanup(
return;
}
Tasks.foreach(pending)
.executeWith(buildThreadPool(getJobContext()))
.executeWith(buildSubmitter(getJobContext()))
.suppressExceptions(suppressExceptions)
.run(u -> commitContext.abortMultipartCommit(
u.getKey(), u.getUploadId()));
@ -838,45 +839,117 @@ protected String getRole() {
}
/**
* Returns an {@link ExecutorService} for parallel tasks. The number of
* Returns an {@link Tasks.Submitter} for parallel tasks. The number of
* threads in the thread-pool is set by fs.s3a.committer.threads.
* If num-threads is 0, this will return null;
* this is used in Tasks as a cue
* to switch to single-threaded execution.
*
* @param context the JobContext for this commit
* @return an {@link ExecutorService} or null for the number of threads
* @return a submitter or null
*/
protected final synchronized ExecutorService buildThreadPool(
protected Tasks.Submitter buildSubmitter(
JobContext context) {
if (getThreadCount(context) > 0) {
return new PoolSubmitter(context);
} else {
return null;
}
}
/**
* Returns an {@link ExecutorService} for parallel tasks. The number of
* threads in the thread-pool is set by fs.s3a.committer.threads.
* If num-threads is 0, this will raise an exception.
*
* @param context the JobContext for this commit
* @param numThreads threads
* @return an {@link ExecutorService} for the number of threads
*/
private synchronized ExecutorService buildThreadPool(
JobContext context, int numThreads) {
Preconditions.checkArgument(numThreads > 0,
"Cannot create a thread pool with no threads");
if (threadPool == null) {
int numThreads = context.getConfiguration().getInt(
FS_S3A_COMMITTER_THREADS,
DEFAULT_COMMITTER_THREADS);
LOG.debug("{}: creating thread pool of size {}", getRole(), numThreads);
if (numThreads > 0) {
threadPool = HadoopExecutors.newFixedThreadPool(numThreads,
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat(THREAD_PREFIX + context.getJobID() + "-%d")
.build());
} else {
return null;
}
threadPool = HadoopExecutors.newFixedThreadPool(numThreads,
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat(THREAD_PREFIX + context.getJobID() + "-%d")
.build());
}
return threadPool;
}
/**
* Get the thread count for this job's commit operations.
* @param context the JobContext for this commit
* @return a possibly zero thread count.
*/
private int getThreadCount(final JobContext context) {
return context.getConfiguration().getInt(
FS_S3A_COMMITTER_THREADS,
DEFAULT_COMMITTER_THREADS);
}
/**
* Submit a runnable.
* This will demand-create the thread pool if needed.
* <p></p>
* This is synchronized to ensure the thread pool is always valid when
* work is synchronized. See HADOOP-16798.
* @param context the JobContext for this commit
* @param task task to execute
* @return the future of the submitted task.
*/
private synchronized Future<?> submitRunnable(
final JobContext context,
final Runnable task) {
return buildThreadPool(context, getThreadCount(context)).submit(task);
}
/**
* The real task submitter, which hands off the work to
* the current thread pool.
*/
private final class PoolSubmitter implements Tasks.Submitter {
private final JobContext context;
private final int numThreads;
private PoolSubmitter(final JobContext context) {
this.numThreads = getThreadCount(context);
Preconditions.checkArgument(numThreads > 0,
"Cannot create a thread pool with no threads");
this.context = context;
}
@Override
public Future<?> submit(final Runnable task) {
return submitRunnable(context, task);
}
}
/**
* Destroy any thread pools; wait for that to finish,
* but don't overreact if it doesn't finish in time.
*/
protected synchronized void destroyThreadPool() {
if (threadPool != null) {
LOG.debug("Destroying thread pool");
HadoopExecutors.shutdown(threadPool, LOG,
THREAD_POOL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
protected void destroyThreadPool() {
ExecutorService pool;
// reset the thread pool in a sync block, then shut it down
// afterwards. This allows for other threads to create a
// new thread pool on demand.
synchronized(this) {
pool = this.threadPool;
threadPool = null;
}
if (pool != null) {
LOG.debug("Destroying thread pool");
HadoopExecutors.shutdown(pool, LOG,
THREAD_POOL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
}
}
/**
@ -884,11 +957,9 @@ protected synchronized void destroyThreadPool() {
* within the commit of all uploads of a single task.
* This is currently null; it is here to allow the Tasks class to
* provide the logic for execute/revert.
* Why not use the existing thread pool? Too much fear of deadlocking,
* and tasks are being committed in parallel anyway.
* @return null. always.
*/
protected final synchronized ExecutorService singleCommitThreadPool() {
protected final synchronized Tasks.Submitter singleThreadSubmitter() {
return null;
}
@ -932,7 +1003,7 @@ protected void abortPendingUploads(JobContext context,
CommitOperations.CommitContext commitContext
= initiateCommitOperation()) {
Tasks.foreach(pending)
.executeWith(buildThreadPool(context))
.executeWith(buildSubmitter(context))
.suppressExceptions(suppressExceptions)
.run(commitContext::abortSingleCommit);
}
@ -961,7 +1032,7 @@ protected void abortPendingUploads(
CommitOperations.CommitContext commitContext
= initiateCommitOperation()) {
Tasks.foreach(pending.getSourceFiles())
.executeWith(buildThreadPool(context))
.executeWith(buildSubmitter(context))
.suppressExceptions(suppressExceptions)
.run(path ->
loadAndAbort(commitContext,

View File

@ -25,7 +25,6 @@
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
@ -76,7 +75,7 @@ public interface FailureTask<I, E extends Exception> {
*/
public static class Builder<I> {
private final Iterable<I> items;
private ExecutorService service = null;
private Submitter service = null;
private FailureTask<I, ?> onFailure = null;
private boolean stopOnFailure = false;
private boolean suppressExceptions = false;
@ -96,11 +95,11 @@ public static class Builder<I> {
/**
* Declare executor service: if null, the tasks are executed in a single
* thread.
* @param executorService service to schedule tasks with.
* @param submitter service to schedule tasks with.
* @return this builder.
*/
public Builder<I> executeWith(ExecutorService executorService) {
this.service = executorService;
public Builder<I> executeWith(Submitter submitter) {
this.service = submitter;
return this;
}
@ -407,4 +406,18 @@ private static <E extends Exception> void castAndThrow(Exception e) throws E {
}
throw (E) e;
}
/**
* Interface to whatever lets us submit tasks.
*/
public interface Submitter {
/**
* Submit work.
* @param task task to execute
* @return the future of the submitted task.
*/
Future<?> submit(Runnable task);
}
}

View File

@ -23,7 +23,6 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -187,7 +186,7 @@ private void replacePartitions(
Map<Path, String> partitions = new ConcurrentHashMap<>();
FileSystem sourceFS = pending.getSourceFS();
ExecutorService pool = buildThreadPool(context);
Tasks.Submitter submitter = buildSubmitter(context);
try (DurationInfo ignored =
new DurationInfo(LOG, "Replacing partitions")) {
@ -198,7 +197,7 @@ private void replacePartitions(
Tasks.foreach(pending.getSourceFiles())
.stopOnFailure()
.suppressExceptions(false)
.executeWith(pool)
.executeWith(submitter)
.run(path -> {
PendingSet pendingSet = PendingSet.load(sourceFS, path);
Path lastParent = null;
@ -216,7 +215,7 @@ private void replacePartitions(
Tasks.foreach(partitions.keySet())
.stopOnFailure()
.suppressExceptions(false)
.executeWith(pool)
.executeWith(submitter)
.run(partitionPath -> {
LOG.debug("{}: removing partition path to be replaced: " +
getRole(), partitionPath);

View File

@ -699,7 +699,7 @@ protected int commitTaskInternal(final TaskAttemptContext context,
Tasks.foreach(taskOutput)
.stopOnFailure()
.suppressExceptions(false)
.executeWith(buildThreadPool(context))
.executeWith(buildSubmitter(context))
.run(stat -> {
Path path = stat.getPath();
File localFile = new File(path.toUri().getPath());

View File

@ -25,6 +25,7 @@
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -57,6 +58,12 @@ public class TestTasks extends HadoopTestBase {
* Thread pool for task execution.
*/
private ExecutorService threadPool;
/**
* Task submitter bonded to the thread pool, or
* null for the 0-thread case.
*/
Tasks.Submitter submitter;
private final CounterTask failingTask
= new CounterTask("failing committer", FAILPOINT, Item::commit);
@ -117,6 +124,9 @@ public void setup() {
.setDaemon(true)
.setNameFormat(getMethodName() + "-pool-%d")
.build());
submitter = new PoolSubmitter();
} else {
submitter = null;
}
}
@ -129,12 +139,21 @@ public void teardown() {
}
}
private class PoolSubmitter implements Tasks.Submitter {
@Override
public Future<?> submit(final Runnable task) {
return threadPool.submit(task);
}
}
/**
* create the builder.
* @return pre-inited builder
*/
private Tasks.Builder<Item> builder() {
return Tasks.foreach(items).executeWith(threadPool);
return Tasks.foreach(items).executeWith(submitter);
}
private void assertRun(Tasks.Builder<Item> builder,