MAPREDUCE-3721. Fixed a race in shuffle which caused reduces to hang. Contributed by Siddharth Seth.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1236041 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arun Murthy 2012-01-26 05:40:29 +00:00
parent 21c9116309
commit fae75c2d7f
2 changed files with 29 additions and 12 deletions

View File

@ -568,6 +568,9 @@ Release 0.23.1 - Unreleased
CapacityScheduler so that it deducts current-usage per user and not CapacityScheduler so that it deducts current-usage per user and not
per-application. (Arun C Murthy via vinodkv) per-application. (Arun C Murthy via vinodkv)
MAPREDUCE-3721. Fixed a race in shuffle which caused reduces to hang.
(sseth via acmurthy)
Release 0.23.0 - 2011-11-01 Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -92,6 +92,7 @@ public class MergeManager<K, V> {
private final long memoryLimit; private final long memoryLimit;
private long usedMemory; private long usedMemory;
private long commitMemory;
private final long maxSingleShuffleLimit; private final long maxSingleShuffleLimit;
private final int memToMemMergeOutputsThreshold; private final int memToMemMergeOutputsThreshold;
@ -181,6 +182,13 @@ public class MergeManager<K, V> {
"ioSortFactor=" + ioSortFactor + ", " + "ioSortFactor=" + ioSortFactor + ", " +
"memToMemMergeOutputsThreshold=" + memToMemMergeOutputsThreshold); "memToMemMergeOutputsThreshold=" + memToMemMergeOutputsThreshold);
if (this.maxSingleShuffleLimit >= this.mergeThreshold) {
throw new RuntimeException("Invlaid configuration: "
+ "maxSingleShuffleLimit should be less than mergeThreshold"
+ "maxSingleShuffleLimit: " + this.maxSingleShuffleLimit
+ "mergeThreshold: " + this.mergeThreshold);
}
boolean allowMemToMemMerge = boolean allowMemToMemMerge =
jobConf.getBoolean(MRJobConfig.REDUCE_MEMTOMEM_ENABLED, false); jobConf.getBoolean(MRJobConfig.REDUCE_MEMTOMEM_ENABLED, false);
if (allowMemToMemMerge) { if (allowMemToMemMerge) {
@ -245,16 +253,16 @@ public class MergeManager<K, V> {
// all the stalled threads // all the stalled threads
if (usedMemory > memoryLimit) { if (usedMemory > memoryLimit) {
LOG.debug(mapId + ": Stalling shuffle since usedMemory (" + usedMemory + LOG.debug(mapId + ": Stalling shuffle since usedMemory (" + usedMemory
") is greater than memoryLimit (" + memoryLimit + ")"); + ") is greater than memoryLimit (" + memoryLimit + ")." +
" CommitMemory is (" + commitMemory + ")");
return stallShuffle; return stallShuffle;
} }
// Allow the in-memory shuffle to progress // Allow the in-memory shuffle to progress
LOG.debug(mapId + ": Proceeding with shuffle since usedMemory (" + LOG.debug(mapId + ": Proceeding with shuffle since usedMemory ("
usedMemory + + usedMemory + ") is lesser than memoryLimit (" + memoryLimit + ")."
") is lesser than memoryLimit (" + memoryLimit + ")"); + "CommitMemory is (" + commitMemory + ")");
return unconditionalReserve(mapId, requestedSize, true); return unconditionalReserve(mapId, requestedSize, true);
} }
@ -270,18 +278,24 @@ public class MergeManager<K, V> {
} }
synchronized void unreserve(long size) { synchronized void unreserve(long size) {
commitMemory -= size;
usedMemory -= size; usedMemory -= size;
} }
public synchronized void closeInMemoryFile(MapOutput<K,V> mapOutput) { public synchronized void closeInMemoryFile(MapOutput<K,V> mapOutput) {
inMemoryMapOutputs.add(mapOutput); inMemoryMapOutputs.add(mapOutput);
LOG.info("closeInMemoryFile -> map-output of size: " + mapOutput.getSize() LOG.info("closeInMemoryFile -> map-output of size: " + mapOutput.getSize()
+ ", inMemoryMapOutputs.size() -> " + inMemoryMapOutputs.size()); + ", inMemoryMapOutputs.size() -> " + inMemoryMapOutputs.size()
+ ", commitMemory -> " + commitMemory + ", usedMemory ->" + usedMemory);
commitMemory+= mapOutput.getSize();
synchronized (inMemoryMerger) { synchronized (inMemoryMerger) {
if (!inMemoryMerger.isInProgress() && usedMemory >= mergeThreshold) { // Can hang if mergeThreshold is really low.
LOG.info("Starting inMemoryMerger's merge since usedMemory=" + if (!inMemoryMerger.isInProgress() && commitMemory >= mergeThreshold) {
usedMemory + " > mergeThreshold=" + mergeThreshold); LOG.info("Starting inMemoryMerger's merge since commitMemory=" +
commitMemory + " > mergeThreshold=" + mergeThreshold +
". Current usedMemory=" + usedMemory);
inMemoryMapOutputs.addAll(inMemoryMergedMapOutputs); inMemoryMapOutputs.addAll(inMemoryMergedMapOutputs);
inMemoryMergedMapOutputs.clear(); inMemoryMergedMapOutputs.clear();
inMemoryMerger.startMerge(inMemoryMapOutputs); inMemoryMerger.startMerge(inMemoryMapOutputs);