YARN-7394. Merge code paths for Reservation/Plan queues and Auto Created queues. (Suma Shivaprasad via wangda)

This commit is contained in:
Wangda Tan 2017-11-06 21:38:24 -08:00
parent 8f214dc4f8
commit 13fa2d4e3e
10 changed files with 401 additions and 278 deletions

View File

@ -28,10 +28,10 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AutoCreatedLeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.PlanQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.PlanQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ReservationQueue;
import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -92,8 +92,8 @@ protected void addReservationQueue(
String planQueueName, Queue queue, String currResId) { String planQueueName, Queue queue, String currResId) {
PlanQueue planQueue = (PlanQueue)queue; PlanQueue planQueue = (PlanQueue)queue;
try { try {
ReservationQueue resQueue = AutoCreatedLeafQueue resQueue =
new ReservationQueue(cs, currResId, planQueue); new AutoCreatedLeafQueue(cs, currResId, planQueue);
cs.addQueue(resQueue); cs.addQueue(resQueue);
} catch (SchedulerDynamicEditException e) { } catch (SchedulerDynamicEditException e) {
LOG.warn( LOG.warn(
@ -112,8 +112,8 @@ protected void createDefaultReservationQueue(
PlanQueue planQueue = (PlanQueue)queue; PlanQueue planQueue = (PlanQueue)queue;
if (cs.getQueue(defReservationId) == null) { if (cs.getQueue(defReservationId) == null) {
try { try {
ReservationQueue defQueue = AutoCreatedLeafQueue defQueue =
new ReservationQueue(cs, defReservationId, planQueue); new AutoCreatedLeafQueue(cs, defReservationId, planQueue);
cs.addQueue(defQueue); cs.addQueue(defQueue);
} catch (SchedulerDynamicEditException e) { } catch (SchedulerDynamicEditException e) {
LOG.warn( LOG.warn(

View File

@ -0,0 +1,232 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Iterator;
/**
* A container class for automatically created child leaf queues.
* From the user perspective this is equivalent to a LeafQueue,
* but functionality wise is a sub-class of ParentQueue
*/
public abstract class AbstractManagedParentQueue extends ParentQueue {
private static final Logger LOG = LoggerFactory.getLogger(
AbstractManagedParentQueue.class);
private int maxAppsForAutoCreatedQueues;
private int maxAppsPerUserForAutoCreatedQueues;
private int userLimit;
private float userLimitFactor;
public AbstractManagedParentQueue(CapacitySchedulerContext cs,
String queueName, CSQueue parent, CSQueue old) throws IOException {
super(cs, queueName, parent, old);
super.setupQueueConfigs(csContext.getClusterResource());
initializeLeafQueueConfigs();
StringBuffer queueInfo = new StringBuffer();
queueInfo.append("Created Managed Parent Queue: ").append(queueName)
.append("\nof type : [" + getClass())
.append("]\nwith capacity: [")
.append(super.getCapacity()).append("]\nwith max capacity: [")
.append(super.getMaximumCapacity()).append("\nwith max apps: [")
.append(getMaxApplicationsForAutoCreatedQueues())
.append("]\nwith max apps per user: [")
.append(getMaxApplicationsPerUserForAutoCreatedQueues())
.append("]\nwith user limit: [").append(getUserLimit())
.append("]\nwith user limit factor: [")
.append(getUserLimitFactor()).append("].");
LOG.info(queueInfo.toString());
}
@Override
public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource)
throws IOException {
try {
writeLock.lock();
// Set new configs
setupQueueConfigs(clusterResource);
initializeLeafQueueConfigs();
// run reinitialize on each existing queue, to trigger absolute cap
// recomputations
for (CSQueue res : this.getChildQueues()) {
res.reinitialize(res, clusterResource);
}
} finally {
writeLock.unlock();
}
}
/**
* Initialize leaf queue configs from template configurations specified on
* parent queue.
*/
protected void initializeLeafQueueConfigs() {
CapacitySchedulerConfiguration conf = csContext.getConfiguration();
final String queuePath = super.getQueuePath();
int maxApps = conf.getMaximumApplicationsPerQueue(queuePath);
if (maxApps < 0) {
maxApps = (int) (
CapacitySchedulerConfiguration.DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS
* getAbsoluteCapacity());
}
userLimit = conf.getUserLimit(queuePath);
userLimitFactor = conf.getUserLimitFactor(queuePath);
maxAppsForAutoCreatedQueues = maxApps;
maxAppsPerUserForAutoCreatedQueues =
(int) (maxApps * (userLimit / 100.0f) * userLimitFactor);
}
/**
* Number of maximum applications for each of the auto created leaf queues.
*
* @return maxAppsForAutoCreatedQueues
*/
public int getMaxApplicationsForAutoCreatedQueues() {
return maxAppsForAutoCreatedQueues;
}
/**
* Number of maximum applications per user for each of the auto created
* leaf queues.
*
* @return maxAppsPerUserForAutoCreatedQueues
*/
public int getMaxApplicationsPerUserForAutoCreatedQueues() {
return maxAppsPerUserForAutoCreatedQueues;
}
/**
* User limit value for each of the auto created leaf queues.
*
* @return userLimit
*/
public int getUserLimitForAutoCreatedQueues() {
return userLimit;
}
/**
* User limit factor value for each of the auto created leaf queues.
*
* @return userLimitFactor
*/
public float getUserLimitFactor() {
return userLimitFactor;
}
public int getMaxAppsForAutoCreatedQueues() {
return maxAppsForAutoCreatedQueues;
}
public int getMaxAppsPerUserForAutoCreatedQueues() {
return maxAppsPerUserForAutoCreatedQueues;
}
public int getUserLimit() {
return userLimit;
}
/**
* Add the specified child queue.
* @param childQueue reference to the child queue to be added
* @throws SchedulerDynamicEditException
*/
public void addChildQueue(CSQueue childQueue)
throws SchedulerDynamicEditException {
try {
writeLock.lock();
if (childQueue.getCapacity() > 0) {
throw new SchedulerDynamicEditException(
"Queue " + childQueue + " being added has non zero capacity.");
}
boolean added = this.childQueues.add(childQueue);
if (LOG.isDebugEnabled()) {
LOG.debug("updateChildQueues (action: add queue): " + added + " "
+ getChildQueuesToPrint());
}
} finally {
writeLock.unlock();
}
}
/**
* Remove the specified child queue.
* @param childQueue reference to the child queue to be removed
* @throws SchedulerDynamicEditException
*/
public void removeChildQueue(CSQueue childQueue)
throws SchedulerDynamicEditException {
try {
writeLock.lock();
if (childQueue.getCapacity() > 0) {
throw new SchedulerDynamicEditException(
"Queue " + childQueue + " being removed has non zero capacity.");
}
Iterator<CSQueue> qiter = childQueues.iterator();
while (qiter.hasNext()) {
CSQueue cs = qiter.next();
if (cs.equals(childQueue)) {
qiter.remove();
if (LOG.isDebugEnabled()) {
LOG.debug("Removed child queue: {}" + cs.getQueueName());
}
}
}
} finally {
writeLock.unlock();
}
}
/**
* Remove the specified child queue.
* @param childQueueName name of the child queue to be removed
* @throws SchedulerDynamicEditException
*/
public CSQueue removeChildQueue(String childQueueName)
throws SchedulerDynamicEditException {
CSQueue childQueue;
try {
writeLock.lock();
childQueue = this.csContext.getCapacitySchedulerQueueManager().getQueue(
childQueueName);
if (childQueue != null) {
removeChildQueue(childQueue);
} else {
throw new SchedulerDynamicEditException("Cannot find queue to delete "
+ ": " + childQueueName);
}
} finally {
writeLock.unlock();
}
return childQueue;
}
}

View File

@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the * to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance * "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at * with the License. You may obtain a copy of the License at
* * <p>
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* * <p>
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -18,35 +18,35 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.io.IOException;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.IOException;
/** /**
* This represents a dynamic {@link LeafQueue} managed by the * Leaf queues which are auto created by an underkying implementation of
* {@link ReservationSystem} * AbstractManagedParentQueue. Eg: PlanQueue for reservations or
* * ManagedParentQueue for auto created dynamic queues
*/ */
public class ReservationQueue extends LeafQueue { public class AutoCreatedLeafQueue extends LeafQueue {
private static final Logger LOG = LoggerFactory private static final Logger LOG = LoggerFactory
.getLogger(ReservationQueue.class); .getLogger(AutoCreatedLeafQueue.class);
private PlanQueue parent; private AbstractManagedParentQueue parent;
public ReservationQueue(CapacitySchedulerContext cs, String queueName, public AutoCreatedLeafQueue(CapacitySchedulerContext cs, String queueName,
PlanQueue parent) throws IOException { AbstractManagedParentQueue parent) throws IOException {
super(cs, queueName, parent, null); super(cs, queueName, parent, null);
// the following parameters are common to all reservation in the plan
updateQuotas(parent.getUserLimitForReservation(), updateApplicationAndUserLimits(parent.getUserLimitForAutoCreatedQueues(),
parent.getUserLimitFactor(), parent.getUserLimitFactor(),
parent.getMaxApplicationsForReservations(), parent.getMaxApplicationsForAutoCreatedQueues(),
parent.getMaxApplicationsPerUserForReservation()); parent.getMaxApplicationsPerUserForAutoCreatedQueues());
this.parent = parent; this.parent = parent;
} }
@ -55,21 +55,18 @@ public void reinitialize(CSQueue newlyParsedQueue,
Resource clusterResource) throws IOException { Resource clusterResource) throws IOException {
try { try {
writeLock.lock(); writeLock.lock();
// Sanity check
if (!(newlyParsedQueue instanceof ReservationQueue) || !newlyParsedQueue validate(newlyParsedQueue);
.getQueuePath().equals(getQueuePath())) {
throw new IOException(
"Trying to reinitialize " + getQueuePath() + " from "
+ newlyParsedQueue.getQueuePath());
}
super.reinitialize(newlyParsedQueue, clusterResource); super.reinitialize(newlyParsedQueue, clusterResource);
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
this, labelManager, null); this, labelManager, null);
updateQuotas(parent.getUserLimitForReservation(), updateApplicationAndUserLimits(parent.getUserLimitForAutoCreatedQueues(),
parent.getUserLimitFactor(), parent.getUserLimitFactor(),
parent.getMaxApplicationsForReservations(), parent.getMaxApplicationsForAutoCreatedQueues(),
parent.getMaxApplicationsPerUserForReservation()); parent.getMaxApplicationsPerUserForAutoCreatedQueues());
} finally { } finally {
writeLock.unlock(); writeLock.unlock();
} }
@ -77,10 +74,10 @@ public void reinitialize(CSQueue newlyParsedQueue,
/** /**
* This methods to change capacity for a queue and adjusts its * This methods to change capacity for a queue and adjusts its
* absoluteCapacity * absoluteCapacity.
* *
* @param entitlement the new entitlement for the queue (capacity, * @param entitlement the new entitlement for the queue (capacity,
* maxCapacity, etc..) * maxCapacity)
* @throws SchedulerDynamicEditException * @throws SchedulerDynamicEditException
*/ */
public void setEntitlement(QueueEntitlement entitlement) public void setEntitlement(QueueEntitlement entitlement)
@ -94,8 +91,6 @@ public void setEntitlement(QueueEntitlement entitlement)
} }
setCapacity(capacity); setCapacity(capacity);
setAbsoluteCapacity(getParent().getAbsoluteCapacity() * getCapacity()); setAbsoluteCapacity(getParent().getAbsoluteCapacity() * getCapacity());
// note: we currently set maxCapacity to capacity
// this might be revised later
setMaxCapacity(entitlement.getMaxCapacity()); setMaxCapacity(entitlement.getMaxCapacity());
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("successfully changed to " + capacity + " for queue " + this LOG.debug("successfully changed to " + capacity + " for queue " + this
@ -106,12 +101,14 @@ public void setEntitlement(QueueEntitlement entitlement)
} }
} }
private void updateQuotas(int userLimit, float userLimitFactor, private void validate(final CSQueue newlyParsedQueue) throws IOException {
int maxAppsForReservation, int maxAppsPerUserForReservation) { if (!(newlyParsedQueue instanceof AutoCreatedLeafQueue) || !newlyParsedQueue
setUserLimit(userLimit); .getQueuePath().equals(getQueuePath())) {
setUserLimitFactor(userLimitFactor); throw new IOException(
setMaxApplications(maxAppsForReservation); "Error trying to reinitialize " + getQueuePath() + " from "
maxApplicationsPerUser = maxAppsPerUserForReservation; + newlyParsedQueue.getQueuePath());
}
} }
@Override @Override
@ -119,4 +116,14 @@ protected void setupConfigurableCapacities() {
CSQueueUtils.updateAndCheckCapacitiesByLabel(getQueuePath(), CSQueueUtils.updateAndCheckCapacitiesByLabel(getQueuePath(),
queueCapacities, parent == null ? null : parent.getQueueCapacities()); queueCapacities, parent == null ? null : parent.getQueueCapacities());
} }
private void updateApplicationAndUserLimits(int userLimit,
float userLimitFactor,
int maxAppsForAutoCreatedQueues,
int maxAppsPerUserForAutoCreatedQueues) {
setUserLimit(userLimit);
setUserLimitFactor(userLimitFactor);
setMaxApplications(maxAppsForAutoCreatedQueues);
setMaxApplicationsPerUser(maxAppsPerUserForAutoCreatedQueues);
}
} }

View File

@ -1921,12 +1921,12 @@ public void removeQueue(String queueName)
writeLock.lock(); writeLock.lock();
LOG.info("Removing queue: " + queueName); LOG.info("Removing queue: " + queueName);
CSQueue q = this.getQueue(queueName); CSQueue q = this.getQueue(queueName);
if (!(q instanceof ReservationQueue)) { if (!(q instanceof AutoCreatedLeafQueue)) {
throw new SchedulerDynamicEditException( throw new SchedulerDynamicEditException(
"The queue that we are asked " + "to remove (" + queueName "The queue that we are asked " + "to remove (" + queueName
+ ") is not a ReservationQueue"); + ") is not a AutoCreatedLeafQueue");
} }
ReservationQueue disposableLeafQueue = (ReservationQueue) q; AutoCreatedLeafQueue disposableLeafQueue = (AutoCreatedLeafQueue) q;
// at this point we should have no more apps // at this point we should have no more apps
if (disposableLeafQueue.getNumApplications() > 0) { if (disposableLeafQueue.getNumApplications() > 0) {
throw new SchedulerDynamicEditException( throw new SchedulerDynamicEditException(
@ -1936,9 +1936,11 @@ public void removeQueue(String queueName)
+ " pending apps"); + " pending apps");
} }
((PlanQueue) disposableLeafQueue.getParent()).removeChildQueue(q); ((AbstractManagedParentQueue) disposableLeafQueue.getParent())
.removeChildQueue(q);
this.queueManager.removeQueue(queueName); this.queueManager.removeQueue(queueName);
LOG.info("Removal of ReservationQueue " + queueName + " has succeeded"); LOG.info("Removal of AutoCreatedLeafQueue "
+ queueName + " has succeeded");
} finally { } finally {
writeLock.unlock(); writeLock.unlock();
} }
@ -1949,25 +1951,28 @@ public void addQueue(Queue queue)
throws SchedulerDynamicEditException { throws SchedulerDynamicEditException {
try { try {
writeLock.lock(); writeLock.lock();
if (!(queue instanceof ReservationQueue)) { if (!(queue instanceof AutoCreatedLeafQueue)) {
throw new SchedulerDynamicEditException( throw new SchedulerDynamicEditException(
"Queue " + queue.getQueueName() + " is not a ReservationQueue"); "Queue " + queue.getQueueName() + " is not a AutoCreatedLeafQueue");
} }
ReservationQueue newQueue = (ReservationQueue) queue; AutoCreatedLeafQueue newQueue = (AutoCreatedLeafQueue) queue;
if (newQueue.getParent() == null || !(newQueue if (newQueue.getParent() == null
.getParent() instanceof PlanQueue)) { || !(AbstractManagedParentQueue.class.
isAssignableFrom(newQueue.getParent().getClass()))) {
throw new SchedulerDynamicEditException( throw new SchedulerDynamicEditException(
"ParentQueue for " + newQueue.getQueueName() "ParentQueue for " + newQueue.getQueueName()
+ " is not properly set (should be set and be a PlanQueue)"); + " is not properly set"
+ " (should be set and be a PlanQueue or ManagedParentQueue)");
} }
PlanQueue parentPlan = (PlanQueue) newQueue.getParent(); AbstractManagedParentQueue parentPlan =
(AbstractManagedParentQueue) newQueue.getParent();
String queuename = newQueue.getQueueName(); String queuename = newQueue.getQueueName();
parentPlan.addChildQueue(newQueue); parentPlan.addChildQueue(newQueue);
this.queueManager.addQueue(queuename, newQueue); this.queueManager.addQueue(queuename, newQueue);
LOG.info("Creation of ReservationQueue " + newQueue + " succeeded"); LOG.info("Creation of AutoCreatedLeafQueue " + newQueue + " succeeded");
} finally { } finally {
writeLock.unlock(); writeLock.unlock();
} }
@ -1981,21 +1986,22 @@ public void setEntitlement(String inQueue, QueueEntitlement entitlement)
LeafQueue queue = this.queueManager.getAndCheckLeafQueue(inQueue); LeafQueue queue = this.queueManager.getAndCheckLeafQueue(inQueue);
ParentQueue parent = (ParentQueue) queue.getParent(); ParentQueue parent = (ParentQueue) queue.getParent();
if (!(queue instanceof ReservationQueue)) { if (!(queue instanceof AutoCreatedLeafQueue)) {
throw new SchedulerDynamicEditException( throw new SchedulerDynamicEditException(
"Entitlement can not be" + " modified dynamically since queue " "Entitlement can not be" + " modified dynamically since queue "
+ inQueue + " is not a ReservationQueue"); + inQueue + " is not a AutoCreatedLeafQueue");
} }
if (!(parent instanceof PlanQueue)) { if (parent == null
|| !(AbstractManagedParentQueue.class.isAssignableFrom(parent.getClass()))) {
throw new SchedulerDynamicEditException( throw new SchedulerDynamicEditException(
"The parent of ReservationQueue " + inQueue "The parent of AutoCreatedLeafQueue " + inQueue
+ " must be an PlanQueue"); + " must be a PlanQueue/ManagedParentQueue");
} }
ReservationQueue newQueue = (ReservationQueue) queue; AutoCreatedLeafQueue newQueue = (AutoCreatedLeafQueue) queue;
float sumChilds = ((PlanQueue) parent).sumOfChildCapacities(); float sumChilds = parent.sumOfChildCapacities();
float newChildCap = float newChildCap =
sumChilds - queue.getCapacity() + entitlement.getCapacity(); sumChilds - queue.getCapacity() + entitlement.getCapacity();
@ -2010,12 +2016,13 @@ public void setEntitlement(String inQueue, QueueEntitlement entitlement)
newQueue.setEntitlement(entitlement); newQueue.setEntitlement(entitlement);
} else{ } else{
throw new SchedulerDynamicEditException( throw new SchedulerDynamicEditException(
"Sum of child queues would exceed 100% for PlanQueue: " + parent "Sum of child queues should exceed 100% for auto creating parent "
.getQueueName()); + "queue : " + parent.getQueueName());
} }
LOG.info( LOG.info(
"Set entitlement for ReservationQueue " + inQueue + " to " + queue "Set entitlement for AutoCreatedLeafQueue " + inQueue
.getCapacity() + " request was (" + entitlement.getCapacity() + " to " + queue.getCapacity() +
" request was (" + entitlement.getCapacity()
+ ")"); + ")");
} finally { } finally {
writeLock.unlock(); writeLock.unlock();

View File

@ -238,7 +238,7 @@ static CSQueue parseQueue(
queueName + ReservationConstants.DEFAULT_QUEUE_SUFFIX; queueName + ReservationConstants.DEFAULT_QUEUE_SUFFIX;
List<CSQueue> childQueues = new ArrayList<>(); List<CSQueue> childQueues = new ArrayList<>();
ReservationQueue resQueue = new ReservationQueue(csContext, AutoCreatedLeafQueue resQueue = new AutoCreatedLeafQueue(csContext,
defReservationId, (PlanQueue) queue); defReservationId, (PlanQueue) queue);
try { try {
resQueue.setEntitlement(new QueueEntitlement(1.0f, 1.0f)); resQueue.setEntitlement(new QueueEntitlement(1.0f, 1.0f));
@ -303,7 +303,7 @@ private void validateQueueHierarchy(Map<String, CSQueue> queues,
Map<String, CSQueue> newQueues) throws IOException { Map<String, CSQueue> newQueues) throws IOException {
// check that all static queues are included in the newQueues list // check that all static queues are included in the newQueues list
for (Map.Entry<String, CSQueue> e : queues.entrySet()) { for (Map.Entry<String, CSQueue> e : queues.entrySet()) {
if (!(e.getValue() instanceof ReservationQueue)) { if (!(e.getValue() instanceof AutoCreatedLeafQueue)) {
String queueName = e.getKey(); String queueName = e.getKey();
CSQueue oldQueue = e.getValue(); CSQueue oldQueue = e.getValue();
CSQueue newQueue = newQueues.get(queueName); CSQueue newQueue = newQueues.get(queueName);

View File

@ -1997,6 +1997,10 @@ public void setAbsoluteCapacity(float absoluteCapacity) {
queueCapacities.setAbsoluteCapacity(absoluteCapacity); queueCapacities.setAbsoluteCapacity(absoluteCapacity);
} }
public void setMaxApplicationsPerUser(int maxApplicationsPerUser) {
this.maxApplicationsPerUser = maxApplicationsPerUser;
}
public void setMaxApplications(int maxApplications) { public void setMaxApplications(int maxApplications) {
this.maxApplications = maxApplications; this.maxApplications = maxApplications;
} }

View File

@ -1080,4 +1080,17 @@ public void stopQueue() {
public QueueOrderingPolicy getQueueOrderingPolicy() { public QueueOrderingPolicy getQueueOrderingPolicy() {
return queueOrderingPolicy; return queueOrderingPolicy;
} }
protected float sumOfChildCapacities() {
try {
writeLock.lock();
float ret = 0;
for (CSQueue l : childQueues) {
ret += l.getCapacity();
}
return ret;
} finally {
writeLock.unlock();
}
}
} }

View File

@ -19,11 +19,9 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.io.IOException; import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -33,191 +31,50 @@
* reservations, but functionality wise is a sub-class of ParentQueue * reservations, but functionality wise is a sub-class of ParentQueue
* *
*/ */
public class PlanQueue extends ParentQueue { public class PlanQueue extends AbstractManagedParentQueue {
private static final Logger LOG = LoggerFactory.getLogger(PlanQueue.class); private static final Logger LOG = LoggerFactory.getLogger(PlanQueue.class);
private int maxAppsForReservation;
private int maxAppsPerUserForReservation;
private int userLimit;
private float userLimitFactor;
protected CapacitySchedulerContext schedulerContext;
private boolean showReservationsAsQueues; private boolean showReservationsAsQueues;
public PlanQueue(CapacitySchedulerContext cs, String queueName, public PlanQueue(CapacitySchedulerContext cs, String queueName,
CSQueue parent, CSQueue old) throws IOException { CSQueue parent, CSQueue old) throws IOException {
super(cs, queueName, parent, old); super(cs, queueName, parent, old);
this.schedulerContext = cs;
// Set the reservation queue attributes for the Plan
CapacitySchedulerConfiguration conf = cs.getConfiguration();
String queuePath = super.getQueuePath();
int maxAppsForReservation = conf.getMaximumApplicationsPerQueue(queuePath);
showReservationsAsQueues = conf.getShowReservationAsQueues(queuePath);
if (maxAppsForReservation < 0) {
maxAppsForReservation =
(int) (CapacitySchedulerConfiguration.DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS * super
.getAbsoluteCapacity());
}
int userLimit = conf.getUserLimit(queuePath);
float userLimitFactor = conf.getUserLimitFactor(queuePath);
int maxAppsPerUserForReservation =
(int) (maxAppsForReservation * (userLimit / 100.0f) * userLimitFactor);
updateQuotas(userLimit, userLimitFactor, maxAppsForReservation,
maxAppsPerUserForReservation);
StringBuffer queueInfo = new StringBuffer();
queueInfo.append("Created Plan Queue: ").append(queueName)
.append("\nwith capacity: [").append(super.getCapacity())
.append("]\nwith max capacity: [").append(super.getMaximumCapacity())
.append("\nwith max reservation apps: [").append(maxAppsForReservation)
.append("]\nwith max reservation apps per user: [")
.append(maxAppsPerUserForReservation).append("]\nwith user limit: [")
.append(userLimit).append("]\nwith user limit factor: [")
.append(userLimitFactor).append("].");
LOG.info(queueInfo.toString());
} }
@Override @Override
public void reinitialize(CSQueue newlyParsedQueue, public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource)
Resource clusterResource) throws IOException { throws IOException {
try { validate(newlyParsedQueue);
writeLock.lock(); super.reinitialize(newlyParsedQueue, clusterResource);
// Sanity check }
if (!(newlyParsedQueue instanceof PlanQueue) || !newlyParsedQueue
.getQueuePath().equals(getQueuePath())) {
throw new IOException(
"Trying to reinitialize " + getQueuePath() + " from "
+ newlyParsedQueue.getQueuePath());
}
PlanQueue newlyParsedParentQueue = (PlanQueue) newlyParsedQueue; @Override
protected void initializeLeafQueueConfigs() {
String queuePath = super.getQueuePath();
showReservationsAsQueues = csContext.getConfiguration()
.getShowReservationAsQueues(queuePath);
super.initializeLeafQueueConfigs();
}
if (newlyParsedParentQueue.getChildQueues().size() != 1) { private void validate(final CSQueue newlyParsedQueue) throws IOException {
throw new IOException( // Sanity check
"Reservable Queue should not have sub-queues in the" if (!(newlyParsedQueue instanceof PlanQueue) || !newlyParsedQueue
+ "configuration expect the default reservation queue"); .getQueuePath().equals(getQueuePath())) {
} throw new IOException(
"Trying to reinitialize " + getQueuePath() + " from "
// Set new configs + newlyParsedQueue.getQueuePath());
setupQueueConfigs(clusterResource);
updateQuotas(newlyParsedParentQueue.userLimit,
newlyParsedParentQueue.userLimitFactor,
newlyParsedParentQueue.maxAppsForReservation,
newlyParsedParentQueue.maxAppsPerUserForReservation);
// run reinitialize on each existing queue, to trigger absolute cap
// recomputations
for (CSQueue res : this.getChildQueues()) {
res.reinitialize(res, clusterResource);
}
showReservationsAsQueues =
newlyParsedParentQueue.showReservationsAsQueues;
} finally {
writeLock.unlock();
} }
}
void addChildQueue(CSQueue newQueue) PlanQueue newlyParsedParentQueue = (PlanQueue) newlyParsedQueue;
throws SchedulerDynamicEditException {
try { if (newlyParsedParentQueue.getChildQueues().size() != 1) {
writeLock.lock(); throw new IOException(
if (newQueue.getCapacity() > 0) { "Reservable Queue should not have sub-queues in the"
throw new SchedulerDynamicEditException( + "configuration expect the default reservation queue");
"Queue " + newQueue + " being added has non zero capacity.");
}
boolean added = this.childQueues.add(newQueue);
if (LOG.isDebugEnabled()) {
LOG.debug("updateChildQueues (action: add queue): " + added + " "
+ getChildQueuesToPrint());
}
} finally {
writeLock.unlock();
} }
} }
void removeChildQueue(CSQueue remQueue)
throws SchedulerDynamicEditException {
try {
writeLock.lock();
if (remQueue.getCapacity() > 0) {
throw new SchedulerDynamicEditException(
"Queue " + remQueue + " being removed has non zero capacity.");
}
Iterator<CSQueue> qiter = childQueues.iterator();
while (qiter.hasNext()) {
CSQueue cs = qiter.next();
if (cs.equals(remQueue)) {
qiter.remove();
if (LOG.isDebugEnabled()) {
LOG.debug("Removed child queue: {}", cs.getQueueName());
}
}
}
} finally {
writeLock.unlock();
}
}
protected float sumOfChildCapacities() {
try {
writeLock.lock();
float ret = 0;
for (CSQueue l : childQueues) {
ret += l.getCapacity();
}
return ret;
} finally {
writeLock.unlock();
}
}
private void updateQuotas(int userLimit, float userLimitFactor,
int maxAppsForReservation, int maxAppsPerUserForReservation) {
this.userLimit = userLimit;
this.userLimitFactor = userLimitFactor;
this.maxAppsForReservation = maxAppsForReservation;
this.maxAppsPerUserForReservation = maxAppsPerUserForReservation;
}
/**
* Number of maximum applications for each of the reservations in this Plan.
*
* @return maxAppsForreservation
*/
public int getMaxApplicationsForReservations() {
return maxAppsForReservation;
}
/**
* Number of maximum applications per user for each of the reservations in
* this Plan.
*
* @return maxAppsPerUserForreservation
*/
public int getMaxApplicationsPerUserForReservation() {
return maxAppsPerUserForReservation;
}
/**
* User limit value for each of the reservations in this Plan.
*
* @return userLimit
*/
public int getUserLimitForReservation() {
return userLimit;
}
/**
* User limit factor value for each of the reservations in this Plan.
*
* @return userLimitFactor
*/
public float getUserLimitFactor() {
return userLimitFactor;
}
/** /**
* Determine whether to hide/show the ReservationQueues * Determine whether to hide/show the ReservationQueues
*/ */

View File

@ -36,15 +36,19 @@
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
public class TestReservationQueue { /**
* Test class for dynamic auto created leaf queues.
* @see AutoCreatedLeafQueue
*/
public class TestAutoCreatedLeafQueue {
CapacitySchedulerConfiguration csConf; private CapacitySchedulerConfiguration csConf;
CapacitySchedulerContext csContext; private CapacitySchedulerContext csContext;
final static int DEF_MAX_APPS = 10000; final static int DEF_MAX_APPS = 10000;
final static int GB = 1024; final static int GB = 1024;
private final ResourceCalculator resourceCalculator = private final ResourceCalculator resourceCalculator =
new DefaultResourceCalculator(); new DefaultResourceCalculator();
ReservationQueue reservationQueue; private AutoCreatedLeafQueue autoCreatedLeafQueue;
@Before @Before
public void setup() throws IOException { public void setup() throws IOException {
@ -61,49 +65,48 @@ public void setup() throws IOException {
when(csContext.getClusterResource()).thenReturn( when(csContext.getClusterResource()).thenReturn(
Resources.createResource(100 * 16 * GB, 100 * 32)); Resources.createResource(100 * 16 * GB, 100 * 32));
when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
RMContext mockRMContext = TestUtils.getMockRMContext(); RMContext mockRMContext = TestUtils.getMockRMContext();
when(csContext.getRMContext()).thenReturn(mockRMContext); when(csContext.getRMContext()).thenReturn(mockRMContext);
// create a queue // create a queue
PlanQueue pq = new PlanQueue(csContext, "root", null, null); PlanQueue pq = new PlanQueue(csContext, "root", null, null);
reservationQueue = new ReservationQueue(csContext, "a", pq); autoCreatedLeafQueue = new AutoCreatedLeafQueue(csContext, "a", pq);
} }
private void validateReservationQueue(double capacity) { private void validateAutoCreatedLeafQueue(double capacity) {
assertTrue(" actual capacity: " + reservationQueue.getCapacity(), assertTrue(" actual capacity: " + autoCreatedLeafQueue.getCapacity(),
reservationQueue.getCapacity() - capacity < CSQueueUtils.EPSILON); autoCreatedLeafQueue.getCapacity() - capacity < CSQueueUtils.EPSILON);
assertEquals(reservationQueue.maxApplications, DEF_MAX_APPS); assertEquals(autoCreatedLeafQueue.maxApplications, DEF_MAX_APPS);
assertEquals(reservationQueue.maxApplicationsPerUser, DEF_MAX_APPS); assertEquals(autoCreatedLeafQueue.maxApplicationsPerUser, DEF_MAX_APPS);
} }
@Test @Test
public void testAddSubtractCapacity() throws Exception { public void testAddSubtractCapacity() throws Exception {
// verify that setting, adding, subtracting capacity works // verify that setting, adding, subtracting capacity works
reservationQueue.setCapacity(1.0F); autoCreatedLeafQueue.setCapacity(1.0F);
validateReservationQueue(1); validateAutoCreatedLeafQueue(1);
reservationQueue.setEntitlement(new QueueEntitlement(0.9f, 1f)); autoCreatedLeafQueue.setEntitlement(new QueueEntitlement(0.9f, 1f));
validateReservationQueue(0.9); validateAutoCreatedLeafQueue(0.9);
reservationQueue.setEntitlement(new QueueEntitlement(1f, 1f)); autoCreatedLeafQueue.setEntitlement(new QueueEntitlement(1f, 1f));
validateReservationQueue(1); validateAutoCreatedLeafQueue(1);
reservationQueue.setEntitlement(new QueueEntitlement(0f, 1f)); autoCreatedLeafQueue.setEntitlement(new QueueEntitlement(0f, 1f));
validateReservationQueue(0); validateAutoCreatedLeafQueue(0);
try { try {
reservationQueue.setEntitlement(new QueueEntitlement(1.1f, 1f)); autoCreatedLeafQueue.setEntitlement(new QueueEntitlement(1.1f, 1f));
fail(); fail();
} catch (SchedulerDynamicEditException iae) { } catch (SchedulerDynamicEditException iae) {
// expected // expected
validateReservationQueue(1); validateAutoCreatedLeafQueue(1);
} }
try { try {
reservationQueue.setEntitlement(new QueueEntitlement(-0.1f, 1f)); autoCreatedLeafQueue.setEntitlement(new QueueEntitlement(-0.1f, 1f));
fail(); fail();
} catch (SchedulerDynamicEditException iae) { } catch (SchedulerDynamicEditException iae) {
// expected // expected
validateReservationQueue(1); validateAutoCreatedLeafQueue(1);
} }
} }

View File

@ -77,21 +77,21 @@ public void testRefreshQueuesWithReservations() throws Exception {
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
//set default queue capacity to zero //set default queue capacity to zero
((ReservationQueue) cs ((AutoCreatedLeafQueue) cs
.getQueue("a" + ReservationConstants.DEFAULT_QUEUE_SUFFIX)) .getQueue("a" + ReservationConstants.DEFAULT_QUEUE_SUFFIX))
.setEntitlement( .setEntitlement(
new QueueEntitlement(0f, 1f)); new QueueEntitlement(0f, 1f));
// Test add one reservation dynamically and manually modify capacity // Test add one reservation dynamically and manually modify capacity
ReservationQueue a1 = AutoCreatedLeafQueue a1 =
new ReservationQueue(cs, "a1", (PlanQueue) cs.getQueue("a")); new AutoCreatedLeafQueue(cs, "a1", (PlanQueue) cs.getQueue("a"));
cs.addQueue(a1); cs.addQueue(a1);
a1.setEntitlement(new QueueEntitlement(A1_CAPACITY / 100, 1f)); a1.setEntitlement(new QueueEntitlement(A1_CAPACITY / 100, 1f));
// Test add another reservation queue and use setEntitlement to modify // Test add another reservation queue and use setEntitlement to modify
// capacity // capacity
ReservationQueue a2 = AutoCreatedLeafQueue a2 =
new ReservationQueue(cs, "a2", (PlanQueue) cs.getQueue("a")); new AutoCreatedLeafQueue(cs, "a2", (PlanQueue) cs.getQueue("a"));
cs.addQueue(a2); cs.addQueue(a2);
cs.setEntitlement("a2", new QueueEntitlement(A2_CAPACITY / 100, 1.0f)); cs.setEntitlement("a2", new QueueEntitlement(A2_CAPACITY / 100, 1.0f));
@ -113,8 +113,8 @@ public void testAddQueueFailCases() throws Exception {
try { try {
// Test invalid addition (adding non-zero size queue) // Test invalid addition (adding non-zero size queue)
ReservationQueue a1 = AutoCreatedLeafQueue a1 =
new ReservationQueue(cs, "a1", (PlanQueue) cs.getQueue("a")); new AutoCreatedLeafQueue(cs, "a1", (PlanQueue) cs.getQueue("a"));
a1.setEntitlement(new QueueEntitlement(A1_CAPACITY / 100, 1f)); a1.setEntitlement(new QueueEntitlement(A1_CAPACITY / 100, 1f));
cs.addQueue(a1); cs.addQueue(a1);
fail(); fail();
@ -123,11 +123,11 @@ public void testAddQueueFailCases() throws Exception {
} }
// Test add one reservation dynamically and manually modify capacity // Test add one reservation dynamically and manually modify capacity
ReservationQueue a1 = AutoCreatedLeafQueue a1 =
new ReservationQueue(cs, "a1", (PlanQueue) cs.getQueue("a")); new AutoCreatedLeafQueue(cs, "a1", (PlanQueue) cs.getQueue("a"));
cs.addQueue(a1); cs.addQueue(a1);
//set default queue capacity to zero //set default queue capacity to zero
((ReservationQueue) cs ((AutoCreatedLeafQueue) cs
.getQueue("a" + ReservationConstants.DEFAULT_QUEUE_SUFFIX)) .getQueue("a" + ReservationConstants.DEFAULT_QUEUE_SUFFIX))
.setEntitlement( .setEntitlement(
new QueueEntitlement(0f, 1f)); new QueueEntitlement(0f, 1f));
@ -135,8 +135,8 @@ public void testAddQueueFailCases() throws Exception {
// Test add another reservation queue and use setEntitlement to modify // Test add another reservation queue and use setEntitlement to modify
// capacity // capacity
ReservationQueue a2 = AutoCreatedLeafQueue a2 =
new ReservationQueue(cs, "a2", (PlanQueue) cs.getQueue("a")); new AutoCreatedLeafQueue(cs, "a2", (PlanQueue) cs.getQueue("a"));
cs.addQueue(a2); cs.addQueue(a2);
@ -162,8 +162,8 @@ public void testRemoveQueue() throws Exception {
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
// Test add one reservation dynamically and manually modify capacity // Test add one reservation dynamically and manually modify capacity
ReservationQueue a1 = AutoCreatedLeafQueue a1 =
new ReservationQueue(cs, "a1", (PlanQueue) cs.getQueue("a")); new AutoCreatedLeafQueue(cs, "a1", (PlanQueue) cs.getQueue("a"));
cs.addQueue(a1); cs.addQueue(a1);
a1.setEntitlement(new QueueEntitlement(A1_CAPACITY / 100, 1f)); a1.setEntitlement(new QueueEntitlement(A1_CAPACITY / 100, 1f));
@ -230,8 +230,8 @@ public void testMoveAppToPlanQueue() throws Exception {
// create the default reservation queue // create the default reservation queue
String defQName = "a" + ReservationConstants.DEFAULT_QUEUE_SUFFIX; String defQName = "a" + ReservationConstants.DEFAULT_QUEUE_SUFFIX;
ReservationQueue defQ = AutoCreatedLeafQueue defQ =
new ReservationQueue(scheduler, defQName, new AutoCreatedLeafQueue(scheduler, defQName,
(PlanQueue) scheduler.getQueue("a")); (PlanQueue) scheduler.getQueue("a"));
scheduler.addQueue(defQ); scheduler.addQueue(defQ);
defQ.setEntitlement(new QueueEntitlement(1f, 1f)); defQ.setEntitlement(new QueueEntitlement(1f, 1f));