YARN-3356. Capacity Scheduler FiCaSchedulerApp should use ResourceUsage to track used-resources-by-label. Contributed by Wangda Tan
This commit is contained in:
@ -65,6 +65,9 @@ Release 2.8.0 - UNRELEASED
YARN-3357. Move TestFifoScheduler to FIFO package. (Rohith Sharmaks
via devaraj)
YARN-3356. Capacity Scheduler FiCaSchedulerApp should use ResourceUsage to
track used-resources-by-label. (Wangda Tan via jianhe)
YARN-3339. TestDockerContainerExecutor should pull a single image and not
@ -358,14 +358,15 @@ public synchronized void recoverContainersOnNode(
// recover scheduler node
SchedulerNode schedulerNode = nodes.get(nm.getNodeID());
// recover queue: update headroom etc.
Queue queue = schedulerAttempt.getQueue();
queue.recoverContainer(clusterResource, schedulerAttempt, rmContainer);
// recover scheduler attempt
schedulerAttempt.recoverContainer(schedulerNode, rmContainer);
// set master container for the current running AMContainer for this
// attempt.
@ -20,8 +20,6 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@ -191,6 +189,16 @@ synchronized public void updateResourceRequests(
metrics.decrPendingResources(user, lastRequestContainers,
// update queue:
if (lastRequest != null) {
Resources.multiply(lastRequestCapability, lastRequestContainers));
@ -376,6 +384,9 @@ synchronized private void decrementOutstanding(
if (numOffSwitchContainers == 0) {
synchronized private void checkForDeactivation() {
@ -404,6 +415,12 @@ synchronized public void move(Queue newQueue) {
newMetrics.incrPendingResources(user, request.getNumContainers(),
Resource delta = Resources.multiply(request.getCapability(),
// Update Queue
queue.decPendingResource(request.getNodeLabelExpression(), delta);
newQueue.incPendingResource(request.getNodeLabelExpression(), delta);
@ -423,6 +440,12 @@ synchronized public void stop(RMAppAttemptState rmAppAttemptFinalState) {
if (request != null) {
metrics.decrPendingResources(user, request.getNumContainers(),
// Update Queue
metrics.finishAppAttempt(applicationId, pending, user);
@ -90,4 +90,24 @@ public void recoverContainer(Resource clusterResource,
* @return default label expression
public String getDefaultNodeLabelExpression();
* When new outstanding resource is asked, calling this will increase pending
* resource in a queue.
* @param nodeLabel asked by application
* @param resourceToInc new resource asked
public void incPendingResource(String nodeLabel, Resource resourceToInc);
* When an outstanding resource is fulfilled or canceled, calling this will
* decrease pending resource in a queue.
* @param nodeLabel
* asked by application
* @param resourceToDec
* new resource asked
public void decPendingResource(String nodeLabel, Resource resourceToDec);
@ -20,6 +20,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
@ -75,14 +76,17 @@ public UsageByLabel(String label) {
public Resource getUsed() {
return resArr[ResourceType.USED.idx];
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("{used=" + resArr[0] + "%, ");
sb.append("pending=" + resArr[1] + "%, ");
sb.append("am_used=" + resArr[2] + "%, ");
sb.append("reserved=" + resArr[3] + "%, ");
sb.append("headroom=" + resArr[4] + "%}");
sb.append("reserved=" + resArr[3] + "%}");
return sb.toString();
@ -117,6 +121,17 @@ public void decUsed(String label, Resource res) {
public void setUsed(Resource res) {
setUsed(NL, res);
public void copyAllUsed(ResourceUsage other) {
try {
for (Entry<String, UsageByLabel> entry : other.usages.entrySet()) {
setUsed(entry.getKey(), Resources.clone(entry.getValue().getUsed()));
} finally {
public void setUsed(String label, Resource res) {
_set(label, ResourceType.USED, res);
@ -87,13 +87,12 @@ public class SchedulerApplicationAttempt {
private final Multiset<Priority> reReservations = HashMultiset.create();
protected final Resource currentReservation = Resource.newInstance(0, 0);
private Resource resourceLimit = Resource.newInstance(0, 0);
protected Resource currentConsumption = Resource.newInstance(0, 0);
private Resource amResource = Resources.none();
private boolean unmanagedAM = true;
private boolean amRunning = false;
private LogAggregationContext logAggregationContext;
protected ResourceUsage attemptResourceUsage = new ResourceUsage();
protected List<RMContainer> newlyAllocatedContainers =
new ArrayList<RMContainer>();
@ -217,11 +216,11 @@ public String getQueueName() {
public Resource getAMResource() {
return amResource;
return attemptResourceUsage.getAMUsed();
public void setAMResource(Resource amResource) {
this.amResource = amResource;
public boolean isAmRunning() {
@ -260,7 +259,7 @@ public synchronized int getReReservations(Priority priority) {
public synchronized Resource getCurrentReservation() {
return currentReservation;
return attemptResourceUsage.getReserved();
public Queue getQueue() {
@ -311,8 +310,8 @@ public synchronized RMContainer reserve(SchedulerNode node, Priority priority,
rmContainer =
new RMContainerImpl(container, getApplicationAttemptId(),
node.getNodeID(), appSchedulingInfo.getUser(), rmContext);
Resources.addTo(currentReservation, container.getResource());
// Reset the re-reservation count
@ -336,7 +335,7 @@ public synchronized RMContainer reserve(SchedulerNode node, Priority priority,
+ " reserved container " + rmContainer + " on node " + node
+ ". This attempt currently has " + reservedContainers.size()
+ " reserved containers at priority " + priority
+ "; currentReservation " + currentReservation.getMemory());
+ "; currentReservation " + container.getResource());
return rmContainer;
@ -402,9 +401,9 @@ public synchronized void showRequests() {
for (Priority priority : getPriorities()) {
Map<String, ResourceRequest> requests = getResourceRequests(priority);
if (requests != null) {
LOG.debug("showRequests:" + " application=" + getApplicationId() +
" headRoom=" + getHeadroom() +
" currentConsumption=" + currentConsumption.getMemory());
LOG.debug("showRequests:" + " application=" + getApplicationId()
+ " headRoom=" + getHeadroom() + " currentConsumption="
+ attemptResourceUsage.getUsed().getMemory());
for (ResourceRequest request : requests.values()) {
LOG.debug("showRequests:" + " application=" + getApplicationId()
+ " request=" + request);
@ -415,7 +414,7 @@ public synchronized void showRequests() {
public Resource getCurrentConsumption() {
return currentConsumption;
return attemptResourceUsage.getUsed();
public static class ContainersAndNMTokensAllocation {
@ -548,12 +547,17 @@ synchronized AggregateAppResourceUsage getRunningAggregateAppResourceUsage() {
public synchronized ApplicationResourceUsageReport getResourceUsageReport() {
AggregateAppResourceUsage resUsage = getRunningAggregateAppResourceUsage();
AggregateAppResourceUsage runningResourceUsage =
Resource usedResourceClone =
Resource reservedResourceClone =
return ApplicationResourceUsageReport.newInstance(liveContainers.size(),
reservedContainers.size(), Resources.clone(currentConsumption),
Resources.add(currentConsumption, currentReservation),
resUsage.getMemorySeconds(), resUsage.getVcoreSeconds());
reservedContainers.size(), usedResourceClone, reservedResourceClone,
Resources.add(usedResourceClone, reservedResourceClone),
public synchronized Map<ContainerId, RMContainer> getLiveContainersMap() {
@ -572,7 +576,7 @@ public synchronized void transferStateFromPreviousAttempt(
SchedulerApplicationAttempt appAttempt) {
this.liveContainers = appAttempt.getLiveContainersMap();
// this.reReservations = appAttempt.reReservations;
this.currentConsumption = appAttempt.getCurrentConsumption();
this.resourceLimit = appAttempt.getResourceLimit();
// this.currentReservation = appAttempt.currentReservation;
// this.newlyAllocatedContainers = appAttempt.newlyAllocatedContainers;
@ -603,7 +607,8 @@ public synchronized void move(Queue newQueue) {
this.queue = newQueue;
public synchronized void recoverContainer(RMContainer rmContainer) {
public synchronized void recoverContainer(SchedulerNode node,
RMContainer rmContainer) {
// recover app scheduling info
@ -613,8 +618,9 @@ public synchronized void recoverContainer(RMContainer rmContainer) {
LOG.info("SchedulerAttempt " + getApplicationAttemptId()
+ " is recovering container " + rmContainer.getContainerId());
liveContainers.put(rmContainer.getContainerId(), rmContainer);
Resources.addTo(currentConsumption, rmContainer.getContainer()
attemptResourceUsage.incUsed(node.getPartition(), rmContainer
// resourceLimit: updated when LeafQueue#recoverContainer#allocateResource
// is called.
// newlyAllocatedContainers.add(rmContainer);
@ -35,6 +35,7 @@
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@ -294,4 +295,17 @@ public Set<String> getLabels() {
public void updateLabels(Set<String> labels) {
this.labels = labels;
* Get partition of which the node belongs to, if node-labels of this node is
* empty or null, it belongs to NO_LABEL partition. And since we only support
* one partition for each node (YARN-2694), first label will be its partition.
public String getPartition() {
if (this.labels == null || this.labels.isEmpty()) {
return RMNodeLabelsManager.NO_LABEL;
} else {
return this.labels.iterator().next();
@ -509,4 +509,28 @@ synchronized boolean canAssignToThisQueue(Resource clusterResource,
// non-empty
return false;
public void incPendingResource(String nodeLabel, Resource resourceToInc) {
if (nodeLabel == null) {
nodeLabel = RMNodeLabelsManager.NO_LABEL;
// ResourceUsage has its own lock, no addition lock needs here.
queueUsage.incPending(nodeLabel, resourceToInc);
if (null != parent) {
parent.incPendingResource(nodeLabel, resourceToInc);
public void decPendingResource(String nodeLabel, Resource resourceToDec) {
if (nodeLabel == null) {
nodeLabel = RMNodeLabelsManager.NO_LABEL;
// ResourceUsage has its own lock, no addition lock needs here.
queueUsage.decPending(nodeLabel, resourceToDec);
if (null != parent) {
parent.decPendingResource(nodeLabel, resourceToDec);
@ -739,6 +739,15 @@ private static Set<String> getRequestLabelSetByExpression(
return labels;
private boolean checkResourceRequestMatchingNodeLabel(ResourceRequest offswitchResourceRequest,
FiCaSchedulerNode node) {
String askedNodeLabel = offswitchResourceRequest.getNodeLabelExpression();
if (null == askedNodeLabel) {
askedNodeLabel = RMNodeLabelsManager.NO_LABEL;
return askedNodeLabel.equals(node.getPartition());
public synchronized CSAssignment assignContainers(Resource clusterResource,
FiCaSchedulerNode node, ResourceLimits currentResourceLimits) {
@ -796,6 +805,14 @@ public synchronized CSAssignment assignContainers(Resource clusterResource,
if (application.getTotalRequiredResources(priority) <= 0) {
// Is the node-label-expression of this offswitch resource request
// matches the node's label?
// If not match, jump to next priority.
if (!checkResourceRequestMatchingNodeLabel(anyRequest, node)) {
if (!this.reservationsContinueLooking) {
if (!shouldAllocOrReserveNewContainer(application, priority, required)) {
if (LOG.isDebugEnabled()) {
@ -825,7 +842,7 @@ public synchronized CSAssignment assignContainers(Resource clusterResource,
// Check user limit
if (!assignToUser(clusterResource, application.getUser(), userLimit,
if (!canAssignToUser(clusterResource, application.getUser(), userLimit,
application, true, requestedNodeLabels)) {
@ -1076,7 +1093,7 @@ private Resource computeUserLimit(FiCaSchedulerApp application,
protected synchronized boolean assignToUser(Resource clusterResource,
protected synchronized boolean canAssignToUser(Resource clusterResource,
String userName, Resource limit, FiCaSchedulerApp application,
boolean checkReservations, Set<String> requestLabels) {
User user = getUser(userName);
@ -1094,7 +1111,8 @@ protected synchronized boolean assignToUser(Resource clusterResource,
limit)) {
// if enabled, check to see if could we potentially use this node instead
// of a reserved node if the application has reserved containers
if (this.reservationsContinueLooking && checkReservations) {
if (this.reservationsContinueLooking && checkReservations
&& label.equals(CommonNodeLabelsManager.NO_LABEL)) {
if (Resources.lessThanOrEqual(
@ -1305,7 +1323,7 @@ protected boolean checkLimitsToReserve(Resource clusterResource,
// Check user limit
if (!assignToUser(clusterResource, application.getUser(), userLimit,
if (!canAssignToUser(clusterResource, application.getUser(), userLimit,
application, false, null)) {
if (LOG.isDebugEnabled()) {
LOG.debug("was going to reserve but hit user limit");
@ -1622,7 +1640,8 @@ public void completedContainer(Resource clusterResource,
node, rmContainer);
} else {
removed =
application.containerCompleted(rmContainer, containerStatus, event);
application.containerCompleted(rmContainer, containerStatus,
event, node.getPartition());
@ -90,7 +90,8 @@ public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId,
synchronized public boolean containerCompleted(RMContainer rmContainer,
ContainerStatus containerStatus, RMContainerEventType event) {
ContainerStatus containerStatus, RMContainerEventType event,
String partition) {
// Remove from the list of containers
if (null == liveContainers.remove(rmContainer.getContainerId())) {
@ -122,7 +123,7 @@ synchronized public boolean containerCompleted(RMContainer rmContainer,
// Update usage metrics
Resource containerResource = rmContainer.getContainer().getResource();
queue.getMetrics().releaseResources(getUser(), 1, containerResource);
Resources.subtractFrom(currentConsumption, containerResource);
attemptResourceUsage.decUsed(partition, containerResource);
// Clear resource utilization metrics cache.
lastMemoryAggregateAllocationUpdateTime = -1;
@ -156,7 +157,8 @@ synchronized public RMContainer allocate(NodeType type, FiCaSchedulerNode node,
// Update consumption and track allocations
List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
type, node, priority, request, container);
Resources.addTo(currentConsumption, container.getResource());
// Update resource requests related to "request" and store in RMContainer
@ -198,12 +200,13 @@ public synchronized boolean unreserve(FiCaSchedulerNode node, Priority priority)
Resource resource = reservedContainer.getContainer().getResource();
Resources.subtractFrom(currentReservation, resource);
this.attemptResourceUsage.decReserved(node.getPartition(), resource);
LOG.info("Application " + getApplicationId() + " unreserved "
+ " on node " + node + ", currently has " + reservedContainers.size()
+ " at priority " + priority + "; currentReservation "
+ currentReservation);
+ " on node " + node + ", currently has "
+ reservedContainers.size() + " at priority " + priority
+ "; currentReservation " + this.attemptResourceUsage.getReserved()
+ " on node-label=" + node.getPartition());
return true;
@ -142,7 +142,7 @@ synchronized public void containerCompleted(RMContainer rmContainer,
// Update usage metrics
Resource containerResource = rmContainer.getContainer().getResource();
queue.getMetrics().releaseResources(getUser(), 1, containerResource);
Resources.subtractFrom(currentConsumption, containerResource);
// remove from preemption map if it is completed
@ -164,11 +164,12 @@ private synchronized void unreserveInternal(
Resource resource = reservedContainer.getContainer().getResource();
Resources.subtractFrom(currentReservation, resource);
LOG.info("Application " + getApplicationId() + " unreserved " + " on node "
+ node + ", currently has " + reservedContainers.size() + " at priority "
+ priority + "; currentReservation " + currentReservation);
+ node + ", currently has " + reservedContainers.size()
+ " at priority " + priority + "; currentReservation "
+ this.attemptResourceUsage.getReserved());
@ -339,7 +340,7 @@ else if (allowed.equals(NodeType.RACK_LOCAL) &&
// Update consumption and track allocations
List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
type, node, priority, request, container);
Resources.addTo(currentConsumption, container.getResource());
// Update resource requests related to "request" and store in RMContainer
((RMContainerImpl) rmContainer).setResourceRequests(resourceRequestList);
@ -291,4 +291,12 @@ public String getDefaultNodeLabelExpression() {
// TODO, add implementation for FS
return null;
public void incPendingResource(String nodeLabel, Resource resourceToInc) {
public void decPendingResource(String nodeLabel, Resource resourceToDec) {
@ -54,6 +54,7 @@
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
@ -200,6 +201,14 @@ public String getDefaultNodeLabelExpression() {
// TODO add implementation for FIFO scheduler
return null;
public void incPendingResource(String nodeLabel, Resource resourceToInc) {
public void decPendingResource(String nodeLabel, Resource resourceToDec) {
public FifoScheduler() {
@ -870,7 +879,8 @@ protected synchronized void completedContainer(RMContainer rmContainer,
// Inform the application
application.containerCompleted(rmContainer, containerStatus, event);
application.containerCompleted(rmContainer, containerStatus, event,
// Inform the node
@ -164,17 +164,19 @@ public List<ResourceRequest> createReq(String[] hosts, int memory, int priority,
public List<ResourceRequest> createReq(String[] hosts, int memory, int priority,
int containers, String labelExpression) throws Exception {
List<ResourceRequest> reqs = new ArrayList<ResourceRequest>();
for (String host : hosts) {
// only add host/rack request when asked host isn't ANY
if (!host.equals(ResourceRequest.ANY)) {
ResourceRequest hostReq =
createResourceReq(host, memory, priority, containers,
ResourceRequest rackReq =
createResourceReq("/default-rack", memory, priority, containers,
if (hosts != null) {
for (String host : hosts) {
// only add host/rack request when asked host isn't ANY
if (!host.equals(ResourceRequest.ANY)) {
ResourceRequest hostReq =
createResourceReq(host, memory, priority, containers,
ResourceRequest rackReq =
createResourceReq("/default-rack", memory, priority, containers,
@ -29,11 +29,13 @@
import java.net.InetSocketAddress;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
@ -111,6 +113,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
@ -128,10 +131,13 @@
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ComparisonFailure;
import org.junit.Test;
import org.mockito.Mockito;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
public class TestCapacityScheduler {
private static final Log LOG = LogFactory.getLog(TestCapacityScheduler.class);
@ -2557,6 +2563,165 @@ public void testParentQueueMaxCapsAreRespected() throws Exception {
Assert.fail("Shouldn't successfully allocate containers for am2, "
+ "queue-a's max capacity will be violated if container allocated");
private <E> Set<E> toSet(E... elements) {
Set<E> set = Sets.newHashSet(elements);
return set;
public void testQueueHierarchyPendingResourceUpdate() throws Exception {
Configuration conf =
TestUtils.getConfigurationWithQueueLabels(new Configuration(false));
conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
MemoryRMStateStore memStore = new MemoryRMStateStore();
MockRM rm = new MockRM(conf, memStore) {
protected RMNodeLabelsManager createNodeLabelManager() {
return mgr;
MockNM nm1 = // label = x
new MockNM("h1:1234", 200 * GB, rm.getResourceTrackerService());
MockNM nm2 = // label = ""
new MockNM("h2:1234", 200 * GB, rm.getResourceTrackerService());
// Launch app1 in queue=a1
RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "a1");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2);
// Launch app2 in queue=b1
RMApp app2 = rm.submitApp(8 * GB, "app", "user", null, "b1");
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2);
// am1 asks for 8 * 1GB container for no label
Priority.newInstance(1), "*", Resources.createResource(1 * GB), 8)),
checkPendingResource(rm, "a1", 8 * GB, null);
checkPendingResource(rm, "a", 8 * GB, null);
checkPendingResource(rm, "root", 8 * GB, null);
// am2 asks for 8 * 1GB container for no label
Priority.newInstance(1), "*", Resources.createResource(1 * GB), 8)),
checkPendingResource(rm, "a1", 8 * GB, null);
checkPendingResource(rm, "a", 8 * GB, null);
checkPendingResource(rm, "b1", 8 * GB, null);
checkPendingResource(rm, "b", 8 * GB, null);
// root = a + b
checkPendingResource(rm, "root", 16 * GB, null);
// am2 asks for 8 * 1GB container in another priority for no label
Priority.newInstance(2), "*", Resources.createResource(1 * GB), 8)),
checkPendingResource(rm, "a1", 8 * GB, null);
checkPendingResource(rm, "a", 8 * GB, null);
checkPendingResource(rm, "b1", 16 * GB, null);
checkPendingResource(rm, "b", 16 * GB, null);
// root = a + b
checkPendingResource(rm, "root", 24 * GB, null);
// am1 asks 4 GB resource instead of 8 * GB for priority=1
Priority.newInstance(1), "*", Resources.createResource(4 * GB), 1)),
checkPendingResource(rm, "a1", 4 * GB, null);
checkPendingResource(rm, "a", 4 * GB, null);
checkPendingResource(rm, "b1", 16 * GB, null);
checkPendingResource(rm, "b", 16 * GB, null);
// root = a + b
checkPendingResource(rm, "root", 20 * GB, null);
// am1 asks 8 * GB resource which label=x
Priority.newInstance(2), "*", Resources.createResource(8 * GB), 1,
true, "x")), null);
checkPendingResource(rm, "a1", 4 * GB, null);
checkPendingResource(rm, "a", 4 * GB, null);
checkPendingResource(rm, "a1", 8 * GB, "x");
checkPendingResource(rm, "a", 8 * GB, "x");
checkPendingResource(rm, "b1", 16 * GB, null);
checkPendingResource(rm, "b", 16 * GB, null);
// root = a + b
checkPendingResource(rm, "root", 20 * GB, null);
checkPendingResource(rm, "root", 8 * GB, "x");
// some containers allocated for am1, pending resource should decrease
ContainerId containerId =
ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
Assert.assertTrue(rm.waitForState(nm1, containerId,
RMContainerState.ALLOCATED, 10 * 1000));
containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 3);
Assert.assertTrue(rm.waitForState(nm2, containerId,
RMContainerState.ALLOCATED, 10 * 1000));
checkPendingResource(rm, "a1", 0 * GB, null);
checkPendingResource(rm, "a", 0 * GB, null);
checkPendingResource(rm, "a1", 0 * GB, "x");
checkPendingResource(rm, "a", 0 * GB, "x");
// some containers could be allocated for am2 when we allocating containers
// for am1, just check if pending resource of b1/b/root > 0
checkPendingResourceGreaterThanZero(rm, "b1", null);
checkPendingResourceGreaterThanZero(rm, "b", null);
// root = a + b
checkPendingResourceGreaterThanZero(rm, "root", null);
checkPendingResource(rm, "root", 0 * GB, "x");
// complete am2, pending resource should be 0 now
AppAttemptRemovedSchedulerEvent appRemovedEvent =
new AppAttemptRemovedSchedulerEvent(
am2.getApplicationAttemptId(), RMAppAttemptState.FINISHED, false);
checkPendingResource(rm, "a1", 0 * GB, null);
checkPendingResource(rm, "a", 0 * GB, null);
checkPendingResource(rm, "a1", 0 * GB, "x");
checkPendingResource(rm, "a", 0 * GB, "x");
checkPendingResource(rm, "b1", 0 * GB, null);
checkPendingResource(rm, "b", 0 * GB, null);
checkPendingResource(rm, "root", 0 * GB, null);
checkPendingResource(rm, "root", 0 * GB, "x");
private void checkPendingResource(MockRM rm, String queueName, int memory,
String label) {
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
CSQueue queue = cs.getQueue(queueName);
.getPending(label == null ? RMNodeLabelsManager.NO_LABEL : label)
private void checkPendingResourceGreaterThanZero(MockRM rm, String queueName,
String label) {
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
CSQueue queue = cs.getQueue(queueName);
.getPending(label == null ? RMNodeLabelsManager.NO_LABEL : label)
.getMemory() > 0);
// Test verifies AM Used resource for LeafQueue when AM ResourceRequest is
// lesser than minimumAllocation
@ -247,7 +247,8 @@ public void testSortedQueues() throws Exception {
// Stub an App and its containerCompleted
FiCaSchedulerApp app_0 = getMockApplication(0,user_0);
any(ContainerStatus.class), any(RMContainerEventType.class),
Priority priority = TestUtils.createMockPriority(1);
ContainerAllocationExpirer expirer =
@ -328,19 +328,6 @@ protected RMSecretManagerService createRMSecretManagerService() {
MockRM.launchAndRegisterAM(app1, rm1, nm1);
private Configuration getConfigurationWithDefaultQueueLabels(
Configuration config) {
final String A = CapacitySchedulerConfiguration.ROOT + ".a";
final String B = CapacitySchedulerConfiguration.ROOT + ".b";
CapacitySchedulerConfiguration conf =
(CapacitySchedulerConfiguration) getConfigurationWithQueueLabels(config);
new CapacitySchedulerConfiguration(config);
conf.setDefaultNodeLabelExpression(A, "x");
conf.setDefaultNodeLabelExpression(B, "y");
return conf;
private Configuration getConfigurationWithQueueLabels(Configuration config) {
CapacitySchedulerConfiguration conf =
new CapacitySchedulerConfiguration(config);
@ -406,57 +393,6 @@ private <E> Set<E> toSet(E... elements) {
return set;
private Configuration getComplexConfigurationWithQueueLabels(
Configuration config) {
CapacitySchedulerConfiguration conf =
new CapacitySchedulerConfiguration(config);
// Define top-level queues
conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"});
conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "y", 100);
conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "z", 100);
final String A = CapacitySchedulerConfiguration.ROOT + ".a";
conf.setCapacity(A, 10);
conf.setMaximumCapacity(A, 10);
conf.setAccessibleNodeLabels(A, toSet("x", "y"));
conf.setCapacityByLabel(A, "x", 100);
conf.setCapacityByLabel(A, "y", 50);
final String B = CapacitySchedulerConfiguration.ROOT + ".b";
conf.setCapacity(B, 90);
conf.setMaximumCapacity(B, 100);
conf.setAccessibleNodeLabels(B, toSet("y", "z"));
conf.setCapacityByLabel(B, "y", 50);
conf.setCapacityByLabel(B, "z", 100);
// Define 2nd-level queues
final String A1 = A + ".a1";
conf.setQueues(A, new String[] {"a1"});
conf.setCapacity(A1, 100);
conf.setMaximumCapacity(A1, 100);
conf.setAccessibleNodeLabels(A1, toSet("x", "y"));
conf.setDefaultNodeLabelExpression(A1, "x");
conf.setCapacityByLabel(A1, "x", 100);
conf.setCapacityByLabel(A1, "y", 100);
conf.setQueues(B, new String[] {"b1", "b2"});
final String B1 = B + ".b1";
conf.setCapacity(B1, 50);
conf.setMaximumCapacity(B1, 50);
conf.setAccessibleNodeLabels(B1, RMNodeLabelsManager.EMPTY_STRING_SET);
final String B2 = B + ".b2";
conf.setCapacity(B2, 50);
conf.setMaximumCapacity(B2, 50);
conf.setAccessibleNodeLabels(B2, toSet("y", "z"));
conf.setCapacityByLabel(B2, "y", 100);
conf.setCapacityByLabel(B2, "z", 100);
return conf;
@Test (timeout = 300000)
public void testContainerAllocationWithSingleUserLimits() throws Exception {
final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
@ -468,7 +404,7 @@ public void testContainerAllocationWithSingleUserLimits() throws Exception {
NodeId.newInstance("h2", 0), toSet("y")));
// inject node label manager
MockRM rm1 = new MockRM(getConfigurationWithDefaultQueueLabels(conf)) {
MockRM rm1 = new MockRM(TestUtils.getConfigurationWithDefaultQueueLabels(conf)) {
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
@ -554,7 +490,7 @@ public void testContainerAllocateWithComplexLabels() throws Exception {
// inject node label manager
MockRM rm1 = new MockRM(getComplexConfigurationWithQueueLabels(conf)) {
MockRM rm1 = new MockRM(TestUtils.getComplexConfigurationWithQueueLabels(conf)) {
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
@ -711,7 +647,7 @@ public void testContainerAllocateWithDefaultQueueLabels() throws Exception {
NodeId.newInstance("h2", 0), toSet("y")));
// inject node label manager
MockRM rm1 = new MockRM(getConfigurationWithDefaultQueueLabels(conf)) {
MockRM rm1 = new MockRM(TestUtils.getConfigurationWithDefaultQueueLabels(conf)) {
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
@ -1058,19 +1058,19 @@ public void testAssignToUser() throws Exception {
// set limit so subtrace reservations it can continue
Resource limit = Resources.createResource(12 * GB, 0);
boolean res = a.assignToUser(clusterResource, user_0, limit, app_0,
boolean res = a.canAssignToUser(clusterResource, user_0, limit, app_0,
true, null);
// tell it not to check for reservations and should fail as already over
// limit
res = a.assignToUser(clusterResource, user_0, limit, app_0, false, null);
res = a.canAssignToUser(clusterResource, user_0, limit, app_0, false, null);
refreshQueuesTurnOffReservationsContLook(a, csConf);
// should now return false since feature off
res = a.assignToUser(clusterResource, user_0, limit, app_0, true, null);
res = a.canAssignToUser(clusterResource, user_0, limit, app_0, true, null);
@ -60,6 +60,8 @@
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import com.google.common.collect.Sets;
public class TestUtils {
private static final Log LOG = LogFactory.getLog(TestUtils.class);
@ -216,4 +218,131 @@ public static Container getMockContainer(
return container;
private static <E> Set<E> toSet(E... elements) {
Set<E> set = Sets.newHashSet(elements);
return set;
* Get a queue structure:
* <pre>
* Root
* / | \
* a b c
* | | |
* a1 b1 c1
* (x) (y)
* </pre>
public static Configuration getConfigurationWithQueueLabels(Configuration config) {
CapacitySchedulerConfiguration conf =
new CapacitySchedulerConfiguration(config);
// Define top-level queues
conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b", "c"});
conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "y", 100);
final String A = CapacitySchedulerConfiguration.ROOT + ".a";
conf.setCapacity(A, 10);
conf.setMaximumCapacity(A, 15);
conf.setAccessibleNodeLabels(A, toSet("x"));
conf.setCapacityByLabel(A, "x", 100);
final String B = CapacitySchedulerConfiguration.ROOT + ".b";
conf.setCapacity(B, 20);
conf.setAccessibleNodeLabels(B, toSet("y"));
conf.setCapacityByLabel(B, "y", 100);
final String C = CapacitySchedulerConfiguration.ROOT + ".c";
conf.setCapacity(C, 70);
conf.setMaximumCapacity(C, 70);
conf.setAccessibleNodeLabels(C, RMNodeLabelsManager.EMPTY_STRING_SET);
// Define 2nd-level queues
final String A1 = A + ".a1";
conf.setQueues(A, new String[] {"a1"});
conf.setCapacity(A1, 100);
conf.setMaximumCapacity(A1, 100);
conf.setCapacityByLabel(A1, "x", 100);
final String B1 = B + ".b1";
conf.setQueues(B, new String[] {"b1"});
conf.setCapacity(B1, 100);
conf.setMaximumCapacity(B1, 100);
conf.setCapacityByLabel(B1, "y", 100);
final String C1 = C + ".c1";
conf.setQueues(C, new String[] {"c1"});
conf.setCapacity(C1, 100);
conf.setMaximumCapacity(C1, 100);
return conf;
public static Configuration getComplexConfigurationWithQueueLabels(
Configuration config) {
CapacitySchedulerConfiguration conf =
new CapacitySchedulerConfiguration(config);
// Define top-level queues
conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"});
conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "y", 100);
conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "z", 100);
final String A = CapacitySchedulerConfiguration.ROOT + ".a";
conf.setCapacity(A, 10);
conf.setMaximumCapacity(A, 10);
conf.setAccessibleNodeLabels(A, toSet("x", "y"));
conf.setCapacityByLabel(A, "x", 100);
conf.setCapacityByLabel(A, "y", 50);
final String B = CapacitySchedulerConfiguration.ROOT + ".b";
conf.setCapacity(B, 90);
conf.setMaximumCapacity(B, 100);
conf.setAccessibleNodeLabels(B, toSet("y", "z"));
conf.setCapacityByLabel(B, "y", 50);
conf.setCapacityByLabel(B, "z", 100);
// Define 2nd-level queues
final String A1 = A + ".a1";
conf.setQueues(A, new String[] {"a1"});
conf.setCapacity(A1, 100);
conf.setMaximumCapacity(A1, 100);
conf.setAccessibleNodeLabels(A1, toSet("x", "y"));
conf.setDefaultNodeLabelExpression(A1, "x");
conf.setCapacityByLabel(A1, "x", 100);
conf.setCapacityByLabel(A1, "y", 100);
conf.setQueues(B, new String[] {"b1", "b2"});
final String B1 = B + ".b1";
conf.setCapacity(B1, 50);
conf.setMaximumCapacity(B1, 50);
conf.setAccessibleNodeLabels(B1, RMNodeLabelsManager.EMPTY_STRING_SET);
final String B2 = B + ".b2";
conf.setCapacity(B2, 50);
conf.setMaximumCapacity(B2, 50);
conf.setAccessibleNodeLabels(B2, toSet("y", "z"));
conf.setCapacityByLabel(B2, "y", 100);
conf.setCapacityByLabel(B2, "z", 100);
return conf;
public static Configuration getConfigurationWithDefaultQueueLabels(
Configuration config) {
final String A = CapacitySchedulerConfiguration.ROOT + ".a";
final String B = CapacitySchedulerConfiguration.ROOT + ".b";
CapacitySchedulerConfiguration conf =
(CapacitySchedulerConfiguration) getConfigurationWithQueueLabels(config);
new CapacitySchedulerConfiguration(config);
conf.setDefaultNodeLabelExpression(A, "x");
conf.setDefaultNodeLabelExpression(B, "y");
return conf;
Reference in New Issue
Block a user