YARN-3141. Improve locks in SchedulerApplicationAttempt/FSAppAttempt/FiCaSchedulerApp. Contributed by Wangda Tan

This commit is contained in:
Jian He 2016-09-19 16:58:39 +08:00
parent ea29e3bc27
commit b8a30f2f17
4 changed files with 953 additions and 738 deletions

View File

@ -251,7 +251,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
return result; return result;
} }
public synchronized float getLocalityWaitFactor( public float getLocalityWaitFactor(
SchedulerRequestKey schedulerKey, int clusterNodes) { SchedulerRequestKey schedulerKey, int clusterNodes) {
// Estimate: Required unique resources (i.e. hosts + racks) // Estimate: Required unique resources (i.e. hosts + racks)
int requiredResources = int requiredResources =

View File

@ -63,7 +63,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAMCont
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityHeadroomProvider; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityHeadroomProvider;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
@ -99,7 +98,6 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
* to hold the message if its app doesn't not get container from a node * to hold the message if its app doesn't not get container from a node
*/ */
private String appSkipNodeDiagnostics; private String appSkipNodeDiagnostics;
private CapacitySchedulerContext capacitySchedulerContext;
public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId,
String user, Queue queue, ActiveUsersManager activeUsersManager, String user, Queue queue, ActiveUsersManager activeUsersManager,
@ -153,118 +151,128 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
containerAllocator = new ContainerAllocator(this, rc, rmContext, containerAllocator = new ContainerAllocator(this, rc, rmContext,
activitiesManager); activitiesManager);
if (scheduler instanceof CapacityScheduler) {
capacitySchedulerContext = (CapacitySchedulerContext) scheduler;
}
} }
public synchronized boolean containerCompleted(RMContainer rmContainer, public boolean containerCompleted(RMContainer rmContainer,
ContainerStatus containerStatus, RMContainerEventType event, ContainerStatus containerStatus, RMContainerEventType event,
String partition) { String partition) {
ContainerId containerId = rmContainer.getContainerId(); try {
writeLock.lock();
ContainerId containerId = rmContainer.getContainerId();
// Remove from the list of containers // Remove from the list of containers
if (null == liveContainers.remove(containerId)) { if (null == liveContainers.remove(containerId)) {
return false; return false;
}
// Remove from the list of newly allocated containers if found
newlyAllocatedContainers.remove(rmContainer);
// Inform the container
rmContainer.handle(
new RMContainerFinishedEvent(containerId, containerStatus, event));
containersToPreempt.remove(containerId);
Resource containerResource = rmContainer.getContainer().getResource();
RMAuditLogger.logSuccess(getUser(), AuditConstants.RELEASE_CONTAINER,
"SchedulerApp", getApplicationId(), containerId, containerResource);
// Update usage metrics
queue.getMetrics().releaseResources(getUser(), 1, containerResource);
attemptResourceUsage.decUsed(partition, containerResource);
// Clear resource utilization metrics cache.
lastMemoryAggregateAllocationUpdateTime = -1;
return true;
} finally {
writeLock.unlock();
} }
// Remove from the list of newly allocated containers if found
newlyAllocatedContainers.remove(rmContainer);
// Inform the container
rmContainer.handle(
new RMContainerFinishedEvent(containerId, containerStatus, event));
containersToPreempt.remove(containerId);
Resource containerResource = rmContainer.getContainer().getResource();
RMAuditLogger.logSuccess(getUser(),
AuditConstants.RELEASE_CONTAINER, "SchedulerApp",
getApplicationId(), containerId, containerResource);
// Update usage metrics
queue.getMetrics().releaseResources(getUser(), 1, containerResource);
attemptResourceUsage.decUsed(partition, containerResource);
// Clear resource utilization metrics cache.
lastMemoryAggregateAllocationUpdateTime = -1;
return true;
} }
public synchronized RMContainer allocate(NodeType type, FiCaSchedulerNode node, public RMContainer allocate(NodeType type, FiCaSchedulerNode node,
SchedulerRequestKey schedulerKey, ResourceRequest request, SchedulerRequestKey schedulerKey, ResourceRequest request,
Container container) { Container container) {
try {
writeLock.lock();
if (isStopped) { if (isStopped) {
return null; return null;
}
// Required sanity check - AM can call 'allocate' to update resource
// request without locking the scheduler, hence we need to check
if (getTotalRequiredResources(schedulerKey) <= 0) {
return null;
}
// Create RMContainer
RMContainer rmContainer = new RMContainerImpl(container,
this.getApplicationAttemptId(), node.getNodeID(),
appSchedulingInfo.getUser(), this.rmContext,
request.getNodeLabelExpression());
((RMContainerImpl) rmContainer).setQueueName(this.getQueueName());
updateAMContainerDiagnostics(AMState.ASSIGNED, null);
// Add it to allContainers list.
newlyAllocatedContainers.add(rmContainer);
ContainerId containerId = container.getId();
liveContainers.put(containerId, rmContainer);
// Update consumption and track allocations
List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
type, node, schedulerKey, request, container);
attemptResourceUsage.incUsed(node.getPartition(),
container.getResource());
// Update resource requests related to "request" and store in RMContainer
((RMContainerImpl) rmContainer).setResourceRequests(resourceRequestList);
// Inform the container
rmContainer.handle(
new RMContainerEvent(containerId, RMContainerEventType.START));
if (LOG.isDebugEnabled()) {
LOG.debug("allocate: applicationAttemptId=" + containerId
.getApplicationAttemptId() + " container=" + containerId + " host="
+ container.getNodeId().getHost() + " type=" + type);
}
RMAuditLogger.logSuccess(getUser(), AuditConstants.ALLOC_CONTAINER,
"SchedulerApp", getApplicationId(), containerId,
container.getResource());
return rmContainer;
} finally {
writeLock.unlock();
} }
// Required sanity check - AM can call 'allocate' to update resource
// request without locking the scheduler, hence we need to check
if (getTotalRequiredResources(schedulerKey) <= 0) {
return null;
}
// Create RMContainer
RMContainer rmContainer =
new RMContainerImpl(container, this.getApplicationAttemptId(),
node.getNodeID(), appSchedulingInfo.getUser(), this.rmContext,
request.getNodeLabelExpression());
((RMContainerImpl)rmContainer).setQueueName(this.getQueueName());
updateAMContainerDiagnostics(AMState.ASSIGNED, null);
// Add it to allContainers list.
newlyAllocatedContainers.add(rmContainer);
ContainerId containerId = container.getId();
liveContainers.put(containerId, rmContainer);
// Update consumption and track allocations
List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
type, node, schedulerKey, request, container);
attemptResourceUsage.incUsed(node.getPartition(), container.getResource());
// Update resource requests related to "request" and store in RMContainer
((RMContainerImpl)rmContainer).setResourceRequests(resourceRequestList);
// Inform the container
rmContainer.handle(
new RMContainerEvent(containerId, RMContainerEventType.START));
if (LOG.isDebugEnabled()) {
LOG.debug("allocate: applicationAttemptId="
+ containerId.getApplicationAttemptId()
+ " container=" + containerId + " host="
+ container.getNodeId().getHost() + " type=" + type);
}
RMAuditLogger.logSuccess(getUser(),
AuditConstants.ALLOC_CONTAINER, "SchedulerApp",
getApplicationId(), containerId, container.getResource());
return rmContainer;
} }
public synchronized boolean unreserve(SchedulerRequestKey schedulerKey, public boolean unreserve(SchedulerRequestKey schedulerKey,
FiCaSchedulerNode node, RMContainer rmContainer) { FiCaSchedulerNode node, RMContainer rmContainer) {
// Cancel increase request (if it has reserved increase request try {
rmContainer.cancelIncreaseReservation(); writeLock.lock();
// Cancel increase request (if it has reserved increase request
// Done with the reservation? rmContainer.cancelIncreaseReservation();
if (internalUnreserve(node, schedulerKey)) {
node.unreserveResource(this);
// Update reserved metrics // Done with the reservation?
queue.getMetrics().unreserveResource(getUser(), if (internalUnreserve(node, schedulerKey)) {
rmContainer.getReservedResource()); node.unreserveResource(this);
queue.decReservedResource(node.getPartition(),
rmContainer.getReservedResource()); // Update reserved metrics
return true; queue.getMetrics().unreserveResource(getUser(),
rmContainer.getReservedResource());
queue.decReservedResource(node.getPartition(),
rmContainer.getReservedResource());
return true;
}
return false;
} finally {
writeLock.unlock();
} }
return false;
} }
private boolean internalUnreserve(FiCaSchedulerNode node, private boolean internalUnreserve(FiCaSchedulerNode node,
@ -303,33 +311,15 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
return false; return false;
} }
public synchronized float getLocalityWaitFactor( public void markContainerForPreemption(ContainerId cont) {
SchedulerRequestKey schedulerKey, int clusterNodes) { try {
// Estimate: Required unique resources (i.e. hosts + racks) writeLock.lock();
int requiredResources = // ignore already completed containers
Math.max(this.getResourceRequests(schedulerKey).size() - 1, 0); if (liveContainers.containsKey(cont)) {
containersToPreempt.add(cont);
// waitFactor can't be more than '1'
// i.e. no point skipping more than clustersize opportunities
return Math.min(((float)requiredResources / clusterNodes), 1.0f);
}
public synchronized Resource getTotalPendingRequests() {
Resource ret = Resource.newInstance(0, 0);
for (ResourceRequest rr : appSchedulingInfo.getAllResourceRequests()) {
// to avoid double counting we count only "ANY" resource requests
if (ResourceRequest.isAnyLocation(rr.getResourceName())){
Resources.addTo(ret,
Resources.multiply(rr.getCapability(), rr.getNumContainers()));
} }
} } finally {
return ret; writeLock.unlock();
}
public synchronized void markContainerForPreemption(ContainerId cont) {
// ignore already completed containers
if (liveContainers.containsKey(cont)) {
containersToPreempt.add(cont);
} }
} }
@ -343,94 +333,115 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
* @param minimumAllocation * @param minimumAllocation
* @return an allocation * @return an allocation
*/ */
public synchronized Allocation getAllocation(ResourceCalculator rc, public Allocation getAllocation(ResourceCalculator resourceCalculator,
Resource clusterResource, Resource minimumAllocation) { Resource clusterResource, Resource minimumAllocation) {
try {
Set<ContainerId> currentContPreemption = Collections.unmodifiableSet( writeLock.lock();
new HashSet<ContainerId>(containersToPreempt)); Set<ContainerId> currentContPreemption = Collections.unmodifiableSet(
containersToPreempt.clear(); new HashSet<ContainerId>(containersToPreempt));
Resource tot = Resource.newInstance(0, 0); containersToPreempt.clear();
for(ContainerId c : currentContPreemption){ Resource tot = Resource.newInstance(0, 0);
Resources.addTo(tot, for (ContainerId c : currentContPreemption) {
liveContainers.get(c).getContainer().getResource()); Resources.addTo(tot, liveContainers.get(c).getContainer()
.getResource());
}
int numCont = (int) Math.ceil(
Resources.divide(rc, clusterResource, tot, minimumAllocation));
ResourceRequest rr = ResourceRequest.newInstance(Priority.UNDEFINED,
ResourceRequest.ANY, minimumAllocation, numCont);
List<Container> newlyAllocatedContainers = pullNewlyAllocatedContainers();
List<Container> newlyIncreasedContainers = pullNewlyIncreasedContainers();
List<Container> newlyDecreasedContainers = pullNewlyDecreasedContainers();
List<NMToken> updatedNMTokens = pullUpdatedNMTokens();
Resource headroom = getHeadroom();
setApplicationHeadroomForMetrics(headroom);
return new Allocation(newlyAllocatedContainers, headroom, null,
currentContPreemption, Collections.singletonList(rr), updatedNMTokens,
newlyIncreasedContainers, newlyDecreasedContainers);
} finally {
writeLock.unlock();
} }
int numCont = (int) Math.ceil(
Resources.divide(rc, clusterResource, tot, minimumAllocation));
ResourceRequest rr = ResourceRequest.newInstance(
Priority.UNDEFINED, ResourceRequest.ANY,
minimumAllocation, numCont);
List<Container> newlyAllocatedContainers = pullNewlyAllocatedContainers();
List<Container> newlyIncreasedContainers = pullNewlyIncreasedContainers();
List<Container> newlyDecreasedContainers = pullNewlyDecreasedContainers();
List<NMToken> updatedNMTokens = pullUpdatedNMTokens();
Resource headroom = getHeadroom();
setApplicationHeadroomForMetrics(headroom);
return new Allocation(newlyAllocatedContainers, headroom, null,
currentContPreemption, Collections.singletonList(rr), updatedNMTokens,
newlyIncreasedContainers, newlyDecreasedContainers);
} }
synchronized public NodeId getNodeIdToUnreserve( @VisibleForTesting
public NodeId getNodeIdToUnreserve(
SchedulerRequestKey schedulerKey, Resource resourceNeedUnreserve, SchedulerRequestKey schedulerKey, Resource resourceNeedUnreserve,
ResourceCalculator rc, Resource clusterResource) { ResourceCalculator rc, Resource clusterResource) {
try {
writeLock.lock();
// first go around make this algorithm simple and just grab first
// reservation that has enough resources
Map<NodeId, RMContainer> reservedContainers = this.reservedContainers.get(
schedulerKey);
// first go around make this algorithm simple and just grab first if ((reservedContainers != null) && (!reservedContainers.isEmpty())) {
// reservation that has enough resources for (Map.Entry<NodeId, RMContainer> entry : reservedContainers
Map<NodeId, RMContainer> reservedContainers = this.reservedContainers .entrySet()) {
.get(schedulerKey); NodeId nodeId = entry.getKey();
RMContainer reservedContainer = entry.getValue();
if ((reservedContainers != null) && (!reservedContainers.isEmpty())) { if (reservedContainer.hasIncreaseReservation()) {
for (Map.Entry<NodeId, RMContainer> entry : reservedContainers.entrySet()) { // Currently, only regular container allocation supports continuous
NodeId nodeId = entry.getKey(); // reservation looking, we don't support canceling increase request
RMContainer reservedContainer = entry.getValue(); // reservation when allocating regular container.
if (reservedContainer.hasIncreaseReservation()) { continue;
// Currently, only regular container allocation supports continuous }
// reservation looking, we don't support canceling increase request
// reservation when allocating regular container. Resource reservedResource = reservedContainer.getReservedResource();
continue;
} // make sure we unreserve one with at least the same amount of
// resources, otherwise could affect capacity limits
Resource reservedResource = reservedContainer.getReservedResource(); if (Resources.fitsIn(rc, clusterResource, resourceNeedUnreserve,
reservedResource)) {
// make sure we unreserve one with at least the same amount of if (LOG.isDebugEnabled()) {
// resources, otherwise could affect capacity limits LOG.debug(
if (Resources.fitsIn(rc, clusterResource, resourceNeedUnreserve, "unreserving node with reservation size: " + reservedResource
reservedResource)) { + " in order to allocate container with size: "
if (LOG.isDebugEnabled()) { + resourceNeedUnreserve);
LOG.debug("unreserving node with reservation size: " }
+ reservedResource return nodeId;
+ " in order to allocate container with size: " + resourceNeedUnreserve);
} }
return nodeId;
} }
} }
return null;
} finally {
writeLock.unlock();
} }
return null;
} }
public synchronized void setHeadroomProvider( public void setHeadroomProvider(
CapacityHeadroomProvider headroomProvider) { CapacityHeadroomProvider headroomProvider) {
this.headroomProvider = headroomProvider; try {
} writeLock.lock();
this.headroomProvider = headroomProvider;
public synchronized CapacityHeadroomProvider getHeadroomProvider() { } finally {
return headroomProvider; writeLock.unlock();
}
@Override
public synchronized Resource getHeadroom() {
if (headroomProvider != null) {
return headroomProvider.getHeadroom();
} }
return super.getHeadroom();
} }
@Override @Override
public synchronized void transferStateFromPreviousAttempt( public Resource getHeadroom() {
try {
readLock.lock();
if (headroomProvider != null) {
return headroomProvider.getHeadroom();
}
return super.getHeadroom();
} finally {
readLock.unlock();
}
}
@Override
public void transferStateFromPreviousAttempt(
SchedulerApplicationAttempt appAttempt) { SchedulerApplicationAttempt appAttempt) {
super.transferStateFromPreviousAttempt(appAttempt); try {
this.headroomProvider = writeLock.lock();
((FiCaSchedulerApp) appAttempt).getHeadroomProvider(); super.transferStateFromPreviousAttempt(appAttempt);
this.headroomProvider = ((FiCaSchedulerApp) appAttempt).headroomProvider;
} finally {
writeLock.unlock();
}
} }
public boolean reserveIncreasedContainer(SchedulerRequestKey schedulerKey, public boolean reserveIncreasedContainer(SchedulerRequestKey schedulerKey,
@ -444,11 +455,11 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
// Update the node // Update the node
node.reserveResource(this, schedulerKey, rmContainer); node.reserveResource(this, schedulerKey, rmContainer);
// Succeeded // Succeeded
return true; return true;
} }
return false; return false;
} }
@ -515,9 +526,12 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
showRequests(); showRequests();
} }
synchronized (this) { try {
writeLock.lock();
return containerAllocator.assignContainers(clusterResource, node, return containerAllocator.assignContainers(clusterResource, node,
schedulingMode, currentResourceLimits, reservedContainer); schedulingMode, currentResourceLimits, reservedContainer);
} finally {
writeLock.unlock();
} }
} }
@ -625,23 +639,33 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
* Capacity Scheduler. * Capacity Scheduler.
*/ */
@Override @Override
public synchronized ApplicationResourceUsageReport getResourceUsageReport() { public ApplicationResourceUsageReport getResourceUsageReport() {
ApplicationResourceUsageReport report = super.getResourceUsageReport(); try {
Resource cluster = rmContext.getScheduler().getClusterResource(); // Use write lock here because
Resource totalPartitionRes = // SchedulerApplicationAttempt#getResourceUsageReport updated fields
rmContext.getNodeLabelManager() // TODO: improve this
.getResourceByLabel(getAppAMNodePartitionName(), cluster); writeLock.lock();
ResourceCalculator calc = rmContext.getScheduler().getResourceCalculator(); ApplicationResourceUsageReport report = super.getResourceUsageReport();
if (!calc.isInvalidDivisor(totalPartitionRes)) { Resource cluster = rmContext.getScheduler().getClusterResource();
float queueAbsMaxCapPerPartition = Resource totalPartitionRes =
((AbstractCSQueue)getQueue()).getQueueCapacities() rmContext.getNodeLabelManager().getResourceByLabel(
.getAbsoluteCapacity(getAppAMNodePartitionName()); getAppAMNodePartitionName(), cluster);
float queueUsagePerc = ResourceCalculator calc =
calc.divide(totalPartitionRes, report.getUsedResources(), rmContext.getScheduler().getResourceCalculator();
Resources.multiply(totalPartitionRes, if (!calc.isInvalidDivisor(totalPartitionRes)) {
queueAbsMaxCapPerPartition)) * 100; float queueAbsMaxCapPerPartition =
report.setQueueUsagePercentage(queueUsagePerc); ((AbstractCSQueue) getQueue()).getQueueCapacities()
.getAbsoluteCapacity(getAppAMNodePartitionName());
float queueUsagePerc = calc.divide(totalPartitionRes,
report.getUsedResources(),
Resources.multiply(totalPartitionRes, queueAbsMaxCapPerPartition))
* 100;
report.setQueueUsagePercentage(queueUsagePerc);
}
return report;
} finally {
writeLock.unlock();
} }
return report;
} }
} }

