diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index 7a6c785764..c40bb0b19c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -1266,6 +1266,29 @@ + + mapreduce.shuffle.pathcache.max-weight + 10485760 + The maximum total weight of entries the cache may contain. + + + + + mapreduce.shuffle.pathcache.expire-after-access-minutes + 5 + The length of time after an entry is last accessed that it + should be automatically removed. + + + + + mapreduce.shuffle.pathcache.concurrency-level + 16 + Uses the concurrency level to create a fixed number of hashtable + segments, each governed by its own write lock. + + + mapreduce.job.reduce.shuffle.consumer.plugin.class org.apache.hadoop.mapreduce.task.reduce.Shuffle diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java index bdd7716363..ffc65641a5 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java @@ -139,8 +139,6 @@ import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.google.common.cache.RemovalListener; -import com.google.common.cache.RemovalNotification; -import com.google.common.cache.Weigher; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.thirdparty.protobuf.ByteString; @@ -836,63 +834,58 @@ public ChannelPipeline getPipeline() throws Exception { // TODO factor out encode/decode to permit binary shuffle // TODO factor out decode of index to permit alt. models } - } class Shuffle extends SimpleChannelUpstreamHandler { - private static final int MAX_WEIGHT = 10 * 1024 * 1024; - private static final int EXPIRE_AFTER_ACCESS_MINUTES = 5; - private static final int ALLOWED_CONCURRENCY = 16; - private final Configuration conf; + private static final String MAX_WEIGHT = + "mapreduce.shuffle.pathcache.max-weight"; + private static final int DEFAULT_MAX_WEIGHT = 10 * 1024 * 1024; + + private static final String EXPIRE_AFTER_ACCESS_MINUTES = + "mapreduce.shuffle.pathcache.expire-after-access-minutes"; + private static final int DEFAULT_EXPIRE_AFTER_ACCESS_MINUTES = 5; + + private static final String CONCURRENCY_LEVEL = + "mapreduce.shuffle.pathcache.concurrency-level"; + private static final int DEFAULT_CONCURRENCY_LEVEL = 16; + private final IndexCache indexCache; + private final + LoadingCache pathCache; + private int port; - private final LoadingCache pathCache = - CacheBuilder.newBuilder().expireAfterAccess(EXPIRE_AFTER_ACCESS_MINUTES, - TimeUnit.MINUTES).softValues().concurrencyLevel(ALLOWED_CONCURRENCY). - removalListener( - new RemovalListener() { - @Override - public void onRemoval(RemovalNotification notification) { - if (LOG.isDebugEnabled()) { - LOG.debug("PathCache Eviction: " + notification.getKey() + - ", Reason=" + notification.getCause()); - } - } - } - ).maximumWeight(MAX_WEIGHT).weigher( - new Weigher() { - @Override - public int weigh(AttemptPathIdentifier key, - AttemptPathInfo value) { - return key.jobId.length() + key.user.length() + - key.attemptId.length()+ - value.indexPath.toString().length() + - value.dataPath.toString().length(); - } - } - ).build(new CacheLoader() { - @Override - public AttemptPathInfo load(AttemptPathIdentifier key) throws - Exception { - String base = getBaseLocation(key.jobId, key.user); - String attemptBase = base + key.attemptId; - Path indexFileName = getAuxiliaryLocalPathHandler() - .getLocalPathForRead(attemptBase + "/" + INDEX_FILE_NAME); - Path mapOutputFileName = getAuxiliaryLocalPathHandler() - .getLocalPathForRead(attemptBase + "/" + DATA_FILE_NAME); - if (LOG.isDebugEnabled()) { - LOG.debug("Loaded : " + key + " via loader"); - } - return new AttemptPathInfo(indexFileName, mapOutputFileName); - } - }); - - public Shuffle(Configuration conf) { - this.conf = conf; - indexCache = new IndexCache(new JobConf(conf)); + Shuffle(Configuration conf) { this.port = conf.getInt(SHUFFLE_PORT_CONFIG_KEY, DEFAULT_SHUFFLE_PORT); + this.indexCache = new IndexCache(new JobConf(conf)); + this.pathCache = CacheBuilder.newBuilder() + .expireAfterAccess(conf.getInt(EXPIRE_AFTER_ACCESS_MINUTES, + DEFAULT_EXPIRE_AFTER_ACCESS_MINUTES), TimeUnit.MINUTES) + .softValues() + .concurrencyLevel(conf.getInt(CONCURRENCY_LEVEL, + DEFAULT_CONCURRENCY_LEVEL)) + .removalListener((RemovalListener) notification -> + LOG.debug("PathCache Eviction: {}, Reason={}", + notification.getKey(), notification.getCause())) + .maximumWeight(conf.getInt(MAX_WEIGHT, DEFAULT_MAX_WEIGHT)) + .weigher((key, value) -> key.jobId.length() + key.user.length() + + key.attemptId.length()+ value.indexPath.toString().length() + + value.dataPath.toString().length()) + .build(new CacheLoader() { + @Override + public AttemptPathInfo load(AttemptPathIdentifier key) throws + Exception { + String base = getBaseLocation(key.jobId, key.user); + String attemptBase = base + key.attemptId; + Path indexFileName = getAuxiliaryLocalPathHandler() + .getLocalPathForRead(attemptBase + "/" + INDEX_FILE_NAME); + Path mapOutputFileName = getAuxiliaryLocalPathHandler() + .getLocalPathForRead(attemptBase + "/" + DATA_FILE_NAME); + LOG.debug("Loaded : {} via loader", key); + return new AttemptPathInfo(indexFileName, mapOutputFileName); + } + }); } public void setPort(int port) {