MAPREDUCE-6724. Single shuffle to memory must not exceed Integer#MAX_VALUE. (Haibo Chen via gera)

This commit is contained in:
Gera Shegalov 2016-07-28 14:37:03 -07:00
parent c4463f2ef2
commit 6890d5b472
2 changed files with 30 additions and 19 deletions

View File

@ -99,7 +99,9 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
private long usedMemory; private long usedMemory;
private long commitMemory; private long commitMemory;
private final long maxSingleShuffleLimit;
@VisibleForTesting
final long maxSingleShuffleLimit;
private final int memToMemMergeOutputsThreshold; private final int memToMemMergeOutputsThreshold;
private final long mergeThreshold; private final long mergeThreshold;
@ -187,10 +189,16 @@ public MergeManagerImpl(TaskAttemptID reduceId, JobConf jobConf,
usedMemory = 0L; usedMemory = 0L;
commitMemory = 0L; commitMemory = 0L;
this.maxSingleShuffleLimit = long maxSingleShuffleLimitConfiged =
(long)(memoryLimit * singleShuffleMemoryLimitPercent); (long)(memoryLimit * singleShuffleMemoryLimitPercent);
this.memToMemMergeOutputsThreshold = if(maxSingleShuffleLimitConfiged > Integer.MAX_VALUE) {
jobConf.getInt(MRJobConfig.REDUCE_MEMTOMEM_THRESHOLD, ioSortFactor); maxSingleShuffleLimitConfiged = Integer.MAX_VALUE;
LOG.info("The max number of bytes for a single in-memory shuffle cannot" +
" be larger than Integer.MAX_VALUE. Setting it to Integer.MAX_VALUE");
}
this.maxSingleShuffleLimit = maxSingleShuffleLimitConfiged;
this.memToMemMergeOutputsThreshold =
jobConf.getInt(MRJobConfig.REDUCE_MEMTOMEM_THRESHOLD, ioSortFactor);
this.mergeThreshold = (long)(this.memoryLimit * this.mergeThreshold = (long)(this.memoryLimit *
jobConf.getFloat( jobConf.getFloat(
MRJobConfig.SHUFFLE_MERGE_PERCENT, MRJobConfig.SHUFFLE_MERGE_PERCENT,
@ -249,17 +257,13 @@ ExceptionReporter getExceptionReporter() {
public void waitForResource() throws InterruptedException { public void waitForResource() throws InterruptedException {
inMemoryMerger.waitForMerge(); inMemoryMerger.waitForMerge();
} }
private boolean canShuffleToMemory(long requestedSize) {
return (requestedSize < maxSingleShuffleLimit);
}
@Override @Override
public synchronized MapOutput<K,V> reserve(TaskAttemptID mapId, public synchronized MapOutput<K,V> reserve(TaskAttemptID mapId,
long requestedSize, long requestedSize,
int fetcher int fetcher
) throws IOException { ) throws IOException {
if (!canShuffleToMemory(requestedSize)) { if (requestedSize > maxSingleShuffleLimit) {
LOG.info(mapId + ": Shuffling to disk since " + requestedSize + LOG.info(mapId + ": Shuffling to disk since " + requestedSize +
" is greater than maxSingleShuffleLimit (" + " is greater than maxSingleShuffleLimit (" +
maxSingleShuffleLimit + ")"); maxSingleShuffleLimit + ")");

View File

@ -289,22 +289,29 @@ null, conf, mock(LocalFileSystem.class), null, null, null, null, null,
final long maxInMemReduce = mgr.getMaxInMemReduceLimit(); final long maxInMemReduce = mgr.getMaxInMemReduceLimit();
assertTrue("Large in-memory reduce area unusable: " + maxInMemReduce, assertTrue("Large in-memory reduce area unusable: " + maxInMemReduce,
maxInMemReduce > Integer.MAX_VALUE); maxInMemReduce > Integer.MAX_VALUE);
assertEquals("maxSingleShuffleLimit to be capped at Integer.MAX_VALUE",
Integer.MAX_VALUE, mgr.maxSingleShuffleLimit);
verifyReservedMapOutputType(mgr, 10L, "MEMORY");
verifyReservedMapOutputType(mgr, 1L + Integer.MAX_VALUE, "DISK");
}
private void verifyReservedMapOutputType(MergeManagerImpl<Text, Text> mgr,
long size, String expectedShuffleMode) throws IOException {
final TaskAttemptID mapId = TaskAttemptID.forName("attempt_0_1_m_1_1");
final MapOutput<Text, Text> mapOutput = mgr.reserve(mapId, size, 1);
assertEquals("Shuffled bytes: " + size, expectedShuffleMode,
mapOutput.getDescription());
mgr.unreserve(size);
} }
@Test @Test
public void testZeroShuffleMemoryLimitPercent() throws Exception { public void testZeroShuffleMemoryLimitPercent() throws Exception {
final JobConf jobConf = new JobConf(); final JobConf jobConf = new JobConf();
jobConf.setFloat(MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT, 0.0f); jobConf.setFloat(MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT, 0.0f);
final MergeManager<Text, Text> mgr = final MergeManagerImpl<Text, Text> mgr =
new MergeManagerImpl<>(null, jobConf, mock(LocalFileSystem.class), new MergeManagerImpl<>(null, jobConf, mock(LocalFileSystem.class),
null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null,
new MROutputFiles()); new MROutputFiles());
final long mapOutputSize = 10; verifyReservedMapOutputType(mgr, 10L, "DISK");
final int fetcher = 1;
final MapOutput<Text, Text> mapOutput = mgr.reserve(
TaskAttemptID.forName("attempt_0_1_m_1_1"),
mapOutputSize, fetcher);
assertEquals("Tiny map outputs should be shuffled to disk", "DISK",
mapOutput.getDescription());
} }
} }