View File

@ -123,65 +123,72 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
return queue.getMetrics(); return queue.getMetrics();
} }
synchronized public void containerCompleted(RMContainer rmContainer, public void containerCompleted(RMContainer rmContainer,
ContainerStatus containerStatus, RMContainerEventType event) { ContainerStatus containerStatus, RMContainerEventType event) {
try {
Container container = rmContainer.getContainer(); writeLock.lock();
ContainerId containerId = container.getId(); Container container = rmContainer.getContainer();
ContainerId containerId = container.getId();
// Remove from the list of newly allocated containers if found
newlyAllocatedContainers.remove(rmContainer); // Remove from the list of newly allocated containers if found
newlyAllocatedContainers.remove(rmContainer);
// Inform the container
rmContainer.handle( // Inform the container
new RMContainerFinishedEvent( rmContainer.handle(
containerId, new RMContainerFinishedEvent(containerId, containerStatus, event));
containerStatus, if (LOG.isDebugEnabled()) {
event) LOG.debug("Completed container: " + rmContainer.getContainerId()
); + " in state: " + rmContainer.getState() + " event:" + event);
if (LOG.isDebugEnabled()) { }
LOG.debug("Completed container: " + rmContainer.getContainerId() +
" in state: " + rmContainer.getState() + " event:" + event); // Remove from the list of containers
liveContainers.remove(rmContainer.getContainerId());
Resource containerResource = rmContainer.getContainer().getResource();
RMAuditLogger.logSuccess(getUser(), AuditConstants.RELEASE_CONTAINER,
"SchedulerApp", getApplicationId(), containerId, containerResource);
// Update usage metrics
queue.getMetrics().releaseResources(getUser(), 1, containerResource);
this.attemptResourceUsage.decUsed(containerResource);
// remove from preemption map if it is completed
preemptionMap.remove(rmContainer);
// Clear resource utilization metrics cache.
lastMemoryAggregateAllocationUpdateTime = -1;
} finally {
writeLock.unlock();
} }
// Remove from the list of containers
liveContainers.remove(rmContainer.getContainerId());
Resource containerResource = rmContainer.getContainer().getResource();
RMAuditLogger.logSuccess(getUser(),
AuditConstants.RELEASE_CONTAINER, "SchedulerApp",
getApplicationId(), containerId, containerResource);
// Update usage metrics
queue.getMetrics().releaseResources(getUser(), 1, containerResource);
this.attemptResourceUsage.decUsed(containerResource);
// remove from preemption map if it is completed
preemptionMap.remove(rmContainer);
// Clear resource utilization metrics cache.
lastMemoryAggregateAllocationUpdateTime = -1;
} }
private synchronized void unreserveInternal( private void unreserveInternal(
SchedulerRequestKey schedulerKey, FSSchedulerNode node) { SchedulerRequestKey schedulerKey, FSSchedulerNode node) {
Map<NodeId, RMContainer> reservedContainers = try {
this.reservedContainers.get(schedulerKey); writeLock.lock();
RMContainer reservedContainer = reservedContainers.remove(node.getNodeID()); Map<NodeId, RMContainer> reservedContainers = this.reservedContainers.get(
if (reservedContainers.isEmpty()) { schedulerKey);
this.reservedContainers.remove(schedulerKey); RMContainer reservedContainer = reservedContainers.remove(
node.getNodeID());
if (reservedContainers.isEmpty()) {
this.reservedContainers.remove(schedulerKey);
}
// Reset the re-reservation count
resetReReservations(schedulerKey);
Resource resource = reservedContainer.getContainer().getResource();
this.attemptResourceUsage.decReserved(resource);
LOG.info(
"Application " + getApplicationId() + " unreserved " + " on node "
+ node + ", currently has " + reservedContainers.size()
+ " at priority " + schedulerKey.getPriority()
+ "; currentReservation " + this.attemptResourceUsage
.getReserved());
} finally {
writeLock.unlock();
} }
// Reset the re-reservation count
resetReReservations(schedulerKey);
Resource resource = reservedContainer.getContainer().getResource();
this.attemptResourceUsage.decReserved(resource);
LOG.info("Application " + getApplicationId() + " unreserved " + " on node "
+ node + ", currently has " + reservedContainers.size()
+ " at priority " + schedulerKey.getPriority() + "; currentReservation "
+ this.attemptResourceUsage.getReserved());
} }
private void subtractResourcesOnBlacklistedNodes( private void subtractResourcesOnBlacklistedNodes(
@ -239,17 +246,6 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
return headroom; return headroom;
} }
public synchronized float getLocalityWaitFactor(
SchedulerRequestKey schedulerKey, int clusterNodes) {
// Estimate: Required unique resources (i.e. hosts + racks)
int requiredResources =
Math.max(this.getResourceRequests(schedulerKey).size() - 1, 0);
// waitFactor can't be more than '1'
// i.e. no point skipping more than clustersize opportunities
return Math.min(((float)requiredResources / clusterNodes), 1.0f);
}
/** /**
* Return the level at which we are allowed to schedule containers, given the * Return the level at which we are allowed to schedule containers, given the
* current size of the cluster and thresholds indicating how many nodes to * current size of the cluster and thresholds indicating how many nodes to
@ -261,44 +257,56 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
* @param rackLocalityThreshold rackLocalityThreshold * @param rackLocalityThreshold rackLocalityThreshold
* @return NodeType * @return NodeType
*/ */
public synchronized NodeType getAllowedLocalityLevel( NodeType getAllowedLocalityLevel(
SchedulerRequestKey schedulerKey, int numNodes, SchedulerRequestKey schedulerKey, int numNodes,
double nodeLocalityThreshold, double rackLocalityThreshold) { double nodeLocalityThreshold, double rackLocalityThreshold) {
// upper limit on threshold // upper limit on threshold
if (nodeLocalityThreshold > 1.0) { nodeLocalityThreshold = 1.0; } if (nodeLocalityThreshold > 1.0) {
if (rackLocalityThreshold > 1.0) { rackLocalityThreshold = 1.0; } nodeLocalityThreshold = 1.0;
}
if (rackLocalityThreshold > 1.0) {
rackLocalityThreshold = 1.0;
}
// If delay scheduling is not being used, can schedule anywhere // If delay scheduling is not being used, can schedule anywhere
if (nodeLocalityThreshold < 0.0 || rackLocalityThreshold < 0.0) { if (nodeLocalityThreshold < 0.0 || rackLocalityThreshold < 0.0) {
return NodeType.OFF_SWITCH; return NodeType.OFF_SWITCH;
} }
// Default level is NODE_LOCAL try {
if (!allowedLocalityLevel.containsKey(schedulerKey)) { writeLock.lock();
allowedLocalityLevel.put(schedulerKey, NodeType.NODE_LOCAL);
return NodeType.NODE_LOCAL;
}
NodeType allowed = allowedLocalityLevel.get(schedulerKey); // Default level is NODE_LOCAL
if (!allowedLocalityLevel.containsKey(schedulerKey)) {
// If level is already most liberal, we're done allowedLocalityLevel.put(schedulerKey, NodeType.NODE_LOCAL);
if (allowed.equals(NodeType.OFF_SWITCH)) return NodeType.OFF_SWITCH; return NodeType.NODE_LOCAL;
double threshold = allowed.equals(NodeType.NODE_LOCAL) ? nodeLocalityThreshold :
rackLocalityThreshold;
// Relax locality constraints once we've surpassed threshold.
if (getSchedulingOpportunities(schedulerKey) > (numNodes * threshold)) {
if (allowed.equals(NodeType.NODE_LOCAL)) {
allowedLocalityLevel.put(schedulerKey, NodeType.RACK_LOCAL);
resetSchedulingOpportunities(schedulerKey);
} }
else if (allowed.equals(NodeType.RACK_LOCAL)) {
allowedLocalityLevel.put(schedulerKey, NodeType.OFF_SWITCH); NodeType allowed = allowedLocalityLevel.get(schedulerKey);
resetSchedulingOpportunities(schedulerKey);
// If level is already most liberal, we're done
if (allowed.equals(NodeType.OFF_SWITCH)) {
return NodeType.OFF_SWITCH;
} }
double threshold = allowed.equals(NodeType.NODE_LOCAL) ?
nodeLocalityThreshold :
rackLocalityThreshold;
// Relax locality constraints once we've surpassed threshold.
if (getSchedulingOpportunities(schedulerKey) > (numNodes * threshold)) {
if (allowed.equals(NodeType.NODE_LOCAL)) {
allowedLocalityLevel.put(schedulerKey, NodeType.RACK_LOCAL);
resetSchedulingOpportunities(schedulerKey);
} else if (allowed.equals(NodeType.RACK_LOCAL)) {
allowedLocalityLevel.put(schedulerKey, NodeType.OFF_SWITCH);
resetSchedulingOpportunities(schedulerKey);
}
}
return allowedLocalityLevel.get(schedulerKey);
} finally {
writeLock.unlock();
} }
return allowedLocalityLevel.get(schedulerKey);
} }
/** /**
@ -311,119 +319,131 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
* @param currentTimeMs currentTimeMs * @param currentTimeMs currentTimeMs
* @return NodeType * @return NodeType
*/ */
public synchronized NodeType getAllowedLocalityLevelByTime( NodeType getAllowedLocalityLevelByTime(
SchedulerRequestKey schedulerKey, long nodeLocalityDelayMs, SchedulerRequestKey schedulerKey, long nodeLocalityDelayMs,
long rackLocalityDelayMs, long currentTimeMs) { long rackLocalityDelayMs, long currentTimeMs) {
// if not being used, can schedule anywhere // if not being used, can schedule anywhere
if (nodeLocalityDelayMs < 0 || rackLocalityDelayMs < 0) { if (nodeLocalityDelayMs < 0 || rackLocalityDelayMs < 0) {
return NodeType.OFF_SWITCH; return NodeType.OFF_SWITCH;
} }
// default level is NODE_LOCAL try {
if (!allowedLocalityLevel.containsKey(schedulerKey)) { writeLock.lock();
// add the initial time of priority to prevent comparing with FsApp
// startTime and allowedLocalityLevel degrade // default level is NODE_LOCAL
lastScheduledContainer.put(schedulerKey, currentTimeMs); if (!allowedLocalityLevel.containsKey(schedulerKey)) {
if (LOG.isDebugEnabled()) { // add the initial time of priority to prevent comparing with FsApp
LOG.debug("Init the lastScheduledContainer time, priority: " // startTime and allowedLocalityLevel degrade
+ schedulerKey.getPriority() + ", time: " + currentTimeMs); lastScheduledContainer.put(schedulerKey, currentTimeMs);
if (LOG.isDebugEnabled()) {
LOG.debug(
"Init the lastScheduledContainer time, priority: " + schedulerKey
.getPriority() + ", time: " + currentTimeMs);
}
allowedLocalityLevel.put(schedulerKey, NodeType.NODE_LOCAL);
return NodeType.NODE_LOCAL;
} }
allowedLocalityLevel.put(schedulerKey, NodeType.NODE_LOCAL);
return NodeType.NODE_LOCAL;
}
NodeType allowed = allowedLocalityLevel.get(schedulerKey); NodeType allowed = allowedLocalityLevel.get(schedulerKey);
// if level is already most liberal, we're done // if level is already most liberal, we're done
if (allowed.equals(NodeType.OFF_SWITCH)) { if (allowed.equals(NodeType.OFF_SWITCH)) {
return NodeType.OFF_SWITCH; return NodeType.OFF_SWITCH;
}
// check waiting time
long waitTime = currentTimeMs;
if (lastScheduledContainer.containsKey(schedulerKey)) {
waitTime -= lastScheduledContainer.get(schedulerKey);
} else {
waitTime -= getStartTime();
}
long thresholdTime = allowed.equals(NodeType.NODE_LOCAL) ?
nodeLocalityDelayMs : rackLocalityDelayMs;
if (waitTime > thresholdTime) {
if (allowed.equals(NodeType.NODE_LOCAL)) {
allowedLocalityLevel.put(schedulerKey, NodeType.RACK_LOCAL);
resetSchedulingOpportunities(schedulerKey, currentTimeMs);
} else if (allowed.equals(NodeType.RACK_LOCAL)) {
allowedLocalityLevel.put(schedulerKey, NodeType.OFF_SWITCH);
resetSchedulingOpportunities(schedulerKey, currentTimeMs);
} }
// check waiting time
long waitTime = currentTimeMs;
if (lastScheduledContainer.containsKey(schedulerKey)) {
waitTime -= lastScheduledContainer.get(schedulerKey);
} else{
waitTime -= getStartTime();
}
long thresholdTime = allowed.equals(NodeType.NODE_LOCAL) ?
nodeLocalityDelayMs :
rackLocalityDelayMs;
if (waitTime > thresholdTime) {
if (allowed.equals(NodeType.NODE_LOCAL)) {
allowedLocalityLevel.put(schedulerKey, NodeType.RACK_LOCAL);
resetSchedulingOpportunities(schedulerKey, currentTimeMs);
} else if (allowed.equals(NodeType.RACK_LOCAL)) {
allowedLocalityLevel.put(schedulerKey, NodeType.OFF_SWITCH);
resetSchedulingOpportunities(schedulerKey, currentTimeMs);
}
}
return allowedLocalityLevel.get(schedulerKey);
} finally {
writeLock.unlock();
} }
return allowedLocalityLevel.get(schedulerKey);
} }
synchronized public RMContainer allocate(NodeType type, FSSchedulerNode node, public RMContainer allocate(NodeType type, FSSchedulerNode node,
SchedulerRequestKey schedulerKey, ResourceRequest request, SchedulerRequestKey schedulerKey, ResourceRequest request,
Container reservedContainer) { Container reservedContainer) {
// Update allowed locality level RMContainer rmContainer;
NodeType allowed = allowedLocalityLevel.get(schedulerKey); Container container;
if (allowed != null) {
if (allowed.equals(NodeType.OFF_SWITCH) && try {
(type.equals(NodeType.NODE_LOCAL) || writeLock.lock();
type.equals(NodeType.RACK_LOCAL))) { // Update allowed locality level
this.resetAllowedLocalityLevel(schedulerKey, type); NodeType allowed = allowedLocalityLevel.get(schedulerKey);
if (allowed != null) {
if (allowed.equals(NodeType.OFF_SWITCH) && (type.equals(
NodeType.NODE_LOCAL) || type.equals(NodeType.RACK_LOCAL))) {
this.resetAllowedLocalityLevel(schedulerKey, type);
} else if (allowed.equals(NodeType.RACK_LOCAL) && type.equals(
NodeType.NODE_LOCAL)) {
this.resetAllowedLocalityLevel(schedulerKey, type);
}
} }
else if (allowed.equals(NodeType.RACK_LOCAL) &&
type.equals(NodeType.NODE_LOCAL)) { // Required sanity check - AM can call 'allocate' to update resource
this.resetAllowedLocalityLevel(schedulerKey, type); // request without locking the scheduler, hence we need to check
if (getTotalRequiredResources(schedulerKey) <= 0) {
return null;
} }
container = reservedContainer;
if (container == null) {
container = createContainer(node, request.getCapability(),
schedulerKey);
}
// Create RMContainer
rmContainer = new RMContainerImpl(container,
getApplicationAttemptId(), node.getNodeID(),
appSchedulingInfo.getUser(), rmContext);
((RMContainerImpl) rmContainer).setQueueName(this.getQueueName());
// Add it to allContainers list.
newlyAllocatedContainers.add(rmContainer);
liveContainers.put(container.getId(), rmContainer);
// Update consumption and track allocations
List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
type, node, schedulerKey, request, container);
this.attemptResourceUsage.incUsed(container.getResource());
// Update resource requests related to "request" and store in RMContainer
((RMContainerImpl) rmContainer).setResourceRequests(resourceRequestList);
// Inform the container
rmContainer.handle(
new RMContainerEvent(container.getId(), RMContainerEventType.START));
if (LOG.isDebugEnabled()) {
LOG.debug("allocate: applicationAttemptId=" + container.getId()
.getApplicationAttemptId() + " container=" + container.getId()
+ " host=" + container.getNodeId().getHost() + " type=" + type);
}
RMAuditLogger.logSuccess(getUser(), AuditConstants.ALLOC_CONTAINER,
"SchedulerApp", getApplicationId(), container.getId(),
container.getResource());
} finally {
writeLock.unlock();
} }
// Required sanity check - AM can call 'allocate' to update resource
// request without locking the scheduler, hence we need to check
if (getTotalRequiredResources(schedulerKey) <= 0) {
return null;
}
Container container = reservedContainer;
if (container == null) {
container =
createContainer(node, request.getCapability(), schedulerKey);
}
// Create RMContainer
RMContainer rmContainer = new RMContainerImpl(container,
getApplicationAttemptId(), node.getNodeID(),
appSchedulingInfo.getUser(), rmContext);
((RMContainerImpl)rmContainer).setQueueName(this.getQueueName());
// Add it to allContainers list.
newlyAllocatedContainers.add(rmContainer);
liveContainers.put(container.getId(), rmContainer);
// Update consumption and track allocations
List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
type, node, schedulerKey, request, container);
this.attemptResourceUsage.incUsed(container.getResource());
// Update resource requests related to "request" and store in RMContainer
((RMContainerImpl) rmContainer).setResourceRequests(resourceRequestList);
// Inform the container
rmContainer.handle(
new RMContainerEvent(container.getId(), RMContainerEventType.START));
if (LOG.isDebugEnabled()) {
LOG.debug("allocate: applicationAttemptId="
+ container.getId().getApplicationAttemptId()
+ " container=" + container.getId() + " host="
+ container.getNodeId().getHost() + " type=" + type);
}
RMAuditLogger.logSuccess(getUser(),
AuditConstants.ALLOC_CONTAINER, "SchedulerApp",
getApplicationId(), container.getId(), container.getResource());
return rmContainer; return rmContainer;
} }
@ -434,19 +454,30 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
* @param schedulerKey Scheduler Key * @param schedulerKey Scheduler Key
* @param level NodeType * @param level NodeType
*/ */
public synchronized void resetAllowedLocalityLevel( public void resetAllowedLocalityLevel(
SchedulerRequestKey schedulerKey, NodeType level) { SchedulerRequestKey schedulerKey, NodeType level) {
NodeType old = allowedLocalityLevel.get(schedulerKey); NodeType old;
LOG.info("Raising locality level from " + old + " to " + level + " at " + try {
" priority " + schedulerKey.getPriority()); writeLock.lock();
allowedLocalityLevel.put(schedulerKey, level); old = allowedLocalityLevel.put(schedulerKey, level);
} finally {
writeLock.unlock();
}
LOG.info("Raising locality level from " + old + " to " + level + " at "
+ " priority " + schedulerKey.getPriority());
} }
// related methods // related methods
public void addPreemption(RMContainer container, long time) { public void addPreemption(RMContainer container, long time) {
assert preemptionMap.get(container) == null; assert preemptionMap.get(container) == null;
preemptionMap.put(container, time); try {
Resources.addTo(preemptedResources, container.getAllocatedResource()); writeLock.lock();
preemptionMap.put(container, time);
Resources.addTo(preemptedResources, container.getAllocatedResource());
} finally {
writeLock.unlock();
}
} }
public Long getContainerPreemptionTime(RMContainer container) { public Long getContainerPreemptionTime(RMContainer container) {
@ -584,21 +615,35 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
getUser(), rmContainer.getContainer().getResource()); getUser(), rmContainer.getContainer().getResource());
} }
private synchronized void setReservation(SchedulerNode node) { private void setReservation(SchedulerNode node) {
String rackName = node.getRackName() == null ? "NULL" : node.getRackName(); String rackName =
Set<String> rackReservations = reservations.get(rackName); node.getRackName() == null ? "NULL" : node.getRackName();
if (rackReservations == null) {
rackReservations = new HashSet<>(); try {
reservations.put(rackName, rackReservations); writeLock.lock();
Set<String> rackReservations = reservations.get(rackName);
if (rackReservations == null) {
rackReservations = new HashSet<>();
reservations.put(rackName, rackReservations);
}
rackReservations.add(node.getNodeName());
} finally {
writeLock.unlock();
} }
rackReservations.add(node.getNodeName());
} }
private synchronized void clearReservation(SchedulerNode node) { private void clearReservation(SchedulerNode node) {
String rackName = node.getRackName() == null ? "NULL" : node.getRackName(); String rackName =
Set<String> rackReservations = reservations.get(rackName); node.getRackName() == null ? "NULL" : node.getRackName();
if (rackReservations != null) {
rackReservations.remove(node.getNodeName()); try {
writeLock.lock();
Set<String> rackReservations = reservations.get(rackName);
if (rackReservations != null) {
rackReservations.remove(node.getNodeName());
}
} finally {
writeLock.unlock();
} }
} }
@ -737,7 +782,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
// For each priority, see if we can schedule a node local, rack local // For each priority, see if we can schedule a node local, rack local
// or off-switch request. Rack of off-switch requests may be delayed // or off-switch request. Rack of off-switch requests may be delayed
// (not scheduled) in order to promote better locality. // (not scheduled) in order to promote better locality.
synchronized (this) { try {
writeLock.lock();
for (SchedulerRequestKey schedulerKey : keysToTry) { for (SchedulerRequestKey schedulerKey : keysToTry) {
// Skip it for reserved container, since // Skip it for reserved container, since
// we already check it in isValidReservation. // we already check it in isValidReservation.
@ -772,8 +818,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0 if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0
&& localRequest != null && localRequest.getNumContainers() != 0) { && localRequest != null && localRequest.getNumContainers() != 0) {
return assignContainer(node, localRequest, return assignContainer(node, localRequest, NodeType.NODE_LOCAL,
NodeType.NODE_LOCAL, reserved, schedulerKey); reserved, schedulerKey);
} }
if (rackLocalRequest != null && !rackLocalRequest.getRelaxLocality()) { if (rackLocalRequest != null && !rackLocalRequest.getRelaxLocality()) {
@ -781,29 +827,31 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
} }
if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0 if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0
&& (allowedLocality.equals(NodeType.RACK_LOCAL) || && (allowedLocality.equals(NodeType.RACK_LOCAL) || allowedLocality
allowedLocality.equals(NodeType.OFF_SWITCH))) { .equals(NodeType.OFF_SWITCH))) {
return assignContainer(node, rackLocalRequest, return assignContainer(node, rackLocalRequest, NodeType.RACK_LOCAL,
NodeType.RACK_LOCAL, reserved, schedulerKey); reserved, schedulerKey);
} }
ResourceRequest offSwitchRequest = ResourceRequest offSwitchRequest = getResourceRequest(schedulerKey,
getResourceRequest(schedulerKey, ResourceRequest.ANY); ResourceRequest.ANY);
if (offSwitchRequest != null && !offSwitchRequest.getRelaxLocality()) { if (offSwitchRequest != null && !offSwitchRequest.getRelaxLocality()) {
continue; continue;
} }
if (offSwitchRequest != null && if (offSwitchRequest != null
offSwitchRequest.getNumContainers() != 0) { && offSwitchRequest.getNumContainers() != 0) {
if (!hasNodeOrRackLocalRequests(schedulerKey) || if (!hasNodeOrRackLocalRequests(schedulerKey) || allowedLocality
allowedLocality.equals(NodeType.OFF_SWITCH)) { .equals(NodeType.OFF_SWITCH)) {
return assignContainer( return assignContainer(node, offSwitchRequest, NodeType.OFF_SWITCH,
node, offSwitchRequest, NodeType.OFF_SWITCH, reserved, reserved, schedulerKey);
schedulerKey);
} }
} }
} }
} finally {
writeLock.unlock();
} }
return Resources.none(); return Resources.none();
} }
@ -963,14 +1011,17 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
Resources.addTo(demand, getCurrentConsumption()); Resources.addTo(demand, getCurrentConsumption());
// Add up outstanding resource requests // Add up outstanding resource requests
synchronized (this) { try {
writeLock.lock();
for (SchedulerRequestKey k : getSchedulerKeys()) { for (SchedulerRequestKey k : getSchedulerKeys()) {
ResourceRequest r = getResourceRequest(k, ResourceRequest.ANY); ResourceRequest r = getResourceRequest(k, ResourceRequest.ANY);
if (r != null) { if (r != null) {
Resources.multiplyAndAddTo(demand, Resources.multiplyAndAddTo(demand, r.getCapability(),
r.getCapability(), r.getNumContainers()); r.getNumContainers());
} }
} }
} finally {
writeLock.unlock();
} }
} }