MAPREDUCE-6541. Exclude scheduled reducer memory when calculating available mapper slots from headroom to avoid deadlock. Contributed by Varun Saxena

This commit is contained in:
Naganarasimha 2016-10-27 18:03:13 +05:30
parent 8bd6d77a41
commit 060558c6f2
2 changed files with 137 additions and 9 deletions

View File

@ -338,6 +338,12 @@ ScheduledRequests getScheduledRequests() {
return scheduledRequests;
}
@Private
@VisibleForTesting
int getNumOfPendingReduces() {
return pendingReduces.size();
}
public boolean getIsReduceStarted() {
return reduceStarted;
}
@ -521,15 +527,20 @@ boolean preemptReducesIfNeeded() {
}
// The pending mappers haven't been waiting for too long. Let us see if
// the headroom can fit a mapper.
Resource availableResourceForMap = getAvailableResources();
// there are enough resources for a mapper to run. This is calculated by
// excluding scheduled reducers from headroom and comparing it against
// resources required to run one mapper.
Resource scheduledReducesResource = Resources.multiply(
reduceResourceRequest, scheduledRequests.reduces.size());
Resource availableResourceForMap =
Resources.subtract(getAvailableResources(), scheduledReducesResource);
if (ResourceCalculatorUtils.computeAvailableContainers(availableResourceForMap,
mapResourceRequest, getSchedulerResourceTypes()) > 0) {
// the available headroom is enough to run a mapper
// Enough room to run a mapper
return false;
}
// Available headroom is not enough to run mapper. See if we should hold
// Available resources are not enough to run mapper. See if we should hold
// off before preempting reducers and preempt if okay.
return preemptReducersForHangingMapRequests(reducerNoHeadroomPreemptionDelayMs);
}
@ -990,11 +1001,6 @@ public Resource getResourceLimit() {
Resources.add(assignedMapResource, assignedReduceResource));
}
@VisibleForTesting
public int getNumOfPendingReduces() {
return pendingReduces.size();
}
@Private
@VisibleForTesting
class ScheduledRequests {

View File

@ -3190,6 +3190,128 @@ public void testAvoidAskMoreReducersWhenReducerPreemptionIsRequired()
}
}
/**
* Tests whether scheduled reducers are excluded from headroom while
* calculating headroom.
*/
@Test
public void testExcludeSchedReducesFromHeadroom() throws Exception {
LOG.info("Running testExcludeSchedReducesFromHeadroom");
Configuration conf = new Configuration();
conf.setInt(MRJobConfig.MR_JOB_REDUCER_UNCONDITIONAL_PREEMPT_DELAY_SEC, -1);
MyResourceManager rm = new MyResourceManager(conf);
rm.start();
DrainDispatcher dispatcher =
(DrainDispatcher) rm.getRMContext().getDispatcher();
// Submit the application
RMApp app = rm.submitApp(1024);
dispatcher.await();
MockNM amNodeManager = rm.registerNode("amNM:1234", 1260);
amNodeManager.nodeHeartbeat(true);
dispatcher.await();
ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
.getAppAttemptId();
rm.sendAMLaunched(appAttemptId);
dispatcher.await();
JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
Job mockJob = mock(Job.class);
when(mockJob.getReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
Task mockTask = mock(Task.class);
TaskAttempt mockTaskAttempt = mock(TaskAttempt.class);
when(mockJob.getTask((TaskId)any())).thenReturn(mockTask);
when(mockTask.getAttempt((TaskAttemptId)any())).thenReturn(mockTaskAttempt);
when(mockTaskAttempt.getProgress()).thenReturn(0.01f);
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
appAttemptId, mockJob);
MockNM nodeManager = rm.registerNode("h1:1234", 4096);
dispatcher.await();
// Register nodes to RM.
MockNM nodeManager2 = rm.registerNode("h2:1234", 1024);
dispatcher.await();
// Request 2 maps and 1 reducer(sone on nodes which are not registered).
ContainerRequestEvent event1 =
createReq(jobId, 1, 1024, new String[] { "h1" });
allocator.sendRequest(event1);
ContainerRequestEvent event2 =
createReq(jobId, 2, 1024, new String[] { "h2" });
allocator.sendRequest(event2);
ContainerRequestEvent event3 =
createReq(jobId, 3, 1024, new String[] { "h1" }, false, true);
allocator.sendRequest(event3);
// This will tell the scheduler about the requests but there will be no
// allocations as nodes are not added.
allocator.schedule();
dispatcher.await();
// Request for another reducer on h3 which has not registered.
ContainerRequestEvent event4 =
createReq(jobId, 4, 1024, new String[] { "h3" }, false, true);
allocator.sendRequest(event4);
allocator.schedule();
dispatcher.await();
// Update resources in scheduler through node heartbeat from h1.
nodeManager.nodeHeartbeat(true);
dispatcher.await();
rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(3072, 3));
allocator.schedule();
dispatcher.await();
// Two maps are assigned.
Assert.assertEquals(2, allocator.getAssignedRequests().maps.size());
// Send deallocate request for map so that no maps are assigned after this.
ContainerAllocatorEvent deallocate1 = createDeallocateEvent(jobId, 1, false);
allocator.sendDeallocate(deallocate1);
ContainerAllocatorEvent deallocate2 = createDeallocateEvent(jobId, 2, false);
allocator.sendDeallocate(deallocate2);
// No map should be assigned.
Assert.assertEquals(0, allocator.getAssignedRequests().maps.size());
nodeManager.nodeHeartbeat(true);
dispatcher.await();
rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(1024, 1));
allocator.schedule();
dispatcher.await();
// h2 heartbeats.
nodeManager2.nodeHeartbeat(true);
dispatcher.await();
// Send request for one more mapper.
ContainerRequestEvent event5 =
createReq(jobId, 5, 1024, new String[] { "h1" });
allocator.sendRequest(event5);
rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(2048, 2));
allocator.schedule();
dispatcher.await();
// One reducer is assigned and one map is scheduled
Assert.assertEquals(1, allocator.getScheduledRequests().maps.size());
Assert.assertEquals(1, allocator.getAssignedRequests().reduces.size());
// Headroom enough to run a mapper if headroom is taken as it is but wont be
// enough if scheduled reducers resources are deducted.
rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(1260, 2));
allocator.schedule();
dispatcher.await();
// After allocate response, the one assigned reducer is preempted and killed
Assert.assertEquals(1, MyContainerAllocator.getTaskAttemptKillEvents().size());
Assert.assertEquals(RMContainerAllocator.RAMPDOWN_DIAGNOSTIC,
MyContainerAllocator.getTaskAttemptKillEvents().get(0).getMessage());
Assert.assertEquals(1, allocator.getNumOfPendingReduces());
}
private static class MockScheduler implements ApplicationMasterProtocol {
ApplicationAttemptId attemptId;
long nextContainerId = 10;