diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index d81fc50246..b9db11cf17 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -212,6 +212,9 @@ Release 2.2.1 - UNRELEASED
MAPREDUCE-5457. Add a KeyOnlyTextOutputReader to enable streaming to write
out text files without separators (Sandy Ryza)
+ MAPREDUCE-5596. Allow configuring the number of threads used to serve
+ shuffle connections (Sandy Ryza via jlowe)
+
OPTIMIZATIONS
MAPREDUCE-4680. Job history cleaner should only check timestamps of files in
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
index 598d106ce9..29facecb6b 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
@@ -304,6 +304,16 @@
+
+ mapreduce.shuffle.max.threads
+ 0
+ Max allowed threads for serving shuffle connections. Set to zero
+ to indicate the default of 2 times the number of available
+ processors (as reported by Runtime.availableProcessors()). Netty is used to
+ serve requests, so a thread is not needed for each connection.
+
+
+
mapreduce.reduce.markreset.buffer.percent
0.0
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
index 82fd59e551..9f377e23ac 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
@@ -163,6 +163,10 @@ public class ShuffleHandler extends AuxiliaryService {
public static final String MAX_SHUFFLE_CONNECTIONS = "mapreduce.shuffle.max.connections";
public static final int DEFAULT_MAX_SHUFFLE_CONNECTIONS = 0; // 0 implies no limit
+
+ public static final String MAX_SHUFFLE_THREADS = "mapreduce.shuffle.max.threads";
+ // 0 implies Netty default of 2 * number of available processors
+ public static final int DEFAULT_MAX_SHUFFLE_THREADS = 0;
@Metrics(about="Shuffle output metrics", context="mapred")
static class ShuffleMetrics implements ChannelFutureListener {
@@ -282,6 +286,11 @@ protected void serviceInit(Configuration conf) throws Exception {
maxShuffleConnections = conf.getInt(MAX_SHUFFLE_CONNECTIONS,
DEFAULT_MAX_SHUFFLE_CONNECTIONS);
+ int maxShuffleThreads = conf.getInt(MAX_SHUFFLE_THREADS,
+ DEFAULT_MAX_SHUFFLE_THREADS);
+ if (maxShuffleThreads == 0) {
+ maxShuffleThreads = 2 * Runtime.getRuntime().availableProcessors();
+ }
ThreadFactory bossFactory = new ThreadFactoryBuilder()
.setNameFormat("ShuffleHandler Netty Boss #%d")
@@ -292,7 +301,8 @@ protected void serviceInit(Configuration conf) throws Exception {
selector = new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(bossFactory),
- Executors.newCachedThreadPool(workerFactory));
+ Executors.newCachedThreadPool(workerFactory),
+ maxShuffleThreads);
super.serviceInit(new Configuration(conf));
}