diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 1ec852aef3..3060dfdad7 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -1406,6 +1406,9 @@ Release 0.23.0 - Unreleased MAPREDUCE-3073. Fixed build issues in MR1. (mahadev via acmurthy) + MAPREDUCE-2691. Increase threadpool size for launching containers in + MapReduce ApplicationMaster. (vinodkv via acmurthy) + Release 0.22.0 - Unreleased INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java index 982f7d334a..95e17d8f4f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java @@ -73,6 +73,8 @@ public class ContainerLauncherImpl extends AbstractService implements private AppContext context; private ThreadPoolExecutor launcherPool; + private static final int INITIAL_POOL_SIZE = 10; + private int limitOnPoolSize; private Thread eventHandlingThread; private BlockingQueue eventQueue = new LinkedBlockingQueue(); @@ -96,16 +98,17 @@ public synchronized void init(Configuration conf) { YarnConfiguration.YARN_SECURITY_INFO, ContainerManagerSecurityInfo.class, SecurityInfo.class); this.recordFactory = RecordFactoryProvider.getRecordFactory(conf); + this.limitOnPoolSize = conf.getInt( + MRJobConfig.MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT, + MRJobConfig.DEFAULT_MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT); super.init(myLocalConfig); } public void start() { - launcherPool = - new ThreadPoolExecutor(getConfig().getInt( - MRJobConfig.MR_AM_CONTAINERLAUNCHER_THREAD_COUNT, 10), - Integer.MAX_VALUE, 1, TimeUnit.HOURS, - new LinkedBlockingQueue()); - launcherPool.prestartAllCoreThreads(); // Wait for work. + // Start with a default core-pool size of 10 and change it dynamically. + launcherPool = new ThreadPoolExecutor(INITIAL_POOL_SIZE, + Integer.MAX_VALUE, 1, TimeUnit.HOURS, + new LinkedBlockingQueue()); eventHandlingThread = new Thread(new Runnable() { @Override public void run() { @@ -117,6 +120,26 @@ public void run() { LOG.error("Returning, interrupted : " + e); return; } + + int poolSize = launcherPool.getCorePoolSize(); + + // See if we need up the pool size only if haven't reached the + // maximum limit yet. + if (poolSize != limitOnPoolSize) { + + // nodes where containers will run at *this* point of time. This is + // *not* the cluster size and doesn't need to be. + int numNodes = ugiMap.size(); + int idealPoolSize = Math.min(limitOnPoolSize, numNodes); + + if (poolSize <= idealPoolSize) { + // Bump up the pool size to idealPoolSize+INITIAL_POOL_SIZE, the + // later is just a buffer so we are not always increasing the + // pool-size + launcherPool.setCorePoolSize(idealPoolSize + INITIAL_POOL_SIZE); + } + } + // the events from the queue are handled in parallel // using a thread pool launcherPool.execute(new EventProcessor(event)); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index c456d52deb..accfdddc3d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -332,9 +332,15 @@ public interface MRJobConfig { MR_AM_PREFIX+"num-progress-splits"; public static final int DEFAULT_MR_AM_NUM_PROGRESS_SPLITS = 12; - /** Number of threads user to launch containers in the app master.*/ - public static final String MR_AM_CONTAINERLAUNCHER_THREAD_COUNT = - MR_AM_PREFIX+"containerlauncher.thread-count"; + /** + * Upper limit on the number of threads user to launch containers in the app + * master. Expect level config, you shouldn't be needing it in most cases. + */ + public static final String MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT = + MR_AM_PREFIX+"containerlauncher.thread-count-limit"; + + public static final int DEFAULT_MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT = + 500; /** Number of threads to handle job client RPC requests.*/ public static final String MR_AM_JOB_CLIENT_THREAD_COUNT =