diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 19f95fc011..f7e3bded2e 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -365,6 +365,9 @@ Release 2.8.0 - UNRELEASED MAPREDUCE-6349. Fix typo in property org.apache.hadoop.mapreduce. lib.chain.Chain.REDUCER_INPUT_VALUE_CLASS. (Ray Chiang via ozawa) + MAPREDUCE-5649. Reduce cannot use more than 2G memory for the final merge + (Gera Shegalov via jlowe) + Release 2.7.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java index 8bf17ef75f..f7887070b9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java @@ -93,8 +93,10 @@ public class MergeManagerImpl implements MergeManager { Set onDiskMapOutputs = new TreeSet(); private final OnDiskMerger onDiskMerger; - - private final long memoryLimit; + + @VisibleForTesting + final long memoryLimit; + private long usedMemory; private long commitMemory; private final long maxSingleShuffleLimit; @@ -167,11 +169,10 @@ public MergeManagerImpl(TaskAttemptID reduceId, JobConf jobConf, } // Allow unit tests to fix Runtime memory - this.memoryLimit = - (long)(jobConf.getLong(MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES, - Math.min(Runtime.getRuntime().maxMemory(), Integer.MAX_VALUE)) - * maxInMemCopyUse); - + this.memoryLimit = (long)(jobConf.getLong( + MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES, + Runtime.getRuntime().maxMemory()) * maxInMemCopyUse); + this.ioSortFactor = jobConf.getInt(MRJobConfig.IO_SORT_FACTOR, 100); final float singleShuffleMemoryLimitPercent = @@ -202,7 +203,7 @@ public MergeManagerImpl(TaskAttemptID reduceId, JobConf jobConf, if (this.maxSingleShuffleLimit >= this.mergeThreshold) { throw new RuntimeException("Invalid configuration: " - + "maxSingleShuffleLimit should be less than mergeThreshold" + + "maxSingleShuffleLimit should be less than mergeThreshold " + "maxSingleShuffleLimit: " + this.maxSingleShuffleLimit + "mergeThreshold: " + this.mergeThreshold); } @@ -668,24 +669,26 @@ public void close() throws IOException { } } + @VisibleForTesting + final long getMaxInMemReduceLimit() { + final float maxRedPer = + jobConf.getFloat(MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT, 0f); + if (maxRedPer > 1.0 || maxRedPer < 0.0) { + throw new RuntimeException(maxRedPer + ": " + + MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT + + " must be a float between 0 and 1.0"); + } + return (long)(memoryLimit * maxRedPer); + } + private RawKeyValueIterator finalMerge(JobConf job, FileSystem fs, List> inMemoryMapOutputs, List onDiskMapOutputs ) throws IOException { - LOG.info("finalMerge called with " + - inMemoryMapOutputs.size() + " in-memory map-outputs and " + - onDiskMapOutputs.size() + " on-disk map-outputs"); - - final float maxRedPer = - job.getFloat(MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT, 0f); - if (maxRedPer > 1.0 || maxRedPer < 0.0) { - throw new IOException(MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT + - maxRedPer); - } - int maxInMemReduce = (int)Math.min( - Runtime.getRuntime().maxMemory() * maxRedPer, Integer.MAX_VALUE); - - + LOG.info("finalMerge called with " + + inMemoryMapOutputs.size() + " in-memory map-outputs and " + + onDiskMapOutputs.size() + " on-disk map-outputs"); + final long maxInMemReduce = getMaxInMemReduceLimit(); // merge config params Class keyClass = (Class)job.getMapOutputKeyClass(); Class valueClass = (Class)job.getMapOutputValueClass(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java index 8d6bab9273..ef860afaf8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java @@ -260,4 +260,33 @@ public void testOnDiskMerger() throws IOException, URISyntaxException, } } + + @Test + public void testLargeMemoryLimits() throws Exception { + final JobConf conf = new JobConf(); + // Xmx in production + conf.setLong(MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES, + 8L * 1024 * 1024 * 1024); + + // M1 = Xmx fraction for map outputs + conf.setFloat(MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT, 1.0f); + + // M2 = max M1 fraction for a single maple output + conf.setFloat(MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT, 0.95f); + + // M3 = M1 fraction at which in memory merge is triggered + conf.setFloat(MRJobConfig.SHUFFLE_MERGE_PERCENT, 1.0f); + + // M4 = M1 fraction of map outputs remaining in memory for a reduce + conf.setFloat(MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT, 1.0f); + + final MergeManagerImpl mgr = new MergeManagerImpl( + null, conf, mock(LocalFileSystem.class), null, null, null, null, null, + null, null, null, null, null, new MROutputFiles()); + assertTrue("Large shuffle area unusable: " + mgr.memoryLimit, + mgr.memoryLimit > Integer.MAX_VALUE); + final long maxInMemReduce = mgr.getMaxInMemReduceLimit(); + assertTrue("Large in-memory reduce area unusable: " + maxInMemReduce, + maxInMemReduce > Integer.MAX_VALUE); + } }