MAPREDUCE-5844. Add a configurable delay to reducer-preemption. (Maysam Yabandeh via kasha)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1603957 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Karthik Kambatla 2014-06-19 17:22:56 +00:00
parent 3f82484218
commit 7b9c074b76
7 changed files with 277 additions and 59 deletions

View File

@ -216,6 +216,9 @@ Release 2.5.0 - UNRELEASED
MAPREDUCE-5896. InputSplits should indicate which locations have the block
cached in memory. (Sandy Ryza via kasha)
MAPREDUCE-5844. Add a configurable delay to reducer-preemption.
(Maysam Yabandeh via kasha)
OPTIMIZATIONS
BUG FIXES

View File

@ -475,8 +475,8 @@
<Match>
<Class name="org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator" />
<Or>
<Field name="mapResourceReqt" />
<Field name="reduceResourceReqt" />
<Field name="mapResourceRequest" />
<Field name="reduceResourceRequest" />
<Field name="maxReduceRampupLimit" />
<Field name="reduceSlowStart" />
</Or>

View File

@ -73,6 +73,7 @@
import org.apache.hadoop.yarn.client.api.NMTokenCache;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.RackResolver;
import com.google.common.annotations.VisibleForTesting;
@ -143,15 +144,21 @@ added to the pending and are ramped up (added to scheduled) based
private int lastCompletedTasks = 0;
private boolean recalculateReduceSchedule = false;
private int mapResourceReqt;//memory
private int reduceResourceReqt;//memory
private int mapResourceRequest;//memory
private int reduceResourceRequest;//memory
private boolean reduceStarted = false;
private float maxReduceRampupLimit = 0;
private float maxReducePreemptionLimit = 0;
/**
* after this threshold, if the container request is not allocated, it is
* considered delayed.
*/
private long allocationDelayThresholdMs = 0;
private float reduceSlowStart = 0;
private long retryInterval;
private long retrystartTime;
private Clock clock;
private final AMPreemptionPolicy preemptionPolicy;
@ -166,6 +173,7 @@ public RMContainerAllocator(ClientService clientService, AppContext context,
super(clientService, context);
this.preemptionPolicy = preemptionPolicy;
this.stopped = new AtomicBoolean(false);
this.clock = context.getClock();
}
@Override
@ -180,6 +188,9 @@ protected void serviceInit(Configuration conf) throws Exception {
maxReducePreemptionLimit = conf.getFloat(
MRJobConfig.MR_AM_JOB_REDUCE_PREEMPTION_LIMIT,
MRJobConfig.DEFAULT_MR_AM_JOB_REDUCE_PREEMPTION_LIMIT);
allocationDelayThresholdMs = conf.getInt(
MRJobConfig.MR_JOB_REDUCER_PREEMPT_DELAY_SEC,
MRJobConfig.DEFAULT_MR_JOB_REDUCER_PREEMPT_DELAY_SEC) * 1000;//sec -> ms
RackResolver.init(conf);
retryInterval = getConfig().getLong(MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS,
MRJobConfig.DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS);
@ -246,7 +257,7 @@ protected synchronized void heartbeat() throws Exception {
getJob().getTotalMaps(), completedMaps,
scheduledRequests.maps.size(), scheduledRequests.reduces.size(),
assignedRequests.maps.size(), assignedRequests.reduces.size(),
mapResourceReqt, reduceResourceReqt,
mapResourceRequest, reduceResourceRequest,
pendingReduces.size(),
maxReduceRampupLimit, reduceSlowStart);
recalculateReduceSchedule = false;
@ -268,6 +279,18 @@ protected void serviceStop() throws Exception {
scheduleStats.log("Final Stats: ");
}
@Private
@VisibleForTesting
AssignedRequests getAssignedRequests() {
return assignedRequests;
}
@Private
@VisibleForTesting
ScheduledRequests getScheduledRequests() {
return scheduledRequests;
}
public boolean getIsReduceStarted() {
return reduceStarted;
}
@ -303,16 +326,16 @@ protected synchronized void handleEvent(ContainerAllocatorEvent event) {
int supportedMaxContainerCapability =
getMaxContainerCapability().getMemory();
if (reqEvent.getAttemptID().getTaskId().getTaskType().equals(TaskType.MAP)) {
if (mapResourceReqt == 0) {
mapResourceReqt = reqEvent.getCapability().getMemory();
if (mapResourceRequest == 0) {
mapResourceRequest = reqEvent.getCapability().getMemory();
eventHandler.handle(new JobHistoryEvent(jobId,
new NormalizedResourceEvent(org.apache.hadoop.mapreduce.TaskType.MAP,
mapResourceReqt)));
LOG.info("mapResourceReqt:"+mapResourceReqt);
if (mapResourceReqt > supportedMaxContainerCapability) {
mapResourceRequest)));
LOG.info("mapResourceRequest:"+ mapResourceRequest);
if (mapResourceRequest > supportedMaxContainerCapability) {
String diagMsg = "MAP capability required is more than the supported " +
"max container capability in the cluster. Killing the Job. mapResourceReqt: " +
mapResourceReqt + " maxContainerCapability:" + supportedMaxContainerCapability;
"max container capability in the cluster. Killing the Job. mapResourceRequest: " +
mapResourceRequest + " maxContainerCapability:" + supportedMaxContainerCapability;
LOG.info(diagMsg);
eventHandler.handle(new JobDiagnosticsUpdateEvent(
jobId, diagMsg));
@ -320,20 +343,20 @@ protected synchronized void handleEvent(ContainerAllocatorEvent event) {
}
}
//set the rounded off memory
reqEvent.getCapability().setMemory(mapResourceReqt);
reqEvent.getCapability().setMemory(mapResourceRequest);
scheduledRequests.addMap(reqEvent);//maps are immediately scheduled
} else {
if (reduceResourceReqt == 0) {
reduceResourceReqt = reqEvent.getCapability().getMemory();
if (reduceResourceRequest == 0) {
reduceResourceRequest = reqEvent.getCapability().getMemory();
eventHandler.handle(new JobHistoryEvent(jobId,
new NormalizedResourceEvent(
org.apache.hadoop.mapreduce.TaskType.REDUCE,
reduceResourceReqt)));
LOG.info("reduceResourceReqt:"+reduceResourceReqt);
if (reduceResourceReqt > supportedMaxContainerCapability) {
reduceResourceRequest)));
LOG.info("reduceResourceRequest:"+ reduceResourceRequest);
if (reduceResourceRequest > supportedMaxContainerCapability) {
String diagMsg = "REDUCE capability required is more than the " +
"supported max container capability in the cluster. Killing the " +
"Job. reduceResourceReqt: " + reduceResourceReqt +
"Job. reduceResourceRequest: " + reduceResourceRequest +
" maxContainerCapability:" + supportedMaxContainerCapability;
LOG.info(diagMsg);
eventHandler.handle(new JobDiagnosticsUpdateEvent(
@ -342,7 +365,7 @@ protected synchronized void handleEvent(ContainerAllocatorEvent event) {
}
}
//set the rounded off memory
reqEvent.getCapability().setMemory(reduceResourceReqt);
reqEvent.getCapability().setMemory(reduceResourceRequest);
if (reqEvent.getEarlierAttemptFailed()) {
//add to the front of queue for fail fast
pendingReduces.addFirst(new ContainerRequest(reqEvent, PRIORITY_REDUCE));
@ -394,8 +417,22 @@ private static String getHost(String contMgrAddress) {
return host;
}
private void preemptReducesIfNeeded() {
if (reduceResourceReqt == 0) {
@Private
@VisibleForTesting
synchronized void setReduceResourceRequest(int mem) {
this.reduceResourceRequest = mem;
}
@Private
@VisibleForTesting
synchronized void setMapResourceRequest(int mem) {
this.mapResourceRequest = mem;
}
@Private
@VisibleForTesting
void preemptReducesIfNeeded() {
if (reduceResourceRequest == 0) {
return; //no reduces
}
//check if reduces have taken over the whole cluster and there are
@ -403,9 +440,9 @@ private void preemptReducesIfNeeded() {
if (scheduledRequests.maps.size() > 0) {
int memLimit = getMemLimit();
int availableMemForMap = memLimit - ((assignedRequests.reduces.size() -
assignedRequests.preemptionWaitingReduces.size()) * reduceResourceReqt);
assignedRequests.preemptionWaitingReduces.size()) * reduceResourceRequest);
//availableMemForMap must be sufficient to run atleast 1 map
if (availableMemForMap < mapResourceReqt) {
if (availableMemForMap < mapResourceRequest) {
//to make sure new containers are given to maps and not reduces
//ramp down all scheduled reduces if any
//(since reduces are scheduled at higher priority than maps)
@ -414,22 +451,40 @@ private void preemptReducesIfNeeded() {
pendingReduces.add(req);
}
scheduledRequests.reduces.clear();
//preempt for making space for at least one map
int premeptionLimit = Math.max(mapResourceReqt,
(int) (maxReducePreemptionLimit * memLimit));
int preemptMem = Math.min(scheduledRequests.maps.size() * mapResourceReqt,
premeptionLimit);
int toPreempt = (int) Math.ceil((float) preemptMem/reduceResourceReqt);
toPreempt = Math.min(toPreempt, assignedRequests.reduces.size());
LOG.info("Going to preempt " + toPreempt + " due to lack of space for maps");
assignedRequests.preemptReduce(toPreempt);
//do further checking to find the number of map requests that were
//hanging around for a while
int hangingMapRequests = getNumOfHangingRequests(scheduledRequests.maps);
if (hangingMapRequests > 0) {
//preempt for making space for at least one map
int premeptionLimit = Math.max(mapResourceRequest,
(int) (maxReducePreemptionLimit * memLimit));
int preemptMem = Math.min(hangingMapRequests * mapResourceRequest,
premeptionLimit);
int toPreempt = (int) Math.ceil((float) preemptMem / reduceResourceRequest);
toPreempt = Math.min(toPreempt, assignedRequests.reduces.size());
LOG.info("Going to preempt " + toPreempt + " due to lack of space for maps");
assignedRequests.preemptReduce(toPreempt);
}
}
}
}
private int getNumOfHangingRequests(Map<TaskAttemptId, ContainerRequest> requestMap) {
if (allocationDelayThresholdMs <= 0)
return requestMap.size();
int hangingRequests = 0;
long currTime = clock.getTime();
for (ContainerRequest request: requestMap.values()) {
long delay = currTime - request.requestTimeMs;
if (delay > allocationDelayThresholdMs)
hangingRequests++;
}
return hangingRequests;
}
@Private
public void scheduleReduces(
@ -715,11 +770,13 @@ private void handleUpdatedNodes(AllocateResponse response) {
@Private
public int getMemLimit() {
int headRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0;
return headRoom + assignedRequests.maps.size() * mapResourceReqt +
assignedRequests.reduces.size() * reduceResourceReqt;
return headRoom + assignedRequests.maps.size() * mapResourceRequest +
assignedRequests.reduces.size() * reduceResourceRequest;
}
private class ScheduledRequests {
@Private
@VisibleForTesting
class ScheduledRequests {
private final LinkedList<TaskAttemptId> earlierFailedMaps =
new LinkedList<TaskAttemptId>();
@ -729,7 +786,8 @@ private class ScheduledRequests {
new HashMap<String, LinkedList<TaskAttemptId>>();
private final Map<String, LinkedList<TaskAttemptId>> mapsRackMapping =
new HashMap<String, LinkedList<TaskAttemptId>>();
private final Map<TaskAttemptId, ContainerRequest> maps =
@VisibleForTesting
final Map<TaskAttemptId, ContainerRequest> maps =
new LinkedHashMap<TaskAttemptId, ContainerRequest>();
private final LinkedHashMap<TaskAttemptId, ContainerRequest> reduces =
@ -825,22 +883,22 @@ private void assign(List<Container> allocatedContainers) {
int allocatedMemory = allocated.getResource().getMemory();
if (PRIORITY_FAST_FAIL_MAP.equals(priority)
|| PRIORITY_MAP.equals(priority)) {
if (allocatedMemory < mapResourceReqt
if (allocatedMemory < mapResourceRequest
|| maps.isEmpty()) {
LOG.info("Cannot assign container " + allocated
+ " for a map as either "
+ " container memory less than required " + mapResourceReqt
+ " container memory less than required " + mapResourceRequest
+ " or no pending map tasks - maps.isEmpty="
+ maps.isEmpty());
isAssignable = false;
}
}
else if (PRIORITY_REDUCE.equals(priority)) {
if (allocatedMemory < reduceResourceReqt
if (allocatedMemory < reduceResourceRequest
|| reduces.isEmpty()) {
LOG.info("Cannot assign container " + allocated
+ " for a reduce as either "
+ " container memory less than required " + reduceResourceReqt
+ " container memory less than required " + reduceResourceRequest
+ " or no pending reduce tasks - reduces.isEmpty="
+ reduces.isEmpty());
isAssignable = false;
@ -1119,14 +1177,18 @@ private void assignMapsWithLocality(List<Container> allocatedContainers) {
}
}
private class AssignedRequests {
@Private
@VisibleForTesting
class AssignedRequests {
private final Map<ContainerId, TaskAttemptId> containerToAttemptMap =
new HashMap<ContainerId, TaskAttemptId>();
private final LinkedHashMap<TaskAttemptId, Container> maps =
new LinkedHashMap<TaskAttemptId, Container>();
private final LinkedHashMap<TaskAttemptId, Container> reduces =
@VisibleForTesting
final LinkedHashMap<TaskAttemptId, Container> reduces =
new LinkedHashMap<TaskAttemptId, Container>();
private final Set<TaskAttemptId> preemptionWaitingReduces =
@VisibleForTesting
final Set<TaskAttemptId> preemptionWaitingReduces =
new HashSet<TaskAttemptId>();
void add(Container container, TaskAttemptId tId) {

View File

@ -29,8 +29,10 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
@ -96,6 +98,8 @@ public RMContainerRequestor(ClientService clientService, AppContext context) {
super(clientService, context);
}
@Private
@VisibleForTesting
static class ContainerRequest {
final TaskAttemptId attemptID;
final Resource capability;
@ -103,20 +107,39 @@ static class ContainerRequest {
final String[] racks;
//final boolean earlierAttemptFailed;
final Priority priority;
/**
* the time when this request object was formed; can be used to avoid
* aggressive preemption for recently placed requests
*/
final long requestTimeMs;
public ContainerRequest(ContainerRequestEvent event, Priority priority) {
this(event.getAttemptID(), event.getCapability(), event.getHosts(),
event.getRacks(), priority);
}
public ContainerRequest(ContainerRequestEvent event, Priority priority,
long requestTimeMs) {
this(event.getAttemptID(), event.getCapability(), event.getHosts(),
event.getRacks(), priority, requestTimeMs);
}
public ContainerRequest(TaskAttemptId attemptID,
Resource capability, String[] hosts, String[] racks,
Priority priority) {
Resource capability, String[] hosts, String[] racks,
Priority priority) {
this(attemptID, capability, hosts, racks, priority,
System.currentTimeMillis());
}
public ContainerRequest(TaskAttemptId attemptID,
Resource capability, String[] hosts, String[] racks,
Priority priority, long requestTimeMs) {
this.attemptID = attemptID;
this.capability = capability;
this.hosts = hosts;
this.racks = racks;
this.priority = priority;
this.requestTimeMs = requestTimeMs;
}
public String toString() {

View File

@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app;
package org.apache.hadoop.mapreduce.v2.app.rm;
import static org.mockito.Matchers.anyFloat;
import static org.mockito.Matchers.anyInt;
@ -40,6 +40,10 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.ClusterInfo;
import org.apache.hadoop.mapreduce.v2.app.ControlledClock;
import org.apache.hadoop.mapreduce.v2.app.MRApp;
import org.junit.Assert;
import org.apache.commons.logging.Log;
@ -65,10 +69,6 @@
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.preemption.NoopAMPreemptionPolicy;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@ -80,6 +80,7 @@
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
@ -422,6 +423,115 @@ public void testReducerRampdownDiagnostics() throws Exception {
killEventMessage.contains(RMContainerAllocator.RAMPDOWN_DIAGNOSTIC));
}
@Test(timeout = 30000)
public void testPreemptReducers() throws Exception {
LOG.info("Running testPreemptReducers");
Configuration conf = new Configuration();
MyResourceManager rm = new MyResourceManager(conf);
rm.start();
DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
.getDispatcher();
// Submit the application
RMApp app = rm.submitApp(1024);
dispatcher.await();
MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
amNodeManager.nodeHeartbeat(true);
dispatcher.await();
ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
.getAppAttemptId();
rm.sendAMLaunched(appAttemptId);
dispatcher.await();
JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
Job mockJob = mock(Job.class);
when(mockJob.getReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
appAttemptId, mockJob, new SystemClock());
allocator.setMapResourceRequest(1024);
allocator.setReduceResourceRequest(1024);
RMContainerAllocator.AssignedRequests assignedRequests =
allocator.getAssignedRequests();
RMContainerAllocator.ScheduledRequests scheduledRequests =
allocator.getScheduledRequests();
ContainerRequestEvent event1 =
createReq(jobId, 1, 2048, new String[] { "h1" }, false, false);
scheduledRequests.maps.put(mock(TaskAttemptId.class),
new RMContainerRequestor.ContainerRequest(event1, null));
assignedRequests.reduces.put(mock(TaskAttemptId.class),
mock(Container.class));
allocator.preemptReducesIfNeeded();
Assert.assertEquals("The reducer is not preempted",
1, assignedRequests.preemptionWaitingReduces.size());
}
@Test(timeout = 30000)
public void testNonAggressivelyPreemptReducers() throws Exception {
LOG.info("Running testPreemptReducers");
final int preemptThreshold = 2; //sec
Configuration conf = new Configuration();
conf.setInt(
MRJobConfig.MR_JOB_REDUCER_PREEMPT_DELAY_SEC,
preemptThreshold);
MyResourceManager rm = new MyResourceManager(conf);
rm.start();
DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
.getDispatcher();
// Submit the application
RMApp app = rm.submitApp(1024);
dispatcher.await();
MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
amNodeManager.nodeHeartbeat(true);
dispatcher.await();
ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
.getAppAttemptId();
rm.sendAMLaunched(appAttemptId);
dispatcher.await();
JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
Job mockJob = mock(Job.class);
when(mockJob.getReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
ControlledClock clock = new ControlledClock(null);
clock.setTime(1);
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
appAttemptId, mockJob, clock);
allocator.setMapResourceRequest(1024);
allocator.setReduceResourceRequest(1024);
RMContainerAllocator.AssignedRequests assignedRequests =
allocator.getAssignedRequests();
RMContainerAllocator.ScheduledRequests scheduledRequests =
allocator.getScheduledRequests();
ContainerRequestEvent event1 =
createReq(jobId, 1, 2048, new String[] { "h1" }, false, false);
scheduledRequests.maps.put(mock(TaskAttemptId.class),
new RMContainerRequestor.ContainerRequest(event1, null, clock.getTime()));
assignedRequests.reduces.put(mock(TaskAttemptId.class),
mock(Container.class));
clock.setTime(clock.getTime() + 1);
allocator.preemptReducesIfNeeded();
Assert.assertEquals("The reducer is aggressively preeempted", 0,
assignedRequests.preemptionWaitingReduces.size());
clock.setTime(clock.getTime() + (preemptThreshold) * 1000);
allocator.preemptReducesIfNeeded();
Assert.assertEquals("The reducer is not preeempted", 1,
assignedRequests.preemptionWaitingReduces.size());
}
@Test
public void testMapReduceScheduling() throws Exception {

View File

@ -579,7 +579,17 @@ public interface MRJobConfig {
MR_AM_PREFIX + "history.use-batched-flush.queue-size.threshold";
public static final int DEFAULT_MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD =
50;
/**
* The threshold in terms of seconds after which an unsatisfied mapper request
* triggers reducer preemption to free space. Default 0 implies that the reduces
* should be preempted immediately after allocation if there is currently no
* room for newly allocated mappers.
*/
public static final String MR_JOB_REDUCER_PREEMPT_DELAY_SEC =
"mapreduce.job.reducer.preempt.delay.sec";
public static final int DEFAULT_MR_JOB_REDUCER_PREEMPT_DELAY_SEC = 0;
public static final String MR_AM_ENV =
MR_AM_PREFIX + "env";

View File

@ -82,6 +82,16 @@
</description>
</property>
<property>
<name>mapreduce.job.reducer.preempt.delay.sec</name>
<value>0</value>
<description>The threshold in terms of seconds after which an unsatisfied mapper
request triggers reducer preemption to free space. Default 0 implies that the
reduces should be preempted immediately after allocation if there is currently no
room for newly allocated mappers.
</description>
</property>
<property>
<name>mapreduce.job.max.split.locations</name>
<value>10</value>