diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 463c336266..28627d9286 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -389,6 +389,9 @@ Release 0.23.1 - Unreleased MAPREDUCE-3608. Fixed compile issue with MAPREDUCE-3522. (mahadev via acmurthy) + MAPREDUCE-3490. Fixed MapReduce AM to count failed maps also towards Reduce + ramp up. (Sharad Agarwal and Arun C Murthy via vinodkv) + Release 0.23.0 - 2011-11-01 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index d55dc2981f..74e2c1b2a8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -33,6 +33,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.JobCounter; import org.apache.hadoop.mapreduce.JobID; @@ -122,8 +123,6 @@ added to the pending and are ramped up (added to scheduled) based private boolean recalculateReduceSchedule = false; private int mapResourceReqt;//memory private int reduceResourceReqt;//memory - private int completedMaps = 0; - private int completedReduces = 0; private boolean reduceStarted = false; private float maxReduceRampupLimit = 0; @@ -169,7 +168,13 @@ protected synchronized void heartbeat() throws Exception { if (recalculateReduceSchedule) { preemptReducesIfNeeded(); - scheduleReduces(); + scheduleReduces( + getJob().getTotalMaps(), getJob().getCompletedMaps(), + scheduledRequests.maps.size(), scheduledRequests.reduces.size(), + assignedRequests.maps.size(), assignedRequests.reduces.size(), + mapResourceReqt, reduceResourceReqt, + pendingReduces.size(), + maxReduceRampupLimit, reduceSlowStart); recalculateReduceSchedule = false; } } @@ -180,6 +185,14 @@ public void stop() { LOG.info("Final Stats: " + getStat()); } + public boolean getIsReduceStarted() { + return reduceStarted; + } + + public void setIsReduceStarted(boolean reduceStarted) { + this.reduceStarted = reduceStarted; + } + @SuppressWarnings("unchecked") @Override public synchronized void handle(ContainerAllocatorEvent event) { @@ -319,10 +332,17 @@ private void preemptReducesIfNeeded() { } } } - - private void scheduleReduces() { + + @Private + public void scheduleReduces( + int totalMaps, int completedMaps, + int scheduledMaps, int scheduledReduces, + int assignedMaps, int assignedReduces, + int mapResourceReqt, int reduceResourceReqt, + int numPendingReduces, + float maxReduceRampupLimit, float reduceSlowStart) { - if (pendingReduces.size() == 0) { + if (numPendingReduces == 0) { return; } @@ -330,29 +350,25 @@ private void scheduleReduces() { //if all maps are assigned, then ramp up all reduces irrespective of the //headroom - if (scheduledRequests.maps.size() == 0 && pendingReduces.size() > 0) { - LOG.info("All maps assigned. Ramping up all remaining reduces:" + pendingReduces.size()); - for (ContainerRequest req : pendingReduces) { - scheduledRequests.addReduce(req); - } - pendingReduces.clear(); + if (scheduledMaps == 0 && numPendingReduces > 0) { + LOG.info("All maps assigned. " + + "Ramping up all remaining reduces:" + numPendingReduces); + scheduleAllReduces(); return; } - - int totalMaps = assignedRequests.maps.size() + completedMaps + scheduledRequests.maps.size(); - //check for slow start - if (!reduceStarted) {//not set yet + if (!getIsReduceStarted()) {//not set yet int completedMapsForReduceSlowstart = (int)Math.ceil(reduceSlowStart * totalMaps); if(completedMaps < completedMapsForReduceSlowstart) { LOG.info("Reduce slow start threshold not met. " + - "completedMapsForReduceSlowstart " + completedMapsForReduceSlowstart); + "completedMapsForReduceSlowstart " + + completedMapsForReduceSlowstart); return; } else { LOG.info("Reduce slow start threshold reached. Scheduling reduces."); - reduceStarted = true; + setIsReduceStarted(true); } } @@ -363,20 +379,21 @@ private void scheduleReduces() { completedMapPercent = 1; } - int netScheduledMapMem = scheduledRequests.maps.size() * mapResourceReqt - + assignedRequests.maps.size() * mapResourceReqt; + int netScheduledMapMem = + (scheduledMaps + assignedMaps) * mapResourceReqt; - int netScheduledReduceMem = scheduledRequests.reduces.size() - * reduceResourceReqt + assignedRequests.reduces.size() - * reduceResourceReqt; + int netScheduledReduceMem = + (scheduledReduces + assignedReduces) * reduceResourceReqt; int finalMapMemLimit = 0; int finalReduceMemLimit = 0; // ramp up the reduces based on completed map percentage int totalMemLimit = getMemLimit(); - int idealReduceMemLimit = Math.min((int)(completedMapPercent * totalMemLimit), - (int) (maxReduceRampupLimit * totalMemLimit)); + int idealReduceMemLimit = + Math.min( + (int)(completedMapPercent * totalMemLimit), + (int) (maxReduceRampupLimit * totalMemLimit)); int idealMapMemLimit = totalMemLimit - idealReduceMemLimit; // check if there aren't enough maps scheduled, give the free map capacity @@ -397,29 +414,46 @@ private void scheduleReduces() { " netScheduledMapMem:" + netScheduledMapMem + " netScheduledReduceMem:" + netScheduledReduceMem); - int rampUp = (finalReduceMemLimit - netScheduledReduceMem) - / reduceResourceReqt; + int rampUp = + (finalReduceMemLimit - netScheduledReduceMem) / reduceResourceReqt; if (rampUp > 0) { - rampUp = Math.min(rampUp, pendingReduces.size()); + rampUp = Math.min(rampUp, numPendingReduces); LOG.info("Ramping up " + rampUp); - //more reduce to be scheduled - for (int i = 0; i < rampUp; i++) { - ContainerRequest request = pendingReduces.removeFirst(); - scheduledRequests.addReduce(request); - } + rampUpReduces(rampUp); } else if (rampUp < 0){ int rampDown = -1 * rampUp; - rampDown = Math.min(rampDown, scheduledRequests.reduces.size()); + rampDown = Math.min(rampDown, scheduledReduces); LOG.info("Ramping down " + rampDown); - //remove from the scheduled and move back to pending - for (int i = 0; i < rampDown; i++) { - ContainerRequest request = scheduledRequests.removeReduce(); - pendingReduces.add(request); - } + rampDownReduces(rampDown); } } + private void scheduleAllReduces() { + for (ContainerRequest req : pendingReduces) { + scheduledRequests.addReduce(req); + } + pendingReduces.clear(); + } + + @Private + public void rampUpReduces(int rampUp) { + //more reduce to be scheduled + for (int i = 0; i < rampUp; i++) { + ContainerRequest request = pendingReduces.removeFirst(); + scheduledRequests.addReduce(request); + } + } + + @Private + public void rampDownReduces(int rampDown) { + //remove from the scheduled and move back to pending + for (int i = 0; i < rampDown; i++) { + ContainerRequest request = scheduledRequests.removeReduce(); + pendingReduces.add(request); + } + } + /** * Synchronized to avoid findbugs warnings */ @@ -429,8 +463,8 @@ private synchronized String getStat() { " ScheduledReduces:" + scheduledRequests.reduces.size() + " AssignedMaps:" + assignedRequests.maps.size() + " AssignedReduces:" + assignedRequests.reduces.size() + - " completedMaps:" + completedMaps + - " completedReduces:" + completedReduces + + " completedMaps:" + getJob().getCompletedMaps() + + " completedReduces:" + getJob().getCompletedReduces() + " containersAllocated:" + containersAllocated + " containersReleased:" + containersReleased + " hostLocalAssigned:" + hostLocalAssigned + @@ -497,11 +531,7 @@ private List getResources() throws Exception { + cont.getContainerId()); } else { assignedRequests.remove(attemptID); - if (attemptID.getTaskId().getTaskType().equals(TaskType.MAP)) { - completedMaps++; - } else { - completedReduces++; - } + // send the container completed event to Task attempt eventHandler.handle(new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_CONTAINER_COMPLETED)); @@ -514,7 +544,8 @@ private List getResources() throws Exception { return newContainers; } - private int getMemLimit() { + @Private + public int getMemLimit() { int headRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0; return headRoom + assignedRequests.maps.size() * mapResourceReqt + assignedRequests.reduces.size() * reduceResourceReqt; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java index de3909ea42..a4b84b2b53 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java @@ -19,8 +19,7 @@ package org.apache.hadoop.mapreduce.v2.app; import static org.mockito.Matchers.isA; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; import java.io.IOException; import java.util.ArrayList; @@ -1218,6 +1217,70 @@ protected void startAllocatorThread() { } + @Test + public void testReduceScheduling() throws Exception { + int totalMaps = 10; + int succeededMaps = 1; + int scheduledMaps = 10; + int scheduledReduces = 0; + int assignedMaps = 2; + int assignedReduces = 0; + int mapResourceReqt = 1024; + int reduceResourceReqt = 2*1024; + int numPendingReduces = 4; + float maxReduceRampupLimit = 0.5f; + float reduceSlowStart = 0.2f; + + RMContainerAllocator allocator = mock(RMContainerAllocator.class); + doCallRealMethod().when(allocator). + scheduleReduces(anyInt(), anyInt(), anyInt(), anyInt(), anyInt(), + anyInt(), anyInt(), anyInt(), anyInt(), anyFloat(), anyFloat()); + + // Test slow-start + allocator.scheduleReduces( + totalMaps, succeededMaps, + scheduledMaps, scheduledReduces, + assignedMaps, assignedReduces, + mapResourceReqt, reduceResourceReqt, + numPendingReduces, + maxReduceRampupLimit, reduceSlowStart); + verify(allocator, never()).setIsReduceStarted(true); + + succeededMaps = 3; + allocator.scheduleReduces( + totalMaps, succeededMaps, + scheduledMaps, scheduledReduces, + assignedMaps, assignedReduces, + mapResourceReqt, reduceResourceReqt, + numPendingReduces, + maxReduceRampupLimit, reduceSlowStart); + verify(allocator, times(1)).setIsReduceStarted(true); + + // Test reduce ramp-up + doReturn(100 * 1024).when(allocator).getMemLimit(); + allocator.scheduleReduces( + totalMaps, succeededMaps, + scheduledMaps, scheduledReduces, + assignedMaps, assignedReduces, + mapResourceReqt, reduceResourceReqt, + numPendingReduces, + maxReduceRampupLimit, reduceSlowStart); + verify(allocator).rampUpReduces(anyInt()); + verify(allocator, never()).rampDownReduces(anyInt()); + + // Test reduce ramp-down + scheduledReduces = 3; + doReturn(10 * 1024).when(allocator).getMemLimit(); + allocator.scheduleReduces( + totalMaps, succeededMaps, + scheduledMaps, scheduledReduces, + assignedMaps, assignedReduces, + mapResourceReqt, reduceResourceReqt, + numPendingReduces, + maxReduceRampupLimit, reduceSlowStart); + verify(allocator).rampDownReduces(anyInt()); + } + public static void main(String[] args) throws Exception { TestRMContainerAllocator t = new TestRMContainerAllocator(); t.testSimple();