YARN-5761. Separate QueueManager from Scheduler. (Xuan Gong via gtcarrera9)

This commit is contained in:
Li Lu 2016-11-30 13:38:42 -08:00
parent 3fd844b99f
commit 69fb70c31a
10 changed files with 536 additions and 303 deletions

View File

@ -0,0 +1,75 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
/**
*
* Context of the Queues in Scheduler.
*
*/
@Private
@Unstable
public interface SchedulerQueueManager<T extends Queue,
E extends ReservationSchedulerConfiguration> {
/**
* Get the root queue.
* @return root queue
*/
T getRootQueue();
/**
* Get all the queues.
* @return a map contains all the queues as well as related queue names
*/
Map<String, T> getQueues();
/**
* Remove the queue from the existing queue.
* @param queueName the queue name
*/
void removeQueue(String queueName);
/**
* Add a new queue to the existing queues.
* @param queueName the queue name
* @param queue the queue object
*/
void addQueue(String queueName, T queue);
/**
* Get a queue matching the specified queue name.
* @param queueName the queue name
* @return a queue object
*/
T getQueue(String queueName);
/**
* Reinitialize the queues.
* @param newConf the configuration
* @throws IOException if fails to re-initialize queues
*/
void reinitializeQueues(E newConf) throws IOException;
}

View File

