MAPREDUCE-6265. Make ContainerLauncherImpl.INITIAL_POOL_SIZE configurable to better control to launch/kill containers. Contributed by Zhihai Xu

This commit is contained in:
Tsuyoshi Ozawa 2015-03-14 16:44:02 +09:00
parent 32741cf3d2
commit 9d38520c8e
5 changed files with 45 additions and 9 deletions

View File

@ -340,6 +340,9 @@ Release 2.7.0 - UNRELEASED
MAPREDUCE-6263. Configurable timeout between YARNRunner terminate the MAPREDUCE-6263. Configurable timeout between YARNRunner terminate the
application and forcefully kill. (Eric Payne via junping_du) application and forcefully kill. (Eric Payne via junping_du)
MAPREDUCE-6265. Make ContainerLauncherImpl.INITIAL_POOL_SIZE configurable
to better control to launch/kill containers. (Zhihai Xu via ozawa)
OPTIMIZATIONS OPTIMIZATIONS
MAPREDUCE-6169. MergeQueue should release reference to the current item MAPREDUCE-6169. MergeQueue should release reference to the current item

View File

@ -70,7 +70,7 @@ public class ContainerLauncherImpl extends AbstractService implements
new ConcurrentHashMap<ContainerId, Container>(); new ConcurrentHashMap<ContainerId, Container>();
private final AppContext context; private final AppContext context;
protected ThreadPoolExecutor launcherPool; protected ThreadPoolExecutor launcherPool;
protected static final int INITIAL_POOL_SIZE = 10; protected int initialPoolSize;
private int limitOnPoolSize; private int limitOnPoolSize;
private Thread eventHandlingThread; private Thread eventHandlingThread;
protected BlockingQueue<ContainerLauncherEvent> eventQueue = protected BlockingQueue<ContainerLauncherEvent> eventQueue =
@ -246,6 +246,12 @@ protected void serviceInit(Configuration conf) throws Exception {
MRJobConfig.MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT, MRJobConfig.MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT,
MRJobConfig.DEFAULT_MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT); MRJobConfig.DEFAULT_MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT);
LOG.info("Upper limit on the thread pool size is " + this.limitOnPoolSize); LOG.info("Upper limit on the thread pool size is " + this.limitOnPoolSize);
this.initialPoolSize = conf.getInt(
MRJobConfig.MR_AM_CONTAINERLAUNCHER_THREADPOOL_INITIAL_SIZE,
MRJobConfig.DEFAULT_MR_AM_CONTAINERLAUNCHER_THREADPOOL_INITIAL_SIZE);
LOG.info("The thread pool initial size is " + this.initialPoolSize);
super.serviceInit(conf); super.serviceInit(conf);
cmProxy = new ContainerManagementProtocolProxy(conf); cmProxy = new ContainerManagementProtocolProxy(conf);
} }
@ -256,7 +262,7 @@ protected void serviceStart() throws Exception {
"ContainerLauncher #%d").setDaemon(true).build(); "ContainerLauncher #%d").setDaemon(true).build();
// Start with a default core-pool size of 10 and change it dynamically. // Start with a default core-pool size of 10 and change it dynamically.
launcherPool = new ThreadPoolExecutor(INITIAL_POOL_SIZE, launcherPool = new ThreadPoolExecutor(initialPoolSize,
Integer.MAX_VALUE, 1, TimeUnit.HOURS, Integer.MAX_VALUE, 1, TimeUnit.HOURS,
new LinkedBlockingQueue<Runnable>(), new LinkedBlockingQueue<Runnable>(),
tf); tf);
@ -289,11 +295,11 @@ public void run() {
int idealPoolSize = Math.min(limitOnPoolSize, numNodes); int idealPoolSize = Math.min(limitOnPoolSize, numNodes);
if (poolSize < idealPoolSize) { if (poolSize < idealPoolSize) {
// Bump up the pool size to idealPoolSize+INITIAL_POOL_SIZE, the // Bump up the pool size to idealPoolSize+initialPoolSize, the
// later is just a buffer so we are not always increasing the // later is just a buffer so we are not always increasing the
// pool-size // pool-size
int newPoolSize = Math.min(limitOnPoolSize, idealPoolSize int newPoolSize = Math.min(limitOnPoolSize, idealPoolSize
+ INITIAL_POOL_SIZE); + initialPoolSize);
LOG.info("Setting ContainerLauncher pool size to " + newPoolSize LOG.info("Setting ContainerLauncher pool size to " + newPoolSize
+ " as number-of-nodes to talk to is " + numNodes); + " as number-of-nodes to talk to is " + numNodes);
launcherPool.setCorePoolSize(newPoolSize); launcherPool.setCorePoolSize(newPoolSize);

View File

@ -90,7 +90,7 @@ public class TestContainerLauncher {
static final Log LOG = LogFactory.getLog(TestContainerLauncher.class); static final Log LOG = LogFactory.getLog(TestContainerLauncher.class);
@Test (timeout = 5000) @Test (timeout = 10000)
public void testPoolSize() throws InterruptedException { public void testPoolSize() throws InterruptedException {
ApplicationId appId = ApplicationId.newInstance(12345, 67); ApplicationId appId = ApplicationId.newInstance(12345, 67);
@ -108,12 +108,14 @@ public void testPoolSize() throws InterruptedException {
ThreadPoolExecutor threadPool = containerLauncher.getThreadPool(); ThreadPoolExecutor threadPool = containerLauncher.getThreadPool();
// No events yet // No events yet
Assert.assertEquals(containerLauncher.initialPoolSize,
MRJobConfig.DEFAULT_MR_AM_CONTAINERLAUNCHER_THREADPOOL_INITIAL_SIZE);
Assert.assertEquals(0, threadPool.getPoolSize()); Assert.assertEquals(0, threadPool.getPoolSize());
Assert.assertEquals(ContainerLauncherImpl.INITIAL_POOL_SIZE, Assert.assertEquals(containerLauncher.initialPoolSize,
threadPool.getCorePoolSize()); threadPool.getCorePoolSize());
Assert.assertNull(containerLauncher.foundErrors); Assert.assertNull(containerLauncher.foundErrors);
containerLauncher.expectedCorePoolSize = ContainerLauncherImpl.INITIAL_POOL_SIZE; containerLauncher.expectedCorePoolSize = containerLauncher.initialPoolSize;
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
ContainerId containerId = ContainerId.newContainerId(appAttemptId, i); ContainerId containerId = ContainerId.newContainerId(appAttemptId, i);
TaskAttemptId taskAttemptId = MRBuilderUtils.newTaskAttemptId(taskId, i); TaskAttemptId taskAttemptId = MRBuilderUtils.newTaskAttemptId(taskId, i);
@ -152,7 +154,7 @@ public void testPoolSize() throws InterruptedException {
// Different hosts, there should be an increase in core-thread-pool size to // Different hosts, there should be an increase in core-thread-pool size to
// 21(11hosts+10buffer) // 21(11hosts+10buffer)
// Core pool size should be 21 but the live pool size should be only 11. // Core pool size should be 21 but the live pool size should be only 11.
containerLauncher.expectedCorePoolSize = 11 + ContainerLauncherImpl.INITIAL_POOL_SIZE; containerLauncher.expectedCorePoolSize = 11 + containerLauncher.initialPoolSize;
containerLauncher.finishEventHandling = false; containerLauncher.finishEventHandling = false;
ContainerId containerId = ContainerId.newContainerId(appAttemptId, 21); ContainerId containerId = ContainerId.newContainerId(appAttemptId, 21);
TaskAttemptId taskAttemptId = MRBuilderUtils.newTaskAttemptId(taskId, 21); TaskAttemptId taskAttemptId = MRBuilderUtils.newTaskAttemptId(taskId, 21);
@ -164,6 +166,15 @@ public void testPoolSize() throws InterruptedException {
Assert.assertNull(containerLauncher.foundErrors); Assert.assertNull(containerLauncher.foundErrors);
containerLauncher.stop(); containerLauncher.stop();
// change configuration MR_AM_CONTAINERLAUNCHER_THREADPOOL_INITIAL_SIZE
// and verify initialPoolSize value.
Configuration conf = new Configuration();
conf.setInt(MRJobConfig.MR_AM_CONTAINERLAUNCHER_THREADPOOL_INITIAL_SIZE,
20);
containerLauncher = new CustomContainerLauncher(context);
containerLauncher.init(conf);
Assert.assertEquals(containerLauncher.initialPoolSize, 20);
} }
@Test(timeout = 5000) @Test(timeout = 5000)
@ -187,7 +198,7 @@ public void testPoolLimits() throws InterruptedException {
ThreadPoolExecutor threadPool = containerLauncher.getThreadPool(); ThreadPoolExecutor threadPool = containerLauncher.getThreadPool();
// 10 different hosts // 10 different hosts
containerLauncher.expectedCorePoolSize = ContainerLauncherImpl.INITIAL_POOL_SIZE; containerLauncher.expectedCorePoolSize = containerLauncher.initialPoolSize;
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
containerLauncher.handle(new ContainerLauncherEvent(taskAttemptId, containerLauncher.handle(new ContainerLauncherEvent(taskAttemptId,
containerId, "host" + i + ":1234", null, containerId, "host" + i + ":1234", null,

View File

@ -504,6 +504,14 @@ public interface MRJobConfig {
public static final int DEFAULT_MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT = public static final int DEFAULT_MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT =
500; 500;
/**
* The initial size of thread pool to launch containers in the app master
*/
public static final String MR_AM_CONTAINERLAUNCHER_THREADPOOL_INITIAL_SIZE =
MR_AM_PREFIX+"containerlauncher.threadpool-initial-size";
public static final int DEFAULT_MR_AM_CONTAINERLAUNCHER_THREADPOOL_INITIAL_SIZE =
10;
/** Number of threads to handle job client RPC requests.*/ /** Number of threads to handle job client RPC requests.*/
public static final String MR_AM_JOB_CLIENT_THREAD_COUNT = public static final String MR_AM_JOB_CLIENT_THREAD_COUNT =
MR_AM_PREFIX + "job.client.thread-count"; MR_AM_PREFIX + "job.client.thread-count";

View File

@ -1694,4 +1694,12 @@
calculated as (heapSize / mapreduce.heap.memory-mb.ratio). calculated as (heapSize / mapreduce.heap.memory-mb.ratio).
</description> </description>
</property> </property>
<property>
<name>yarn.app.mapreduce.am.containerlauncher.threadpool-initial-size</name>
<value>10</value>
<description>The initial size of thread pool to launch containers in the
app master.
</description>
</property>
</configuration> </configuration>