From 586348e4cbf197188057d6b843a6701cfffdaff3 Mon Sep 17 00:00:00 2001 From: Jian He Date: Fri, 20 Mar 2015 13:54:01 -0700 Subject: [PATCH] YARN-3356. Capacity Scheduler FiCaSchedulerApp should use ResourceUsage to track used-resources-by-label. Contributed by Wangda Tan --- hadoop-yarn-project/CHANGES.txt | 3 + .../scheduler/AbstractYarnScheduler.java | 5 +- .../scheduler/AppSchedulingInfo.java | 27 ++- .../resourcemanager/scheduler/Queue.java | 20 +++ .../scheduler/ResourceUsage.java | 19 +- .../SchedulerApplicationAttempt.java | 50 +++--- .../scheduler/SchedulerNode.java | 14 ++ .../scheduler/capacity/AbstractCSQueue.java | 24 +++ .../scheduler/capacity/LeafQueue.java | 29 ++- .../common/fica/FiCaSchedulerApp.java | 17 +- .../scheduler/fair/FSAppAttempt.java | 11 +- .../scheduler/fair/FSQueue.java | 8 + .../scheduler/fifo/FifoScheduler.java | 12 +- .../yarn/server/resourcemanager/MockAM.java | 24 +-- .../capacity/TestCapacityScheduler.java | 167 +++++++++++++++++- .../capacity/TestChildQueueOrder.java | 3 +- .../capacity/TestContainerAllocation.java | 70 +------- .../scheduler/capacity/TestReservations.java | 6 +- .../scheduler/capacity/TestUtils.java | 129 ++++++++++++++ 19 files changed, 509 insertions(+), 129 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index bbd018a776..046b7b153b 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -65,6 +65,9 @@ Release 2.8.0 - UNRELEASED YARN-3357. Move TestFifoScheduler to FIFO package. (Rohith Sharmaks via devaraj) + YARN-3356. Capacity Scheduler FiCaSchedulerApp should use ResourceUsage to + track used-resources-by-label. (Wangda Tan via jianhe) + OPTIMIZATIONS YARN-3339. TestDockerContainerExecutor should pull a single image and not diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 968a767f51..e1f94cf40c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -358,14 +358,15 @@ public synchronized void recoverContainersOnNode( container)); // recover scheduler node - nodes.get(nm.getNodeID()).recoverContainer(rmContainer); + SchedulerNode schedulerNode = nodes.get(nm.getNodeID()); + schedulerNode.recoverContainer(rmContainer); // recover queue: update headroom etc. Queue queue = schedulerAttempt.getQueue(); queue.recoverContainer(clusterResource, schedulerAttempt, rmContainer); // recover scheduler attempt - schedulerAttempt.recoverContainer(rmContainer); + schedulerAttempt.recoverContainer(schedulerNode, rmContainer); // set master container for the current running AMContainer for this // attempt. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index 1324c7d816..84ebe9cf70 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -20,8 +20,6 @@ import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -191,6 +189,16 @@ synchronized public void updateResourceRequests( request.getCapability()); metrics.decrPendingResources(user, lastRequestContainers, lastRequestCapability); + + // update queue: + queue.incPendingResource( + request.getNodeLabelExpression(), + Resources.multiply(request.getCapability(), + request.getNumContainers())); + if (lastRequest != null) { + queue.decPendingResource(lastRequest.getNodeLabelExpression(), + Resources.multiply(lastRequestCapability, lastRequestContainers)); + } } } } @@ -376,6 +384,9 @@ synchronized private void decrementOutstanding( if (numOffSwitchContainers == 0) { checkForDeactivation(); } + + queue.decPendingResource(offSwitchRequest.getNodeLabelExpression(), + offSwitchRequest.getCapability()); } synchronized private void checkForDeactivation() { @@ -404,6 +415,12 @@ synchronized public void move(Queue newQueue) { request.getCapability()); newMetrics.incrPendingResources(user, request.getNumContainers(), request.getCapability()); + + Resource delta = Resources.multiply(request.getCapability(), + request.getNumContainers()); + // Update Queue + queue.decPendingResource(request.getNodeLabelExpression(), delta); + newQueue.incPendingResource(request.getNodeLabelExpression(), delta); } } oldMetrics.moveAppFrom(this); @@ -423,6 +440,12 @@ synchronized public void stop(RMAppAttemptState rmAppAttemptFinalState) { if (request != null) { metrics.decrPendingResources(user, request.getNumContainers(), request.getCapability()); + + // Update Queue + queue.decPendingResource( + request.getNodeLabelExpression(), + Resources.multiply(request.getCapability(), + request.getNumContainers())); } } metrics.finishAppAttempt(applicationId, pending, user); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java index 4663a914c5..02003c145a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java @@ -90,4 +90,24 @@ public void recoverContainer(Resource clusterResource, * @return default label expression */ public String getDefaultNodeLabelExpression(); + + /** + * When new outstanding resource is asked, calling this will increase pending + * resource in a queue. + * + * @param nodeLabel asked by application + * @param resourceToInc new resource asked + */ + public void incPendingResource(String nodeLabel, Resource resourceToInc); + + /** + * When an outstanding resource is fulfilled or canceled, calling this will + * decrease pending resource in a queue. + * + * @param nodeLabel + * asked by application + * @param resourceToDec + * new resource asked + */ + public void decPendingResource(String nodeLabel, Resource resourceToDec); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java index de44bbe497..36ee4daa1e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java @@ -20,6 +20,7 @@ import java.util.HashMap; import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; @@ -75,14 +76,17 @@ public UsageByLabel(String label) { }; } + public Resource getUsed() { + return resArr[ResourceType.USED.idx]; + } + @Override public String toString() { StringBuilder sb = new StringBuilder(); sb.append("{used=" + resArr[0] + "%, "); sb.append("pending=" + resArr[1] + "%, "); sb.append("am_used=" + resArr[2] + "%, "); - sb.append("reserved=" + resArr[3] + "%, "); - sb.append("headroom=" + resArr[4] + "%}"); + sb.append("reserved=" + resArr[3] + "%}"); return sb.toString(); } } @@ -117,6 +121,17 @@ public void decUsed(String label, Resource res) { public void setUsed(Resource res) { setUsed(NL, res); } + + public void copyAllUsed(ResourceUsage other) { + try { + writeLock.lock(); + for (Entry entry : other.usages.entrySet()) { + setUsed(entry.getKey(), Resources.clone(entry.getValue().getUsed())); + } + } finally { + writeLock.unlock(); + } + } public void setUsed(String label, Resource res) { _set(label, ResourceType.USED, res); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index 9816699851..799a5c1581 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -87,13 +87,12 @@ public class SchedulerApplicationAttempt { private final Multiset reReservations = HashMultiset.create(); - protected final Resource currentReservation = Resource.newInstance(0, 0); private Resource resourceLimit = Resource.newInstance(0, 0); - protected Resource currentConsumption = Resource.newInstance(0, 0); - private Resource amResource = Resources.none(); private boolean unmanagedAM = true; private boolean amRunning = false; private LogAggregationContext logAggregationContext; + + protected ResourceUsage attemptResourceUsage = new ResourceUsage(); protected List newlyAllocatedContainers = new ArrayList(); @@ -217,11 +216,11 @@ public String getQueueName() { } public Resource getAMResource() { - return amResource; + return attemptResourceUsage.getAMUsed(); } public void setAMResource(Resource amResource) { - this.amResource = amResource; + attemptResourceUsage.setAMUsed(amResource); } public boolean isAmRunning() { @@ -260,7 +259,7 @@ public synchronized int getReReservations(Priority priority) { @Stable @Private public synchronized Resource getCurrentReservation() { - return currentReservation; + return attemptResourceUsage.getReserved(); } public Queue getQueue() { @@ -311,8 +310,8 @@ public synchronized RMContainer reserve(SchedulerNode node, Priority priority, rmContainer = new RMContainerImpl(container, getApplicationAttemptId(), node.getNodeID(), appSchedulingInfo.getUser(), rmContext); - - Resources.addTo(currentReservation, container.getResource()); + attemptResourceUsage.incReserved(node.getPartition(), + container.getResource()); // Reset the re-reservation count resetReReservations(priority); @@ -336,7 +335,7 @@ public synchronized RMContainer reserve(SchedulerNode node, Priority priority, + " reserved container " + rmContainer + " on node " + node + ". This attempt currently has " + reservedContainers.size() + " reserved containers at priority " + priority - + "; currentReservation " + currentReservation.getMemory()); + + "; currentReservation " + container.getResource()); } return rmContainer; @@ -402,9 +401,9 @@ public synchronized void showRequests() { for (Priority priority : getPriorities()) { Map requests = getResourceRequests(priority); if (requests != null) { - LOG.debug("showRequests:" + " application=" + getApplicationId() + - " headRoom=" + getHeadroom() + - " currentConsumption=" + currentConsumption.getMemory()); + LOG.debug("showRequests:" + " application=" + getApplicationId() + + " headRoom=" + getHeadroom() + " currentConsumption=" + + attemptResourceUsage.getUsed().getMemory()); for (ResourceRequest request : requests.values()) { LOG.debug("showRequests:" + " application=" + getApplicationId() + " request=" + request); @@ -415,7 +414,7 @@ public synchronized void showRequests() { } public Resource getCurrentConsumption() { - return currentConsumption; + return attemptResourceUsage.getUsed(); } public static class ContainersAndNMTokensAllocation { @@ -548,12 +547,17 @@ synchronized AggregateAppResourceUsage getRunningAggregateAppResourceUsage() { } public synchronized ApplicationResourceUsageReport getResourceUsageReport() { - AggregateAppResourceUsage resUsage = getRunningAggregateAppResourceUsage(); + AggregateAppResourceUsage runningResourceUsage = + getRunningAggregateAppResourceUsage(); + Resource usedResourceClone = + Resources.clone(attemptResourceUsage.getUsed()); + Resource reservedResourceClone = + Resources.clone(attemptResourceUsage.getReserved()); return ApplicationResourceUsageReport.newInstance(liveContainers.size(), - reservedContainers.size(), Resources.clone(currentConsumption), - Resources.clone(currentReservation), - Resources.add(currentConsumption, currentReservation), - resUsage.getMemorySeconds(), resUsage.getVcoreSeconds()); + reservedContainers.size(), usedResourceClone, reservedResourceClone, + Resources.add(usedResourceClone, reservedResourceClone), + runningResourceUsage.getMemorySeconds(), + runningResourceUsage.getVcoreSeconds()); } public synchronized Map getLiveContainersMap() { @@ -572,7 +576,7 @@ public synchronized void transferStateFromPreviousAttempt( SchedulerApplicationAttempt appAttempt) { this.liveContainers = appAttempt.getLiveContainersMap(); // this.reReservations = appAttempt.reReservations; - this.currentConsumption = appAttempt.getCurrentConsumption(); + this.attemptResourceUsage.copyAllUsed(appAttempt.attemptResourceUsage); this.resourceLimit = appAttempt.getResourceLimit(); // this.currentReservation = appAttempt.currentReservation; // this.newlyAllocatedContainers = appAttempt.newlyAllocatedContainers; @@ -603,7 +607,8 @@ public synchronized void move(Queue newQueue) { this.queue = newQueue; } - public synchronized void recoverContainer(RMContainer rmContainer) { + public synchronized void recoverContainer(SchedulerNode node, + RMContainer rmContainer) { // recover app scheduling info appSchedulingInfo.recoverContainer(rmContainer); @@ -613,8 +618,9 @@ public synchronized void recoverContainer(RMContainer rmContainer) { LOG.info("SchedulerAttempt " + getApplicationAttemptId() + " is recovering container " + rmContainer.getContainerId()); liveContainers.put(rmContainer.getContainerId(), rmContainer); - Resources.addTo(currentConsumption, rmContainer.getContainer() - .getResource()); + attemptResourceUsage.incUsed(node.getPartition(), rmContainer + .getContainer().getResource()); + // resourceLimit: updated when LeafQueue#recoverContainer#allocateResource // is called. // newlyAllocatedContainers.add(rmContainer); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index 29011345df..f03663a832 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; @@ -294,4 +295,17 @@ public Set getLabels() { public void updateLabels(Set labels) { this.labels = labels; } + + /** + * Get partition of which the node belongs to, if node-labels of this node is + * empty or null, it belongs to NO_LABEL partition. And since we only support + * one partition for each node (YARN-2694), first label will be its partition. + */ + public String getPartition() { + if (this.labels == null || this.labels.isEmpty()) { + return RMNodeLabelsManager.NO_LABEL; + } else { + return this.labels.iterator().next(); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 4e53060beb..3cd85ae42f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -509,4 +509,28 @@ synchronized boolean canAssignToThisQueue(Resource clusterResource, // non-empty return false; } + + @Override + public void incPendingResource(String nodeLabel, Resource resourceToInc) { + if (nodeLabel == null) { + nodeLabel = RMNodeLabelsManager.NO_LABEL; + } + // ResourceUsage has its own lock, no addition lock needs here. + queueUsage.incPending(nodeLabel, resourceToInc); + if (null != parent) { + parent.incPendingResource(nodeLabel, resourceToInc); + } + } + + @Override + public void decPendingResource(String nodeLabel, Resource resourceToDec) { + if (nodeLabel == null) { + nodeLabel = RMNodeLabelsManager.NO_LABEL; + } + // ResourceUsage has its own lock, no addition lock needs here. + queueUsage.decPending(nodeLabel, resourceToDec); + if (null != parent) { + parent.decPendingResource(nodeLabel, resourceToDec); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index fa0e280db4..3e5405dd5e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -739,6 +739,15 @@ private static Set getRequestLabelSetByExpression( return labels; } + private boolean checkResourceRequestMatchingNodeLabel(ResourceRequest offswitchResourceRequest, + FiCaSchedulerNode node) { + String askedNodeLabel = offswitchResourceRequest.getNodeLabelExpression(); + if (null == askedNodeLabel) { + askedNodeLabel = RMNodeLabelsManager.NO_LABEL; + } + return askedNodeLabel.equals(node.getPartition()); + } + @Override public synchronized CSAssignment assignContainers(Resource clusterResource, FiCaSchedulerNode node, ResourceLimits currentResourceLimits) { @@ -796,6 +805,14 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, if (application.getTotalRequiredResources(priority) <= 0) { continue; } + + // Is the node-label-expression of this offswitch resource request + // matches the node's label? + // If not match, jump to next priority. + if (!checkResourceRequestMatchingNodeLabel(anyRequest, node)) { + continue; + } + if (!this.reservationsContinueLooking) { if (!shouldAllocOrReserveNewContainer(application, priority, required)) { if (LOG.isDebugEnabled()) { @@ -825,7 +842,7 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, } // Check user limit - if (!assignToUser(clusterResource, application.getUser(), userLimit, + if (!canAssignToUser(clusterResource, application.getUser(), userLimit, application, true, requestedNodeLabels)) { break; } @@ -1076,7 +1093,7 @@ private Resource computeUserLimit(FiCaSchedulerApp application, } @Private - protected synchronized boolean assignToUser(Resource clusterResource, + protected synchronized boolean canAssignToUser(Resource clusterResource, String userName, Resource limit, FiCaSchedulerApp application, boolean checkReservations, Set requestLabels) { User user = getUser(userName); @@ -1094,7 +1111,8 @@ protected synchronized boolean assignToUser(Resource clusterResource, limit)) { // if enabled, check to see if could we potentially use this node instead // of a reserved node if the application has reserved containers - if (this.reservationsContinueLooking && checkReservations) { + if (this.reservationsContinueLooking && checkReservations + && label.equals(CommonNodeLabelsManager.NO_LABEL)) { if (Resources.lessThanOrEqual( resourceCalculator, clusterResource, @@ -1305,7 +1323,7 @@ protected boolean checkLimitsToReserve(Resource clusterResource, } // Check user limit - if (!assignToUser(clusterResource, application.getUser(), userLimit, + if (!canAssignToUser(clusterResource, application.getUser(), userLimit, application, false, null)) { if (LOG.isDebugEnabled()) { LOG.debug("was going to reserve but hit user limit"); @@ -1622,7 +1640,8 @@ public void completedContainer(Resource clusterResource, node, rmContainer); } else { removed = - application.containerCompleted(rmContainer, containerStatus, event); + application.containerCompleted(rmContainer, containerStatus, + event, node.getPartition()); node.releaseContainer(container); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 10f5c20fb1..e0413895d4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -90,7 +90,8 @@ public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, } synchronized public boolean containerCompleted(RMContainer rmContainer, - ContainerStatus containerStatus, RMContainerEventType event) { + ContainerStatus containerStatus, RMContainerEventType event, + String partition) { // Remove from the list of containers if (null == liveContainers.remove(rmContainer.getContainerId())) { @@ -122,7 +123,7 @@ synchronized public boolean containerCompleted(RMContainer rmContainer, // Update usage metrics Resource containerResource = rmContainer.getContainer().getResource(); queue.getMetrics().releaseResources(getUser(), 1, containerResource); - Resources.subtractFrom(currentConsumption, containerResource); + attemptResourceUsage.decUsed(partition, containerResource); // Clear resource utilization metrics cache. lastMemoryAggregateAllocationUpdateTime = -1; @@ -156,7 +157,8 @@ synchronized public RMContainer allocate(NodeType type, FiCaSchedulerNode node, // Update consumption and track allocations List resourceRequestList = appSchedulingInfo.allocate( type, node, priority, request, container); - Resources.addTo(currentConsumption, container.getResource()); + attemptResourceUsage.incUsed(node.getPartition(), + container.getResource()); // Update resource requests related to "request" and store in RMContainer ((RMContainerImpl)rmContainer).setResourceRequests(resourceRequestList); @@ -198,12 +200,13 @@ public synchronized boolean unreserve(FiCaSchedulerNode node, Priority priority) resetReReservations(priority); Resource resource = reservedContainer.getContainer().getResource(); - Resources.subtractFrom(currentReservation, resource); + this.attemptResourceUsage.decReserved(node.getPartition(), resource); LOG.info("Application " + getApplicationId() + " unreserved " - + " on node " + node + ", currently has " + reservedContainers.size() - + " at priority " + priority + "; currentReservation " - + currentReservation); + + " on node " + node + ", currently has " + + reservedContainers.size() + " at priority " + priority + + "; currentReservation " + this.attemptResourceUsage.getReserved() + + " on node-label=" + node.getPartition()); return true; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index 67103d187b..dfde5abb7f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -142,7 +142,7 @@ synchronized public void containerCompleted(RMContainer rmContainer, // Update usage metrics Resource containerResource = rmContainer.getContainer().getResource(); queue.getMetrics().releaseResources(getUser(), 1, containerResource); - Resources.subtractFrom(currentConsumption, containerResource); + this.attemptResourceUsage.decUsed(containerResource); // remove from preemption map if it is completed preemptionMap.remove(rmContainer); @@ -164,11 +164,12 @@ private synchronized void unreserveInternal( resetReReservations(priority); Resource resource = reservedContainer.getContainer().getResource(); - Resources.subtractFrom(currentReservation, resource); + this.attemptResourceUsage.decReserved(resource); LOG.info("Application " + getApplicationId() + " unreserved " + " on node " - + node + ", currently has " + reservedContainers.size() + " at priority " - + priority + "; currentReservation " + currentReservation); + + node + ", currently has " + reservedContainers.size() + + " at priority " + priority + "; currentReservation " + + this.attemptResourceUsage.getReserved()); } @Override @@ -339,7 +340,7 @@ else if (allowed.equals(NodeType.RACK_LOCAL) && // Update consumption and track allocations List resourceRequestList = appSchedulingInfo.allocate( type, node, priority, request, container); - Resources.addTo(currentConsumption, container.getResource()); + this.attemptResourceUsage.incUsed(container.getResource()); // Update resource requests related to "request" and store in RMContainer ((RMContainerImpl) rmContainer).setResourceRequests(resourceRequestList); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java index 349464e1ef..1562bf6770 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java @@ -291,4 +291,12 @@ public String getDefaultNodeLabelExpression() { // TODO, add implementation for FS return null; } + + @Override + public void incPendingResource(String nodeLabel, Resource resourceToInc) { + } + + @Override + public void decPendingResource(String nodeLabel, Resource resourceToDec) { + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index beb3ab580b..b8c419cdec 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; @@ -200,6 +201,14 @@ public String getDefaultNodeLabelExpression() { // TODO add implementation for FIFO scheduler return null; } + + @Override + public void incPendingResource(String nodeLabel, Resource resourceToInc) { + } + + @Override + public void decPendingResource(String nodeLabel, Resource resourceToDec) { + } }; public FifoScheduler() { @@ -870,7 +879,8 @@ protected synchronized void completedContainer(RMContainer rmContainer, } // Inform the application - application.containerCompleted(rmContainer, containerStatus, event); + application.containerCompleted(rmContainer, containerStatus, event, + RMNodeLabelsManager.NO_LABEL); // Inform the node node.releaseContainer(container); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java index 494f5a412f..f62fdb3dce 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java @@ -164,17 +164,19 @@ public List createReq(String[] hosts, int memory, int priority, public List createReq(String[] hosts, int memory, int priority, int containers, String labelExpression) throws Exception { List reqs = new ArrayList(); - for (String host : hosts) { - // only add host/rack request when asked host isn't ANY - if (!host.equals(ResourceRequest.ANY)) { - ResourceRequest hostReq = - createResourceReq(host, memory, priority, containers, - labelExpression); - reqs.add(hostReq); - ResourceRequest rackReq = - createResourceReq("/default-rack", memory, priority, containers, - labelExpression); - reqs.add(rackReq); + if (hosts != null) { + for (String host : hosts) { + // only add host/rack request when asked host isn't ANY + if (!host.equals(ResourceRequest.ANY)) { + ResourceRequest hostReq = + createResourceReq(host, memory, priority, containers, + labelExpression); + reqs.add(hostReq); + ResourceRequest rackReq = + createResourceReq("/default-rack", memory, priority, containers, + labelExpression); + reqs.add(rackReq); + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index e30f4413d9..aaa615dc42 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -29,11 +29,13 @@ import java.net.InetSocketAddress; import java.security.PrivilegedAction; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; @@ -111,6 +113,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; @@ -128,10 +131,13 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; -import org.junit.ComparisonFailure; import org.junit.Test; import org.mockito.Mockito; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; + public class TestCapacityScheduler { private static final Log LOG = LogFactory.getLog(TestCapacityScheduler.class); @@ -2557,6 +2563,165 @@ public void testParentQueueMaxCapsAreRespected() throws Exception { Assert.fail("Shouldn't successfully allocate containers for am2, " + "queue-a's max capacity will be violated if container allocated"); } + + @SuppressWarnings("unchecked") + private Set toSet(E... elements) { + Set set = Sets.newHashSet(elements); + return set; + } + + @Test + public void testQueueHierarchyPendingResourceUpdate() throws Exception { + Configuration conf = + TestUtils.getConfigurationWithQueueLabels(new Configuration(false)); + conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true); + + final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager(); + mgr.init(conf); + mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y")); + mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"))); + + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + MockRM rm = new MockRM(conf, memStore) { + protected RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm.start(); + MockNM nm1 = // label = x + new MockNM("h1:1234", 200 * GB, rm.getResourceTrackerService()); + nm1.registerNode(); + + MockNM nm2 = // label = "" + new MockNM("h2:1234", 200 * GB, rm.getResourceTrackerService()); + nm2.registerNode(); + + // Launch app1 in queue=a1 + RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "a1"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2); + + // Launch app2 in queue=b1 + RMApp app2 = rm.submitApp(8 * GB, "app", "user", null, "b1"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2); + + // am1 asks for 8 * 1GB container for no label + am1.allocate(Arrays.asList(ResourceRequest.newInstance( + Priority.newInstance(1), "*", Resources.createResource(1 * GB), 8)), + null); + + checkPendingResource(rm, "a1", 8 * GB, null); + checkPendingResource(rm, "a", 8 * GB, null); + checkPendingResource(rm, "root", 8 * GB, null); + + // am2 asks for 8 * 1GB container for no label + am2.allocate(Arrays.asList(ResourceRequest.newInstance( + Priority.newInstance(1), "*", Resources.createResource(1 * GB), 8)), + null); + + checkPendingResource(rm, "a1", 8 * GB, null); + checkPendingResource(rm, "a", 8 * GB, null); + checkPendingResource(rm, "b1", 8 * GB, null); + checkPendingResource(rm, "b", 8 * GB, null); + // root = a + b + checkPendingResource(rm, "root", 16 * GB, null); + + // am2 asks for 8 * 1GB container in another priority for no label + am2.allocate(Arrays.asList(ResourceRequest.newInstance( + Priority.newInstance(2), "*", Resources.createResource(1 * GB), 8)), + null); + + checkPendingResource(rm, "a1", 8 * GB, null); + checkPendingResource(rm, "a", 8 * GB, null); + checkPendingResource(rm, "b1", 16 * GB, null); + checkPendingResource(rm, "b", 16 * GB, null); + // root = a + b + checkPendingResource(rm, "root", 24 * GB, null); + + // am1 asks 4 GB resource instead of 8 * GB for priority=1 + am1.allocate(Arrays.asList(ResourceRequest.newInstance( + Priority.newInstance(1), "*", Resources.createResource(4 * GB), 1)), + null); + + checkPendingResource(rm, "a1", 4 * GB, null); + checkPendingResource(rm, "a", 4 * GB, null); + checkPendingResource(rm, "b1", 16 * GB, null); + checkPendingResource(rm, "b", 16 * GB, null); + // root = a + b + checkPendingResource(rm, "root", 20 * GB, null); + + // am1 asks 8 * GB resource which label=x + am1.allocate(Arrays.asList(ResourceRequest.newInstance( + Priority.newInstance(2), "*", Resources.createResource(8 * GB), 1, + true, "x")), null); + + checkPendingResource(rm, "a1", 4 * GB, null); + checkPendingResource(rm, "a", 4 * GB, null); + checkPendingResource(rm, "a1", 8 * GB, "x"); + checkPendingResource(rm, "a", 8 * GB, "x"); + checkPendingResource(rm, "b1", 16 * GB, null); + checkPendingResource(rm, "b", 16 * GB, null); + // root = a + b + checkPendingResource(rm, "root", 20 * GB, null); + checkPendingResource(rm, "root", 8 * GB, "x"); + + // some containers allocated for am1, pending resource should decrease + ContainerId containerId = + ContainerId.newContainerId(am1.getApplicationAttemptId(), 2); + Assert.assertTrue(rm.waitForState(nm1, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 3); + Assert.assertTrue(rm.waitForState(nm2, containerId, + RMContainerState.ALLOCATED, 10 * 1000)); + + checkPendingResource(rm, "a1", 0 * GB, null); + checkPendingResource(rm, "a", 0 * GB, null); + checkPendingResource(rm, "a1", 0 * GB, "x"); + checkPendingResource(rm, "a", 0 * GB, "x"); + // some containers could be allocated for am2 when we allocating containers + // for am1, just check if pending resource of b1/b/root > 0 + checkPendingResourceGreaterThanZero(rm, "b1", null); + checkPendingResourceGreaterThanZero(rm, "b", null); + // root = a + b + checkPendingResourceGreaterThanZero(rm, "root", null); + checkPendingResource(rm, "root", 0 * GB, "x"); + + // complete am2, pending resource should be 0 now + AppAttemptRemovedSchedulerEvent appRemovedEvent = + new AppAttemptRemovedSchedulerEvent( + am2.getApplicationAttemptId(), RMAppAttemptState.FINISHED, false); + rm.getResourceScheduler().handle(appRemovedEvent); + + checkPendingResource(rm, "a1", 0 * GB, null); + checkPendingResource(rm, "a", 0 * GB, null); + checkPendingResource(rm, "a1", 0 * GB, "x"); + checkPendingResource(rm, "a", 0 * GB, "x"); + checkPendingResource(rm, "b1", 0 * GB, null); + checkPendingResource(rm, "b", 0 * GB, null); + checkPendingResource(rm, "root", 0 * GB, null); + checkPendingResource(rm, "root", 0 * GB, "x"); + } + + private void checkPendingResource(MockRM rm, String queueName, int memory, + String label) { + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + CSQueue queue = cs.getQueue(queueName); + Assert.assertEquals( + memory, + queue.getQueueResourceUsage() + .getPending(label == null ? RMNodeLabelsManager.NO_LABEL : label) + .getMemory()); + } + + private void checkPendingResourceGreaterThanZero(MockRM rm, String queueName, + String label) { + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + CSQueue queue = cs.getQueue(queueName); + Assert.assertTrue(queue.getQueueResourceUsage() + .getPending(label == null ? RMNodeLabelsManager.NO_LABEL : label) + .getMemory() > 0); + } // Test verifies AM Used resource for LeafQueue when AM ResourceRequest is // lesser than minimumAllocation diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java index 71dc523c9d..23b31faeb8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java @@ -247,7 +247,8 @@ public void testSortedQueues() throws Exception { // Stub an App and its containerCompleted FiCaSchedulerApp app_0 = getMockApplication(0,user_0); doReturn(true).when(app_0).containerCompleted(any(RMContainer.class), - any(ContainerStatus.class),any(RMContainerEventType.class)); + any(ContainerStatus.class), any(RMContainerEventType.class), + any(String.class)); Priority priority = TestUtils.createMockPriority(1); ContainerAllocationExpirer expirer = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java index fe61eabc97..03b8f5c1fe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java @@ -328,19 +328,6 @@ protected RMSecretManagerService createRMSecretManagerService() { MockRM.launchAndRegisterAM(app1, rm1, nm1); } - private Configuration getConfigurationWithDefaultQueueLabels( - Configuration config) { - final String A = CapacitySchedulerConfiguration.ROOT + ".a"; - final String B = CapacitySchedulerConfiguration.ROOT + ".b"; - - CapacitySchedulerConfiguration conf = - (CapacitySchedulerConfiguration) getConfigurationWithQueueLabels(config); - new CapacitySchedulerConfiguration(config); - conf.setDefaultNodeLabelExpression(A, "x"); - conf.setDefaultNodeLabelExpression(B, "y"); - return conf; - } - private Configuration getConfigurationWithQueueLabels(Configuration config) { CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(config); @@ -406,57 +393,6 @@ private Set toSet(E... elements) { return set; } - private Configuration getComplexConfigurationWithQueueLabels( - Configuration config) { - CapacitySchedulerConfiguration conf = - new CapacitySchedulerConfiguration(config); - - // Define top-level queues - conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"}); - conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100); - conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "y", 100); - conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "z", 100); - - final String A = CapacitySchedulerConfiguration.ROOT + ".a"; - conf.setCapacity(A, 10); - conf.setMaximumCapacity(A, 10); - conf.setAccessibleNodeLabels(A, toSet("x", "y")); - conf.setCapacityByLabel(A, "x", 100); - conf.setCapacityByLabel(A, "y", 50); - - final String B = CapacitySchedulerConfiguration.ROOT + ".b"; - conf.setCapacity(B, 90); - conf.setMaximumCapacity(B, 100); - conf.setAccessibleNodeLabels(B, toSet("y", "z")); - conf.setCapacityByLabel(B, "y", 50); - conf.setCapacityByLabel(B, "z", 100); - - // Define 2nd-level queues - final String A1 = A + ".a1"; - conf.setQueues(A, new String[] {"a1"}); - conf.setCapacity(A1, 100); - conf.setMaximumCapacity(A1, 100); - conf.setAccessibleNodeLabels(A1, toSet("x", "y")); - conf.setDefaultNodeLabelExpression(A1, "x"); - conf.setCapacityByLabel(A1, "x", 100); - conf.setCapacityByLabel(A1, "y", 100); - - conf.setQueues(B, new String[] {"b1", "b2"}); - final String B1 = B + ".b1"; - conf.setCapacity(B1, 50); - conf.setMaximumCapacity(B1, 50); - conf.setAccessibleNodeLabels(B1, RMNodeLabelsManager.EMPTY_STRING_SET); - - final String B2 = B + ".b2"; - conf.setCapacity(B2, 50); - conf.setMaximumCapacity(B2, 50); - conf.setAccessibleNodeLabels(B2, toSet("y", "z")); - conf.setCapacityByLabel(B2, "y", 100); - conf.setCapacityByLabel(B2, "z", 100); - - return conf; - } - @Test (timeout = 300000) public void testContainerAllocationWithSingleUserLimits() throws Exception { final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager(); @@ -468,7 +404,7 @@ public void testContainerAllocationWithSingleUserLimits() throws Exception { NodeId.newInstance("h2", 0), toSet("y"))); // inject node label manager - MockRM rm1 = new MockRM(getConfigurationWithDefaultQueueLabels(conf)) { + MockRM rm1 = new MockRM(TestUtils.getConfigurationWithDefaultQueueLabels(conf)) { @Override public RMNodeLabelsManager createNodeLabelManager() { return mgr; @@ -554,7 +490,7 @@ public void testContainerAllocateWithComplexLabels() throws Exception { RMNodeLabelsManager.EMPTY_STRING_SET)); // inject node label manager - MockRM rm1 = new MockRM(getComplexConfigurationWithQueueLabels(conf)) { + MockRM rm1 = new MockRM(TestUtils.getComplexConfigurationWithQueueLabels(conf)) { @Override public RMNodeLabelsManager createNodeLabelManager() { return mgr; @@ -711,7 +647,7 @@ public void testContainerAllocateWithDefaultQueueLabels() throws Exception { NodeId.newInstance("h2", 0), toSet("y"))); // inject node label manager - MockRM rm1 = new MockRM(getConfigurationWithDefaultQueueLabels(conf)) { + MockRM rm1 = new MockRM(TestUtils.getConfigurationWithDefaultQueueLabels(conf)) { @Override public RMNodeLabelsManager createNodeLabelManager() { return mgr; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java index c5b7587a39..e8a8243203 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java @@ -1058,19 +1058,19 @@ public void testAssignToUser() throws Exception { // set limit so subtrace reservations it can continue Resource limit = Resources.createResource(12 * GB, 0); - boolean res = a.assignToUser(clusterResource, user_0, limit, app_0, + boolean res = a.canAssignToUser(clusterResource, user_0, limit, app_0, true, null); assertTrue(res); // tell it not to check for reservations and should fail as already over // limit - res = a.assignToUser(clusterResource, user_0, limit, app_0, false, null); + res = a.canAssignToUser(clusterResource, user_0, limit, app_0, false, null); assertFalse(res); refreshQueuesTurnOffReservationsContLook(a, csConf); // should now return false since feature off - res = a.assignToUser(clusterResource, user_0, limit, app_0, true, null); + res = a.canAssignToUser(clusterResource, user_0, limit, app_0, true, null); assertFalse(res); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java index 9e352a70cc..62135b91df 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java @@ -60,6 +60,8 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import com.google.common.collect.Sets; + public class TestUtils { private static final Log LOG = LogFactory.getLog(TestUtils.class); @@ -216,4 +218,131 @@ public static Container getMockContainer( when(container.getPriority()).thenReturn(priority); return container; } + + @SuppressWarnings("unchecked") + private static Set toSet(E... elements) { + Set set = Sets.newHashSet(elements); + return set; + } + + /** + * Get a queue structure: + *
+   *             Root
+   *            /  |  \
+   *           a   b   c
+   *           |   |   |
+   *           a1  b1  c1
+   *          (x)  (y)
+   * 
+ */ + public static Configuration getConfigurationWithQueueLabels(Configuration config) { + CapacitySchedulerConfiguration conf = + new CapacitySchedulerConfiguration(config); + + // Define top-level queues + conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b", "c"}); + conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100); + conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "y", 100); + + final String A = CapacitySchedulerConfiguration.ROOT + ".a"; + conf.setCapacity(A, 10); + conf.setMaximumCapacity(A, 15); + conf.setAccessibleNodeLabels(A, toSet("x")); + conf.setCapacityByLabel(A, "x", 100); + + final String B = CapacitySchedulerConfiguration.ROOT + ".b"; + conf.setCapacity(B, 20); + conf.setAccessibleNodeLabels(B, toSet("y")); + conf.setCapacityByLabel(B, "y", 100); + + final String C = CapacitySchedulerConfiguration.ROOT + ".c"; + conf.setCapacity(C, 70); + conf.setMaximumCapacity(C, 70); + conf.setAccessibleNodeLabels(C, RMNodeLabelsManager.EMPTY_STRING_SET); + + // Define 2nd-level queues + final String A1 = A + ".a1"; + conf.setQueues(A, new String[] {"a1"}); + conf.setCapacity(A1, 100); + conf.setMaximumCapacity(A1, 100); + conf.setCapacityByLabel(A1, "x", 100); + + final String B1 = B + ".b1"; + conf.setQueues(B, new String[] {"b1"}); + conf.setCapacity(B1, 100); + conf.setMaximumCapacity(B1, 100); + conf.setCapacityByLabel(B1, "y", 100); + + final String C1 = C + ".c1"; + conf.setQueues(C, new String[] {"c1"}); + conf.setCapacity(C1, 100); + conf.setMaximumCapacity(C1, 100); + + return conf; + } + + public static Configuration getComplexConfigurationWithQueueLabels( + Configuration config) { + CapacitySchedulerConfiguration conf = + new CapacitySchedulerConfiguration(config); + + // Define top-level queues + conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"}); + conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100); + conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "y", 100); + conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "z", 100); + + final String A = CapacitySchedulerConfiguration.ROOT + ".a"; + conf.setCapacity(A, 10); + conf.setMaximumCapacity(A, 10); + conf.setAccessibleNodeLabels(A, toSet("x", "y")); + conf.setCapacityByLabel(A, "x", 100); + conf.setCapacityByLabel(A, "y", 50); + + final String B = CapacitySchedulerConfiguration.ROOT + ".b"; + conf.setCapacity(B, 90); + conf.setMaximumCapacity(B, 100); + conf.setAccessibleNodeLabels(B, toSet("y", "z")); + conf.setCapacityByLabel(B, "y", 50); + conf.setCapacityByLabel(B, "z", 100); + + // Define 2nd-level queues + final String A1 = A + ".a1"; + conf.setQueues(A, new String[] {"a1"}); + conf.setCapacity(A1, 100); + conf.setMaximumCapacity(A1, 100); + conf.setAccessibleNodeLabels(A1, toSet("x", "y")); + conf.setDefaultNodeLabelExpression(A1, "x"); + conf.setCapacityByLabel(A1, "x", 100); + conf.setCapacityByLabel(A1, "y", 100); + + conf.setQueues(B, new String[] {"b1", "b2"}); + final String B1 = B + ".b1"; + conf.setCapacity(B1, 50); + conf.setMaximumCapacity(B1, 50); + conf.setAccessibleNodeLabels(B1, RMNodeLabelsManager.EMPTY_STRING_SET); + + final String B2 = B + ".b2"; + conf.setCapacity(B2, 50); + conf.setMaximumCapacity(B2, 50); + conf.setAccessibleNodeLabels(B2, toSet("y", "z")); + conf.setCapacityByLabel(B2, "y", 100); + conf.setCapacityByLabel(B2, "z", 100); + + return conf; + } + + public static Configuration getConfigurationWithDefaultQueueLabels( + Configuration config) { + final String A = CapacitySchedulerConfiguration.ROOT + ".a"; + final String B = CapacitySchedulerConfiguration.ROOT + ".b"; + + CapacitySchedulerConfiguration conf = + (CapacitySchedulerConfiguration) getConfigurationWithQueueLabels(config); + new CapacitySchedulerConfiguration(config); + conf.setDefaultNodeLabelExpression(A, "x"); + conf.setDefaultNodeLabelExpression(B, "y"); + return conf; + } }