From 2440671a117f165dcda5056404bc898df3c50803 Mon Sep 17 00:00:00 2001 From: Varun Vasudev Date: Thu, 18 Feb 2016 14:15:08 +0530 Subject: [PATCH] MAPREDUCE-6634. Log uncaught exceptions/errors in various thread pools in mapreduce. Contributed by Sidharta Seethana. --- hadoop-mapreduce-project/CHANGES.txt | 3 ++ .../hadoop/mapred/LocalContainerLauncher.java | 4 +-- .../v2/app/commit/CommitterEventHandler.java | 3 +- .../mapreduce/v2/app/job/impl/JobImpl.java | 3 +- .../app/launcher/ContainerLauncherImpl.java | 3 +- .../mapred/LocalDistributedCacheManager.java | 5 ++- .../apache/hadoop/mapred/LocalJobRunner.java | 8 +++-- .../mapred/LocatedFileStatusFetcher.java | 4 +-- .../org/apache/hadoop/mapred/TaskLog.java | 31 ++++++++++--------- .../mapred/lib/MultithreadedMapRunner.java | 4 ++- .../lib/output/TestFileOutputCommitter.java | 4 +-- .../mapreduce/v2/hs/HistoryFileManager.java | 6 ++-- .../hadoop/mapreduce/v2/hs/JobHistory.java | 3 +- .../apache/hadoop/mapred/ShuffleHandler.java | 6 ++-- .../org/apache/hadoop/examples/pi/Util.java | 5 +-- 15 files changed, 53 insertions(+), 39 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 0b8c818df3..da28bc607d 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -310,6 +310,9 @@ Release 2.9.0 - UNRELEASED MAPREDUCE-6431. JobClient should be an AutoClosable (haibochen via rkanter) + MAPREDUCE-6634. Log uncaught exceptions/errors in various thread pools in + mapreduce. (Sidharta Seethana via vvasudev) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java index 1a0d5fb076..da118c5258 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java @@ -26,7 +26,6 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; @@ -60,6 +59,7 @@ import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.util.ShutdownHookManager; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -138,7 +138,7 @@ public void serviceStart() throws Exception { // make it a daemon thread so that the process can exit even if the task is // not interruptible taskRunner = - Executors.newSingleThreadExecutor(new ThreadFactoryBuilder(). + HadoopExecutors.newSingleThreadExecutor(new ThreadFactoryBuilder(). setDaemon(true).setNameFormat("uber-SubtaskRunner").build()); // create and start an event handling thread eventHandler = new Thread(new EventHandler(), "uber-EventHandler"); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java index b53955fd85..0b1be708d3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java @@ -49,6 +49,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.concurrent.HadoopThreadPoolExecutor; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -133,7 +134,7 @@ public Thread newThread(Runnable r) { tfBuilder.setThreadFactory(backingTf); } ThreadFactory tf = tfBuilder.build(); - launcherPool = new ThreadPoolExecutor(5, 5, 1, + launcherPool = new HadoopThreadPoolExecutor(5, 5, 1, TimeUnit.HOURS, new LinkedBlockingQueue(), tf); eventHandlingThread = new Thread(new Runnable() { @Override diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index 5ed07622ef..c8c5ce90ca 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -116,6 +116,7 @@ import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.concurrent.HadoopScheduledThreadPoolExecutor; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeReport; @@ -698,7 +699,7 @@ public JobImpl(JobId jobId, ApplicationAttemptId applicationAttemptId, .setNameFormat("Job Fail Wait Timeout Monitor #%d") .setDaemon(true) .build(); - this.executor = new ScheduledThreadPoolExecutor(1, threadFactory); + this.executor = new HadoopScheduledThreadPoolExecutor(1, threadFactory); // This "this leak" is okay because the retained pointer is in an // instance variable. diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java index a7e966cd36..189e2ef9e1 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java @@ -45,6 +45,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.concurrent.HadoopThreadPoolExecutor; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; @@ -266,7 +267,7 @@ protected void serviceStart() throws Exception { "ContainerLauncher #%d").setDaemon(true).build(); // Start with a default core-pool size of 10 and change it dynamically. - launcherPool = new ThreadPoolExecutor(initialPoolSize, + launcherPool = new HadoopThreadPoolExecutor(initialPoolSize, Integer.MAX_VALUE, 1, TimeUnit.HOURS, new LinkedBlockingQueue(), tf); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java index 8606ede816..3b87197e1b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java @@ -35,7 +35,6 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicLong; @@ -43,7 +42,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileContext; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.Path; @@ -53,6 +51,7 @@ import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.util.ConverterUtils; @@ -121,7 +120,7 @@ public void setup(JobConf conf) throws IOException { ThreadFactory tf = new ThreadFactoryBuilder() .setNameFormat("LocalDistributedCacheManager Downloader #%d") .build(); - exec = Executors.newCachedThreadPool(tf); + exec = HadoopExecutors.newCachedThreadPool(tf); Path destPath = localDirAllocator.getLocalPathForWrite(".", conf); Map> resourcesToPaths = Maps.newHashMap(); for (LocalResource resource : localResources.values()) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java index 45d3cc5b29..37c147dea3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java @@ -32,7 +32,6 @@ import java.util.Map; import java.util.Random; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -74,6 +73,7 @@ import org.apache.hadoop.util.ReflectionUtils; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.util.concurrent.HadoopExecutors; /** Implements MapReduce locally, in-process, for debugging. */ @InterfaceAudience.Private @@ -428,7 +428,8 @@ protected synchronized ExecutorService createMapExecutor() { ThreadFactory tf = new ThreadFactoryBuilder() .setNameFormat("LocalJobRunner Map Task Executor #%d") .build(); - ExecutorService executor = Executors.newFixedThreadPool(maxMapThreads, tf); + ExecutorService executor = HadoopExecutors.newFixedThreadPool( + maxMapThreads, tf); return executor; } @@ -454,7 +455,8 @@ protected synchronized ExecutorService createReduceExecutor() { LOG.debug("Reduce tasks to process: " + this.numReduceTasks); // Create a new executor service to drain the work queue. - ExecutorService executor = Executors.newFixedThreadPool(maxReduceThreads); + ExecutorService executor = HadoopExecutors.newFixedThreadPool( + maxReduceThreads); return executor; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java index 87114ad04f..a039bc9748 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java @@ -24,7 +24,6 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; @@ -47,6 +46,7 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.util.concurrent.HadoopExecutors; /** * Utility class to fetch block locations for specified Input paths using a @@ -92,7 +92,7 @@ public LocatedFileStatusFetcher(Configuration conf, Path[] dirs, IOException { int numThreads = conf.getInt(FileInputFormat.LIST_STATUS_NUM_THREADS, FileInputFormat.DEFAULT_LIST_STATUS_NUM_THREADS); - rawExec = Executors.newFixedThreadPool( + rawExec = HadoopExecutors.newFixedThreadPool( numThreads, new ThreadFactoryBuilder().setDaemon(true) .setNameFormat("GetFileInfo #%d").build()); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java index e07b5be287..bf838c2388 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java @@ -51,6 +51,7 @@ import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.ShutdownHookManager; +import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.log4j.Appender; import org.apache.log4j.LogManager; @@ -327,22 +328,22 @@ private static void flushAppenders(Logger l) { public static ScheduledExecutorService createLogSyncer() { final ScheduledExecutorService scheduler = - Executors.newSingleThreadScheduledExecutor( - new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - final Thread t = Executors.defaultThreadFactory().newThread(r); - t.setDaemon(true); - t.setName("Thread for syncLogs"); - return t; - } - }); + HadoopExecutors.newSingleThreadScheduledExecutor( + new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + final Thread t = Executors.defaultThreadFactory().newThread(r); + t.setDaemon(true); + t.setName("Thread for syncLogs"); + return t; + } + }); ShutdownHookManager.get().addShutdownHook(new Runnable() { - @Override - public void run() { - TaskLog.syncLogsShutdown(scheduler); - } - }, 50); + @Override + public void run() { + TaskLog.syncLogsShutdown(scheduler); + } + }, 50); scheduler.scheduleWithFixedDelay( new Runnable() { @Override diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java index 98d794bf46..05339bc5de 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java @@ -31,6 +31,7 @@ import org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.util.concurrent.HadoopThreadPoolExecutor; import java.io.IOException; import java.util.concurrent.*; @@ -84,7 +85,8 @@ public void configure(JobConf jobConf) { // Creating a threadpool of the configured size to execute the Mapper // map method in parallel. - executorService = new ThreadPoolExecutor(numberOfThreads, numberOfThreads, + executorService = new HadoopThreadPoolExecutor(numberOfThreads, + numberOfThreads, 0L, TimeUnit.MILLISECONDS, new BlockingArrayQueue (numberOfThreads)); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java index eba513b5da..20d8ab54d7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java @@ -25,10 +25,10 @@ import java.net.URI; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import junit.framework.TestCase; +import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.junit.Assert; import org.apache.commons.logging.Log; @@ -696,7 +696,7 @@ public Path getDefaultWorkFile(TaskAttemptContext context, }; } - final ExecutorService executor = Executors.newFixedThreadPool(2); + final ExecutorService executor = HadoopExecutors.newFixedThreadPool(2); try { for (int i = 0; i < taCtx.length; i++) { final int taskIdx = i; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java index 6be0d27453..677d5c2917 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java @@ -66,6 +66,7 @@ import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.ShutdownThreadsHelper; +import org.apache.hadoop.util.concurrent.HadoopThreadPoolExecutor; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import com.google.common.annotations.VisibleForTesting; @@ -554,8 +555,9 @@ protected void serviceInit(Configuration conf) throws Exception { JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_THREAD_COUNT); ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat( "MoveIntermediateToDone Thread #%d").build(); - moveToDoneExecutor = new ThreadPoolExecutor(numMoveThreads, numMoveThreads, - 1, TimeUnit.HOURS, new LinkedBlockingQueue(), tf); + moveToDoneExecutor = new HadoopThreadPoolExecutor(numMoveThreads, + numMoveThreads, 1, TimeUnit.HOURS, + new LinkedBlockingQueue(), tf); super.serviceInit(conf); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java index 41bc90a99a..45075c9d16 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java @@ -44,6 +44,7 @@ import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.Service; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.concurrent.HadoopScheduledThreadPoolExecutor; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.event.EventHandler; @@ -126,7 +127,7 @@ protected void serviceStart() throws Exception { ((Service) storage).start(); } - scheduledExecutor = new ScheduledThreadPoolExecutor(2, + scheduledExecutor = new HadoopScheduledThreadPoolExecutor(2, new ThreadFactoryBuilder().setNameFormat("Log Scanner/Cleaner #%d") .build()); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java index 2fb7811080..0d6e900b39 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java @@ -46,7 +46,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -81,6 +80,7 @@ import org.apache.hadoop.security.ssl.SSLFactory; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.Shell; +import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto; @@ -475,8 +475,8 @@ protected void serviceInit(Configuration conf) throws Exception { .build(); selector = new NioServerSocketChannelFactory( - Executors.newCachedThreadPool(bossFactory), - Executors.newCachedThreadPool(workerFactory), + HadoopExecutors.newCachedThreadPool(bossFactory), + HadoopExecutors.newCachedThreadPool(workerFactory), maxShuffleThreads); super.serviceInit(new Configuration(conf)); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/pi/Util.java b/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/pi/Util.java index 8afc1bd760..e74c09194f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/pi/Util.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/pi/Util.java @@ -35,7 +35,6 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.Semaphore; @@ -48,6 +47,7 @@ import org.apache.hadoop.util.ToolRunner; import com.google.common.base.Charsets; +import org.apache.hadoop.util.concurrent.HadoopExecutors; /** Utility methods */ public class Util { @@ -157,7 +157,8 @@ public static String parseStringVariable(final String name, final String s) { /** Execute the callables by a number of threads */ public static > void execute(int nThreads, List callables ) throws InterruptedException, ExecutionException { - final ExecutorService executor = Executors.newFixedThreadPool(nThreads); + final ExecutorService executor = HadoopExecutors.newFixedThreadPool( + nThreads); final List> futures = executor.invokeAll(callables); for(Future f : futures) f.get();