MAPREDUCE-2691. Increase threadpool size for launching containers in MapReduce ApplicationMaster. Contributed by Vinod K V.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1175294 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
e7b63aebcb
commit
d09ceac1f7
@ -1406,6 +1406,9 @@ Release 0.23.0 - Unreleased
|
|||||||
|
|
||||||
MAPREDUCE-3073. Fixed build issues in MR1. (mahadev via acmurthy)
|
MAPREDUCE-3073. Fixed build issues in MR1. (mahadev via acmurthy)
|
||||||
|
|
||||||
|
MAPREDUCE-2691. Increase threadpool size for launching containers in
|
||||||
|
MapReduce ApplicationMaster. (vinodkv via acmurthy)
|
||||||
|
|
||||||
Release 0.22.0 - Unreleased
|
Release 0.22.0 - Unreleased
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -73,6 +73,8 @@ public class ContainerLauncherImpl extends AbstractService implements
|
|||||||
|
|
||||||
private AppContext context;
|
private AppContext context;
|
||||||
private ThreadPoolExecutor launcherPool;
|
private ThreadPoolExecutor launcherPool;
|
||||||
|
private static final int INITIAL_POOL_SIZE = 10;
|
||||||
|
private int limitOnPoolSize;
|
||||||
private Thread eventHandlingThread;
|
private Thread eventHandlingThread;
|
||||||
private BlockingQueue<ContainerLauncherEvent> eventQueue =
|
private BlockingQueue<ContainerLauncherEvent> eventQueue =
|
||||||
new LinkedBlockingQueue<ContainerLauncherEvent>();
|
new LinkedBlockingQueue<ContainerLauncherEvent>();
|
||||||
@ -96,16 +98,17 @@ public synchronized void init(Configuration conf) {
|
|||||||
YarnConfiguration.YARN_SECURITY_INFO,
|
YarnConfiguration.YARN_SECURITY_INFO,
|
||||||
ContainerManagerSecurityInfo.class, SecurityInfo.class);
|
ContainerManagerSecurityInfo.class, SecurityInfo.class);
|
||||||
this.recordFactory = RecordFactoryProvider.getRecordFactory(conf);
|
this.recordFactory = RecordFactoryProvider.getRecordFactory(conf);
|
||||||
|
this.limitOnPoolSize = conf.getInt(
|
||||||
|
MRJobConfig.MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT,
|
||||||
|
MRJobConfig.DEFAULT_MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT);
|
||||||
super.init(myLocalConfig);
|
super.init(myLocalConfig);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void start() {
|
public void start() {
|
||||||
launcherPool =
|
// Start with a default core-pool size of 10 and change it dynamically.
|
||||||
new ThreadPoolExecutor(getConfig().getInt(
|
launcherPool = new ThreadPoolExecutor(INITIAL_POOL_SIZE,
|
||||||
MRJobConfig.MR_AM_CONTAINERLAUNCHER_THREAD_COUNT, 10),
|
Integer.MAX_VALUE, 1, TimeUnit.HOURS,
|
||||||
Integer.MAX_VALUE, 1, TimeUnit.HOURS,
|
new LinkedBlockingQueue<Runnable>());
|
||||||
new LinkedBlockingQueue<Runnable>());
|
|
||||||
launcherPool.prestartAllCoreThreads(); // Wait for work.
|
|
||||||
eventHandlingThread = new Thread(new Runnable() {
|
eventHandlingThread = new Thread(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
@ -117,6 +120,26 @@ public void run() {
|
|||||||
LOG.error("Returning, interrupted : " + e);
|
LOG.error("Returning, interrupted : " + e);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int poolSize = launcherPool.getCorePoolSize();
|
||||||
|
|
||||||
|
// See if we need up the pool size only if haven't reached the
|
||||||
|
// maximum limit yet.
|
||||||
|
if (poolSize != limitOnPoolSize) {
|
||||||
|
|
||||||
|
// nodes where containers will run at *this* point of time. This is
|
||||||
|
// *not* the cluster size and doesn't need to be.
|
||||||
|
int numNodes = ugiMap.size();
|
||||||
|
int idealPoolSize = Math.min(limitOnPoolSize, numNodes);
|
||||||
|
|
||||||
|
if (poolSize <= idealPoolSize) {
|
||||||
|
// Bump up the pool size to idealPoolSize+INITIAL_POOL_SIZE, the
|
||||||
|
// later is just a buffer so we are not always increasing the
|
||||||
|
// pool-size
|
||||||
|
launcherPool.setCorePoolSize(idealPoolSize + INITIAL_POOL_SIZE);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// the events from the queue are handled in parallel
|
// the events from the queue are handled in parallel
|
||||||
// using a thread pool
|
// using a thread pool
|
||||||
launcherPool.execute(new EventProcessor(event));
|
launcherPool.execute(new EventProcessor(event));
|
||||||
|
@ -332,9 +332,15 @@ public interface MRJobConfig {
|
|||||||
MR_AM_PREFIX+"num-progress-splits";
|
MR_AM_PREFIX+"num-progress-splits";
|
||||||
public static final int DEFAULT_MR_AM_NUM_PROGRESS_SPLITS = 12;
|
public static final int DEFAULT_MR_AM_NUM_PROGRESS_SPLITS = 12;
|
||||||
|
|
||||||
/** Number of threads user to launch containers in the app master.*/
|
/**
|
||||||
public static final String MR_AM_CONTAINERLAUNCHER_THREAD_COUNT =
|
* Upper limit on the number of threads user to launch containers in the app
|
||||||
MR_AM_PREFIX+"containerlauncher.thread-count";
|
* master. Expect level config, you shouldn't be needing it in most cases.
|
||||||
|
*/
|
||||||
|
public static final String MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT =
|
||||||
|
MR_AM_PREFIX+"containerlauncher.thread-count-limit";
|
||||||
|
|
||||||
|
public static final int DEFAULT_MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT =
|
||||||
|
500;
|
||||||
|
|
||||||
/** Number of threads to handle job client RPC requests.*/
|
/** Number of threads to handle job client RPC requests.*/
|
||||||
public static final String MR_AM_JOB_CLIENT_THREAD_COUNT =
|
public static final String MR_AM_JOB_CLIENT_THREAD_COUNT =
|
||||||
|
Loading…
Reference in New Issue
Block a user