MAPREDUCE-6302. Incorrect headroom can lead to a deadlock between map and reduce allocations. (kasha)

This commit is contained in:
Karthik Kambatla 2015-10-04 23:49:02 -07:00
parent a0bca2b5ad
commit 4aa9b3e75c
7 changed files with 215 additions and 87 deletions

View File

@ -597,6 +597,9 @@ Release 2.8.0 - UNRELEASED
MAPREDUCE-6503. archive-logs tool should use HADOOP_PREFIX instead MAPREDUCE-6503. archive-logs tool should use HADOOP_PREFIX instead
of HADOOP_HOME (rkanter) of HADOOP_HOME (rkanter)
MAPREDUCE-6302. Preempt reducers after a configurable timeout irrespective
of headroom. (kasha)
Release 2.7.2 - UNRELEASED Release 2.7.2 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -160,11 +160,13 @@ added to the pending and are ramped up (added to scheduled) based
private boolean reduceStarted = false; private boolean reduceStarted = false;
private float maxReduceRampupLimit = 0; private float maxReduceRampupLimit = 0;
private float maxReducePreemptionLimit = 0; private float maxReducePreemptionLimit = 0;
/**
* after this threshold, if the container request is not allocated, it is // Mapper allocation timeout, after which a reducer is forcibly preempted
* considered delayed. private long reducerUnconditionalPreemptionDelayMs;
*/
private long allocationDelayThresholdMs = 0; // Duration to wait before preempting a reducer when there is NO room
private long reducerNoHeadroomPreemptionDelayMs = 0;
private float reduceSlowStart = 0; private float reduceSlowStart = 0;
private int maxRunningMaps = 0; private int maxRunningMaps = 0;
private int maxRunningReduces = 0; private int maxRunningReduces = 0;
@ -204,7 +206,10 @@ protected void serviceInit(Configuration conf) throws Exception {
maxReducePreemptionLimit = conf.getFloat( maxReducePreemptionLimit = conf.getFloat(
MRJobConfig.MR_AM_JOB_REDUCE_PREEMPTION_LIMIT, MRJobConfig.MR_AM_JOB_REDUCE_PREEMPTION_LIMIT,
MRJobConfig.DEFAULT_MR_AM_JOB_REDUCE_PREEMPTION_LIMIT); MRJobConfig.DEFAULT_MR_AM_JOB_REDUCE_PREEMPTION_LIMIT);
allocationDelayThresholdMs = conf.getInt( reducerUnconditionalPreemptionDelayMs = 1000 * conf.getInt(
MRJobConfig.MR_JOB_REDUCER_UNCONDITIONAL_PREEMPT_DELAY_SEC,
MRJobConfig.DEFAULT_MR_JOB_REDUCER_UNCONDITIONAL_PREEMPT_DELAY_SEC);
reducerNoHeadroomPreemptionDelayMs = conf.getInt(
MRJobConfig.MR_JOB_REDUCER_PREEMPT_DELAY_SEC, MRJobConfig.MR_JOB_REDUCER_PREEMPT_DELAY_SEC,
MRJobConfig.DEFAULT_MR_JOB_REDUCER_PREEMPT_DELAY_SEC) * 1000;//sec -> ms MRJobConfig.DEFAULT_MR_JOB_REDUCER_PREEMPT_DELAY_SEC) * 1000;//sec -> ms
maxRunningMaps = conf.getInt(MRJobConfig.JOB_RUNNING_MAP_LIMIT, maxRunningMaps = conf.getInt(MRJobConfig.JOB_RUNNING_MAP_LIMIT,
@ -472,59 +477,89 @@ void preemptReducesIfNeeded() {
if (reduceResourceRequest.equals(Resources.none())) { if (reduceResourceRequest.equals(Resources.none())) {
return; // no reduces return; // no reduces
} }
//check if reduces have taken over the whole cluster and there are
//unassigned maps
if (scheduledRequests.maps.size() > 0) {
Resource resourceLimit = getResourceLimit();
Resource availableResourceForMap =
Resources.subtract(
resourceLimit,
Resources.multiply(reduceResourceRequest,
assignedRequests.reduces.size()
- assignedRequests.preemptionWaitingReduces.size()));
// availableMemForMap must be sufficient to run at least 1 map
if (ResourceCalculatorUtils.computeAvailableContainers(availableResourceForMap,
mapResourceRequest, getSchedulerResourceTypes()) <= 0) {
// to make sure new containers are given to maps and not reduces
// ramp down all scheduled reduces if any
// (since reduces are scheduled at higher priority than maps)
LOG.info("Ramping down all scheduled reduces:"
+ scheduledRequests.reduces.size());
for (ContainerRequest req : scheduledRequests.reduces.values()) {
pendingReduces.add(req);
}
scheduledRequests.reduces.clear();
//do further checking to find the number of map requests that were if (assignedRequests.maps.size() > 0) {
//hanging around for a while // there are assigned mappers
int hangingMapRequests = getNumOfHangingRequests(scheduledRequests.maps); return;
if (hangingMapRequests > 0) { }
// preempt for making space for at least one map
int preemptionReduceNumForOneMap =
ResourceCalculatorUtils.divideAndCeilContainers(mapResourceRequest,
reduceResourceRequest, getSchedulerResourceTypes());
int preemptionReduceNumForPreemptionLimit =
ResourceCalculatorUtils.divideAndCeilContainers(
Resources.multiply(resourceLimit, maxReducePreemptionLimit),
reduceResourceRequest, getSchedulerResourceTypes());
int preemptionReduceNumForAllMaps =
ResourceCalculatorUtils.divideAndCeilContainers(
Resources.multiply(mapResourceRequest, hangingMapRequests),
reduceResourceRequest, getSchedulerResourceTypes());
int toPreempt =
Math.min(Math.max(preemptionReduceNumForOneMap,
preemptionReduceNumForPreemptionLimit),
preemptionReduceNumForAllMaps);
LOG.info("Going to preempt " + toPreempt if (scheduledRequests.maps.size() <= 0) {
+ " due to lack of space for maps"); // there are no pending requests for mappers
assignedRequests.preemptReduce(toPreempt); return;
} }
// At this point:
// we have pending mappers and all assigned resources are taken by reducers
if (reducerUnconditionalPreemptionDelayMs >= 0) {
// Unconditional preemption is enabled.
// If mappers are pending for longer than the configured threshold,
// preempt reducers irrespective of what the headroom is.
if (preemptReducersForHangingMapRequests(
reducerUnconditionalPreemptionDelayMs)) {
return;
} }
} }
// The pending mappers haven't been waiting for too long. Let us see if
// the headroom can fit a mapper.
Resource availableResourceForMap = getAvailableResources();
if (ResourceCalculatorUtils.computeAvailableContainers(availableResourceForMap,
mapResourceRequest, getSchedulerResourceTypes()) > 0) {
// the available headroom is enough to run a mapper
return;
}
// Available headroom is not enough to run mapper. See if we should hold
// off before preempting reducers and preempt if okay.
preemptReducersForHangingMapRequests(reducerNoHeadroomPreemptionDelayMs);
} }
private int getNumOfHangingRequests(Map<TaskAttemptId, ContainerRequest> requestMap) { private boolean preemptReducersForHangingMapRequests(long pendingThreshold) {
int hangingMapRequests = getNumHangingRequests(
pendingThreshold, scheduledRequests.maps);
if (hangingMapRequests > 0) {
preemptReducer(hangingMapRequests);
return true;
}
return false;
}
private void clearAllPendingReduceRequests() {
LOG.info("Ramping down all scheduled reduces:"
+ scheduledRequests.reduces.size());
for (ContainerRequest req : scheduledRequests.reduces.values()) {
pendingReduces.add(req);
}
scheduledRequests.reduces.clear();
}
private void preemptReducer(int hangingMapRequests) {
clearAllPendingReduceRequests();
// preempt for making space for at least one map
int preemptionReduceNumForOneMap =
ResourceCalculatorUtils.divideAndCeilContainers(mapResourceRequest,
reduceResourceRequest, getSchedulerResourceTypes());
int preemptionReduceNumForPreemptionLimit =
ResourceCalculatorUtils.divideAndCeilContainers(
Resources.multiply(getResourceLimit(), maxReducePreemptionLimit),
reduceResourceRequest, getSchedulerResourceTypes());
int preemptionReduceNumForAllMaps =
ResourceCalculatorUtils.divideAndCeilContainers(
Resources.multiply(mapResourceRequest, hangingMapRequests),
reduceResourceRequest, getSchedulerResourceTypes());
int toPreempt =
Math.min(Math.max(preemptionReduceNumForOneMap,
preemptionReduceNumForPreemptionLimit),
preemptionReduceNumForAllMaps);
LOG.info("Going to preempt " + toPreempt
+ " due to lack of space for maps");
assignedRequests.preemptReduce(toPreempt);
}
private int getNumHangingRequests(long allocationDelayThresholdMs,
Map<TaskAttemptId, ContainerRequest> requestMap) {
if (allocationDelayThresholdMs <= 0) if (allocationDelayThresholdMs <= 0)
return requestMap.size(); return requestMap.size();
int hangingRequests = 0; int hangingRequests = 0;
@ -552,9 +587,6 @@ public void scheduleReduces(
// get available resources for this job // get available resources for this job
Resource headRoom = getAvailableResources(); Resource headRoom = getAvailableResources();
if (headRoom == null) {
headRoom = Resources.none();
}
LOG.info("Recalculating schedule, headroom=" + headRoom); LOG.info("Recalculating schedule, headroom=" + headRoom);
@ -681,9 +713,7 @@ private List<Container> getResources() throws Exception {
applyConcurrentTaskLimits(); applyConcurrentTaskLimits();
// will be null the first time // will be null the first time
Resource headRoom = Resource headRoom = Resources.clone(getAvailableResources());
getAvailableResources() == null ? Resources.none() :
Resources.clone(getAvailableResources());
AllocateResponse response; AllocateResponse response;
/* /*
* If contact with RM is lost, the AM will wait MR_AM_TO_RM_WAIT_INTERVAL_MS * If contact with RM is lost, the AM will wait MR_AM_TO_RM_WAIT_INTERVAL_MS
@ -724,9 +754,7 @@ private List<Container> getResources() throws Exception {
// continue to attempt to contact the RM. // continue to attempt to contact the RM.
throw e; throw e;
} }
Resource newHeadRoom = Resource newHeadRoom = getAvailableResources();
getAvailableResources() == null ? Resources.none()
: getAvailableResources();
List<Container> newContainers = response.getAllocatedContainers(); List<Container> newContainers = response.getAllocatedContainers();
// Setting NMTokens // Setting NMTokens
if (response.getNMTokens() != null) { if (response.getNMTokens() != null) {
@ -896,9 +924,6 @@ private void handleUpdatedNodes(AllocateResponse response) {
@Private @Private
public Resource getResourceLimit() { public Resource getResourceLimit() {
Resource headRoom = getAvailableResources(); Resource headRoom = getAvailableResources();
if (headRoom == null) {
headRoom = Resources.none();
}
Resource assignedMapResource = Resource assignedMapResource =
Resources.multiply(mapResourceRequest, assignedRequests.maps.size()); Resources.multiply(mapResourceRequest, assignedRequests.maps.size());
Resource assignedReduceResource = Resource assignedReduceResource =

View File

@ -52,6 +52,7 @@
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.yarn.util.resource.Resources;
/** /**
@ -386,7 +387,7 @@ protected void containerFailedOnHost(String hostName) {
} }
protected Resource getAvailableResources() { protected Resource getAvailableResources() {
return availableResources; return availableResources == null ? Resources.none() : availableResources;
} }
protected void addContainerReq(ContainerRequest req) { protected void addContainerReq(ContainerRequest req) {

View File

@ -431,7 +431,7 @@ public void testReducerRampdownDiagnostics() throws Exception {
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); 0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
final MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, final MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
appAttemptId, mockJob); appAttemptId, mockJob, new SystemClock());
// add resources to scheduler // add resources to scheduler
dispatcher.await(); dispatcher.await();
@ -565,6 +565,69 @@ public void testNonAggressivelyPreemptReducers() throws Exception {
assignedRequests.preemptionWaitingReduces.size()); assignedRequests.preemptionWaitingReduces.size());
} }
@Test(timeout = 30000)
public void testUnconditionalPreemptReducers() throws Exception {
LOG.info("Running testForcePreemptReducers");
int forcePreemptThresholdSecs = 2;
Configuration conf = new Configuration();
conf.setInt(MRJobConfig.MR_JOB_REDUCER_PREEMPT_DELAY_SEC,
2 * forcePreemptThresholdSecs);
conf.setInt(MRJobConfig.MR_JOB_REDUCER_UNCONDITIONAL_PREEMPT_DELAY_SEC,
forcePreemptThresholdSecs);
MyResourceManager rm = new MyResourceManager(conf);
rm.start();
rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(8192, 8));
DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
.getDispatcher();
// Submit the application
RMApp app = rm.submitApp(1024);
dispatcher.await();
MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
amNodeManager.nodeHeartbeat(true);
dispatcher.await();
ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
.getAppAttemptId();
rm.sendAMLaunched(appAttemptId);
dispatcher.await();
JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
Job mockJob = mock(Job.class);
when(mockJob.getReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
ControlledClock clock = new ControlledClock(null);
clock.setTime(1);
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
appAttemptId, mockJob, clock);
allocator.setMapResourceRequest(BuilderUtils.newResource(1024, 1));
allocator.setReduceResourceRequest(BuilderUtils.newResource(1024, 1));
RMContainerAllocator.AssignedRequests assignedRequests =
allocator.getAssignedRequests();
RMContainerAllocator.ScheduledRequests scheduledRequests =
allocator.getScheduledRequests();
ContainerRequestEvent event1 =
createReq(jobId, 1, 2048, new String[] { "h1" }, false, false);
scheduledRequests.maps.put(mock(TaskAttemptId.class),
new RMContainerRequestor.ContainerRequest(event1, null, clock.getTime()));
assignedRequests.reduces.put(mock(TaskAttemptId.class),
mock(Container.class));
clock.setTime(clock.getTime() + 1);
allocator.preemptReducesIfNeeded();
Assert.assertEquals("The reducer is preeempted too soon", 0,
assignedRequests.preemptionWaitingReduces.size());
clock.setTime(clock.getTime() + 1000 * forcePreemptThresholdSecs);
allocator.preemptReducesIfNeeded();
Assert.assertEquals("The reducer is not preeempted", 1,
assignedRequests.preemptionWaitingReduces.size());
}
@Test(timeout = 30000) @Test(timeout = 30000)
public void testExcessReduceContainerAssign() throws Exception { public void testExcessReduceContainerAssign() throws Exception {
final Configuration conf = new Configuration(); final Configuration conf = new Configuration();
@ -590,7 +653,7 @@ public void testExcessReduceContainerAssign() throws Exception {
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); 0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
final MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, final MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
appAttemptId, mockJob); appAttemptId, mockJob, new SystemClock());
// request to allocate two reduce priority containers // request to allocate two reduce priority containers
final String[] locations = new String[] { host }; final String[] locations = new String[] { host };
@ -634,7 +697,8 @@ public void testMapReduceAllocationWithNodeLabelExpression() throws Exception {
0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); 0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
final MockScheduler mockScheduler = new MockScheduler(appAttemptId); final MockScheduler mockScheduler = new MockScheduler(appAttemptId);
MyContainerAllocator allocator = MyContainerAllocator allocator =
new MyContainerAllocator(null, conf, appAttemptId, mockJob) { new MyContainerAllocator(null, conf, appAttemptId, mockJob,
new SystemClock()) {
@Override @Override
protected void register() { protected void register() {
} }
@ -726,7 +790,7 @@ public void testMapReduceScheduling() throws Exception {
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); 0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
appAttemptId, mockJob); appAttemptId, mockJob, new SystemClock());
// add resources to scheduler // add resources to scheduler
MockNM nodeManager1 = rm.registerNode("h1:1234", 1024); MockNM nodeManager1 = rm.registerNode("h1:1234", 1024);
@ -1629,6 +1693,7 @@ public MyFifoScheduler(RMContext rmContext) {
List<ContainerId> lastRelease = null; List<ContainerId> lastRelease = null;
List<String> lastBlacklistAdditions; List<String> lastBlacklistAdditions;
List<String> lastBlacklistRemovals; List<String> lastBlacklistRemovals;
Resource forceResourceLimit = null;
// override this to copy the objects otherwise FifoScheduler updates the // override this to copy the objects otherwise FifoScheduler updates the
// numContainers in same objects as kept by RMContainerAllocator // numContainers in same objects as kept by RMContainerAllocator
@ -1651,9 +1716,18 @@ public synchronized Allocation allocate(
lastRelease = release; lastRelease = release;
lastBlacklistAdditions = blacklistAdditions; lastBlacklistAdditions = blacklistAdditions;
lastBlacklistRemovals = blacklistRemovals; lastBlacklistRemovals = blacklistRemovals;
return super.allocate( Allocation allocation = super.allocate(
applicationAttemptId, askCopy, release, blacklistAdditions, applicationAttemptId, askCopy, release, blacklistAdditions,
blacklistRemovals, increaseRequests, decreaseRequests); blacklistRemovals, increaseRequests, decreaseRequests);
if (forceResourceLimit != null) {
// Test wants to force the non-default resource limit
allocation.setResourceLimit(forceResourceLimit);
}
return allocation;
}
public void forceResourceLimit(Resource resource) {
this.forceResourceLimit = resource;
} }
} }
@ -2677,7 +2751,7 @@ public void testConcurrentTaskLimits() throws Exception {
0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); 0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
final MockScheduler mockScheduler = new MockScheduler(appAttemptId); final MockScheduler mockScheduler = new MockScheduler(appAttemptId);
MyContainerAllocator allocator = new MyContainerAllocator(null, conf, MyContainerAllocator allocator = new MyContainerAllocator(null, conf,
appAttemptId, mockJob) { appAttemptId, mockJob, new SystemClock()) {
@Override @Override
protected void register() { protected void register() {
} }

View File

@ -712,10 +712,18 @@ public interface MRJobConfig {
10 * 1000l; 10 * 1000l;
/** /**
* The threshold in terms of seconds after which an unsatisfied mapper request * Duration to wait before forcibly preempting a reducer to allow
* triggers reducer preemption to free space. Default 0 implies that the reduces * allocating new mappers, even when YARN reports positive headroom.
* should be preempted immediately after allocation if there is currently no */
* room for newly allocated mappers. public static final String MR_JOB_REDUCER_UNCONDITIONAL_PREEMPT_DELAY_SEC =
"mapreduce.job.reducer.unconditional-preempt.delay.sec";
public static final int
DEFAULT_MR_JOB_REDUCER_UNCONDITIONAL_PREEMPT_DELAY_SEC = 5 * 60;
/**
* Duration to wait before preempting a reducer, when there is no headroom
* to allocate new mappers.
*/ */
public static final String MR_JOB_REDUCER_PREEMPT_DELAY_SEC = public static final String MR_JOB_REDUCER_PREEMPT_DELAY_SEC =
"mapreduce.job.reducer.preempt.delay.sec"; "mapreduce.job.reducer.preempt.delay.sec";

View File

@ -91,17 +91,28 @@
</description> </description>
</property> </property>
<property> <property>
<name>mapreduce.job.reducer.preempt.delay.sec</name> <name>mapreduce.job.reducer.preempt.delay.sec</name>
<value>0</value> <value>0</value>
<description>The threshold in terms of seconds after which an unsatisfied mapper <description>The threshold (in seconds) after which an unsatisfied
request triggers reducer preemption to free space. Default 0 implies that the mapper request triggers reducer preemption when there is no anticipated
reduces should be preempted immediately after allocation if there is currently no headroom. If set to 0 or a negative value, the reducer is preempted as
room for newly allocated mappers. soon as lack of headroom is detected. Default is 0.
</description> </description>
</property> </property>
<property> <property>
<name>mapreduce.job.reducer.unconditional-preempt.delay.sec</name>
<value>300</value>
<description>The threshold (in seconds) after which an unsatisfied
mapper request triggers a forced reducer preemption irrespective of the
anticipated headroom. By default, it is set to 5 mins. Setting it to 0
leads to immediate reducer preemption. Setting to -1 disables this
preemption altogether.
</description>
</property>
<property>
<name>mapreduce.job.max.split.locations</name> <name>mapreduce.job.max.split.locations</name>
<value>10</value> <value>10</value>
<description>The max number of block locations to store for each split for <description>The max number of block locations to store for each split for

View File

@ -20,6 +20,7 @@
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NMToken;
@ -29,13 +30,13 @@
public class Allocation { public class Allocation {
final List<Container> containers; final List<Container> containers;
final Resource resourceLimit;
final Set<ContainerId> strictContainers; final Set<ContainerId> strictContainers;
final Set<ContainerId> fungibleContainers; final Set<ContainerId> fungibleContainers;
final List<ResourceRequest> fungibleResources; final List<ResourceRequest> fungibleResources;
final List<NMToken> nmTokens; final List<NMToken> nmTokens;
final List<Container> increasedContainers; final List<Container> increasedContainers;
final List<Container> decreasedContainers; final List<Container> decreasedContainers;
private Resource resourceLimit;
public Allocation(List<Container> containers, Resource resourceLimit, public Allocation(List<Container> containers, Resource resourceLimit,
@ -97,4 +98,9 @@ public List<Container> getIncreasedContainers() {
public List<Container> getDecreasedContainers() { public List<Container> getDecreasedContainers() {
return decreasedContainers; return decreasedContainers;
} }
@VisibleForTesting
public void setResourceLimit(Resource resource) {
this.resourceLimit = resource;
}
} }