MAPREDUCE-5583. Ability to limit running map and reduce tasks. Contributed by Jason Lowe.

This commit is contained in:
Junping Du 2015-03-03 02:01:04 -08:00
parent 9ae7f9eb7b
commit 4228de9402
6 changed files with 363 additions and 17 deletions

View File

@ -258,6 +258,9 @@ Release 2.7.0 - UNRELEASED
MAPREDUCE-6228. Add truncate operation to SLive. (Plamen Jeliazkov via shv) MAPREDUCE-6228. Add truncate operation to SLive. (Plamen Jeliazkov via shv)
MAPREDUCE-5583. Ability to limit running map and reduce tasks.
(Jason Lowe via junping_du)
IMPROVEMENTS IMPROVEMENTS
MAPREDUCE-6149. Document override log4j.properties in MR job. MAPREDUCE-6149. Document override log4j.properties in MR job.

View File

@ -99,9 +99,9 @@ public class RMContainerAllocator extends RMContainerRequestor
public static final public static final
float DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART = 0.05f; float DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART = 0.05f;
private static final Priority PRIORITY_FAST_FAIL_MAP; static final Priority PRIORITY_FAST_FAIL_MAP;
private static final Priority PRIORITY_REDUCE; static final Priority PRIORITY_REDUCE;
private static final Priority PRIORITY_MAP; static final Priority PRIORITY_MAP;
@VisibleForTesting @VisibleForTesting
public static final String RAMPDOWN_DIAGNOSTIC = "Reducer preempted " public static final String RAMPDOWN_DIAGNOSTIC = "Reducer preempted "
@ -166,6 +166,8 @@ added to the pending and are ramped up (added to scheduled) based
*/ */
private long allocationDelayThresholdMs = 0; private long allocationDelayThresholdMs = 0;
private float reduceSlowStart = 0; private float reduceSlowStart = 0;
private int maxRunningMaps = 0;
private int maxRunningReduces = 0;
private long retryInterval; private long retryInterval;
private long retrystartTime; private long retrystartTime;
private Clock clock; private Clock clock;
@ -201,6 +203,10 @@ protected void serviceInit(Configuration conf) throws Exception {
allocationDelayThresholdMs = conf.getInt( allocationDelayThresholdMs = conf.getInt(
MRJobConfig.MR_JOB_REDUCER_PREEMPT_DELAY_SEC, MRJobConfig.MR_JOB_REDUCER_PREEMPT_DELAY_SEC,
MRJobConfig.DEFAULT_MR_JOB_REDUCER_PREEMPT_DELAY_SEC) * 1000;//sec -> ms MRJobConfig.DEFAULT_MR_JOB_REDUCER_PREEMPT_DELAY_SEC) * 1000;//sec -> ms
maxRunningMaps = conf.getInt(MRJobConfig.JOB_RUNNING_MAP_LIMIT,
MRJobConfig.DEFAULT_JOB_RUNNING_MAP_LIMIT);
maxRunningReduces = conf.getInt(MRJobConfig.JOB_RUNNING_REDUCE_LIMIT,
MRJobConfig.DEFAULT_JOB_RUNNING_REDUCE_LIMIT);
RackResolver.init(conf); RackResolver.init(conf);
retryInterval = getConfig().getLong(MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS, retryInterval = getConfig().getLong(MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS,
MRJobConfig.DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS); MRJobConfig.DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS);
@ -664,6 +670,8 @@ public void rampDownReduces(int rampDown) {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private List<Container> getResources() throws Exception { private List<Container> getResources() throws Exception {
applyConcurrentTaskLimits();
// will be null the first time // will be null the first time
Resource headRoom = Resource headRoom =
getAvailableResources() == null ? Resources.none() : getAvailableResources() == null ? Resources.none() :
@ -778,6 +786,43 @@ private List<Container> getResources() throws Exception {
return newContainers; return newContainers;
} }
private void applyConcurrentTaskLimits() {
int numScheduledMaps = scheduledRequests.maps.size();
if (maxRunningMaps > 0 && numScheduledMaps > 0) {
int maxRequestedMaps = Math.max(0,
maxRunningMaps - assignedRequests.maps.size());
int numScheduledFailMaps = scheduledRequests.earlierFailedMaps.size();
int failedMapRequestLimit = Math.min(maxRequestedMaps,
numScheduledFailMaps);
int normalMapRequestLimit = Math.min(
maxRequestedMaps - failedMapRequestLimit,
numScheduledMaps - numScheduledFailMaps);
setRequestLimit(PRIORITY_FAST_FAIL_MAP, mapResourceRequest,
failedMapRequestLimit);
setRequestLimit(PRIORITY_MAP, mapResourceRequest, normalMapRequestLimit);
}
int numScheduledReduces = scheduledRequests.reduces.size();
if (maxRunningReduces > 0 && numScheduledReduces > 0) {
int maxRequestedReduces = Math.max(0,
maxRunningReduces - assignedRequests.reduces.size());
int reduceRequestLimit = Math.min(maxRequestedReduces,
numScheduledReduces);
setRequestLimit(PRIORITY_REDUCE, reduceResourceRequest,
reduceRequestLimit);
}
}
private boolean canAssignMaps() {
return (maxRunningMaps <= 0
|| assignedRequests.maps.size() < maxRunningMaps);
}
private boolean canAssignReduces() {
return (maxRunningReduces <= 0
|| assignedRequests.reduces.size() < maxRunningReduces);
}
private void updateAMRMToken(Token token) throws IOException { private void updateAMRMToken(Token token) throws IOException {
org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> amrmToken = org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> amrmToken =
new org.apache.hadoop.security.token.Token<AMRMTokenIdentifier>(token new org.apache.hadoop.security.token.Token<AMRMTokenIdentifier>(token
@ -1046,8 +1091,7 @@ reduceResourceRequest, getSchedulerResourceTypes()) <= 0
it = allocatedContainers.iterator(); it = allocatedContainers.iterator();
while (it.hasNext()) { while (it.hasNext()) {
Container allocated = it.next(); Container allocated = it.next();
LOG.info("Releasing unassigned and invalid container " LOG.info("Releasing unassigned container " + allocated);
+ allocated + ". RM may have assignment issues");
containerNotAssigned(allocated); containerNotAssigned(allocated);
} }
} }
@ -1150,7 +1194,8 @@ else if (PRIORITY_REDUCE.equals(priority)) {
private ContainerRequest assignToFailedMap(Container allocated) { private ContainerRequest assignToFailedMap(Container allocated) {
//try to assign to earlierFailedMaps if present //try to assign to earlierFailedMaps if present
ContainerRequest assigned = null; ContainerRequest assigned = null;
while (assigned == null && earlierFailedMaps.size() > 0) { while (assigned == null && earlierFailedMaps.size() > 0
&& canAssignMaps()) {
TaskAttemptId tId = earlierFailedMaps.removeFirst(); TaskAttemptId tId = earlierFailedMaps.removeFirst();
if (maps.containsKey(tId)) { if (maps.containsKey(tId)) {
assigned = maps.remove(tId); assigned = maps.remove(tId);
@ -1168,7 +1213,7 @@ private ContainerRequest assignToFailedMap(Container allocated) {
private ContainerRequest assignToReduce(Container allocated) { private ContainerRequest assignToReduce(Container allocated) {
ContainerRequest assigned = null; ContainerRequest assigned = null;
//try to assign to reduces if present //try to assign to reduces if present
if (assigned == null && reduces.size() > 0) { if (assigned == null && reduces.size() > 0 && canAssignReduces()) {
TaskAttemptId tId = reduces.keySet().iterator().next(); TaskAttemptId tId = reduces.keySet().iterator().next();
assigned = reduces.remove(tId); assigned = reduces.remove(tId);
LOG.info("Assigned to reduce"); LOG.info("Assigned to reduce");
@ -1180,7 +1225,7 @@ private ContainerRequest assignToReduce(Container allocated) {
private void assignMapsWithLocality(List<Container> allocatedContainers) { private void assignMapsWithLocality(List<Container> allocatedContainers) {
// try to assign to all nodes first to match node local // try to assign to all nodes first to match node local
Iterator<Container> it = allocatedContainers.iterator(); Iterator<Container> it = allocatedContainers.iterator();
while(it.hasNext() && maps.size() > 0){ while(it.hasNext() && maps.size() > 0 && canAssignMaps()){
Container allocated = it.next(); Container allocated = it.next();
Priority priority = allocated.getPriority(); Priority priority = allocated.getPriority();
assert PRIORITY_MAP.equals(priority); assert PRIORITY_MAP.equals(priority);
@ -1212,7 +1257,7 @@ private void assignMapsWithLocality(List<Container> allocatedContainers) {
// try to match all rack local // try to match all rack local
it = allocatedContainers.iterator(); it = allocatedContainers.iterator();
while(it.hasNext() && maps.size() > 0){ while(it.hasNext() && maps.size() > 0 && canAssignMaps()){
Container allocated = it.next(); Container allocated = it.next();
Priority priority = allocated.getPriority(); Priority priority = allocated.getPriority();
assert PRIORITY_MAP.equals(priority); assert PRIORITY_MAP.equals(priority);
@ -1242,7 +1287,7 @@ private void assignMapsWithLocality(List<Container> allocatedContainers) {
// assign remaining // assign remaining
it = allocatedContainers.iterator(); it = allocatedContainers.iterator();
while(it.hasNext() && maps.size() > 0){ while(it.hasNext() && maps.size() > 0 && canAssignMaps()){
Container allocated = it.next(); Container allocated = it.next();
Priority priority = allocated.getPriority(); Priority priority = allocated.getPriority();
assert PRIORITY_MAP.equals(priority); assert PRIORITY_MAP.equals(priority);

View File

@ -22,6 +22,7 @@
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.TreeMap; import java.util.TreeMap;
@ -44,6 +45,7 @@
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest.ResourceRequestComparator;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
@ -58,6 +60,8 @@
public abstract class RMContainerRequestor extends RMCommunicator { public abstract class RMContainerRequestor extends RMCommunicator {
private static final Log LOG = LogFactory.getLog(RMContainerRequestor.class); private static final Log LOG = LogFactory.getLog(RMContainerRequestor.class);
private static final ResourceRequestComparator RESOURCE_REQUEST_COMPARATOR =
new ResourceRequestComparator();
protected int lastResponseID; protected int lastResponseID;
private Resource availableResources; private Resource availableResources;
@ -77,12 +81,18 @@ public abstract class RMContainerRequestor extends RMCommunicator {
// use custom comparator to make sure ResourceRequest objects differing only in // use custom comparator to make sure ResourceRequest objects differing only in
// numContainers dont end up as duplicates // numContainers dont end up as duplicates
private final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>( private final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>(
new org.apache.hadoop.yarn.api.records.ResourceRequest.ResourceRequestComparator()); RESOURCE_REQUEST_COMPARATOR);
private final Set<ContainerId> release = new TreeSet<ContainerId>(); private final Set<ContainerId> release = new TreeSet<ContainerId>();
// pendingRelease holds history or release requests.request is removed only if // pendingRelease holds history or release requests.request is removed only if
// RM sends completedContainer. // RM sends completedContainer.
// How it different from release? --> release is for per allocate() request. // How it different from release? --> release is for per allocate() request.
protected Set<ContainerId> pendingRelease = new TreeSet<ContainerId>(); protected Set<ContainerId> pendingRelease = new TreeSet<ContainerId>();
private final Map<ResourceRequest,ResourceRequest> requestLimits =
new TreeMap<ResourceRequest,ResourceRequest>(RESOURCE_REQUEST_COMPARATOR);
private final Set<ResourceRequest> requestLimitsToUpdate =
new TreeSet<ResourceRequest>(RESOURCE_REQUEST_COMPARATOR);
private boolean nodeBlacklistingEnabled; private boolean nodeBlacklistingEnabled;
private int blacklistDisablePercent; private int blacklistDisablePercent;
private AtomicBoolean ignoreBlacklisting = new AtomicBoolean(false); private AtomicBoolean ignoreBlacklisting = new AtomicBoolean(false);
@ -178,6 +188,7 @@ protected void serviceInit(Configuration conf) throws Exception {
protected AllocateResponse makeRemoteRequest() throws YarnException, protected AllocateResponse makeRemoteRequest() throws YarnException,
IOException { IOException {
applyRequestLimits();
ResourceBlacklistRequest blacklistRequest = ResourceBlacklistRequest blacklistRequest =
ResourceBlacklistRequest.newInstance(new ArrayList<String>(blacklistAdditions), ResourceBlacklistRequest.newInstance(new ArrayList<String>(blacklistAdditions),
new ArrayList<String>(blacklistRemovals)); new ArrayList<String>(blacklistRemovals));
@ -190,13 +201,14 @@ protected AllocateResponse makeRemoteRequest() throws YarnException,
availableResources = allocateResponse.getAvailableResources(); availableResources = allocateResponse.getAvailableResources();
lastClusterNmCount = clusterNmCount; lastClusterNmCount = clusterNmCount;
clusterNmCount = allocateResponse.getNumClusterNodes(); clusterNmCount = allocateResponse.getNumClusterNodes();
int numCompletedContainers =
allocateResponse.getCompletedContainersStatuses().size();
if (ask.size() > 0 || release.size() > 0) { if (ask.size() > 0 || release.size() > 0) {
LOG.info("getResources() for " + applicationId + ":" + " ask=" LOG.info("getResources() for " + applicationId + ":" + " ask="
+ ask.size() + " release= " + release.size() + " newContainers=" + ask.size() + " release= " + release.size() + " newContainers="
+ allocateResponse.getAllocatedContainers().size() + allocateResponse.getAllocatedContainers().size()
+ " finishedContainers=" + " finishedContainers=" + numCompletedContainers
+ allocateResponse.getCompletedContainersStatuses().size()
+ " resourcelimit=" + availableResources + " knownNMs=" + " resourcelimit=" + availableResources + " knownNMs="
+ clusterNmCount); + clusterNmCount);
} }
@ -204,6 +216,12 @@ protected AllocateResponse makeRemoteRequest() throws YarnException,
ask.clear(); ask.clear();
release.clear(); release.clear();
if (numCompletedContainers > 0) {
// re-send limited requests when a container completes to trigger asking
// for more containers
requestLimitsToUpdate.addAll(requestLimits.keySet());
}
if (blacklistAdditions.size() > 0 || blacklistRemovals.size() > 0) { if (blacklistAdditions.size() > 0 || blacklistRemovals.size() > 0) {
LOG.info("Update the blacklist for " + applicationId + LOG.info("Update the blacklist for " + applicationId +
": blacklistAdditions=" + blacklistAdditions.size() + ": blacklistAdditions=" + blacklistAdditions.size() +
@ -214,6 +232,36 @@ protected AllocateResponse makeRemoteRequest() throws YarnException,
return allocateResponse; return allocateResponse;
} }
private void applyRequestLimits() {
Iterator<ResourceRequest> iter = requestLimits.values().iterator();
while (iter.hasNext()) {
ResourceRequest reqLimit = iter.next();
int limit = reqLimit.getNumContainers();
Map<String, Map<Resource, ResourceRequest>> remoteRequests =
remoteRequestsTable.get(reqLimit.getPriority());
Map<Resource, ResourceRequest> reqMap = (remoteRequests != null)
? remoteRequests.get(ResourceRequest.ANY) : null;
ResourceRequest req = (reqMap != null)
? reqMap.get(reqLimit.getCapability()) : null;
if (req == null) {
continue;
}
// update an existing ask or send a new one if updating
if (ask.remove(req) || requestLimitsToUpdate.contains(req)) {
ResourceRequest newReq = req.getNumContainers() > limit
? reqLimit : req;
ask.add(newReq);
LOG.info("Applying ask limit of " + newReq.getNumContainers()
+ " for priority:" + reqLimit.getPriority()
+ " and capability:" + reqLimit.getCapability());
}
if (limit == Integer.MAX_VALUE) {
iter.remove();
}
}
requestLimitsToUpdate.clear();
}
protected void addOutstandingRequestOnResync() { protected void addOutstandingRequestOnResync() {
for (Map<String, Map<Resource, ResourceRequest>> rr : remoteRequestsTable for (Map<String, Map<Resource, ResourceRequest>> rr : remoteRequestsTable
.values()) { .values()) {
@ -229,6 +277,7 @@ protected void addOutstandingRequestOnResync() {
if (!pendingRelease.isEmpty()) { if (!pendingRelease.isEmpty()) {
release.addAll(pendingRelease); release.addAll(pendingRelease);
} }
requestLimitsToUpdate.addAll(requestLimits.keySet());
} }
// May be incorrect if there's multiple NodeManagers running on a single host. // May be incorrect if there's multiple NodeManagers running on a single host.
@ -459,10 +508,8 @@ private void decResourceRequest(Priority priority, String resourceName,
private void addResourceRequestToAsk(ResourceRequest remoteRequest) { private void addResourceRequestToAsk(ResourceRequest remoteRequest) {
// because objects inside the resource map can be deleted ask can end up // because objects inside the resource map can be deleted ask can end up
// containing an object that matches new resource object but with different // containing an object that matches new resource object but with different
// numContainers. So exisintg values must be replaced explicitly // numContainers. So existing values must be replaced explicitly
if(ask.contains(remoteRequest)) { ask.remove(remoteRequest);
ask.remove(remoteRequest);
}
ask.add(remoteRequest); ask.add(remoteRequest);
} }
@ -490,6 +537,19 @@ protected ContainerRequest getFilteredContainerRequest(ContainerRequest orig) {
return newReq; return newReq;
} }
protected void setRequestLimit(Priority priority, Resource capability,
int limit) {
if (limit < 0) {
limit = Integer.MAX_VALUE;
}
ResourceRequest newReqLimit = ResourceRequest.newInstance(priority,
ResourceRequest.ANY, capability, limit);
ResourceRequest oldReqLimit = requestLimits.put(newReqLimit, newReqLimit);
if (oldReqLimit == null || oldReqLimit.getNumContainers() < limit) {
requestLimitsToUpdate.add(newReqLimit);
}
}
public Set<String> getBlacklistedNodes() { public Set<String> getBlacklistedNodes() {
return blacklistedNodes; return blacklistedNodes;
} }

View File

@ -31,9 +31,11 @@
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
@ -81,7 +83,13 @@
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
@ -89,6 +97,10 @@
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -2387,6 +2399,208 @@ public Token<AMRMTokenIdentifier> run() throws Exception {
new Text(rmAddr), ugiToken.getService()); new Text(rmAddr), ugiToken.getService());
} }
@Test
public void testConcurrentTaskLimits() throws Exception {
final int MAP_LIMIT = 3;
final int REDUCE_LIMIT = 1;
LOG.info("Running testConcurrentTaskLimits");
Configuration conf = new Configuration();
conf.setInt(MRJobConfig.JOB_RUNNING_MAP_LIMIT, MAP_LIMIT);
conf.setInt(MRJobConfig.JOB_RUNNING_REDUCE_LIMIT, REDUCE_LIMIT);
conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 1.0f);
ApplicationId appId = ApplicationId.newInstance(1, 1);
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
appId, 1);
JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
Job mockJob = mock(Job.class);
when(mockJob.getReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
final MockScheduler mockScheduler = new MockScheduler(appAttemptId);
MyContainerAllocator allocator = new MyContainerAllocator(null, conf,
appAttemptId, mockJob) {
@Override
protected void register() {
}
@Override
protected ApplicationMasterProtocol createSchedulerProxy() {
return mockScheduler;
}
};
// create some map requests
ContainerRequestEvent[] reqMapEvents = new ContainerRequestEvent[5];
for (int i = 0; i < reqMapEvents.length; ++i) {
reqMapEvents[i] = createReq(jobId, i, 1024, new String[] { "h" + i });
}
allocator.sendRequests(Arrays.asList(reqMapEvents));
// create some reduce requests
ContainerRequestEvent[] reqReduceEvents = new ContainerRequestEvent[2];
for (int i = 0; i < reqReduceEvents.length; ++i) {
reqReduceEvents[i] = createReq(jobId, i, 1024, new String[] {},
false, true);
}
allocator.sendRequests(Arrays.asList(reqReduceEvents));
allocator.schedule();
// verify all of the host-specific asks were sent plus one for the
// default rack and one for the ANY request
Assert.assertEquals(reqMapEvents.length + 2, mockScheduler.lastAsk.size());
// verify AM is only asking for the map limit overall
Assert.assertEquals(MAP_LIMIT, mockScheduler.lastAnyAskMap);
// assign a map task and verify we do not ask for any more maps
ContainerId cid0 = mockScheduler.assignContainer("h0", false);
allocator.schedule();
allocator.schedule();
Assert.assertEquals(2, mockScheduler.lastAnyAskMap);
// complete the map task and verify that we ask for one more
mockScheduler.completeContainer(cid0);
allocator.schedule();
allocator.schedule();
Assert.assertEquals(3, mockScheduler.lastAnyAskMap);
// assign three more maps and verify we ask for no more maps
ContainerId cid1 = mockScheduler.assignContainer("h1", false);
ContainerId cid2 = mockScheduler.assignContainer("h2", false);
ContainerId cid3 = mockScheduler.assignContainer("h3", false);
allocator.schedule();
allocator.schedule();
Assert.assertEquals(0, mockScheduler.lastAnyAskMap);
// complete two containers and verify we only asked for one more
// since at that point all maps should be scheduled/completed
mockScheduler.completeContainer(cid2);
mockScheduler.completeContainer(cid3);
allocator.schedule();
allocator.schedule();
Assert.assertEquals(1, mockScheduler.lastAnyAskMap);
// allocate the last container and complete the first one
// and verify there are no more map asks.
mockScheduler.completeContainer(cid1);
ContainerId cid4 = mockScheduler.assignContainer("h4", false);
allocator.schedule();
allocator.schedule();
Assert.assertEquals(0, mockScheduler.lastAnyAskMap);
// complete the last map
mockScheduler.completeContainer(cid4);
allocator.schedule();
allocator.schedule();
Assert.assertEquals(0, mockScheduler.lastAnyAskMap);
// verify only reduce limit being requested
Assert.assertEquals(REDUCE_LIMIT, mockScheduler.lastAnyAskReduce);
// assign a reducer and verify ask goes to zero
cid0 = mockScheduler.assignContainer("h0", true);
allocator.schedule();
allocator.schedule();
Assert.assertEquals(0, mockScheduler.lastAnyAskReduce);
// complete the reducer and verify we ask for another
mockScheduler.completeContainer(cid0);
allocator.schedule();
allocator.schedule();
Assert.assertEquals(1, mockScheduler.lastAnyAskReduce);
// assign a reducer and verify ask goes to zero
cid0 = mockScheduler.assignContainer("h0", true);
allocator.schedule();
allocator.schedule();
Assert.assertEquals(0, mockScheduler.lastAnyAskReduce);
// complete the reducer and verify no more reducers
mockScheduler.completeContainer(cid0);
allocator.schedule();
allocator.schedule();
Assert.assertEquals(0, mockScheduler.lastAnyAskReduce);
allocator.close();
}
private static class MockScheduler implements ApplicationMasterProtocol {
ApplicationAttemptId attemptId;
long nextContainerId = 10;
List<ResourceRequest> lastAsk = null;
int lastAnyAskMap = 0;
int lastAnyAskReduce = 0;
List<ContainerStatus> containersToComplete =
new ArrayList<ContainerStatus>();
List<Container> containersToAllocate = new ArrayList<Container>();
public MockScheduler(ApplicationAttemptId attemptId) {
this.attemptId = attemptId;
}
@Override
public RegisterApplicationMasterResponse registerApplicationMaster(
RegisterApplicationMasterRequest request) throws YarnException,
IOException {
return RegisterApplicationMasterResponse.newInstance(
Resource.newInstance(512, 1),
Resource.newInstance(512000, 1024),
Collections.<ApplicationAccessType,String>emptyMap(),
ByteBuffer.wrap("fake_key".getBytes()),
Collections.<Container>emptyList(),
"default",
Collections.<NMToken>emptyList());
}
@Override
public FinishApplicationMasterResponse finishApplicationMaster(
FinishApplicationMasterRequest request) throws YarnException,
IOException {
return FinishApplicationMasterResponse.newInstance(false);
}
@Override
public AllocateResponse allocate(AllocateRequest request)
throws YarnException, IOException {
lastAsk = request.getAskList();
for (ResourceRequest req : lastAsk) {
if (ResourceRequest.ANY.equals(req.getResourceName())) {
Priority priority = req.getPriority();
if (priority.equals(RMContainerAllocator.PRIORITY_MAP)) {
lastAnyAskMap = req.getNumContainers();
} else if (priority.equals(RMContainerAllocator.PRIORITY_REDUCE)){
lastAnyAskReduce = req.getNumContainers();
}
}
}
AllocateResponse response = AllocateResponse.newInstance(
request.getResponseId(),
containersToComplete, containersToAllocate,
Collections.<NodeReport>emptyList(),
Resource.newInstance(512000, 1024), null, 10, null,
Collections.<NMToken>emptyList());
containersToComplete.clear();
containersToAllocate.clear();
return response;
}
public ContainerId assignContainer(String nodeName, boolean isReduce) {
ContainerId containerId =
ContainerId.newContainerId(attemptId, nextContainerId++);
Priority priority = isReduce ? RMContainerAllocator.PRIORITY_REDUCE
: RMContainerAllocator.PRIORITY_MAP;
Container container = Container.newInstance(containerId,
NodeId.newInstance(nodeName, 1234), nodeName + ":5678",
Resource.newInstance(1024, 1), priority, null);
containersToAllocate.add(container);
return containerId;
}
public void completeContainer(ContainerId containerId) {
containersToComplete.add(ContainerStatus.newInstance(containerId,
ContainerState.COMPLETE, "", 0));
}
}
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
TestRMContainerAllocator t = new TestRMContainerAllocator(); TestRMContainerAllocator t = new TestRMContainerAllocator();
t.testSimple(); t.testSimple();

View File

@ -373,6 +373,14 @@ public interface MRJobConfig {
public static final String DEFAULT_JOB_ACL_MODIFY_JOB = " "; public static final String DEFAULT_JOB_ACL_MODIFY_JOB = " ";
public static final String JOB_RUNNING_MAP_LIMIT =
"mapreduce.job.running.map.limit";
public static final int DEFAULT_JOB_RUNNING_MAP_LIMIT = 0;
public static final String JOB_RUNNING_REDUCE_LIMIT =
"mapreduce.job.running.reduce.limit";
public static final int DEFAULT_JOB_RUNNING_REDUCE_LIMIT = 0;
/* config for tracking the local file where all the credentials for the job /* config for tracking the local file where all the credentials for the job
* credentials. * credentials.
*/ */

View File

@ -82,6 +82,22 @@
</description> </description>
</property> </property>
<property>
<name>mapreduce.job.running.map.limit</name>
<value>0</value>
<description>The maximum number of simultaneous map tasks per job.
There is no limit if this value is 0 or negative.
</description>
</property>
<property>
<name>mapreduce.job.running.reduce.limit</name>
<value>0</value>
<description>The maximum number of simultaneous reduce tasks per job.
There is no limit if this value is 0 or negative.
</description>
</property>
<property> <property>
<name>mapreduce.job.reducer.preempt.delay.sec</name> <name>mapreduce.job.reducer.preempt.delay.sec</name>
<value>0</value> <value>0</value>