MAPREDUCE-5596. Allow configuring the number of threads used to serve shuffle connections. Contributed by Sandy Ryza

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1536711 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jason Darrell Lowe 2013-10-29 13:41:50 +00:00
parent defa7af4d3
commit e0c99b80d4
3 changed files with 24 additions and 1 deletions

View File

@ -212,6 +212,9 @@ Release 2.2.1 - UNRELEASED
MAPREDUCE-5457. Add a KeyOnlyTextOutputReader to enable streaming to write MAPREDUCE-5457. Add a KeyOnlyTextOutputReader to enable streaming to write
out text files without separators (Sandy Ryza) out text files without separators (Sandy Ryza)
MAPREDUCE-5596. Allow configuring the number of threads used to serve
shuffle connections (Sandy Ryza via jlowe)
OPTIMIZATIONS OPTIMIZATIONS
MAPREDUCE-4680. Job history cleaner should only check timestamps of files in MAPREDUCE-4680. Job history cleaner should only check timestamps of files in

View File

@ -304,6 +304,16 @@
</description> </description>
</property> </property>
<property>
<name>mapreduce.shuffle.max.threads</name>
<value>0</value>
<description>Max allowed threads for serving shuffle connections. Set to zero
to indicate the default of 2 times the number of available
processors (as reported by Runtime.availableProcessors()). Netty is used to
serve requests, so a thread is not needed for each connection.
</description>
</property>
<property> <property>
<name>mapreduce.reduce.markreset.buffer.percent</name> <name>mapreduce.reduce.markreset.buffer.percent</name>
<value>0.0</value> <value>0.0</value>

View File

@ -164,6 +164,10 @@ public class ShuffleHandler extends AuxiliaryService {
public static final String MAX_SHUFFLE_CONNECTIONS = "mapreduce.shuffle.max.connections"; public static final String MAX_SHUFFLE_CONNECTIONS = "mapreduce.shuffle.max.connections";
public static final int DEFAULT_MAX_SHUFFLE_CONNECTIONS = 0; // 0 implies no limit public static final int DEFAULT_MAX_SHUFFLE_CONNECTIONS = 0; // 0 implies no limit
public static final String MAX_SHUFFLE_THREADS = "mapreduce.shuffle.max.threads";
// 0 implies Netty default of 2 * number of available processors
public static final int DEFAULT_MAX_SHUFFLE_THREADS = 0;
@Metrics(about="Shuffle output metrics", context="mapred") @Metrics(about="Shuffle output metrics", context="mapred")
static class ShuffleMetrics implements ChannelFutureListener { static class ShuffleMetrics implements ChannelFutureListener {
@Metric("Shuffle output in bytes") @Metric("Shuffle output in bytes")
@ -282,6 +286,11 @@ protected void serviceInit(Configuration conf) throws Exception {
maxShuffleConnections = conf.getInt(MAX_SHUFFLE_CONNECTIONS, maxShuffleConnections = conf.getInt(MAX_SHUFFLE_CONNECTIONS,
DEFAULT_MAX_SHUFFLE_CONNECTIONS); DEFAULT_MAX_SHUFFLE_CONNECTIONS);
int maxShuffleThreads = conf.getInt(MAX_SHUFFLE_THREADS,
DEFAULT_MAX_SHUFFLE_THREADS);
if (maxShuffleThreads == 0) {
maxShuffleThreads = 2 * Runtime.getRuntime().availableProcessors();
}
ThreadFactory bossFactory = new ThreadFactoryBuilder() ThreadFactory bossFactory = new ThreadFactoryBuilder()
.setNameFormat("ShuffleHandler Netty Boss #%d") .setNameFormat("ShuffleHandler Netty Boss #%d")
@ -292,7 +301,8 @@ protected void serviceInit(Configuration conf) throws Exception {
selector = new NioServerSocketChannelFactory( selector = new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(bossFactory), Executors.newCachedThreadPool(bossFactory),
Executors.newCachedThreadPool(workerFactory)); Executors.newCachedThreadPool(workerFactory),
maxShuffleThreads);
super.serviceInit(new Configuration(conf)); super.serviceInit(new Configuration(conf));
} }