YARN-4524. Cleanup AppSchedulingInfo. (Karthik Kambatla via wangda)

(cherry picked from commit 05fa852d7567b7590d6b53bbf925f8f424736514)
This commit is contained in:
Wangda Tan 2015-12-30 15:36:55 -08:00
parent a9594c61bb
commit 4e4b3a8465
3 changed files with 175 additions and 210 deletions

View File

@ -84,6 +84,8 @@ Release 2.9.0 - UNRELEASED
YARN-4522. Queue acl can be checked at app submission. (Jian He via wangda) YARN-4522. Queue acl can be checked at app submission. (Jian He via wangda)
YARN-4524. Cleanup AppSchedulingInfo. (Karthik Kambatla via wangda)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -20,6 +20,7 @@
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
@ -42,7 +43,6 @@
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
@ -56,40 +56,36 @@
public class AppSchedulingInfo { public class AppSchedulingInfo {
private static final Log LOG = LogFactory.getLog(AppSchedulingInfo.class); private static final Log LOG = LogFactory.getLog(AppSchedulingInfo.class);
private static final Comparator COMPARATOR =
new org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.Comparator();
private static final int EPOCH_BIT_SHIFT = 40;
private final ApplicationId applicationId;
private final ApplicationAttemptId applicationAttemptId; private final ApplicationAttemptId applicationAttemptId;
final ApplicationId applicationId;
private String queueName;
Queue queue;
final String user;
// TODO making containerIdCounter long
private final AtomicLong containerIdCounter; private final AtomicLong containerIdCounter;
private final int EPOCH_BIT_SHIFT = 40; private final String user;
final Set<Priority> priorities = new TreeSet<Priority>( private Queue queue;
new org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.Comparator());
final Map<Priority, Map<String, ResourceRequest>> resourceRequestMap =
new ConcurrentHashMap<Priority, Map<String, ResourceRequest>>();
final Map<NodeId, Map<Priority, Map<ContainerId,
SchedContainerChangeRequest>>> increaseRequestMap =
new ConcurrentHashMap<>();
private Set<String> userBlacklist = new HashSet<>();
private Set<String> amBlacklist = new HashSet<>();
//private final ApplicationStore store;
private ActiveUsersManager activeUsersManager; private ActiveUsersManager activeUsersManager;
private boolean pending = true; // whether accepted/allocated by scheduler
/* Allocated by scheduler */
boolean pending = true; // for app metrics
private ResourceUsage appResourceUsage; private ResourceUsage appResourceUsage;
private final Set<String> amBlacklist = new HashSet<>();
private Set<String> userBlacklist = new HashSet<>();
final Set<Priority> priorities = new TreeSet<>(COMPARATOR);
final Map<Priority, Map<String, ResourceRequest>> resourceRequestMap =
new ConcurrentHashMap<>();
final Map<NodeId, Map<Priority, Map<ContainerId,
SchedContainerChangeRequest>>> containerIncreaseRequestMap =
new ConcurrentHashMap<>();
public AppSchedulingInfo(ApplicationAttemptId appAttemptId, public AppSchedulingInfo(ApplicationAttemptId appAttemptId,
String user, Queue queue, ActiveUsersManager activeUsersManager, String user, Queue queue, ActiveUsersManager activeUsersManager,
long epoch, ResourceUsage appResourceUsage) { long epoch, ResourceUsage appResourceUsage) {
this.applicationAttemptId = appAttemptId; this.applicationAttemptId = appAttemptId;
this.applicationId = appAttemptId.getApplicationId(); this.applicationId = appAttemptId.getApplicationId();
this.queue = queue; this.queue = queue;
this.queueName = queue.getQueueName();
this.user = user; this.user = user;
this.activeUsersManager = activeUsersManager; this.activeUsersManager = activeUsersManager;
this.containerIdCounter = new AtomicLong(epoch << EPOCH_BIT_SHIFT); this.containerIdCounter = new AtomicLong(epoch << EPOCH_BIT_SHIFT);
@ -104,14 +100,18 @@ public ApplicationAttemptId getApplicationAttemptId() {
return applicationAttemptId; return applicationAttemptId;
} }
public String getQueueName() {
return queueName;
}
public String getUser() { public String getUser() {
return user; return user;
} }
public long getNewContainerId() {
return this.containerIdCounter.incrementAndGet();
}
public synchronized String getQueueName() {
return queue.getQueueName();
}
public synchronized boolean isPending() { public synchronized boolean isPending() {
return pending; return pending;
} }
@ -125,30 +125,23 @@ private synchronized void clearRequests() {
LOG.info("Application " + applicationId + " requests cleared"); LOG.info("Application " + applicationId + " requests cleared");
} }
public long getNewContainerId() { public synchronized boolean hasIncreaseRequest(NodeId nodeId) {
return this.containerIdCounter.incrementAndGet();
}
public boolean hasIncreaseRequest(NodeId nodeId) {
Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> requestsOnNode = Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> requestsOnNode =
increaseRequestMap.get(nodeId); containerIncreaseRequestMap.get(nodeId);
if (null == requestsOnNode) { return requestsOnNode == null ? false : requestsOnNode.size() > 0;
return false;
}
return requestsOnNode.size() > 0;
} }
public Map<ContainerId, SchedContainerChangeRequest> public synchronized Map<ContainerId, SchedContainerChangeRequest>
getIncreaseRequests(NodeId nodeId, Priority priority) { getIncreaseRequests(NodeId nodeId, Priority priority) {
Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> requestsOnNode = Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> requestsOnNode =
increaseRequestMap.get(nodeId); containerIncreaseRequestMap.get(nodeId);
if (null == requestsOnNode) { return requestsOnNode == null ? null : requestsOnNode.get(priority);
return null;
}
return requestsOnNode.get(priority);
} }
/**
* return true if any of the existing increase requests are updated,
* false if none of them are updated
*/
public synchronized boolean updateIncreaseRequests( public synchronized boolean updateIncreaseRequests(
List<SchedContainerChangeRequest> increaseRequests) { List<SchedContainerChangeRequest> increaseRequests) {
boolean resourceUpdated = false; boolean resourceUpdated = false;
@ -157,10 +150,10 @@ public synchronized boolean updateIncreaseRequests(
NodeId nodeId = r.getRMContainer().getAllocatedNode(); NodeId nodeId = r.getRMContainer().getAllocatedNode();
Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> requestsOnNode = Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> requestsOnNode =
increaseRequestMap.get(nodeId); containerIncreaseRequestMap.get(nodeId);
if (null == requestsOnNode) { if (null == requestsOnNode) {
requestsOnNode = new TreeMap<>(); requestsOnNode = new TreeMap<>();
increaseRequestMap.put(nodeId, requestsOnNode); containerIncreaseRequestMap.put(nodeId, requestsOnNode);
} }
SchedContainerChangeRequest prevChangeRequest = SchedContainerChangeRequest prevChangeRequest =
@ -168,22 +161,21 @@ public synchronized boolean updateIncreaseRequests(
if (null != prevChangeRequest) { if (null != prevChangeRequest) {
if (Resources.equals(prevChangeRequest.getTargetCapacity(), if (Resources.equals(prevChangeRequest.getTargetCapacity(),
r.getTargetCapacity())) { r.getTargetCapacity())) {
// New target capacity is as same as what we have, just ignore the new // increase request hasn't changed
// one
continue; continue;
} }
// remove the old one // remove the old one, as we will use the new one going forward
removeIncreaseRequest(nodeId, prevChangeRequest.getPriority(), removeIncreaseRequest(nodeId, prevChangeRequest.getPriority(),
prevChangeRequest.getContainerId()); prevChangeRequest.getContainerId());
} }
if (Resources.equals(r.getTargetCapacity(), r.getRMContainer().getAllocatedResource())) { if (Resources.equals(r.getTargetCapacity(),
r.getRMContainer().getAllocatedResource())) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Trying to increase/decrease container, " LOG.debug("Trying to increase container " + r.getContainerId()
+ "target capacity = previous capacity = " + prevChangeRequest + ", target capacity = previous capacity = " + prevChangeRequest
+ " for container=" + r.getContainerId() + ". Will ignore this increase request.");
+ ". Will ignore this increase request");
} }
continue; continue;
} }
@ -195,25 +187,26 @@ public synchronized boolean updateIncreaseRequests(
return resourceUpdated; return resourceUpdated;
} }
// insert increase request and add missing hierarchy if missing /**
* Insert increase request, adding any missing items in the data-structure
* hierarchy.
*/
private void insertIncreaseRequest(SchedContainerChangeRequest request) { private void insertIncreaseRequest(SchedContainerChangeRequest request) {
NodeId nodeId = request.getNodeId(); NodeId nodeId = request.getNodeId();
Priority priority = request.getPriority(); Priority priority = request.getPriority();
ContainerId containerId = request.getContainerId(); ContainerId containerId = request.getContainerId();
Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> requestsOnNode = Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> requestsOnNode =
increaseRequestMap.get(nodeId); containerIncreaseRequestMap.get(nodeId);
if (null == requestsOnNode) { if (null == requestsOnNode) {
requestsOnNode = requestsOnNode = new HashMap<>();
new HashMap<Priority, Map<ContainerId, SchedContainerChangeRequest>>(); containerIncreaseRequestMap.put(nodeId, requestsOnNode);
increaseRequestMap.put(nodeId, requestsOnNode);
} }
Map<ContainerId, SchedContainerChangeRequest> requestsOnNodeWithPriority = Map<ContainerId, SchedContainerChangeRequest> requestsOnNodeWithPriority =
requestsOnNode.get(priority); requestsOnNode.get(priority);
if (null == requestsOnNodeWithPriority) { if (null == requestsOnNodeWithPriority) {
requestsOnNodeWithPriority = requestsOnNodeWithPriority = new TreeMap<>();
new TreeMap<ContainerId, SchedContainerChangeRequest>();
requestsOnNode.put(priority, requestsOnNodeWithPriority); requestsOnNode.put(priority, requestsOnNodeWithPriority);
} }
@ -237,7 +230,7 @@ private void insertIncreaseRequest(SchedContainerChangeRequest request) {
public synchronized boolean removeIncreaseRequest(NodeId nodeId, Priority priority, public synchronized boolean removeIncreaseRequest(NodeId nodeId, Priority priority,
ContainerId containerId) { ContainerId containerId) {
Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> requestsOnNode = Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> requestsOnNode =
increaseRequestMap.get(nodeId); containerIncreaseRequestMap.get(nodeId);
if (null == requestsOnNode) { if (null == requestsOnNode) {
return false; return false;
} }
@ -256,7 +249,7 @@ public synchronized boolean removeIncreaseRequest(NodeId nodeId, Priority priori
requestsOnNode.remove(priority); requestsOnNode.remove(priority);
} }
if (requestsOnNode.isEmpty()) { if (requestsOnNode.isEmpty()) {
increaseRequestMap.remove(nodeId); containerIncreaseRequestMap.remove(nodeId);
} }
if (request == null) { if (request == null) {
@ -279,18 +272,15 @@ public synchronized boolean removeIncreaseRequest(NodeId nodeId, Priority priori
public SchedContainerChangeRequest getIncreaseRequest(NodeId nodeId, public SchedContainerChangeRequest getIncreaseRequest(NodeId nodeId,
Priority priority, ContainerId containerId) { Priority priority, ContainerId containerId) {
Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> requestsOnNode = Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> requestsOnNode =
increaseRequestMap.get(nodeId); containerIncreaseRequestMap.get(nodeId);
if (null == requestsOnNode) { if (null == requestsOnNode) {
return null; return null;
} }
Map<ContainerId, SchedContainerChangeRequest> requestsOnNodeWithPriority = Map<ContainerId, SchedContainerChangeRequest> requestsOnNodeWithPriority =
requestsOnNode.get(priority); requestsOnNode.get(priority);
if (null == requestsOnNodeWithPriority) { return requestsOnNodeWithPriority == null ? null
return null; : requestsOnNodeWithPriority.get(containerId);
}
return requestsOnNodeWithPriority.get(containerId);
} }
/** /**
@ -299,121 +289,120 @@ public SchedContainerChangeRequest getIncreaseRequest(NodeId nodeId,
* by the application. * by the application.
* *
* @param requests resources to be acquired * @param requests resources to be acquired
* @param recoverPreemptedRequest recover Resource Request on preemption * @param recoverPreemptedRequest recover ResourceRequest on preemption
* @return true if any resource was updated, false else * @return true if any resource was updated, false otherwise
*/ */
synchronized public boolean updateResourceRequests( public synchronized boolean updateResourceRequests(
List<ResourceRequest> requests, boolean recoverPreemptedRequest) { List<ResourceRequest> requests, boolean recoverPreemptedRequest) {
QueueMetrics metrics = queue.getMetrics(); // Flag to track if any incoming requests update "ANY" requests
boolean anyResourcesUpdated = false; boolean anyResourcesUpdated = false;
// Update resource requests // Update resource requests
for (ResourceRequest request : requests) { for (ResourceRequest request : requests) {
Priority priority = request.getPriority(); Priority priority = request.getPriority();
String resourceName = request.getResourceName(); String resourceName = request.getResourceName();
boolean updatePendingResources = false;
ResourceRequest lastRequest = null;
if (resourceName.equals(ResourceRequest.ANY)) { // Update node labels if required
if (LOG.isDebugEnabled()) { updateNodeLabels(request);
LOG.debug("update:" + " application=" + applicationId + " request="
+ request);
}
updatePendingResources = true;
anyResourcesUpdated = true;
// Premature optimization?
// Assumes that we won't see more than one priority request updated
// in one call, reasonable assumption... however, it's totally safe
// to activate same application more than once.
// Thus we don't need another loop ala the one in decrementOutstanding()
// which is needed during deactivate.
if (request.getNumContainers() > 0) {
activeUsersManager.activateApplication(user, applicationId);
}
ResourceRequest previousAnyRequest =
getResourceRequest(priority, resourceName);
// When there is change in ANY request label expression, we should
// update label for all resource requests already added of same
// priority as ANY resource request.
if ((null == previousAnyRequest)
|| isRequestLabelChanged(previousAnyRequest, request)) {
Map<String, ResourceRequest> resourceRequest =
getResourceRequests(priority);
if (resourceRequest != null) {
for (ResourceRequest r : resourceRequest.values()) {
if (!r.getResourceName().equals(ResourceRequest.ANY)) {
r.setNodeLabelExpression(request.getNodeLabelExpression());
}
}
}
}
} else {
ResourceRequest anyRequest =
getResourceRequest(priority, ResourceRequest.ANY);
if (anyRequest != null) {
request.setNodeLabelExpression(anyRequest.getNodeLabelExpression());
}
}
Map<String, ResourceRequest> asks = this.resourceRequestMap.get(priority); Map<String, ResourceRequest> asks = this.resourceRequestMap.get(priority);
if (asks == null) { if (asks == null) {
asks = new ConcurrentHashMap<String, ResourceRequest>(); asks = new ConcurrentHashMap<>();
this.resourceRequestMap.put(priority, asks); this.resourceRequestMap.put(priority, asks);
this.priorities.add(priority); this.priorities.add(priority);
} }
lastRequest = asks.get(resourceName);
// Increment number of containers if recovering preempted resources
ResourceRequest lastRequest = asks.get(resourceName);
if (recoverPreemptedRequest && lastRequest != null) { if (recoverPreemptedRequest && lastRequest != null) {
// Increment the number of containers to 1, as it is recovering a
// single container.
request.setNumContainers(lastRequest.getNumContainers() + 1); request.setNumContainers(lastRequest.getNumContainers() + 1);
} }
// Update asks
asks.put(resourceName, request); asks.put(resourceName, request);
if (updatePendingResources) {
if (resourceName.equals(ResourceRequest.ANY)) {
// Similarly, deactivate application? anyResourcesUpdated = true;
if (request.getNumContainers() <= 0) {
LOG.info("checking for deactivate of application :" // Activate application. Metrics activation is done here.
+ this.applicationId); // TODO: Shouldn't we activate even if numContainers = 0?
checkForDeactivation(); if (request.getNumContainers() > 0) {
} activeUsersManager.activateApplication(user, applicationId);
int lastRequestContainers = lastRequest != null ? lastRequest
.getNumContainers() : 0;
Resource lastRequestCapability = lastRequest != null ? lastRequest
.getCapability() : Resources.none();
metrics.incrPendingResources(user, request.getNumContainers(),
request.getCapability());
metrics.decrPendingResources(user, lastRequestContainers,
lastRequestCapability);
// update queue:
Resource increasedResource =
Resources.multiply(request.getCapability(),
request.getNumContainers());
queue.incPendingResource(request.getNodeLabelExpression(),
increasedResource);
appResourceUsage.incPending(request.getNodeLabelExpression(),
increasedResource);
if (lastRequest != null) {
Resource decreasedResource =
Resources.multiply(lastRequestCapability, lastRequestContainers);
queue.decPendingResource(lastRequest.getNodeLabelExpression(),
decreasedResource);
appResourceUsage.decPending(lastRequest.getNodeLabelExpression(),
decreasedResource);
} }
// Update pendingResources
updatePendingResources(lastRequest, request, queue.getMetrics());
} }
} }
return anyResourcesUpdated; return anyResourcesUpdated;
} }
private boolean isRequestLabelChanged(ResourceRequest requestOne, private void updatePendingResources(ResourceRequest lastRequest,
ResourceRequest request, QueueMetrics metrics) {
if (request.getNumContainers() <= 0) {
LOG.info("checking for deactivate of application :"
+ this.applicationId);
checkForDeactivation();
}
int lastRequestContainers =
(lastRequest != null) ? lastRequest.getNumContainers() : 0;
Resource lastRequestCapability =
lastRequest != null ? lastRequest.getCapability() : Resources.none();
metrics.incrPendingResources(user,
request.getNumContainers(), request.getCapability());
metrics.decrPendingResources(user,
lastRequestContainers, lastRequestCapability);
// update queue:
Resource increasedResource =
Resources.multiply(request.getCapability(), request.getNumContainers());
queue.incPendingResource(request.getNodeLabelExpression(),
increasedResource);
appResourceUsage.incPending(request.getNodeLabelExpression(),
increasedResource);
if (lastRequest != null) {
Resource decreasedResource =
Resources.multiply(lastRequestCapability, lastRequestContainers);
queue.decPendingResource(lastRequest.getNodeLabelExpression(),
decreasedResource);
appResourceUsage.decPending(lastRequest.getNodeLabelExpression(),
decreasedResource);
}
}
private void updateNodeLabels(ResourceRequest request) {
Priority priority = request.getPriority();
String resourceName = request.getResourceName();
if (resourceName.equals(ResourceRequest.ANY)) {
ResourceRequest previousAnyRequest =
getResourceRequest(priority, resourceName);
// When there is change in ANY request label expression, we should
// update label for all resource requests already added of same
// priority as ANY resource request.
if ((null == previousAnyRequest)
|| hasRequestLabelChanged(previousAnyRequest, request)) {
Map<String, ResourceRequest> resourceRequest =
getResourceRequests(priority);
if (resourceRequest != null) {
for (ResourceRequest r : resourceRequest.values()) {
if (!r.getResourceName().equals(ResourceRequest.ANY)) {
r.setNodeLabelExpression(request.getNodeLabelExpression());
}
}
}
}
} else {
ResourceRequest anyRequest =
getResourceRequest(priority, ResourceRequest.ANY);
if (anyRequest != null) {
request.setNodeLabelExpression(anyRequest.getNodeLabelExpression());
}
}
}
private boolean hasRequestLabelChanged(ResourceRequest requestOne,
ResourceRequest requestTwo) { ResourceRequest requestTwo) {
String requestOneLabelExp = requestOne.getNodeLabelExpression(); String requestOneLabelExp = requestOne.getNodeLabelExpression();
String requestTwoLabelExp = requestTwo.getNodeLabelExpression(); String requestTwoLabelExp = requestTwo.getNodeLabelExpression();
@ -465,24 +454,24 @@ void updateUserOrAMBlacklist(Set<String> blacklist,
} }
} }
synchronized public Collection<Priority> getPriorities() { public synchronized Collection<Priority> getPriorities() {
return priorities; return priorities;
} }
synchronized public Map<String, ResourceRequest> getResourceRequests( public synchronized Map<String, ResourceRequest> getResourceRequests(
Priority priority) { Priority priority) {
return resourceRequestMap.get(priority); return resourceRequestMap.get(priority);
} }
public List<ResourceRequest> getAllResourceRequests() { public synchronized List<ResourceRequest> getAllResourceRequests() {
List<ResourceRequest> ret = new ArrayList<ResourceRequest>(); List<ResourceRequest> ret = new ArrayList<>();
for (Map<String, ResourceRequest> r : resourceRequestMap.values()) { for (Map<String, ResourceRequest> r : resourceRequestMap.values()) {
ret.addAll(r.values()); ret.addAll(r.values());
} }
return ret; return ret;
} }
synchronized public ResourceRequest getResourceRequest(Priority priority, public synchronized ResourceRequest getResourceRequest(Priority priority,
String resourceName) { String resourceName) {
Map<String, ResourceRequest> nodeRequests = resourceRequestMap.get(priority); Map<String, ResourceRequest> nodeRequests = resourceRequestMap.get(priority);
return (nodeRequests == null) ? null : nodeRequests.get(resourceName); return (nodeRequests == null) ? null : nodeRequests.get(resourceName);
@ -511,7 +500,7 @@ public boolean isBlacklisted(String resourceName,
} }
} }
} }
public synchronized void increaseContainer( public synchronized void increaseContainer(
SchedContainerChangeRequest increaseRequest) { SchedContainerChangeRequest increaseRequest) {
NodeId nodeId = increaseRequest.getNodeId(); NodeId nodeId = increaseRequest.getNodeId();
@ -559,28 +548,17 @@ public synchronized void decreaseContainer(
/** /**
* Resources have been allocated to this application by the resource * Resources have been allocated to this application by the resource
* scheduler. Track them. * scheduler. Track them.
*
* @param type
* the type of the node
* @param node
* the nodeinfo of the node
* @param priority
* the priority of the request.
* @param request
* the request
* @param container
* the containers allocated.
*/ */
synchronized public List<ResourceRequest> allocate(NodeType type, public synchronized List<ResourceRequest> allocate(NodeType type,
SchedulerNode node, Priority priority, ResourceRequest request, SchedulerNode node, Priority priority, ResourceRequest request,
Container container) { Container containerAllocated) {
List<ResourceRequest> resourceRequests = new ArrayList<ResourceRequest>(); List<ResourceRequest> resourceRequests = new ArrayList<>();
if (type == NodeType.NODE_LOCAL) { if (type == NodeType.NODE_LOCAL) {
allocateNodeLocal(node, priority, request, container, resourceRequests); allocateNodeLocal(node, priority, request, resourceRequests);
} else if (type == NodeType.RACK_LOCAL) { } else if (type == NodeType.RACK_LOCAL) {
allocateRackLocal(node, priority, request, container, resourceRequests); allocateRackLocal(node, priority, request, resourceRequests);
} else { } else {
allocateOffSwitch(node, priority, request, container, resourceRequests); allocateOffSwitch(request, resourceRequests);
} }
QueueMetrics metrics = queue.getMetrics(); QueueMetrics metrics = queue.getMetrics();
if (pending) { if (pending) {
@ -592,8 +570,8 @@ synchronized public List<ResourceRequest> allocate(NodeType type,
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("allocate: applicationId=" + applicationId LOG.debug("allocate: applicationId=" + applicationId
+ " container=" + container.getId() + " container=" + containerAllocated.getId()
+ " host=" + container.getNodeId().toString() + " host=" + containerAllocated.getNodeId().toString()
+ " user=" + user + " user=" + user
+ " resource=" + request.getCapability() + " resource=" + request.getCapability()
+ " type=" + type); + " type=" + type);
@ -606,12 +584,9 @@ synchronized public List<ResourceRequest> allocate(NodeType type,
/** /**
* The {@link ResourceScheduler} is allocating data-local resources to the * The {@link ResourceScheduler} is allocating data-local resources to the
* application. * application.
*
* @param allocatedContainers
* resources allocated to the application
*/ */
synchronized private void allocateNodeLocal(SchedulerNode node, private synchronized void allocateNodeLocal(SchedulerNode node,
Priority priority, ResourceRequest nodeLocalRequest, Container container, Priority priority, ResourceRequest nodeLocalRequest,
List<ResourceRequest> resourceRequests) { List<ResourceRequest> resourceRequests) {
// Update future requirements // Update future requirements
decResourceRequest(node.getNodeName(), priority, nodeLocalRequest); decResourceRequest(node.getNodeName(), priority, nodeLocalRequest);
@ -641,12 +616,9 @@ private void decResourceRequest(String resourceName, Priority priority,
/** /**
* The {@link ResourceScheduler} is allocating data-local resources to the * The {@link ResourceScheduler} is allocating data-local resources to the
* application. * application.
*
* @param allocatedContainers
* resources allocated to the application
*/ */
synchronized private void allocateRackLocal(SchedulerNode node, private synchronized void allocateRackLocal(SchedulerNode node,
Priority priority, ResourceRequest rackLocalRequest, Container container, Priority priority, ResourceRequest rackLocalRequest,
List<ResourceRequest> resourceRequests) { List<ResourceRequest> resourceRequests) {
// Update future requirements // Update future requirements
decResourceRequest(node.getRackName(), priority, rackLocalRequest); decResourceRequest(node.getRackName(), priority, rackLocalRequest);
@ -663,20 +635,16 @@ synchronized private void allocateRackLocal(SchedulerNode node,
/** /**
* The {@link ResourceScheduler} is allocating data-local resources to the * The {@link ResourceScheduler} is allocating data-local resources to the
* application. * application.
*
* @param allocatedContainers
* resources allocated to the application
*/ */
synchronized private void allocateOffSwitch(SchedulerNode node, private synchronized void allocateOffSwitch(
Priority priority, ResourceRequest offSwitchRequest, Container container, ResourceRequest offSwitchRequest, List<ResourceRequest> resourceRequests) {
List<ResourceRequest> resourceRequests) {
// Update future requirements // Update future requirements
decrementOutstanding(offSwitchRequest); decrementOutstanding(offSwitchRequest);
// Update cloned OffRack requests for recovery // Update cloned OffRack requests for recovery
resourceRequests.add(cloneResourceRequest(offSwitchRequest)); resourceRequests.add(cloneResourceRequest(offSwitchRequest));
} }
synchronized private void decrementOutstanding( private synchronized void decrementOutstanding(
ResourceRequest offSwitchRequest) { ResourceRequest offSwitchRequest) {
int numOffSwitchContainers = offSwitchRequest.getNumContainers() - 1; int numOffSwitchContainers = offSwitchRequest.getNumContainers() - 1;
@ -695,7 +663,7 @@ synchronized private void decrementOutstanding(
offSwitchRequest.getCapability()); offSwitchRequest.getCapability());
} }
synchronized private void checkForDeactivation() { private synchronized void checkForDeactivation() {
boolean deactivate = true; boolean deactivate = true;
for (Priority priority : getPriorities()) { for (Priority priority : getPriorities()) {
ResourceRequest request = getResourceRequest(priority, ResourceRequest.ANY); ResourceRequest request = getResourceRequest(priority, ResourceRequest.ANY);
@ -709,7 +677,7 @@ synchronized private void checkForDeactivation() {
// also we need to check increase request // also we need to check increase request
if (!deactivate) { if (!deactivate) {
deactivate = increaseRequestMap.isEmpty(); deactivate = containerIncreaseRequestMap.isEmpty();
} }
if (deactivate) { if (deactivate) {
@ -717,7 +685,7 @@ synchronized private void checkForDeactivation() {
} }
} }
synchronized public void move(Queue newQueue) { public synchronized void move(Queue newQueue) {
QueueMetrics oldMetrics = queue.getMetrics(); QueueMetrics oldMetrics = queue.getMetrics();
QueueMetrics newMetrics = newQueue.getMetrics(); QueueMetrics newMetrics = newQueue.getMetrics();
for (Map<String, ResourceRequest> asks : resourceRequestMap.values()) { for (Map<String, ResourceRequest> asks : resourceRequestMap.values()) {
@ -741,10 +709,9 @@ synchronized public void move(Queue newQueue) {
activeUsersManager = newQueue.getActiveUsersManager(); activeUsersManager = newQueue.getActiveUsersManager();
activeUsersManager.activateApplication(user, applicationId); activeUsersManager.activateApplication(user, applicationId);
this.queue = newQueue; this.queue = newQueue;
this.queueName = newQueue.getQueueName();
} }
synchronized public void stop(RMAppAttemptState rmAppAttemptFinalState) { public synchronized void stop() {
// clear pending resources metrics for the application // clear pending resources metrics for the application
QueueMetrics metrics = queue.getMetrics(); QueueMetrics metrics = queue.getMetrics();
for (Map<String, ResourceRequest> asks : resourceRequestMap.values()) { for (Map<String, ResourceRequest> asks : resourceRequestMap.values()) {
@ -782,12 +749,8 @@ public Set<String> getBlackListCopy() {
public synchronized void transferStateFromPreviousAppSchedulingInfo( public synchronized void transferStateFromPreviousAppSchedulingInfo(
AppSchedulingInfo appInfo) { AppSchedulingInfo appInfo) {
// this.priorities = appInfo.getPriorities();
// this.requests = appInfo.getRequests();
// This should not require locking the userBlacklist since it will not be // This should not require locking the userBlacklist since it will not be
// used by this instance until after setCurrentAppAttempt. // used by this instance until after setCurrentAppAttempt.
// Should cleanup this to avoid sharing between instances and can
// then remove getBlacklist as well.
this.userBlacklist = appInfo.getBlackList(); this.userBlacklist = appInfo.getBlackList();
} }

View File

@ -331,7 +331,7 @@ public synchronized void recoverResourceRequests(
public synchronized void stop(RMAppAttemptState rmAppAttemptFinalState) { public synchronized void stop(RMAppAttemptState rmAppAttemptFinalState) {
// Cleanup all scheduling information // Cleanup all scheduling information
isStopped = true; isStopped = true;
appSchedulingInfo.stop(rmAppAttemptFinalState); appSchedulingInfo.stop();
} }
public synchronized boolean isStopped() { public synchronized boolean isStopped() {