diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index bcebce4862..c98ea508b0 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -597,6 +597,9 @@ Release 2.8.0 - UNRELEASED MAPREDUCE-6503. archive-logs tool should use HADOOP_PREFIX instead of HADOOP_HOME (rkanter) + MAPREDUCE-6302. Preempt reducers after a configurable timeout irrespective + of headroom. (kasha) + Release 2.7.2 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index 74a275300a..bf9b1f8772 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -160,11 +160,13 @@ added to the pending and are ramped up (added to scheduled) based private boolean reduceStarted = false; private float maxReduceRampupLimit = 0; private float maxReducePreemptionLimit = 0; - /** - * after this threshold, if the container request is not allocated, it is - * considered delayed. - */ - private long allocationDelayThresholdMs = 0; + + // Mapper allocation timeout, after which a reducer is forcibly preempted + private long reducerUnconditionalPreemptionDelayMs; + + // Duration to wait before preempting a reducer when there is NO room + private long reducerNoHeadroomPreemptionDelayMs = 0; + private float reduceSlowStart = 0; private int maxRunningMaps = 0; private int maxRunningReduces = 0; @@ -204,7 +206,10 @@ protected void serviceInit(Configuration conf) throws Exception { maxReducePreemptionLimit = conf.getFloat( MRJobConfig.MR_AM_JOB_REDUCE_PREEMPTION_LIMIT, MRJobConfig.DEFAULT_MR_AM_JOB_REDUCE_PREEMPTION_LIMIT); - allocationDelayThresholdMs = conf.getInt( + reducerUnconditionalPreemptionDelayMs = 1000 * conf.getInt( + MRJobConfig.MR_JOB_REDUCER_UNCONDITIONAL_PREEMPT_DELAY_SEC, + MRJobConfig.DEFAULT_MR_JOB_REDUCER_UNCONDITIONAL_PREEMPT_DELAY_SEC); + reducerNoHeadroomPreemptionDelayMs = conf.getInt( MRJobConfig.MR_JOB_REDUCER_PREEMPT_DELAY_SEC, MRJobConfig.DEFAULT_MR_JOB_REDUCER_PREEMPT_DELAY_SEC) * 1000;//sec -> ms maxRunningMaps = conf.getInt(MRJobConfig.JOB_RUNNING_MAP_LIMIT, @@ -472,59 +477,89 @@ void preemptReducesIfNeeded() { if (reduceResourceRequest.equals(Resources.none())) { return; // no reduces } - //check if reduces have taken over the whole cluster and there are - //unassigned maps - if (scheduledRequests.maps.size() > 0) { - Resource resourceLimit = getResourceLimit(); - Resource availableResourceForMap = - Resources.subtract( - resourceLimit, - Resources.multiply(reduceResourceRequest, - assignedRequests.reduces.size() - - assignedRequests.preemptionWaitingReduces.size())); - // availableMemForMap must be sufficient to run at least 1 map - if (ResourceCalculatorUtils.computeAvailableContainers(availableResourceForMap, - mapResourceRequest, getSchedulerResourceTypes()) <= 0) { - // to make sure new containers are given to maps and not reduces - // ramp down all scheduled reduces if any - // (since reduces are scheduled at higher priority than maps) - LOG.info("Ramping down all scheduled reduces:" - + scheduledRequests.reduces.size()); - for (ContainerRequest req : scheduledRequests.reduces.values()) { - pendingReduces.add(req); - } - scheduledRequests.reduces.clear(); - //do further checking to find the number of map requests that were - //hanging around for a while - int hangingMapRequests = getNumOfHangingRequests(scheduledRequests.maps); - if (hangingMapRequests > 0) { - // preempt for making space for at least one map - int preemptionReduceNumForOneMap = - ResourceCalculatorUtils.divideAndCeilContainers(mapResourceRequest, - reduceResourceRequest, getSchedulerResourceTypes()); - int preemptionReduceNumForPreemptionLimit = - ResourceCalculatorUtils.divideAndCeilContainers( - Resources.multiply(resourceLimit, maxReducePreemptionLimit), - reduceResourceRequest, getSchedulerResourceTypes()); - int preemptionReduceNumForAllMaps = - ResourceCalculatorUtils.divideAndCeilContainers( - Resources.multiply(mapResourceRequest, hangingMapRequests), - reduceResourceRequest, getSchedulerResourceTypes()); - int toPreempt = - Math.min(Math.max(preemptionReduceNumForOneMap, - preemptionReduceNumForPreemptionLimit), - preemptionReduceNumForAllMaps); + if (assignedRequests.maps.size() > 0) { + // there are assigned mappers + return; + } - LOG.info("Going to preempt " + toPreempt - + " due to lack of space for maps"); - assignedRequests.preemptReduce(toPreempt); - } + if (scheduledRequests.maps.size() <= 0) { + // there are no pending requests for mappers + return; + } + // At this point: + // we have pending mappers and all assigned resources are taken by reducers + + if (reducerUnconditionalPreemptionDelayMs >= 0) { + // Unconditional preemption is enabled. + // If mappers are pending for longer than the configured threshold, + // preempt reducers irrespective of what the headroom is. + if (preemptReducersForHangingMapRequests( + reducerUnconditionalPreemptionDelayMs)) { + return; } } + + // The pending mappers haven't been waiting for too long. Let us see if + // the headroom can fit a mapper. + Resource availableResourceForMap = getAvailableResources(); + if (ResourceCalculatorUtils.computeAvailableContainers(availableResourceForMap, + mapResourceRequest, getSchedulerResourceTypes()) > 0) { + // the available headroom is enough to run a mapper + return; + } + + // Available headroom is not enough to run mapper. See if we should hold + // off before preempting reducers and preempt if okay. + preemptReducersForHangingMapRequests(reducerNoHeadroomPreemptionDelayMs); } - private int getNumOfHangingRequests(Map requestMap) { + private boolean preemptReducersForHangingMapRequests(long pendingThreshold) { + int hangingMapRequests = getNumHangingRequests( + pendingThreshold, scheduledRequests.maps); + if (hangingMapRequests > 0) { + preemptReducer(hangingMapRequests); + return true; + } + return false; + } + + private void clearAllPendingReduceRequests() { + LOG.info("Ramping down all scheduled reduces:" + + scheduledRequests.reduces.size()); + for (ContainerRequest req : scheduledRequests.reduces.values()) { + pendingReduces.add(req); + } + scheduledRequests.reduces.clear(); + } + + private void preemptReducer(int hangingMapRequests) { + clearAllPendingReduceRequests(); + + // preempt for making space for at least one map + int preemptionReduceNumForOneMap = + ResourceCalculatorUtils.divideAndCeilContainers(mapResourceRequest, + reduceResourceRequest, getSchedulerResourceTypes()); + int preemptionReduceNumForPreemptionLimit = + ResourceCalculatorUtils.divideAndCeilContainers( + Resources.multiply(getResourceLimit(), maxReducePreemptionLimit), + reduceResourceRequest, getSchedulerResourceTypes()); + int preemptionReduceNumForAllMaps = + ResourceCalculatorUtils.divideAndCeilContainers( + Resources.multiply(mapResourceRequest, hangingMapRequests), + reduceResourceRequest, getSchedulerResourceTypes()); + int toPreempt = + Math.min(Math.max(preemptionReduceNumForOneMap, + preemptionReduceNumForPreemptionLimit), + preemptionReduceNumForAllMaps); + + LOG.info("Going to preempt " + toPreempt + + " due to lack of space for maps"); + assignedRequests.preemptReduce(toPreempt); + } + + private int getNumHangingRequests(long allocationDelayThresholdMs, + Map requestMap) { if (allocationDelayThresholdMs <= 0) return requestMap.size(); int hangingRequests = 0; @@ -552,9 +587,6 @@ public void scheduleReduces( // get available resources for this job Resource headRoom = getAvailableResources(); - if (headRoom == null) { - headRoom = Resources.none(); - } LOG.info("Recalculating schedule, headroom=" + headRoom); @@ -681,9 +713,7 @@ private List getResources() throws Exception { applyConcurrentTaskLimits(); // will be null the first time - Resource headRoom = - getAvailableResources() == null ? Resources.none() : - Resources.clone(getAvailableResources()); + Resource headRoom = Resources.clone(getAvailableResources()); AllocateResponse response; /* * If contact with RM is lost, the AM will wait MR_AM_TO_RM_WAIT_INTERVAL_MS @@ -724,9 +754,7 @@ private List getResources() throws Exception { // continue to attempt to contact the RM. throw e; } - Resource newHeadRoom = - getAvailableResources() == null ? Resources.none() - : getAvailableResources(); + Resource newHeadRoom = getAvailableResources(); List newContainers = response.getAllocatedContainers(); // Setting NMTokens if (response.getNMTokens() != null) { @@ -896,9 +924,6 @@ private void handleUpdatedNodes(AllocateResponse response) { @Private public Resource getResourceLimit() { Resource headRoom = getAvailableResources(); - if (headRoom == null) { - headRoom = Resources.none(); - } Resource assignedMapResource = Resources.multiply(mapResourceRequest, assignedRequests.maps.size()); Resource assignedReduceResource = diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java index d612126ae4..a639d5579a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java @@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.yarn.util.resource.Resources; /** @@ -386,7 +387,7 @@ protected void containerFailedOnHost(String hostName) { } protected Resource getAvailableResources() { - return availableResources; + return availableResources == null ? Resources.none() : availableResources; } protected void addContainerReq(ContainerRequest req) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java index e6aebb4bb9..e4421a80d7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java @@ -431,7 +431,7 @@ public void testReducerRampdownDiagnostics() throws Exception { MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, 0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); final MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, - appAttemptId, mockJob); + appAttemptId, mockJob, new SystemClock()); // add resources to scheduler dispatcher.await(); @@ -565,6 +565,69 @@ public void testNonAggressivelyPreemptReducers() throws Exception { assignedRequests.preemptionWaitingReduces.size()); } + @Test(timeout = 30000) + public void testUnconditionalPreemptReducers() throws Exception { + LOG.info("Running testForcePreemptReducers"); + + int forcePreemptThresholdSecs = 2; + Configuration conf = new Configuration(); + conf.setInt(MRJobConfig.MR_JOB_REDUCER_PREEMPT_DELAY_SEC, + 2 * forcePreemptThresholdSecs); + conf.setInt(MRJobConfig.MR_JOB_REDUCER_UNCONDITIONAL_PREEMPT_DELAY_SEC, + forcePreemptThresholdSecs); + + MyResourceManager rm = new MyResourceManager(conf); + rm.start(); + rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(8192, 8)); + DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() + .getDispatcher(); + + // Submit the application + RMApp app = rm.submitApp(1024); + dispatcher.await(); + + MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); + amNodeManager.nodeHeartbeat(true); + dispatcher.await(); + + ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() + .getAppAttemptId(); + rm.sendAMLaunched(appAttemptId); + dispatcher.await(); + + JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); + Job mockJob = mock(Job.class); + when(mockJob.getReport()).thenReturn( + MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, + 0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); + ControlledClock clock = new ControlledClock(null); + clock.setTime(1); + MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, + appAttemptId, mockJob, clock); + allocator.setMapResourceRequest(BuilderUtils.newResource(1024, 1)); + allocator.setReduceResourceRequest(BuilderUtils.newResource(1024, 1)); + RMContainerAllocator.AssignedRequests assignedRequests = + allocator.getAssignedRequests(); + RMContainerAllocator.ScheduledRequests scheduledRequests = + allocator.getScheduledRequests(); + ContainerRequestEvent event1 = + createReq(jobId, 1, 2048, new String[] { "h1" }, false, false); + scheduledRequests.maps.put(mock(TaskAttemptId.class), + new RMContainerRequestor.ContainerRequest(event1, null, clock.getTime())); + assignedRequests.reduces.put(mock(TaskAttemptId.class), + mock(Container.class)); + + clock.setTime(clock.getTime() + 1); + allocator.preemptReducesIfNeeded(); + Assert.assertEquals("The reducer is preeempted too soon", 0, + assignedRequests.preemptionWaitingReduces.size()); + + clock.setTime(clock.getTime() + 1000 * forcePreemptThresholdSecs); + allocator.preemptReducesIfNeeded(); + Assert.assertEquals("The reducer is not preeempted", 1, + assignedRequests.preemptionWaitingReduces.size()); + } + @Test(timeout = 30000) public void testExcessReduceContainerAssign() throws Exception { final Configuration conf = new Configuration(); @@ -590,7 +653,7 @@ public void testExcessReduceContainerAssign() throws Exception { MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, 0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); final MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, - appAttemptId, mockJob); + appAttemptId, mockJob, new SystemClock()); // request to allocate two reduce priority containers final String[] locations = new String[] { host }; @@ -634,7 +697,8 @@ public void testMapReduceAllocationWithNodeLabelExpression() throws Exception { 0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); final MockScheduler mockScheduler = new MockScheduler(appAttemptId); MyContainerAllocator allocator = - new MyContainerAllocator(null, conf, appAttemptId, mockJob) { + new MyContainerAllocator(null, conf, appAttemptId, mockJob, + new SystemClock()) { @Override protected void register() { } @@ -726,7 +790,7 @@ public void testMapReduceScheduling() throws Exception { MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, 0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, - appAttemptId, mockJob); + appAttemptId, mockJob, new SystemClock()); // add resources to scheduler MockNM nodeManager1 = rm.registerNode("h1:1234", 1024); @@ -1629,6 +1693,7 @@ public MyFifoScheduler(RMContext rmContext) { List lastRelease = null; List lastBlacklistAdditions; List lastBlacklistRemovals; + Resource forceResourceLimit = null; // override this to copy the objects otherwise FifoScheduler updates the // numContainers in same objects as kept by RMContainerAllocator @@ -1651,9 +1716,18 @@ public synchronized Allocation allocate( lastRelease = release; lastBlacklistAdditions = blacklistAdditions; lastBlacklistRemovals = blacklistRemovals; - return super.allocate( + Allocation allocation = super.allocate( applicationAttemptId, askCopy, release, blacklistAdditions, blacklistRemovals, increaseRequests, decreaseRequests); + if (forceResourceLimit != null) { + // Test wants to force the non-default resource limit + allocation.setResourceLimit(forceResourceLimit); + } + return allocation; + } + + public void forceResourceLimit(Resource resource) { + this.forceResourceLimit = resource; } } @@ -2677,7 +2751,7 @@ public void testConcurrentTaskLimits() throws Exception { 0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); final MockScheduler mockScheduler = new MockScheduler(appAttemptId); MyContainerAllocator allocator = new MyContainerAllocator(null, conf, - appAttemptId, mockJob) { + appAttemptId, mockJob, new SystemClock()) { @Override protected void register() { } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index 59b887dbf3..e321817f1c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -712,10 +712,18 @@ public interface MRJobConfig { 10 * 1000l; /** - * The threshold in terms of seconds after which an unsatisfied mapper request - * triggers reducer preemption to free space. Default 0 implies that the reduces - * should be preempted immediately after allocation if there is currently no - * room for newly allocated mappers. + * Duration to wait before forcibly preempting a reducer to allow + * allocating new mappers, even when YARN reports positive headroom. + */ + public static final String MR_JOB_REDUCER_UNCONDITIONAL_PREEMPT_DELAY_SEC = + "mapreduce.job.reducer.unconditional-preempt.delay.sec"; + + public static final int + DEFAULT_MR_JOB_REDUCER_UNCONDITIONAL_PREEMPT_DELAY_SEC = 5 * 60; + + /** + * Duration to wait before preempting a reducer, when there is no headroom + * to allocate new mappers. */ public static final String MR_JOB_REDUCER_PREEMPT_DELAY_SEC = "mapreduce.job.reducer.preempt.delay.sec"; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index 6d205c5861..909f5c54a5 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -91,17 +91,28 @@ - - mapreduce.job.reducer.preempt.delay.sec - 0 - The threshold in terms of seconds after which an unsatisfied mapper - request triggers reducer preemption to free space. Default 0 implies that the - reduces should be preempted immediately after allocation if there is currently no - room for newly allocated mappers. - - + + mapreduce.job.reducer.preempt.delay.sec + 0 + The threshold (in seconds) after which an unsatisfied + mapper request triggers reducer preemption when there is no anticipated + headroom. If set to 0 or a negative value, the reducer is preempted as + soon as lack of headroom is detected. Default is 0. + + - + + mapreduce.job.reducer.unconditional-preempt.delay.sec + 300 + The threshold (in seconds) after which an unsatisfied + mapper request triggers a forced reducer preemption irrespective of the + anticipated headroom. By default, it is set to 5 mins. Setting it to 0 + leads to immediate reducer preemption. Setting to -1 disables this + preemption altogether. + + + + mapreduce.job.max.split.locations 10 The max number of block locations to store for each split for diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java index af6caad700..b81da2bbd4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.Set; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NMToken; @@ -29,13 +30,13 @@ public class Allocation { final List containers; - final Resource resourceLimit; final Set strictContainers; final Set fungibleContainers; final List fungibleResources; final List nmTokens; final List increasedContainers; final List decreasedContainers; + private Resource resourceLimit; public Allocation(List containers, Resource resourceLimit, @@ -97,4 +98,9 @@ public List getIncreasedContainers() { public List getDecreasedContainers() { return decreasedContainers; } + + @VisibleForTesting + public void setResourceLimit(Resource resource) { + this.resourceLimit = resource; + } }