YARN-7670. Modifications to the ResourceScheduler API to support SchedulingRequests. (asuresh)

This commit is contained in:
Arun Suresh 2017-12-19 08:59:23 -08:00
parent 801c0988b5
commit 88d8d3f40b
6 changed files with 138 additions and 23 deletions

View File

@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.api.records.UpdateContainerError; import org.apache.hadoop.yarn.api.records.UpdateContainerError;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -295,6 +296,10 @@ public abstract class AbstractYarnScheduler
return nodeTracker.getNodes(nodeFilter); return nodeTracker.getNodes(nodeFilter);
} }
public List<N> getNodes(final NodeFilter filter) {
return nodeTracker.getNodes(filter);
}
public boolean shouldContainersBeAutoUpdated() { public boolean shouldContainersBeAutoUpdated() {
return this.autoUpdateContainers; return this.autoUpdateContainers;
} }
@ -1443,4 +1448,17 @@ public abstract class AbstractYarnScheduler
throw new IOException(e); throw new IOException(e);
} }
} }
/**
* Default implementation. Always returns false.
* @param appAttempt ApplicationAttempt.
* @param schedulingRequest SchedulingRequest.
* @param schedulerNode SchedulerNode.
* @return Success or not.
*/
@Override
public boolean attemptAllocationOnNode(SchedulerApplicationAttempt appAttempt,
SchedulingRequest schedulingRequest, SchedulerNode schedulerNode) {
return false;
}
} }

View File

@ -25,6 +25,7 @@ import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
@ -58,4 +59,16 @@ public interface ResourceScheduler extends YarnScheduler, Recoverable {
* @return the number of available {@link NodeId} by resource name. * @return the number of available {@link NodeId} by resource name.
*/ */
List<NodeId> getNodeIds(String resourceName); List<NodeId> getNodeIds(String resourceName);
/**
* Attempts to allocate a SchedulerRequest on a Node.
* NOTE: This ignores the numAllocations in the resource sizing and tries
* to allocate a SINGLE container only.
* @param appAttempt ApplicationAttempt.
* @param schedulingRequest SchedulingRequest.
* @param schedulerNode SchedulerNode.
* @return true if proposal was accepted.
*/
boolean attemptAllocationOnNode(SchedulerApplicationAttempt appAttempt,
SchedulingRequest schedulingRequest, SchedulerNode schedulerNode);
} }

View File