@ -25,7 +25,6 @@
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@ -68,8 +67,6 @@
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
import org.apache.hadoop.yarn.security.Permission;
import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementRule;
@ -156,9 +153,9 @@ public class CapacityScheduler extends
ResourceAllocationCommitter {
private static final Log LOG = LogFactory.getLog(CapacityScheduler.class);
private YarnAuthorizationProvider authorizer;
private CSQueue root;
private CapacitySchedulerQueueManager queueManager;
// timeout to join when we stop this service
protected final long THREAD_JOIN_TIMEOUT_MS = 1000;
@ -168,22 +165,6 @@ public class CapacityScheduler extends
private int offswitchPerHeartbeatLimit;
static final Comparator<CSQueue> nonPartitionedQueueComparator =
new Comparator<CSQueue>() {
@Override
public int compare(CSQueue q1, CSQueue q2) {
if (q1.getUsedCapacity() < q2.getUsedCapacity()) {
return -1;
} else if (q1.getUsedCapacity() > q2.getUsedCapacity()) {
return 1;
}
return q1.getQueuePath().compareTo(q2.getQueuePath());
}
};
static final PartitionedQueueComparator partitionedQueueComparator =
new PartitionedQueueComparator();
@Override
public void setConf(Configuration conf) {
@ -236,8 +217,6 @@ public Configuration getConf() {
private CapacitySchedulerConfiguration conf;
private Configuration yarnConf;
private Map<String, CSQueue> queues = new ConcurrentHashMap<String, CSQueue>();
private ResourceCalculator calculator;
private boolean usePortForNodeName;
@ -261,11 +240,11 @@ public CapacityScheduler() {
@Override
public QueueMetrics getRootQueueMetrics() {
return root.getMetrics();
return getRootQueue().getMetrics();
}
public CSQueue getRootQueue() {
return root;
return queueManager.getRootQueue();
}
@Override
@ -290,12 +269,12 @@ public void setResourceCalculator(ResourceCalculator rc) {
@Override
public Comparator<CSQueue> getNonPartitionedQueueComparator() {
return nonPartitionedQueueComparator;
return CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR;
}
@Override
public PartitionedQueueComparator getPartitionedQueueComparator() {
return partitionedQueueComparator;
return CapacitySchedulerQueueManager.PARTITIONED_QUEUE_COMPARATOR;
}
@Override
@ -326,7 +305,10 @@ void initScheduler(Configuration configuration) throws
this.usePortForNodeName = this.conf.getUsePortForNodeName();
this.applications = new ConcurrentHashMap<>();
this.labelManager = rmContext.getNodeLabelManager();
authorizer = YarnAuthorizationProvider.getInstance(yarnConf);
this.queueManager = new CapacitySchedulerQueueManager(yarnConf,
this.labelManager);
this.queueManager.setCapacitySchedulerContext(this);
this.activitiesManager = new ActivitiesManager(rmContext);
activitiesManager.init(conf);
initializeQueues(this.conf);
@ -554,13 +536,6 @@ public int getPendingBacklogs() {
}
}
static class QueueHook {
public CSQueue hook(CSQueue queue) {
return queue;
}
}
private static final QueueHook noop = new QueueHook();
@VisibleForTesting
public UserGroupMappingPlacementRule
getUserGroupMappingPlacementRule() throws IOException {
@ -578,7 +553,7 @@ public CSQueue hook(CSQueue queue) {
if (!mappingQueue.equals(
UserGroupMappingPlacementRule.CURRENT_USER_MAPPING) && !mappingQueue
.equals(UserGroupMappingPlacementRule.PRIMARY_GROUP_MAPPING)) {
CSQueue queue = queues.get(mappingQueue);
CSQueue queue = getQueue(mappingQueue);
if (queue == null || !(queue instanceof LeafQueue)) {
throw new IOException(
"mapping contains invalid or non-leaf queue " + mappingQueue);
@ -616,184 +591,29 @@ private void updatePlacementRules() throws IOException {
private void initializeQueues(CapacitySchedulerConfiguration conf)
throws IOException {
root =
parseQueue(this, conf, null, CapacitySchedulerConfiguration.ROOT,
queues, queues, noop);
labelManager.reinitializeQueueLabels(getQueueToLabels());
LOG.info("Initialized root queue " + root);
this.queueManager.initializeQueues(conf);
updatePlacementRules();
setQueueAcls(authorizer, queues);
// Notify Preemption Manager
preemptionManager.refreshQueues(null, root);
preemptionManager.refreshQueues(null, this.getRootQueue());
}
@Lock(CapacityScheduler.class)
private void reinitializeQueues(CapacitySchedulerConfiguration newConf)
throws IOException {
// Parse new queues
Map<String, CSQueue> newQueues = new HashMap<String, CSQueue>();
CSQueue newRoot =
parseQueue(this, newConf, null, CapacitySchedulerConfiguration.ROOT,
newQueues, queues, noop);
// Ensure all existing queues are still present
validateExistingQueues(queues, newQueues);
// Add new queues
addNewQueues(queues, newQueues);
// Re-configure queues
root.reinitialize(newRoot, getClusterResource());
this.queueManager.reinitializeQueues(newConf);
updatePlacementRules();
// Re-calculate headroom for active applications
Resource clusterResource = getClusterResource();
root.updateClusterResource(clusterResource, new ResourceLimits(
clusterResource));
labelManager.reinitializeQueueLabels(getQueueToLabels());
setQueueAcls(authorizer, queues);
// Notify Preemption Manager
preemptionManager.refreshQueues(null, root);
}
@VisibleForTesting
public static void setQueueAcls(YarnAuthorizationProvider authorizer,
Map<String, CSQueue> queues) throws IOException {
List<Permission> permissions = new ArrayList<>();
for (CSQueue queue : queues.values()) {
AbstractCSQueue csQueue = (AbstractCSQueue) queue;
permissions.add(
new Permission(csQueue.getPrivilegedEntity(), csQueue.getACLs()));
}
authorizer.setPermission(permissions, UserGroupInformation.getCurrentUser());
}
private Map<String, Set<String>> getQueueToLabels() {
Map<String, Set<String>> queueToLabels = new HashMap<String, Set<String>>();
for (CSQueue queue : queues.values()) {
queueToLabels.put(queue.getQueueName(), queue.getAccessibleNodeLabels());
}
return queueToLabels;
}
/**
* Ensure all existing queues are present. Queues cannot be deleted
* @param queues existing queues
* @param newQueues new queues
*/
@Lock(CapacityScheduler.class)
private void validateExistingQueues(
Map<String, CSQueue> queues, Map<String, CSQueue> newQueues)
throws IOException {
// check that all static queues are included in the newQueues list
for (Map.Entry<String, CSQueue> e : queues.entrySet()) {
if (!(e.getValue() instanceof ReservationQueue)) {
String queueName = e.getKey();
CSQueue oldQueue = e.getValue();
CSQueue newQueue = newQueues.get(queueName);
if (null == newQueue) {
throw new IOException(queueName + " cannot be found during refresh!");
} else if (!oldQueue.getQueuePath().equals(newQueue.getQueuePath())) {
throw new IOException(queueName + " is moved from:"
+ oldQueue.getQueuePath() + " to:" + newQueue.getQueuePath()
+ " after refresh, which is not allowed.");
}
}
}
}
/**
* Add the new queues (only) to our list of queues...
* ... be careful, do not overwrite existing queues.
* @param queues
* @param newQueues
*/
@Lock(CapacityScheduler.class)
private void addNewQueues(
Map<String, CSQueue> queues, Map<String, CSQueue> newQueues)
{
for (Map.Entry<String, CSQueue> e : newQueues.entrySet()) {
String queueName = e.getKey();
CSQueue queue = e.getValue();
if (!queues.containsKey(queueName)) {
queues.put(queueName, queue);
}
}
}
@Lock(CapacityScheduler.class)
static CSQueue parseQueue(
CapacitySchedulerContext csContext,
CapacitySchedulerConfiguration conf,
CSQueue parent, String queueName, Map<String, CSQueue> queues,
Map<String, CSQueue> oldQueues,
QueueHook hook) throws IOException {
CSQueue queue;
String fullQueueName =
(parent == null) ? queueName
: (parent.getQueuePath() + "." + queueName);
String[] childQueueNames =
conf.getQueues(fullQueueName);
boolean isReservableQueue = conf.isReservable(fullQueueName);
if (childQueueNames == null || childQueueNames.length == 0) {
if (null == parent) {
throw new IllegalStateException(
"Queue configuration missing child queue names for " + queueName);
}
// Check if the queue will be dynamically managed by the Reservation
// system
if (isReservableQueue) {
queue =
new PlanQueue(csContext, queueName, parent,
oldQueues.get(queueName));
} else {
queue =
new LeafQueue(csContext, queueName, parent,
oldQueues.get(queueName));
// Used only for unit tests
queue = hook.hook(queue);
}
} else {
if (isReservableQueue) {
throw new IllegalStateException(
"Only Leaf Queues can be reservable for " + queueName);
}
ParentQueue parentQueue =
new ParentQueue(csContext, queueName, parent, oldQueues.get(queueName));
// Used only for unit tests
queue = hook.hook(parentQueue);
List<CSQueue> childQueues = new ArrayList<CSQueue>();
for (String childQueueName : childQueueNames) {
CSQueue childQueue =
parseQueue(csContext, conf, queue, childQueueName,
queues, oldQueues, hook);
childQueues.add(childQueue);
}
parentQueue.setChildQueues(childQueues);
}
if (queue instanceof LeafQueue && queues.containsKey(queueName)
&& queues.get(queueName) instanceof LeafQueue) {
throw new IOException("Two leaf queues were named " + queueName
+ ". Leaf queue names must be distinct");
}
queues.put(queueName, queue);
LOG.info("Initialized queue: " + queue);
return queue;
preemptionManager.refreshQueues(null, this.getRootQueue());
}
public CSQueue getQueue(String queueName) {
if (queueName == null) {
return null;
}
return queues.get(queueName);
return this.queueManager.getQueue(queueName);
}
private void addApplicationOnRecovery(
@ -1047,7 +867,7 @@ private void doneApplicationAttempt(
// Inform the queue
String queueName = attempt.getQueue().getQueueName();
CSQueue queue = queues.get(queueName);
CSQueue queue = this.getQueue(queueName);
if (!(queue instanceof LeafQueue)) {
LOG.error(
"Cannot finish application " + "from non-leaf queue: " + queueName);
@ -1174,7 +994,7 @@ public QueueInfo getQueueInfo(String queueName,
boolean includeChildQueues, boolean recursive)
throws IOException {
CSQueue queue = null;
queue = this.queues.get(queueName);
queue = this.getQueue(queueName);
if (queue == null) {
throw new IOException("Unknown queue: " + queueName);
}
@ -1192,7 +1012,7 @@ public List<QueueUserACLInfo> getQueueUserAclInfo() {
return new ArrayList<QueueUserACLInfo>();
}
return root.getQueueUserAclInfo(user);
return getRootQueue().getQueueUserAclInfo(user);
}
@Override
@ -1235,7 +1055,7 @@ private void updateNodeAndQueueResource(RMNode nm,
writeLock.lock();
updateNodeResource(nm, resourceOption);
Resource clusterResource = getClusterResource();
root.updateClusterResource(clusterResource,
getRootQueue().updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
} finally {
writeLock.unlock();
@ -1471,8 +1291,8 @@ private CSAssignment allocateContainerOnSingleNode(PlacementSet<FiCaSchedulerNod
private CSAssignment allocateOrReserveNewContainers(
PlacementSet<FiCaSchedulerNode> ps, boolean withNodeHeartbeat) {
CSAssignment assignment = root.assignContainers(getClusterResource(), ps,
new ResourceLimits(labelManager
CSAssignment assignment = getRootQueue().assignContainers(
getClusterResource(), ps, new ResourceLimits(labelManager
.getResourceByLabel(ps.getPartition(), getClusterResource())),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
@ -1506,7 +1326,7 @@ private CSAssignment allocateOrReserveNewContainers(
}
// Try to use NON_EXCLUSIVE
assignment = root.assignContainers(getClusterResource(), ps,
assignment = getRootQueue().assignContainers(getClusterResource(), ps,
// TODO, now we only consider limits for parent for non-labeled
// resources, should consider labeled resources as well.
new ResourceLimits(labelManager
@ -1526,8 +1346,8 @@ private CSAssignment allocateContainersOnMultiNodes(
PlacementSet<FiCaSchedulerNode> ps) {
// When this time look at multiple nodes, try schedule if the
// partition has any available resource or killable resource
if (root.getQueueCapacities().getUsedCapacity(ps.getPartition()) >= 1.0f
&& preemptionManager.getKillableResource(
if (getRootQueue().getQueueCapacities().getUsedCapacity(
ps.getPartition()) >= 1.0f && preemptionManager.getKillableResource(
CapacitySchedulerConfiguration.ROOT, ps.getPartition()) == Resources
.none()) {
if (LOG.isDebugEnabled()) {
@ -1710,7 +1530,7 @@ private void updateNodeLabelsAndQueueResource(
updateLabelsOnNode(id, labels);
}
Resource clusterResource = getClusterResource();
root.updateClusterResource(clusterResource,
getRootQueue().updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
} finally {
writeLock.unlock();
@ -1731,7 +1551,7 @@ private void addNode(RMNode nodeManager) {
}
Resource clusterResource = getClusterResource();
root.updateClusterResource(clusterResource,
getRootQueue().updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
LOG.info(
@ -1782,7 +1602,7 @@ private void removeNode(RMNode nodeInfo) {
nodeTracker.removeNode(nodeId);
Resource clusterResource = getClusterResource();
root.updateClusterResource(clusterResource,
getRootQueue().updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
int numNodes = nodeTracker.nodeCount();
@ -2020,7 +1840,7 @@ public boolean checkAccess(UserGroupInformation callerUGI,
@Override
public List<ApplicationAttemptId> getAppsInQueue(String queueName) {
CSQueue queue = queues.get(queueName);
CSQueue queue = getQueue(queueName);
if (queue == null) {
return null;
}
@ -2030,7 +1850,8 @@ public List<ApplicationAttemptId> getAppsInQueue(String queueName) {
}
public boolean isSystemAppsLimitReached() {
if (root.getNumApplications() < conf.getMaximumSystemApplications()) {
if (getRootQueue().getNumApplications() < conf
.getMaximumSystemApplications()) {
return false;
}
return true;
@ -2131,7 +1952,7 @@ public void removeQueue(String queueName)
}
((PlanQueue) disposableLeafQueue.getParent()).removeChildQueue(q);
this.queues.remove(queueName);
this.queueManager.removeQueue(queueName);
LOG.info("Removal of ReservationQueue " + queueName + " has succeeded");
} finally {
writeLock.unlock();
@ -2160,7 +1981,7 @@ public void addQueue(Queue queue)
PlanQueue parentPlan = (PlanQueue) newQueue.getParent();
String queuename = newQueue.getQueueName();
parentPlan.addChildQueue(newQueue);
this.queues.put(queuename, newQueue);
this.queueManager.addQueue(queuename, newQueue);
LOG.info("Creation of ReservationQueue " + newQueue + " succeeded");
} finally {
writeLock.unlock();
@ -2172,7 +1993,7 @@ public void setEntitlement(String inQueue, QueueEntitlement entitlement)
throws YarnException {
try {
writeLock.lock();
LeafQueue queue = getAndCheckLeafQueue(inQueue);
LeafQueue queue = this.queueManager.getAndCheckLeafQueue(inQueue);
ParentQueue parent = (ParentQueue) queue.getParent();
if (!(queue instanceof ReservationQueue)) {
@ -2224,9 +2045,10 @@ public String moveApplication(ApplicationId appId,
FiCaSchedulerApp app = getApplicationAttempt(
ApplicationAttemptId.newInstance(appId, 0));
String sourceQueueName = app.getQueue().getQueueName();
LeafQueue source = getAndCheckLeafQueue(sourceQueueName);
LeafQueue source = this.queueManager.getAndCheckLeafQueue(
sourceQueueName);
String destQueueName = handleMoveToPlanQueue(targetQueueName);
LeafQueue dest = getAndCheckLeafQueue(destQueueName);
LeafQueue dest = this.queueManager.getAndCheckLeafQueue(destQueueName);
// Validation check - ACLs, submission limits for user & queue
String user = app.getUser();
checkQueuePartition(app, dest);
@ -2290,27 +2112,6 @@ private void checkQueuePartition(FiCaSchedulerApp app, LeafQueue dest)
}
}
/**
* Check that the String provided in input is the name of an existing,
* LeafQueue, if successful returns the queue.
*
* @param queue
* @return the LeafQueue
* @throws YarnException
*/
private LeafQueue getAndCheckLeafQueue(String queue) throws YarnException {
CSQueue ret = this.getQueue(queue);
if (ret == null) {
throw new YarnException("The specified Queue: " + queue
+ " doesn't exist");
}
if (!(ret instanceof LeafQueue)) {
throw new YarnException("The specified Queue: " + queue
+ " is not a Leaf Queue. Move is supported only for Leaf Queues.");
}
return (LeafQueue) ret;
}
/** {@inheritDoc} */
@Override
public EnumSet<SchedulerResourceTypes> getSchedulingResourceTypes() {
@ -2347,7 +2148,7 @@ private String handleMoveToPlanQueue(String targetQueueName) {
@Override
public Set<String> getPlanQueues() {
Set<String> ret = new HashSet<String>();
for (Map.Entry<String, CSQueue> l : queues.entrySet()) {
for (Map.Entry<String, CSQueue> l : queueManager.getQueues().entrySet()) {
if (l.getValue() instanceof PlanQueue) {
ret.add(l.getKey());
}
@ -2367,7 +2168,8 @@ public Priority checkAndGetApplicationPriority(Priority priorityFromContext,
if (null == priorityFromContext) {
// Get the default priority for the Queue. If Queue is non-existent, then
// use default priority
priorityFromContext = getDefaultPriorityForQueue(queueName);
priorityFromContext = this.queueManager.getDefaultPriorityForQueue(
queueName);
LOG.info("Application '" + applicationId
+ "' is submitted without priority "
@ -2391,18 +2193,6 @@ public Priority checkAndGetApplicationPriority(Priority priorityFromContext,
return appPriority;
}
private Priority getDefaultPriorityForQueue(String queueName) {
Queue queue = getQueue(queueName);
if (null == queue || null == queue.getDefaultApplicationPriority()) {
// Return with default application priority
return Priority.newInstance(CapacitySchedulerConfiguration
.DEFAULT_CONFIGURATION_APPLICATION_PRIORITY);
}
return Priority.newInstance(queue.getDefaultApplicationPriority()
.getPriority());
}
@Override
public Priority updateApplicationPriority(Priority newPriority,
ApplicationId applicationId, SettableFuture<Object> future)
@ -2456,7 +2246,7 @@ public PreemptionManager getPreemptionManager() {
@Override
public ResourceUsage getClusterResourceUsage() {
return root.getQueueResourceUsage();
return getRootQueue().getQueueResourceUsage();
}
private SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> getSchedulerContainer(

View File

@ -0,0 +1,361 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.Permission;
import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerQueueManager;
/**
*
* Context of the Queues in Capacity Scheduler.
*
*/
@Private
@Unstable
public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
CSQueue, CapacitySchedulerConfiguration>{
private static final Log LOG = LogFactory.getLog(
CapacitySchedulerQueueManager.class);
static final Comparator<CSQueue> NON_PARTITIONED_QUEUE_COMPARATOR =
new Comparator<CSQueue>() {
@Override
public int compare(CSQueue q1, CSQueue q2) {
if (q1.getUsedCapacity() < q2.getUsedCapacity()) {
return -1;
} else if (q1.getUsedCapacity() > q2.getUsedCapacity()) {
return 1;
}
return q1.getQueuePath().compareTo(q2.getQueuePath());
}
};
static final PartitionedQueueComparator PARTITIONED_QUEUE_COMPARATOR =
new PartitionedQueueComparator();
static class QueueHook {
public CSQueue hook(CSQueue queue) {
return queue;
}
}
private static final QueueHook NOOP = new QueueHook();
private CapacitySchedulerContext csContext;
private final YarnAuthorizationProvider authorizer;
private final Map<String, CSQueue> queues = new ConcurrentHashMap<>();
private CSQueue root;
private final RMNodeLabelsManager labelManager;
/**
* Construct the service.
* @param conf the configuration
* @param labelManager the labelManager
*/
public CapacitySchedulerQueueManager(Configuration conf,
RMNodeLabelsManager labelManager) {
this.authorizer = YarnAuthorizationProvider.getInstance(conf);
this.labelManager = labelManager;
}
@Override
public CSQueue getRootQueue() {
return this.root;
}
@Override
public Map<String, CSQueue> getQueues() {
return queues;
}
@Override
public void removeQueue(String queueName) {
this.queues.remove(queueName);
}
@Override
public void addQueue(String queueName, CSQueue queue) {
this.queues.put(queueName, queue);
}
@Override
public CSQueue getQueue(String queueName) {
return queues.get(queueName);
}
/**
* Set the CapacitySchedulerContext.
* @param capacitySchedulerContext the CapacitySchedulerContext
*/
public void setCapacitySchedulerContext(
CapacitySchedulerContext capacitySchedulerContext) {
this.csContext = capacitySchedulerContext;
}
/**
* Initialized the queues.
* @param conf the CapacitySchedulerConfiguration
* @throws IOException if fails to initialize queues
*/
public void initializeQueues(CapacitySchedulerConfiguration conf)
throws IOException {
root = parseQueue(this.csContext, conf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues, NOOP);
setQueueAcls(authorizer, queues);
labelManager.reinitializeQueueLabels(getQueueToLabels());
LOG.info("Initialized root queue " + root);
}
@Override
public void reinitializeQueues(CapacitySchedulerConfiguration newConf)
throws IOException {
// Parse new queues
Map<String, CSQueue> newQueues = new HashMap<>();
CSQueue newRoot = parseQueue(this.csContext, newConf, null,
CapacitySchedulerConfiguration.ROOT, newQueues, queues, NOOP);
// Ensure all existing queues are still present
validateExistingQueues(queues, newQueues);
// Add new queues
addNewQueues(queues, newQueues);
// Re-configure queues
root.reinitialize(newRoot, this.csContext.getClusterResource());
setQueueAcls(authorizer, queues);
// Re-calculate headroom for active applications
Resource clusterResource = this.csContext.getClusterResource();
root.updateClusterResource(clusterResource, new ResourceLimits(
clusterResource));
labelManager.reinitializeQueueLabels(getQueueToLabels());
}
/**
* Parse the queue from the configuration.
* @param csContext the CapacitySchedulerContext
* @param conf the CapacitySchedulerConfiguration
* @param parent the parent queue
* @param queueName the queue name
* @param queues all the queues
* @param oldQueues the old queues
* @param hook the queue hook
* @return the CSQueue
* @throws IOException
*/
static CSQueue parseQueue(
CapacitySchedulerContext csContext,
CapacitySchedulerConfiguration conf,
CSQueue parent, String queueName, Map<String, CSQueue> queues,
Map<String, CSQueue> oldQueues,
QueueHook hook) throws IOException {
CSQueue queue;
String fullQueueName =
(parent == null) ? queueName
: (parent.getQueuePath() + "." + queueName);
String[] childQueueNames = conf.getQueues(fullQueueName);
boolean isReservableQueue = conf.isReservable(fullQueueName);
if (childQueueNames == null || childQueueNames.length == 0) {
if (null == parent) {
throw new IllegalStateException(
"Queue configuration missing child queue names for " + queueName);
}
// Check if the queue will be dynamically managed by the Reservation
// system
if (isReservableQueue) {
queue =
new PlanQueue(csContext, queueName, parent,
oldQueues.get(queueName));
} else {
queue =
new LeafQueue(csContext, queueName, parent,
oldQueues.get(queueName));
// Used only for unit tests
queue = hook.hook(queue);
}
} else {
if (isReservableQueue) {
throw new IllegalStateException(
"Only Leaf Queues can be reservable for " + queueName);
}
ParentQueue parentQueue =
new ParentQueue(csContext, queueName, parent,
oldQueues.get(queueName));
// Used only for unit tests
queue = hook.hook(parentQueue);
List<CSQueue> childQueues = new ArrayList<>();
for (String childQueueName : childQueueNames) {
CSQueue childQueue =
parseQueue(csContext, conf, queue, childQueueName,
queues, oldQueues, hook);
childQueues.add(childQueue);
}
parentQueue.setChildQueues(childQueues);
}
if (queue instanceof LeafQueue && queues.containsKey(queueName)
&& queues.get(queueName) instanceof LeafQueue) {
throw new IOException("Two leaf queues were named " + queueName
+ ". Leaf queue names must be distinct");
}
queues.put(queueName, queue);
LOG.info("Initialized queue: " + queue);
return queue;
}
/**
* Ensure all existing queues are present. Queues cannot be deleted
* @param queues existing queues
* @param newQueues new queues
*/
private void validateExistingQueues(
Map<String, CSQueue> queues, Map<String, CSQueue> newQueues)
throws IOException {
// check that all static queues are included in the newQueues list
for (Map.Entry<String, CSQueue> e : queues.entrySet()) {
if (!(e.getValue() instanceof ReservationQueue)) {
String queueName = e.getKey();
CSQueue oldQueue = e.getValue();
CSQueue newQueue = newQueues.get(queueName);
if (null == newQueue) {
throw new IOException(queueName + " cannot be found during refresh!");
} else if (!oldQueue.getQueuePath().equals(newQueue.getQueuePath())) {
throw new IOException(queueName + " is moved from:"
+ oldQueue.getQueuePath() + " to:" + newQueue.getQueuePath()
+ " after refresh, which is not allowed.");
}
}
}
}
/**
* Add the new queues (only) to our list of queues...
* ... be careful, do not overwrite existing queues.
* @param queues the existing queues
* @param newQueues the new queues
*/
private void addNewQueues(
Map<String, CSQueue> queues, Map<String, CSQueue> newQueues) {
for (Map.Entry<String, CSQueue> e : newQueues.entrySet()) {
String queueName = e.getKey();
CSQueue queue = e.getValue();
if (!queues.containsKey(queueName)) {
queues.put(queueName, queue);
}
}
}
@VisibleForTesting
/**
* Set the acls for the queues.
* @param authorizer the yarnAuthorizationProvider
* @param queues the queues
* @throws IOException if fails to set queue acls
*/
public static void setQueueAcls(YarnAuthorizationProvider authorizer,
Map<String, CSQueue> queues) throws IOException {
List<Permission> permissions = new ArrayList<>();
for (CSQueue queue : queues.values()) {
AbstractCSQueue csQueue = (AbstractCSQueue) queue;
permissions.add(
new Permission(csQueue.getPrivilegedEntity(), csQueue.getACLs()));
}
authorizer.setPermission(permissions,
UserGroupInformation.getCurrentUser());
}
/**
* Check that the String provided in input is the name of an existing,
* LeafQueue, if successful returns the queue.
*
* @param queue the queue name
* @return the LeafQueue
* @throws YarnException if the queue does not exist or the queue
* is not the type of LeafQueue.
*/
public LeafQueue getAndCheckLeafQueue(String queue) throws YarnException {
CSQueue ret = this.getQueue(queue);
if (ret == null) {
throw new YarnException("The specified Queue: " + queue
+ " doesn't exist");
}
if (!(ret instanceof LeafQueue)) {
throw new YarnException("The specified Queue: " + queue
+ " is not a Leaf Queue.");
}
return (LeafQueue) ret;
}
/**
* Get the default priority of the queue.
* @param queueName the queue name
* @return the default priority of the queue
*/
public Priority getDefaultPriorityForQueue(String queueName) {
Queue queue = getQueue(queueName);
if (null == queue || null == queue.getDefaultApplicationPriority()) {
// Return with default application priority
return Priority.newInstance(CapacitySchedulerConfiguration
.DEFAULT_CONFIGURATION_APPLICATION_PRIORITY);
}
return Priority.newInstance(queue.getDefaultApplicationPriority()
.getPriority());
}
/**
* Get a map of queueToLabels.
* @return the map of queueToLabels
*/
private Map<String, Set<String>> getQueueToLabels() {
Map<String, Set<String>> queueToLabels = new HashMap<>();
for (CSQueue queue : getQueues().values()) {
queueToLabels.put(queue.getQueueName(), queue.getAccessibleNodeLabels());
}
return queueToLabels;
}
}

View File

@ -111,7 +111,8 @@ public void setUp() throws IOException {
when(csContext.getClusterResource()).
thenReturn(Resources.createResource(10 * 16 * GB, 10 * 32));
when(csContext.getNonPartitionedQueueComparator()).
thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
thenReturn(
CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR);
when(csContext.getResourceCalculator()).
thenReturn(resourceCalculator);
when(csContext.getRMContext()).thenReturn(rmContext);
@ -123,9 +124,9 @@ public void setUp() throws IOException {
containerTokenSecretManager);
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
CSQueue root =
CapacityScheduler.parseQueue(csContext, csConf, null, "root",
queues, queues,
CSQueue root = CapacitySchedulerQueueManager
.parseQueue(csContext, csConf, null, "root",
queues, queues,
TestUtils.spyHook);
@ -276,7 +277,8 @@ public void testLimitsComputation() throws Exception {
when(csContext.getMaximumResourceCapability()).
thenReturn(Resources.createResource(16*GB, 16));
when(csContext.getNonPartitionedQueueComparator()).
thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
thenReturn(
CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR);
when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
when(csContext.getRMContext()).thenReturn(rmContext);
when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
@ -288,8 +290,8 @@ public void testLimitsComputation() throws Exception {
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
CSQueue root =
CapacityScheduler.parseQueue(csContext, csConf, null, "root",
queues, queues, TestUtils.spyHook);
CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
"root", queues, queues, TestUtils.spyHook);
LeafQueue queue = (LeafQueue)queues.get(A);
@ -356,9 +358,9 @@ public void testLimitsComputation() throws Exception {
+ ".maximum-am-resource-percent", 0.5f);
// Re-create queues to get new configs.
queues = new HashMap<String, CSQueue>();
root =
CapacityScheduler.parseQueue(csContext, csConf, null, "root",
queues, queues, TestUtils.spyHook);
root = CapacitySchedulerQueueManager.parseQueue(
csContext, csConf, null, "root",
queues, queues, TestUtils.spyHook);
clusterResource = Resources.createResource(100 * 16 * GB);
queue = (LeafQueue)queues.get(A);
@ -378,9 +380,9 @@ public void testLimitsComputation() throws Exception {
9999);
// Re-create queues to get new configs.
queues = new HashMap<String, CSQueue>();
root =
CapacityScheduler.parseQueue(csContext, csConf, null, "root",
queues, queues, TestUtils.spyHook);
root = CapacitySchedulerQueueManager.parseQueue(
csContext, csConf, null, "root",
queues, queues, TestUtils.spyHook);
queue = (LeafQueue)queues.get(A);
assertEquals(9999, (int)csConf.getMaximumApplicationsPerQueue(queue.getQueuePath()));
@ -580,7 +582,8 @@ public void testHeadroom() throws Exception {
when(csContext.getMaximumResourceCapability()).
thenReturn(Resources.createResource(16*GB));
when(csContext.getNonPartitionedQueueComparator()).
thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
thenReturn(
CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR);
when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
when(csContext.getRMContext()).thenReturn(rmContext);
@ -589,8 +592,8 @@ public void testHeadroom() throws Exception {
when(csContext.getClusterResource()).thenReturn(clusterResource);
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
CSQueue rootQueue = CapacityScheduler.parseQueue(csContext, csConf, null,
"root", queues, queues, TestUtils.spyHook);
CSQueue rootQueue = CapacitySchedulerQueueManager.parseQueue(csContext,
csConf, null, "root", queues, queues, TestUtils.spyHook);
ResourceUsage queueCapacities = rootQueue.getQueueResourceUsage();
when(csContext.getClusterResourceUsage())

View File

@ -595,7 +595,8 @@ public void testHeadroom() throws Exception {
when(csContext.getMaximumResourceCapability())
.thenReturn(Resources.createResource(16 * GB));
when(csContext.getNonPartitionedQueueComparator())
.thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
.thenReturn(
CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR);
when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
RMContext rmContext = TestUtils.getMockRMContext();
RMContext spyRMContext = spy(rmContext);
@ -614,8 +615,8 @@ public void testHeadroom() throws Exception {
when(csContext.getClusterResource()).thenReturn(clusterResource);
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
CSQueue rootQueue = CapacityScheduler.parseQueue(csContext, csConf, null,
"root", queues, queues, TestUtils.spyHook);
CSQueue rootQueue = CapacitySchedulerQueueManager.parseQueue(csContext,
csConf, null, "root", queues, queues, TestUtils.spyHook);
ResourceUsage queueResUsage = rootQueue.getQueueResourceUsage();
when(csContext.getClusterResourceUsage())

View File

@ -95,11 +95,12 @@ public void setUp() throws Exception {
when(csContext.getMaximumResourceCapability()).thenReturn(
Resources.createResource(16*GB, 32));
when(csContext.getClusterResource()).
thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32));
thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32));
when(csContext.getNonPartitionedQueueComparator()).
thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
thenReturn(
CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR);
when(csContext.getResourceCalculator()).
thenReturn(resourceComparator);
thenReturn(resourceComparator);
when(csContext.getRMContext()).thenReturn(rmContext);
when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
}
@ -222,7 +223,7 @@ public void testSortedQueues() throws Exception {
setupSortedQueues(csConf);
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
CSQueue root =
CapacityScheduler.parseQueue(csContext, csConf, null,
CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues,
TestUtils.spyHook);

View File

@ -175,7 +175,8 @@ private void setUpInternal(ResourceCalculator rC) throws Exception {
when(csContext.getClusterResource()).
thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32));
when(csContext.getNonPartitionedQueueComparator()).
thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
thenReturn(
CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR);
when(csContext.getResourceCalculator()).
thenReturn(resourceCalculator);
when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
@ -188,7 +189,7 @@ private void setUpInternal(ResourceCalculator rC) throws Exception {
containerTokenSecretManager);
root =
CapacityScheduler.parseQueue(csContext, csConf, null,
CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
CapacitySchedulerConfiguration.ROOT,
queues, queues,
TestUtils.spyHook);
@ -2380,7 +2381,7 @@ public void testActivateApplicationAfterQueueRefresh() throws Exception {
.DEFAULT_MAXIMUM_APPLICATIONMASTERS_RESOURCE_PERCENT * 2);
Map<String, CSQueue> newQueues = new HashMap<String, CSQueue>();
CSQueue newRoot =
CapacityScheduler.parseQueue(csContext, csConf, null,
CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
CapacitySchedulerConfiguration.ROOT,
newQueues, queues,
TestUtils.spyHook);
@ -2405,7 +2406,7 @@ public void testNodeLocalityAfterQueueRefresh() throws Exception {
.NODE_LOCALITY_DELAY, 60);
Map<String, CSQueue> newQueues = new HashMap<String, CSQueue>();
CSQueue newRoot =
CapacityScheduler.parseQueue(csContext, csConf, null,
CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
CapacitySchedulerConfiguration.ROOT,
newQueues, queues,
TestUtils.spyHook);

View File

@ -97,10 +97,11 @@ public void setUp() throws Exception {
when(csContext.getClusterResource()).
thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32));
when(csContext.getNonPartitionedQueueComparator()).
thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
thenReturn(
CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR);
when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
when(csContext.getResourceCalculator()).
thenReturn(resourceComparator);
thenReturn(resourceComparator);
when(csContext.getRMContext()).thenReturn(rmContext);
}
@ -231,7 +232,7 @@ public void testSingleLevelQueues() throws Exception {
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
CSQueue root =
CapacityScheduler.parseQueue(csContext, csConf, null,
CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues,
TestUtils.spyHook);
@ -346,7 +347,7 @@ public void testSingleLevelQueuesPrecision() throws Exception {
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
boolean exceptionOccured = false;
try {
CapacityScheduler.parseQueue(csContext, csConf, null,
CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues,
TestUtils.spyHook);
} catch (IllegalArgumentException ie) {
@ -360,7 +361,7 @@ public void testSingleLevelQueuesPrecision() throws Exception {
exceptionOccured = false;
queues.clear();
try {
CapacityScheduler.parseQueue(csContext, csConf, null,
CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues,
TestUtils.spyHook);
} catch (IllegalArgumentException ie) {
@ -374,7 +375,7 @@ public void testSingleLevelQueuesPrecision() throws Exception {
exceptionOccured = false;
queues.clear();
try {
CapacityScheduler.parseQueue(csContext, csConf, null,
CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues,
TestUtils.spyHook);
} catch (IllegalArgumentException ie) {
@ -467,7 +468,7 @@ public void testMultiLevelQueues() throws Exception {
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
CSQueue root =
CapacityScheduler.parseQueue(csContext, csConf, null,
CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues,
TestUtils.spyHook);
@ -623,8 +624,8 @@ public void testQueueCapacitySettingChildZero() throws Exception {
csConf.setCapacity(Q_B + "." + B3, 0);
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
CapacityScheduler.parseQueue(csContext, csConf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues,
CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues,
TestUtils.spyHook);
}
@ -640,8 +641,8 @@ public void testQueueCapacitySettingParentZero() throws Exception {
csConf.setCapacity(Q_A, 60);
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
CapacityScheduler.parseQueue(csContext, csConf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues,
CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues,
TestUtils.spyHook);
}
@ -662,8 +663,8 @@ public void testQueueCapacityZero() throws Exception {
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
try {
CapacityScheduler.parseQueue(csContext, csConf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues,
CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues,
TestUtils.spyHook);
} catch (IllegalArgumentException e) {
fail("Failed to create queues with 0 capacity: " + e);
@ -678,7 +679,7 @@ public void testOffSwitchScheduling() throws Exception {
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
CSQueue root =
CapacityScheduler.parseQueue(csContext, csConf, null,
CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues,
TestUtils.spyHook);
@ -754,8 +755,8 @@ public void testOffSwitchSchedulingMultiLevelQueues() throws Exception {
//B3
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
CSQueue root =
CapacityScheduler.parseQueue(csContext, csConf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues,
CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues,
TestUtils.spyHook);
// Setup some nodes
@ -850,12 +851,12 @@ public void testQueueAcl() throws Exception {
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
CSQueue root =
CapacityScheduler.parseQueue(csContext, csConf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues,
CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues,
TestUtils.spyHook);
YarnAuthorizationProvider authorizer =
YarnAuthorizationProvider.getInstance(conf);
CapacityScheduler.setQueueAcls(authorizer, queues);
CapacitySchedulerQueueManager.setQueueAcls(authorizer, queues);
UserGroupInformation user = UserGroupInformation.getCurrentUser();
// Setup queue configs

View File

@ -134,7 +134,7 @@ private void setup(CapacitySchedulerConfiguration csConf,
when(csContext.getClusterResource()).thenReturn(
Resources.createResource(100 * 16 * GB, 100 * 12));
when(csContext.getNonPartitionedQueueComparator()).thenReturn(
CapacityScheduler.nonPartitionedQueueComparator);
CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR);
when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
when(csContext.getRMContext()).thenReturn(rmContext);
@ -144,7 +144,7 @@ private void setup(CapacitySchedulerConfiguration csConf,
when(csContext.getContainerTokenSecretManager()).thenReturn(
containerTokenSecretManager);
root = CapacityScheduler.parseQueue(csContext, csConf, null,
root = CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues, TestUtils.spyHook);
ResourceUsage queueResUsage = root.getQueueResourceUsage();
@ -1180,8 +1180,8 @@ public void refreshQueuesTurnOffReservationsContLook(LeafQueue a,
csConf.setBoolean(
CapacitySchedulerConfiguration.RESERVE_CONT_LOOK_ALL_NODES, false);
Map<String, CSQueue> newQueues = new HashMap<String, CSQueue>();
CSQueue newRoot = CapacityScheduler.parseQueue(csContext, csConf, null,
CapacitySchedulerConfiguration.ROOT, newQueues, queues,
CSQueue newRoot = CapacitySchedulerQueueManager.parseQueue(csContext,
csConf, null, CapacitySchedulerConfiguration.ROOT, newQueues, queues,
TestUtils.spyHook);
queues = newQueues;
root.reinitialize(newRoot, cs.getClusterResource());

View File

@ -141,7 +141,7 @@ public Resource answer(InvocationOnMock invocation) throws Throwable {
/**
* Hook to spy on queues.
*/
static class SpyHook extends CapacityScheduler.QueueHook {
static class SpyHook extends CapacitySchedulerQueueManager.QueueHook {
@Override
public CSQueue hook(CSQueue queue) {
return spy(queue);