diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java index 1673ff80a7..09fe0cbbfc 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java @@ -99,7 +99,9 @@ public class MergeManagerImpl implements MergeManager { private long usedMemory; private long commitMemory; - private final long maxSingleShuffleLimit; + + @VisibleForTesting + final long maxSingleShuffleLimit; private final int memToMemMergeOutputsThreshold; private final long mergeThreshold; @@ -187,10 +189,16 @@ public MergeManagerImpl(TaskAttemptID reduceId, JobConf jobConf, usedMemory = 0L; commitMemory = 0L; - this.maxSingleShuffleLimit = - (long)(memoryLimit * singleShuffleMemoryLimitPercent); - this.memToMemMergeOutputsThreshold = - jobConf.getInt(MRJobConfig.REDUCE_MEMTOMEM_THRESHOLD, ioSortFactor); + long maxSingleShuffleLimitConfiged = + (long)(memoryLimit * singleShuffleMemoryLimitPercent); + if(maxSingleShuffleLimitConfiged > Integer.MAX_VALUE) { + maxSingleShuffleLimitConfiged = Integer.MAX_VALUE; + LOG.info("The max number of bytes for a single in-memory shuffle cannot" + + " be larger than Integer.MAX_VALUE. Setting it to Integer.MAX_VALUE"); + } + this.maxSingleShuffleLimit = maxSingleShuffleLimitConfiged; + this.memToMemMergeOutputsThreshold = + jobConf.getInt(MRJobConfig.REDUCE_MEMTOMEM_THRESHOLD, ioSortFactor); this.mergeThreshold = (long)(this.memoryLimit * jobConf.getFloat( MRJobConfig.SHUFFLE_MERGE_PERCENT, @@ -249,17 +257,13 @@ ExceptionReporter getExceptionReporter() { public void waitForResource() throws InterruptedException { inMemoryMerger.waitForMerge(); } - - private boolean canShuffleToMemory(long requestedSize) { - return (requestedSize < maxSingleShuffleLimit); - } - + @Override public synchronized MapOutput reserve(TaskAttemptID mapId, long requestedSize, int fetcher ) throws IOException { - if (!canShuffleToMemory(requestedSize)) { + if (requestedSize > maxSingleShuffleLimit) { LOG.info(mapId + ": Shuffling to disk since " + requestedSize + " is greater than maxSingleShuffleLimit (" + maxSingleShuffleLimit + ")"); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java index 1c0d25b09a..325d2f9a5c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java @@ -289,22 +289,29 @@ null, conf, mock(LocalFileSystem.class), null, null, null, null, null, final long maxInMemReduce = mgr.getMaxInMemReduceLimit(); assertTrue("Large in-memory reduce area unusable: " + maxInMemReduce, maxInMemReduce > Integer.MAX_VALUE); + assertEquals("maxSingleShuffleLimit to be capped at Integer.MAX_VALUE", + Integer.MAX_VALUE, mgr.maxSingleShuffleLimit); + verifyReservedMapOutputType(mgr, 10L, "MEMORY"); + verifyReservedMapOutputType(mgr, 1L + Integer.MAX_VALUE, "DISK"); + } + + private void verifyReservedMapOutputType(MergeManagerImpl mgr, + long size, String expectedShuffleMode) throws IOException { + final TaskAttemptID mapId = TaskAttemptID.forName("attempt_0_1_m_1_1"); + final MapOutput mapOutput = mgr.reserve(mapId, size, 1); + assertEquals("Shuffled bytes: " + size, expectedShuffleMode, + mapOutput.getDescription()); + mgr.unreserve(size); } @Test public void testZeroShuffleMemoryLimitPercent() throws Exception { final JobConf jobConf = new JobConf(); jobConf.setFloat(MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT, 0.0f); - final MergeManager mgr = + final MergeManagerImpl mgr = new MergeManagerImpl<>(null, jobConf, mock(LocalFileSystem.class), null, null, null, null, null, null, null, null, null, null, new MROutputFiles()); - final long mapOutputSize = 10; - final int fetcher = 1; - final MapOutput mapOutput = mgr.reserve( - TaskAttemptID.forName("attempt_0_1_m_1_1"), - mapOutputSize, fetcher); - assertEquals("Tiny map outputs should be shuffled to disk", "DISK", - mapOutput.getDescription()); + verifyReservedMapOutputType(mgr, 10L, "DISK"); } }