@ -50,6 +50,7 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueACL;
@ -59,6 +60,7 @@ import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@ -82,6 +84,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptE
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
@ -99,7 +102,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
@ -141,6 +146,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.Candida
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimpleCandidateNodeSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimpleCandidateNodeSet;
import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager; import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.server.utils.Lock; import org.apache.hadoop.yarn.server.utils.Lock;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@ -596,7 +603,7 @@ public class CapacityScheduler extends
try { try {
cs.writeLock.lock(); cs.writeLock.lock();
cs.tryCommit(cs.getClusterResource(), request); cs.tryCommit(cs.getClusterResource(), request, true);
} finally { } finally {
cs.writeLock.unlock(); cs.writeLock.unlock();
} }
@ -2551,10 +2558,67 @@ public class CapacityScheduler extends
resourceCommitterService.addNewCommitRequest(request); resourceCommitterService.addNewCommitRequest(request);
} else{ } else{
// Otherwise do it sync-ly. // Otherwise do it sync-ly.
tryCommit(cluster, request); tryCommit(cluster, request, true);
} }
} }
@Override
public boolean attemptAllocationOnNode(SchedulerApplicationAttempt appAttempt,
SchedulingRequest schedulingRequest, SchedulerNode schedulerNode) {
if (schedulingRequest.getResourceSizing() != null) {
if (schedulingRequest.getResourceSizing().getNumAllocations() > 1) {
LOG.warn("The SchedulingRequest has requested more than 1 allocation," +
" but only 1 will be attempted !!");
}
if (!appAttempt.isStopped()) {
ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode>
resourceCommitRequest = createResourceCommitRequest(
appAttempt, schedulingRequest, schedulerNode);
return tryCommit(getClusterResource(), resourceCommitRequest, false);
}
}
return false;
}
// This assumes numContainers = 1 for the request.
private ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode>
createResourceCommitRequest(SchedulerApplicationAttempt appAttempt,
SchedulingRequest schedulingRequest, SchedulerNode schedulerNode) {
ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode> allocated =
null;
Resource resource = schedulingRequest.getResourceSizing().getResources();
if (Resources.greaterThan(calculator, getClusterResource(),
resource, Resources.none())) {
ContainerId cId =
ContainerId.newContainerId(appAttempt.getApplicationAttemptId(),
appAttempt.getAppSchedulingInfo().getNewContainerId());
Container container = BuilderUtils.newContainer(
cId, schedulerNode.getNodeID(), schedulerNode.getHttpAddress(),
resource, schedulingRequest.getPriority(), null,
ExecutionType.GUARANTEED,
schedulingRequest.getAllocationRequestId());
RMContainer rmContainer = new RMContainerImpl(container,
SchedulerRequestKey.extractFrom(container),
appAttempt.getApplicationAttemptId(), container.getNodeId(),
appAttempt.getUser(), rmContext, false);
allocated = new ContainerAllocationProposal<>(
getSchedulerContainer(rmContainer, true),
null, null, NodeType.NODE_LOCAL, NodeType.NODE_LOCAL,
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY,
resource);
}
if (null != allocated) {
List<ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode>>
allocationsList = new ArrayList<>();
allocationsList.add(allocated);
return new ResourceCommitRequest<>(allocationsList, null, null);
}
return null;
}
@VisibleForTesting @VisibleForTesting
public ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> public ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode>
createResourceCommitRequest(CSAssignment csAssignment) { createResourceCommitRequest(CSAssignment csAssignment) {
@ -2632,7 +2696,8 @@ public class CapacityScheduler extends
} }
@Override @Override
public void tryCommit(Resource cluster, ResourceCommitRequest r) { public boolean tryCommit(Resource cluster, ResourceCommitRequest r,
boolean updatePending) {
ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request = ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request =
(ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode>) r; (ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode>) r;
@ -2662,15 +2727,17 @@ public class CapacityScheduler extends
LOG.debug("Try to commit allocation proposal=" + request); LOG.debug("Try to commit allocation proposal=" + request);
} }
boolean isSuccess = false;
if (attemptId != null) { if (attemptId != null) {
FiCaSchedulerApp app = getApplicationAttempt(attemptId); FiCaSchedulerApp app = getApplicationAttempt(attemptId);
// Required sanity check for attemptId - when async-scheduling enabled, // Required sanity check for attemptId - when async-scheduling enabled,
// proposal might be outdated if AM failover just finished // proposal might be outdated if AM failover just finished
// and proposal queue was not be consumed in time // and proposal queue was not be consumed in time
if (app != null && attemptId.equals(app.getApplicationAttemptId())) { if (app != null && attemptId.equals(app.getApplicationAttemptId())) {
if (app.accept(cluster, request)) { if (app.accept(cluster, request, updatePending)) {
app.apply(cluster, request); app.apply(cluster, request, updatePending);
LOG.info("Allocation proposal accepted"); LOG.info("Allocation proposal accepted");
isSuccess = true;
} else{ } else{
LOG.info("Failed to accept allocation proposal"); LOG.info("Failed to accept allocation proposal");
} }
@ -2681,6 +2748,7 @@ public class CapacityScheduler extends
} }
} }
} }
return isSuccess;
} }
public int getAsyncSchedulingPendingBacklogs() { public int getAsyncSchedulingPendingBacklogs() {

View File

@ -25,5 +25,15 @@ import org.apache.hadoop.yarn.api.records.Resource;
* plus global scheduling functionality * plus global scheduling functionality
*/ */
public interface ResourceAllocationCommitter { public interface ResourceAllocationCommitter {
void tryCommit(Resource cluster, ResourceCommitRequest proposal);
/**
* Try to commit the allocation Proposal. This also gives the option of
* not updating a pending queued request.
* @param cluster Cluster Resource.
* @param proposal Proposal.
* @param updatePending Decrement pending if successful.
* @return Is successful or not.
*/
boolean tryCommit(Resource cluster, ResourceCommitRequest proposal,
boolean updatePending);
} }

View File

@ -375,7 +375,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
} }
public boolean accept(Resource cluster, public boolean accept(Resource cluster,
ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request) { ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request,
boolean checkPending) {
ContainerRequest containerRequest = null; ContainerRequest containerRequest = null;
boolean reReservation = false; boolean reReservation = false;
@ -408,7 +409,9 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
schedulerContainer.getRmContainer().getContainerRequest(); schedulerContainer.getRmContainer().getContainerRequest();
// Check pending resource request // Check pending resource request
if (!appSchedulingInfo.checkAllocation(allocation.getAllocationLocalityType(), if (checkPending &&
!appSchedulingInfo.checkAllocation(
allocation.getAllocationLocalityType(),
schedulerContainer.getSchedulerNode(), schedulerContainer.getSchedulerNode(),
schedulerContainer.getSchedulerRequestKey())) { schedulerContainer.getSchedulerRequestKey())) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
@ -485,8 +488,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
return accepted; return accepted;
} }
public void apply(Resource cluster, public void apply(Resource cluster, ResourceCommitRequest<FiCaSchedulerApp,
ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request) { FiCaSchedulerNode> request, boolean updatePending) {
boolean reReservation = false; boolean reReservation = false;
try { try {
@ -531,12 +534,15 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
liveContainers.put(containerId, rmContainer); liveContainers.put(containerId, rmContainer);
// Deduct pending resource requests // Deduct pending resource requests
if (updatePending) {
ContainerRequest containerRequest = appSchedulingInfo.allocate( ContainerRequest containerRequest = appSchedulingInfo.allocate(
allocation.getAllocationLocalityType(), allocation.getAllocationLocalityType(),
schedulerContainer.getSchedulerNode(), schedulerContainer.getSchedulerNode(),
schedulerContainer.getSchedulerRequestKey(), schedulerContainer.getSchedulerRequestKey(),
schedulerContainer.getRmContainer().getContainer()); schedulerContainer.getRmContainer().getContainer());
((RMContainerImpl) rmContainer).setContainerRequest(containerRequest); ((RMContainerImpl) rmContainer).setContainerRequest(
containerRequest);
}
attemptResourceUsage.incUsed(schedulerContainer.getNodePartition(), attemptResourceUsage.incUsed(schedulerContainer.getNodePartition(),
allocation.getAllocatedOrReservedResource()); allocation.getAllocatedOrReservedResource());

View File

@ -264,7 +264,7 @@ public class TestCapacitySchedulerAsyncScheduling {
reservedProposals.add(reservedForAttempt1Proposal); reservedProposals.add(reservedForAttempt1Proposal);
ResourceCommitRequest request = ResourceCommitRequest request =
new ResourceCommitRequest(null, reservedProposals, null); new ResourceCommitRequest(null, reservedProposals, null);
scheduler.tryCommit(scheduler.getClusterResource(), request); scheduler.tryCommit(scheduler.getClusterResource(), request, true);
Assert.assertNull("Outdated proposal should not be accepted!", Assert.assertNull("Outdated proposal should not be accepted!",
sn2.getReservedContainer()); sn2.getReservedContainer());
@ -385,7 +385,7 @@ public class TestCapacitySchedulerAsyncScheduling {
// call real apply // call real apply
try { try {
cs.tryCommit((Resource) invocation.getArguments()[0], cs.tryCommit((Resource) invocation.getArguments()[0],
(ResourceCommitRequest) invocation.getArguments()[1]); (ResourceCommitRequest) invocation.getArguments()[1], true);
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
Assert.fail(); Assert.fail();
@ -393,12 +393,12 @@ public class TestCapacitySchedulerAsyncScheduling {
isChecked.set(true); isChecked.set(true);
} else { } else {
cs.tryCommit((Resource) invocation.getArguments()[0], cs.tryCommit((Resource) invocation.getArguments()[0],
(ResourceCommitRequest) invocation.getArguments()[1]); (ResourceCommitRequest) invocation.getArguments()[1], true);
} }
return null; return null;
} }
}).when(spyCs).tryCommit(Mockito.any(Resource.class), }).when(spyCs).tryCommit(Mockito.any(Resource.class),
Mockito.any(ResourceCommitRequest.class)); Mockito.any(ResourceCommitRequest.class), Mockito.anyBoolean());
spyCs.handle(new NodeUpdateSchedulerEvent(sn1.getRMNode())); spyCs.handle(new NodeUpdateSchedulerEvent(sn1.getRMNode()));
@ -473,7 +473,7 @@ public class TestCapacitySchedulerAsyncScheduling {
newProposals.add(newContainerProposal); newProposals.add(newContainerProposal);
ResourceCommitRequest request = ResourceCommitRequest request =
new ResourceCommitRequest(newProposals, null, null); new ResourceCommitRequest(newProposals, null, null);
scheduler.tryCommit(scheduler.getClusterResource(), request); scheduler.tryCommit(scheduler.getClusterResource(), request, true);
} }
// make sure node resource can't be over-allocated! // make sure node resource can't be over-allocated!
Assert.assertTrue("Node resource is Over-allocated!", Assert.assertTrue("Node resource is Over-allocated!",