MAPREDUCE-6689. MapReduce job can infinitely increase number of reducer resource requests. Contributed by Wangda Tan

This commit is contained in:
Jason Lowe 2016-05-06 22:25:47 +00:00
parent 1c5bbf6499
commit c9bb96fa81
2 changed files with 147 additions and 16 deletions

View File

@ -281,14 +281,18 @@ protected synchronized void heartbeat() throws Exception {
} }
if (recalculateReduceSchedule) { if (recalculateReduceSchedule) {
preemptReducesIfNeeded(); boolean reducerPreempted = preemptReducesIfNeeded();
scheduleReduces(
getJob().getTotalMaps(), completedMaps, if (!reducerPreempted) {
scheduledRequests.maps.size(), scheduledRequests.reduces.size(), // Only schedule new reducers if no reducer preemption happens for
assignedRequests.maps.size(), assignedRequests.reduces.size(), // this heartbeat
mapResourceRequest, reduceResourceRequest, scheduleReduces(getJob().getTotalMaps(), completedMaps,
pendingReduces.size(), scheduledRequests.maps.size(), scheduledRequests.reduces.size(),
maxReduceRampupLimit, reduceSlowStart); assignedRequests.maps.size(), assignedRequests.reduces.size(),
mapResourceRequest, reduceResourceRequest, pendingReduces.size(),
maxReduceRampupLimit, reduceSlowStart);
}
recalculateReduceSchedule = false; recalculateReduceSchedule = false;
} }
@ -475,30 +479,30 @@ synchronized void setMapResourceRequest(Resource res) {
@Private @Private
@VisibleForTesting @VisibleForTesting
void preemptReducesIfNeeded() { boolean preemptReducesIfNeeded() {
if (reduceResourceRequest.equals(Resources.none())) { if (reduceResourceRequest.equals(Resources.none())) {
return; // no reduces return false; // no reduces
} }
if (assignedRequests.maps.size() > 0) { if (assignedRequests.maps.size() > 0) {
// there are assigned mappers // there are assigned mappers
return; return false;
} }
if (scheduledRequests.maps.size() <= 0) { if (scheduledRequests.maps.size() <= 0) {
// there are no pending requests for mappers // there are no pending requests for mappers
return; return false;
} }
// At this point: // At this point:
// we have pending mappers and all assigned resources are taken by reducers // we have pending mappers and all assigned resources are taken by reducers
if (reducerUnconditionalPreemptionDelayMs >= 0) { if (reducerUnconditionalPreemptionDelayMs >= 0) {
// Unconditional preemption is enabled. // Unconditional preemption is enabled.
// If mappers are pending for longer than the configured threshold, // If mappers are pending for longer than the configured threshold,
// preempt reducers irrespective of what the headroom is. // preempt reducers irrespective of what the headroom is.
if (preemptReducersForHangingMapRequests( if (preemptReducersForHangingMapRequests(
reducerUnconditionalPreemptionDelayMs)) { reducerUnconditionalPreemptionDelayMs)) {
return; return true;
} }
} }
@ -508,12 +512,12 @@ void preemptReducesIfNeeded() {
if (ResourceCalculatorUtils.computeAvailableContainers(availableResourceForMap, if (ResourceCalculatorUtils.computeAvailableContainers(availableResourceForMap,
mapResourceRequest, getSchedulerResourceTypes()) > 0) { mapResourceRequest, getSchedulerResourceTypes()) > 0) {
// the available headroom is enough to run a mapper // the available headroom is enough to run a mapper
return; return false;
} }
// Available headroom is not enough to run mapper. See if we should hold // Available headroom is not enough to run mapper. See if we should hold
// off before preempting reducers and preempt if okay. // off before preempting reducers and preempt if okay.
preemptReducersForHangingMapRequests(reducerNoHeadroomPreemptionDelayMs); return preemptReducersForHangingMapRequests(reducerNoHeadroomPreemptionDelayMs);
} }
private boolean preemptReducersForHangingMapRequests(long pendingThreshold) { private boolean preemptReducersForHangingMapRequests(long pendingThreshold) {

View File

@ -3016,6 +3016,133 @@ public void testUpdateAskOnRampDownAllReduces() throws Exception {
} }
} }
@Test
public void testAvoidAskMoreReducersWhenReducerPreemptionIsRequired()
throws Exception {
LOG.info("Running testAvoidAskMoreReducersWhenReducerPreemptionIsRequired");
Configuration conf = new Configuration();
MyResourceManager rm = new MyResourceManager(conf);
rm.start();
DrainDispatcher dispatcher =
(DrainDispatcher) rm.getRMContext().getDispatcher();
// Submit the application
RMApp app = rm.submitApp(1024);
dispatcher.await();
MockNM amNodeManager = rm.registerNode("amNM:1234", 1260);
amNodeManager.nodeHeartbeat(true);
dispatcher.await();
ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
.getAppAttemptId();
rm.sendAMLaunched(appAttemptId);
dispatcher.await();
JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
Job mockJob = mock(Job.class);
when(mockJob.getReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
appAttemptId, mockJob);
// Use a controlled clock to advance time for test.
ControlledClock clock = (ControlledClock)allocator.getContext().getClock();
clock.setTime(System.currentTimeMillis());
// Register nodes to RM.
MockNM nodeManager = rm.registerNode("h1:1234", 1024);
dispatcher.await();
// Request 2 maps and 1 reducer(sone on nodes which are not registered).
ContainerRequestEvent event1 =
createReq(jobId, 1, 1024, new String[] { "h1" });
allocator.sendRequest(event1);
ContainerRequestEvent event2 =
createReq(jobId, 2, 1024, new String[] { "h2" });
allocator.sendRequest(event2);
ContainerRequestEvent event3 =
createReq(jobId, 3, 1024, new String[] { "h2" }, false, true);
allocator.sendRequest(event3);
// This will tell the scheduler about the requests but there will be no
// allocations as nodes are not added.
allocator.schedule();
dispatcher.await();
// Advance clock so that maps can be considered as hanging.
clock.setTime(System.currentTimeMillis() + 500000L);
// Request for another reducer on h3 which has not registered.
ContainerRequestEvent event4 =
createReq(jobId, 4, 1024, new String[] { "h3" }, false, true);
allocator.sendRequest(event4);
allocator.schedule();
dispatcher.await();
// Update resources in scheduler through node heartbeat from h1.
nodeManager.nodeHeartbeat(true);
dispatcher.await();
rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(1024, 1));
allocator.schedule();
dispatcher.await();
// One map is assigned.
Assert.assertEquals(1, allocator.getAssignedRequests().maps.size());
// Send deallocate request for map so that no maps are assigned after this.
ContainerAllocatorEvent deallocate = createDeallocateEvent(jobId, 1, false);
allocator.sendDeallocate(deallocate);
// Now one reducer should be scheduled and one should be pending.
Assert.assertEquals(1, allocator.getScheduledRequests().reduces.size());
Assert.assertEquals(1, allocator.getNumOfPendingReduces());
// No map should be assigned and one should be scheduled.
Assert.assertEquals(1, allocator.getScheduledRequests().maps.size());
Assert.assertEquals(0, allocator.getAssignedRequests().maps.size());
Assert.assertEquals(6, allocator.getAsk().size());
for (ResourceRequest req : allocator.getAsk()) {
boolean isReduce =
req.getPriority().equals(RMContainerAllocator.PRIORITY_REDUCE);
if (isReduce) {
// 1 reducer each asked on h2, * and default-rack
Assert.assertTrue((req.getResourceName().equals("*") ||
req.getResourceName().equals("/default-rack") ||
req.getResourceName().equals("h2")) && req.getNumContainers() == 1);
} else { //map
// 0 mappers asked on h1 and 1 each on * and default-rack
Assert.assertTrue(((req.getResourceName().equals("*") ||
req.getResourceName().equals("/default-rack")) &&
req.getNumContainers() == 1) || (req.getResourceName().equals("h1")
&& req.getNumContainers() == 0));
}
}
clock.setTime(System.currentTimeMillis() + 500000L + 10 * 60 * 1000);
// On next allocate request to scheduler, headroom reported will be 2048.
rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(2048, 0));
allocator.schedule();
dispatcher.await();
// After allocate response from scheduler, all scheduled reduces are ramped
// down and move to pending. 3 asks are also updated with 0 containers to
// indicate ramping down of reduces to scheduler.
Assert.assertEquals(0, allocator.getScheduledRequests().reduces.size());
Assert.assertEquals(2, allocator.getNumOfPendingReduces());
Assert.assertEquals(3, allocator.getAsk().size());
for (ResourceRequest req : allocator.getAsk()) {
Assert.assertEquals(
RMContainerAllocator.PRIORITY_REDUCE, req.getPriority());
Assert.assertTrue(req.getResourceName().equals("*") ||
req.getResourceName().equals("/default-rack") ||
req.getResourceName().equals("h2"));
Assert.assertEquals(Resource.newInstance(1024, 1), req.getCapability());
Assert.assertEquals(0, req.getNumContainers());
}
}
private static class MockScheduler implements ApplicationMasterProtocol { private static class MockScheduler implements ApplicationMasterProtocol {
ApplicationAttemptId attemptId; ApplicationAttemptId attemptId;
long nextContainerId = 10; long nextContainerId = 10;