diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 0b5447b017..8e8d6278e6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMoveEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; @@ -220,6 +221,24 @@ public String moveApplication(ApplicationId appId, String newQueue) + " does not support moving apps between queues"); } + public void removeQueue(String queueName) throws YarnException { + throw new YarnException(getClass().getSimpleName() + + " does not support removing queues"); + } + + @Override + public void addQueue(Queue newQueue) throws YarnException { + throw new YarnException(getClass().getSimpleName() + + " does not support this operation"); + } + + @Override + public void setEntitlement(String queue, QueueEntitlement entitlement) + throws YarnException { + throw new YarnException(getClass().getSimpleName() + + " does not support this operation"); + } + private void killOrphanContainerOnNode(RMNode node, NMContainerStatus container) { if (!container.getContainerState().equals(ContainerState.COMPLETE)) { @@ -503,4 +522,10 @@ public synchronized void updateNodeResource(RMNode nm, public EnumSet getSchedulingResourceTypes() { return EnumSet.of(SchedulerResourceTypes.MEMORY); } + + @Override + public Set getPlanQueues() throws YarnException { + throw new YarnException(getClass().getSimpleName() + + " does not support reservations"); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerDynamicEditException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerDynamicEditException.java new file mode 100644 index 0000000000..42dc36a01f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerDynamicEditException.java @@ -0,0 +1,13 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import org.apache.hadoop.yarn.exceptions.YarnException; + +public class SchedulerDynamicEditException extends YarnException { + + private static final long serialVersionUID = 7100374511387193257L; + + public SchedulerDynamicEditException(String string) { + super(string); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java index b6c1018c93..d1b5275a8f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.EnumSet; import java.util.List; +import java.util.Set; import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.classification.InterfaceAudience.Public; @@ -41,6 +42,7 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; @@ -223,6 +225,46 @@ public String moveApplication(ApplicationId appId, String newQueue) */ void killAllAppsInQueue(String queueName) throws YarnException; + /** + * Remove an existing queue. Implementations might limit when a queue could be + * removed (e.g., must have zero entitlement, and no applications running, or + * must be a leaf, etc..). + * + * @param queueName name of the queue to remove + * @throws YarnException + */ + void removeQueue(String queueName) throws YarnException; + + /** + * Add to the scheduler a new Queue. Implementations might limit what type of + * queues can be dynamically added (e.g., Queue must be a leaf, must be + * attached to existing parent, must have zero entitlement). + * + * @param newQueue the queue being added. + * @throws YarnException + */ + void addQueue(Queue newQueue) throws YarnException; + + /** + * This method increase the entitlement for current queue (must respect + * invariants, e.g., no overcommit of parents, non negative, etc.). + * Entitlement is a general term for weights in FairScheduler, capacity for + * the CapacityScheduler, etc. + * + * @param queue the queue for which we change entitlement + * @param entitlement the new entitlement for the queue (capacity, + * maxCapacity, etc..) + * @throws YarnException + */ + void setEntitlement(String queue, QueueEntitlement entitlement) + throws YarnException; + + /** + * Gets the list of names for queues managed by the Reservation System + * @return the list of queues which support reservations + */ + public Set getPlanQueues() throws YarnException; + /** * Return a collection of the resource types that are considered when * scheduling diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index e0816b5bdb..ff6db3ab0e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -24,6 +24,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.HashSet; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -90,6 +92,11 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; + @LimitedPrivate("yarn") @Evolving @SuppressWarnings("unchecked") @@ -473,9 +480,12 @@ private void reinitializeQueues(CapacitySchedulerConfiguration conf) private void validateExistingQueues( Map queues, Map newQueues) throws IOException { - for (String queue : queues.keySet()) { - if (!newQueues.containsKey(queue)) { - throw new IOException(queue + " cannot be found during refresh!"); + // check that all static queues are included in the newQueues list + for (Map.Entry e : queues.entrySet()) { + if (!(e.getValue() instanceof ReservationQueue)) { + if (!newQueues.containsKey(e.getKey())) { + throw new IOException(e.getKey() + " cannot be found during refresh!"); + } } } } @@ -507,26 +517,42 @@ static CSQueue parseQueue( Map oldQueues, QueueHook hook) throws IOException { CSQueue queue; + String fullQueueName = + (parent == null) ? queueName + : (parent.getQueuePath() + "." + queueName); String[] childQueueNames = - conf.getQueues((parent == null) ? - queueName : (parent.getQueuePath()+"."+queueName)); + conf.getQueues(fullQueueName); + boolean isReservableQueue = conf.isReservableQueue(fullQueueName); if (childQueueNames == null || childQueueNames.length == 0) { if (null == parent) { throw new IllegalStateException( "Queue configuration missing child queue names for " + queueName); } - queue = - new LeafQueue(csContext, queueName, parent, oldQueues.get(queueName)); - - // Used only for unit tests - queue = hook.hook(queue); + // Check if the queue will be dynamically managed by the Reservation + // system + if (isReservableQueue) { + queue = + new PlanQueue(csContext, queueName, parent, + oldQueues.get(queueName)); + } else { + queue = + new LeafQueue(csContext, queueName, parent, + oldQueues.get(queueName)); + + // Used only for unit tests + queue = hook.hook(queue); + } } else { + if (isReservableQueue) { + throw new IllegalStateException( + "Only Leaf Queues can be reservable for " + queueName); + } ParentQueue parentQueue = new ParentQueue(csContext, queueName, parent, oldQueues.get(queueName)); // Used only for unit tests queue = hook.hook(parentQueue); - + List childQueues = new ArrayList(); for (String childQueueName : childQueueNames) { CSQueue childQueue = @@ -548,7 +574,7 @@ static CSQueue parseQueue( return queue; } - synchronized CSQueue getQueue(String queueName) { + public synchronized CSQueue getQueue(String queueName) { if (queueName == null) { return null; } @@ -716,7 +742,7 @@ private synchronized void doneApplicationAttempt( ApplicationAttemptId applicationAttemptId, RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers) { LOG.info("Application Attempt " + applicationAttemptId + " is done." + - " finalState=" + rmAppAttemptFinalState); + " finalState=" + rmAppAttemptFinalState); FiCaSchedulerApp attempt = getApplicationAttempt(applicationAttemptId); SchedulerApplication application = @@ -996,9 +1022,16 @@ public void handle(SchedulerEvent event) { case APP_ADDED: { AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event; - addApplication(appAddedEvent.getApplicationId(), - appAddedEvent.getQueue(), - appAddedEvent.getUser(), appAddedEvent.getIsAppRecovering()); + String queueName = + resolveReservationQueueName(appAddedEvent.getQueue(), + appAddedEvent.getApplicationId(), + appAddedEvent.getReservationID()); + if (queueName != null) { + addApplication(appAddedEvent.getApplicationId(), + queueName, + appAddedEvent.getUser(), + appAddedEvent.getIsAppRecovering()); + } } break; case APP_REMOVED: @@ -1231,6 +1264,123 @@ private CapacitySchedulerConfiguration loadCapacitySchedulerConfiguration( } } + private synchronized String resolveReservationQueueName(String queueName, + ApplicationId applicationId, ReservationId reservationID) { + CSQueue queue = getQueue(queueName); + // Check if the queue is a plan queue + if ((queue == null) || !(queue instanceof PlanQueue)) { + return queueName; + } + if (reservationID != null) { + String resQName = reservationID.toString(); + queue = getQueue(resQName); + if (queue == null) { + String message = + "Application " + + applicationId + + " submitted to a reservation which is not yet currently active: " + + resQName; + this.rmContext.getDispatcher().getEventHandler() + .handle(new RMAppRejectedEvent(applicationId, message)); + return null; + } + // use the reservation queue to run the app + queueName = resQName; + } else { + // use the default child queue of the plan for unreserved apps + queueName = queueName + PlanQueue.DEFAULT_QUEUE_SUFFIX; + } + return queueName; + } + + @Override + public synchronized void removeQueue(String queueName) + throws SchedulerDynamicEditException { + LOG.info("Removing queue: " + queueName); + CSQueue q = this.getQueue(queueName); + if (!(q instanceof ReservationQueue)) { + throw new SchedulerDynamicEditException("The queue that we are asked " + + "to remove (" + queueName + ") is not a ReservationQueue"); + } + ReservationQueue disposableLeafQueue = (ReservationQueue) q; + // at this point we should have no more apps + if (disposableLeafQueue.getNumApplications() > 0) { + throw new SchedulerDynamicEditException("The queue " + queueName + + " is not empty " + disposableLeafQueue.getApplications().size() + + " active apps " + disposableLeafQueue.pendingApplications.size() + + " pending apps"); + } + + ((PlanQueue) disposableLeafQueue.getParent()).removeChildQueue(q); + this.queues.remove(queueName); + LOG.info("Removal of ReservationQueue " + queueName + " has succeeded"); + } + + @Override + public synchronized void addQueue(Queue queue) + throws SchedulerDynamicEditException { + + if (!(queue instanceof ReservationQueue)) { + throw new SchedulerDynamicEditException("Queue " + queue.getQueueName() + + " is not a ReservationQueue"); + } + + ReservationQueue newQueue = (ReservationQueue) queue; + + if (newQueue.getParent() == null + || !(newQueue.getParent() instanceof PlanQueue)) { + throw new SchedulerDynamicEditException("ParentQueue for " + + newQueue.getQueueName() + + " is not properly set (should be set and be a PlanQueue)"); + } + + PlanQueue parentPlan = (PlanQueue) newQueue.getParent(); + String queuename = newQueue.getQueueName(); + parentPlan.addChildQueue(newQueue); + this.queues.put(queuename, newQueue); + LOG.info("Creation of ReservationQueue " + newQueue + " succeeded"); + } + + @Override + public synchronized void setEntitlement(String inQueue, + QueueEntitlement entitlement) throws SchedulerDynamicEditException, + YarnException { + LeafQueue queue = getAndCheckLeafQueue(inQueue); + ParentQueue parent = (ParentQueue) queue.getParent(); + + if (!(queue instanceof ReservationQueue)) { + throw new SchedulerDynamicEditException("Entitlement can not be" + + " modified dynamically since queue " + inQueue + + " is not a ReservationQueue"); + } + + if (!(parent instanceof PlanQueue)) { + throw new SchedulerDynamicEditException("The parent of ReservationQueue " + + inQueue + " must be an PlanQueue"); + } + + ReservationQueue newQueue = (ReservationQueue) queue; + + float sumChilds = ((PlanQueue) parent).sumOfChildCapacities(); + float newChildCap = sumChilds - queue.getCapacity() + entitlement.getCapacity(); + + if (newChildCap >= 0 && newChildCap < 1.0f + CSQueueUtils.EPSILON) { + // note: epsilon checks here are not ok, as the epsilons might accumulate + // and become a problem in aggregate + if (Math.abs(entitlement.getCapacity() - queue.getCapacity()) == 0 + && Math.abs(entitlement.getMaxCapacity() - queue.getMaximumCapacity()) == 0) { + return; + } + newQueue.setEntitlement(entitlement); + } else { + throw new SchedulerDynamicEditException( + "Sum of child queues would exceed 100% for PlanQueue: " + + parent.getQueueName()); + } + LOG.info("Set entitlement for ReservationQueue " + inQueue + " to " + + queue.getCapacity() + " request was (" + entitlement.getCapacity() + ")"); + } + @Override public synchronized String moveApplication(ApplicationId appId, String targetQueueName) throws YarnException { @@ -1238,11 +1388,12 @@ public synchronized String moveApplication(ApplicationId appId, getApplicationAttempt(ApplicationAttemptId.newInstance(appId, 0)); String sourceQueueName = app.getQueue().getQueueName(); LeafQueue source = getAndCheckLeafQueue(sourceQueueName); - LeafQueue dest = getAndCheckLeafQueue(targetQueueName); + String destQueueName = handleMoveToPlanQueue(targetQueueName); + LeafQueue dest = getAndCheckLeafQueue(destQueueName); // Validation check - ACLs, submission limits for user & queue String user = app.getUser(); try { - dest.submitApplication(appId, user, targetQueueName); + dest.submitApplication(appId, user, destQueueName); } catch (AccessControlException e) { throw new YarnException(e); } @@ -1261,7 +1412,7 @@ public synchronized String moveApplication(ApplicationId appId, dest.submitApplicationAttempt(app, user); applications.get(appId).setQueue(dest); LOG.info("App: " + app.getApplicationId() + " successfully moved from " - + sourceQueueName + " to: " + targetQueueName); + + sourceQueueName + " to: " + destQueueName); return targetQueueName; } @@ -1296,4 +1447,24 @@ public EnumSet getSchedulingResourceTypes() { return EnumSet .of(SchedulerResourceTypes.MEMORY, SchedulerResourceTypes.CPU); } + + private String handleMoveToPlanQueue(String targetQueueName) { + CSQueue dest = getQueue(targetQueueName); + if (dest != null && dest instanceof PlanQueue) { + // use the default child reservation queue of the plan + targetQueueName = targetQueueName + PlanQueue.DEFAULT_QUEUE_SUFFIX; + } + return targetQueueName; + } + + @Override + public Set getPlanQueues() { + Set ret = new HashSet(); + for (Map.Entry l : queues.entrySet()) { + if (l.getValue() instanceof PlanQueue) { + ret.add(l.getKey()); + } + } + return ret; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index cdb6553c92..b9f5d5ff55 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -86,8 +86,8 @@ public class LeafQueue implements CSQueue { private int userLimit; private float userLimitFactor; - private int maxApplications; - private int maxApplicationsPerUser; + protected int maxApplications; + protected int maxApplicationsPerUser; private float maxAMResourcePerQueuePercent; private int maxActiveApplications; // Based on absolute max capacity @@ -153,8 +153,7 @@ public LeafQueue(CapacitySchedulerContext cs, Resources.subtract(maximumAllocation, minimumAllocation), maximumAllocation); - float capacity = - (float)cs.getConfiguration().getCapacity(getQueuePath()) / 100; + float capacity = getCapacityFromConf(); float absoluteCapacity = parent.getAbsoluteCapacity() * capacity; float maximumCapacity = @@ -221,6 +220,11 @@ public LeafQueue(CapacitySchedulerContext cs, this.activeApplications = new TreeSet(applicationComparator); } + // externalizing in method, to allow overriding + protected float getCapacityFromConf() { + return (float)scheduler.getConfiguration().getCapacity(getQueuePath()) / 100; + } + private synchronized void setupQueueConfigs( Resource clusterResource, float capacity, float absoluteCapacity, @@ -483,7 +487,7 @@ synchronized void setUserLimit(int userLimit) { * Set user limit factor - used only for testing. * @param userLimitFactor new user limit factor */ - synchronized void setUserLimitFactor(int userLimitFactor) { + synchronized void setUserLimitFactor(float userLimitFactor) { this.userLimitFactor = userLimitFactor; } @@ -831,7 +835,7 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, getApplication(reservedContainer.getApplicationAttemptId()); synchronized (application) { return assignReservedContainer(application, node, reservedContainer, - clusterResource); + clusterResource); } } @@ -1880,4 +1884,16 @@ public void detachContainer(Resource clusterResource, getParent().detachContainer(clusterResource, application, rmContainer); } } + + public void setCapacity(float capacity) { + this.capacity = capacity; + } + + public void setAbsoluteCapacity(float absoluteCapacity) { + this.absoluteCapacity = absoluteCapacity; + } + + public void setMaxApplications(int maxApplications) { + this.maxApplications = maxApplications; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index aa74be10ac..011c99c50f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -75,7 +75,7 @@ public class ParentQueue implements CSQueue { private float usedCapacity = 0.0f; - private final Set childQueues; + protected final Set childQueues; private final Comparator queueComparator; private Resource usedResources = Resources.createResource(0, 0); @@ -159,7 +159,7 @@ public ParentQueue(CapacitySchedulerContext cs, ", fullname=" + getQueuePath()); } - private synchronized void setupQueueConfigs( + protected synchronized void setupQueueConfigs( Resource clusterResource, float capacity, float absoluteCapacity, float maximumCapacity, float absoluteMaxCapacity, @@ -881,4 +881,8 @@ public void detachContainer(Resource clusterResource, } } } + + public Map getACLs() { + return acls; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java new file mode 100644 index 0000000000..4ada778ae2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java @@ -0,0 +1,193 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This represents a dynamic queue managed by the {@link ReservationSystem}. + * From the user perspective this is equivalent to a LeafQueue that respect + * reservations, but functionality wise is a sub-class of ParentQueue + * + */ +public class PlanQueue extends ParentQueue { + + public static final String DEFAULT_QUEUE_SUFFIX = "-default"; + + private static final Logger LOG = LoggerFactory.getLogger(PlanQueue.class); + + private int maxAppsForReservation; + private int maxAppsPerUserForReservation; + private int userLimit; + private float userLimitFactor; + protected CapacitySchedulerContext schedulerContext; + private boolean showReservationsAsQueues; + + public PlanQueue(CapacitySchedulerContext cs, String queueName, + CSQueue parent, CSQueue old) { + super(cs, queueName, parent, old); + + this.schedulerContext = cs; + // Set the reservation queue attributes for the Plan + CapacitySchedulerConfiguration conf = cs.getConfiguration(); + String queuePath = super.getQueuePath(); + int maxAppsForReservation = conf.getMaximumApplicationsPerQueue(queuePath); + showReservationsAsQueues = conf.getShowReservationAsQueues(queuePath); + if (maxAppsForReservation < 0) { + maxAppsForReservation = + (int) (CapacitySchedulerConfiguration.DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS * super + .getAbsoluteCapacity()); + } + int userLimit = conf.getUserLimit(queuePath); + float userLimitFactor = conf.getUserLimitFactor(queuePath); + int maxAppsPerUserForReservation = + (int) (maxAppsForReservation * (userLimit / 100.0f) * userLimitFactor); + updateQuotas(userLimit, userLimitFactor, maxAppsForReservation, + maxAppsPerUserForReservation); + + StringBuffer queueInfo = new StringBuffer(); + queueInfo.append("Created Plan Queue: ").append(queueName) + .append("\nwith capacity: [").append(super.getCapacity()) + .append("]\nwith max capacity: [").append(super.getMaximumCapacity()) + .append("\nwith max reservation apps: [").append(maxAppsForReservation) + .append("]\nwith max reservation apps per user: [") + .append(maxAppsPerUserForReservation).append("]\nwith user limit: [") + .append(userLimit).append("]\nwith user limit factor: [") + .append(userLimitFactor).append("]."); + LOG.info(queueInfo.toString()); + } + + @Override + public synchronized void reinitialize(CSQueue newlyParsedQueue, + Resource clusterResource) throws IOException { + // Sanity check + if (!(newlyParsedQueue instanceof PlanQueue) + || !newlyParsedQueue.getQueuePath().equals(getQueuePath())) { + throw new IOException("Trying to reinitialize " + getQueuePath() + + " from " + newlyParsedQueue.getQueuePath()); + } + + PlanQueue newlyParsedParentQueue = (PlanQueue) newlyParsedQueue; + + if (newlyParsedParentQueue.getChildQueues().size() > 0) { + throw new IOException( + "Reservable Queue should not have sub-queues in the" + + "configuration"); + } + + // Set new configs + setupQueueConfigs(clusterResource, newlyParsedParentQueue.getCapacity(), + newlyParsedParentQueue.getAbsoluteCapacity(), + newlyParsedParentQueue.getMaximumCapacity(), + newlyParsedParentQueue.getAbsoluteMaximumCapacity(), + newlyParsedParentQueue.getState(), newlyParsedParentQueue.getACLs()); + + updateQuotas(newlyParsedParentQueue.userLimit, + newlyParsedParentQueue.userLimitFactor, + newlyParsedParentQueue.maxAppsForReservation, + newlyParsedParentQueue.maxAppsPerUserForReservation); + + // run reinitialize on each existing queue, to trigger absolute cap + // recomputations + for (CSQueue res : this.getChildQueues()) { + res.reinitialize(res, clusterResource); + } + showReservationsAsQueues = newlyParsedParentQueue.showReservationsAsQueues; + } + + synchronized void addChildQueue(CSQueue newQueue) + throws SchedulerDynamicEditException { + if (newQueue.getCapacity() > 0) { + throw new SchedulerDynamicEditException("Queue " + newQueue + + " being added has non zero capacity."); + } + boolean added = this.childQueues.add(newQueue); + if (LOG.isDebugEnabled()) { + LOG.debug("updateChildQueues (action: add queue): " + added + " " + + getChildQueuesToPrint()); + } + } + + synchronized void removeChildQueue(CSQueue remQueue) + throws SchedulerDynamicEditException { + if (remQueue.getCapacity() > 0) { + throw new SchedulerDynamicEditException("Queue " + remQueue + + " being removed has non zero capacity."); + } + Iterator qiter = childQueues.iterator(); + while (qiter.hasNext()) { + CSQueue cs = qiter.next(); + if (cs.equals(remQueue)) { + qiter.remove(); + if (LOG.isDebugEnabled()) { + LOG.debug("Removed child queue: {}", cs.getQueueName()); + } + } + } + } + + protected synchronized float sumOfChildCapacities() { + float ret = 0; + for (CSQueue l : childQueues) { + ret += l.getCapacity(); + } + return ret; + } + + private void updateQuotas(int userLimit, float userLimitFactor, + int maxAppsForReservation, int maxAppsPerUserForReservation) { + this.userLimit = userLimit; + this.userLimitFactor = userLimitFactor; + this.maxAppsForReservation = maxAppsForReservation; + this.maxAppsPerUserForReservation = maxAppsPerUserForReservation; + } + + /** + * Number of maximum applications for each of the reservations in this Plan. + * + * @return maxAppsForreservation + */ + public int getMaxApplicationsForReservations() { + return maxAppsForReservation; + } + + /** + * Number of maximum applications per user for each of the reservations in + * this Plan. + * + * @return maxAppsPerUserForreservation + */ + public int getMaxApplicationsPerUserForReservation() { + return maxAppsPerUserForReservation; + } + + /** + * User limit value for each of the reservations in this Plan. + * + * @return userLimit + */ + public int getUserLimitForReservation() { + return userLimit; + } + + /** + * User limit factor value for each of the reservations in this Plan. + * + * @return userLimitFactor + */ + public float getUserLimitFactor() { + return userLimitFactor; + } + + /** + * Determine whether to hide/show the ReservationQueues + */ + public boolean showReservationsAsQueues() { + return showReservationsAsQueues; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java new file mode 100644 index 0000000000..48733fce89 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java @@ -0,0 +1,98 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import java.io.IOException; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This represents a dynamic {@link LeafQueue} managed by the + * {@link ReservationSystem} + * + */ +public class ReservationQueue extends LeafQueue { + + private static final Logger LOG = LoggerFactory + .getLogger(ReservationQueue.class); + + private PlanQueue parent; + + private int maxSystemApps; + + public ReservationQueue(CapacitySchedulerContext cs, String queueName, + PlanQueue parent) { + super(cs, queueName, parent, null); + maxSystemApps = cs.getConfiguration().getMaximumSystemApplications(); + // the following parameters are common to all reservation in the plan + updateQuotas(parent.getUserLimitForReservation(), + parent.getUserLimitFactor(), + parent.getMaxApplicationsForReservations(), + parent.getMaxApplicationsPerUserForReservation()); + this.parent = parent; + } + + @Override + public synchronized void reinitialize(CSQueue newlyParsedQueue, + Resource clusterResource) throws IOException { + // Sanity check + if (!(newlyParsedQueue instanceof ReservationQueue) + || !newlyParsedQueue.getQueuePath().equals(getQueuePath())) { + throw new IOException("Trying to reinitialize " + getQueuePath() + + " from " + newlyParsedQueue.getQueuePath()); + } + CSQueueUtils.updateQueueStatistics( + parent.schedulerContext.getResourceCalculator(), newlyParsedQueue, + parent, parent.schedulerContext.getClusterResource(), + parent.schedulerContext.getMinimumResourceCapability()); + super.reinitialize(newlyParsedQueue, clusterResource); + updateQuotas(parent.getUserLimitForReservation(), + parent.getUserLimitFactor(), + parent.getMaxApplicationsForReservations(), + parent.getMaxApplicationsPerUserForReservation()); + } + + /** + * This methods to change capacity for a queue and adjusts its + * absoluteCapacity + * + * @param entitlement the new entitlement for the queue (capacity, + * maxCapacity, etc..) + * @throws SchedulerDynamicEditException + */ + public synchronized void setEntitlement(QueueEntitlement entitlement) + throws SchedulerDynamicEditException { + float capacity = entitlement.getCapacity(); + if (capacity < 0 || capacity > 1.0f) { + throw new SchedulerDynamicEditException( + "Capacity demand is not in the [0,1] range: " + capacity); + } + setCapacity(capacity); + setAbsoluteCapacity(getParent().getAbsoluteCapacity() * getCapacity()); + setMaxApplications((int) (maxSystemApps * getAbsoluteCapacity())); + // note: we currently set maxCapacity to capacity + // this might be revised later + setMaxCapacity(entitlement.getMaxCapacity()); + if (LOG.isDebugEnabled()) { + LOG.debug("successfully changed to " + capacity + " for queue " + + this.getQueueName()); + } + } + + private void updateQuotas(int userLimit, float userLimitFactor, + int maxAppsForReservation, int maxAppsPerUserForReservation) { + setUserLimit(userLimit); + setUserLimitFactor(userLimitFactor); + setMaxApplications(maxAppsForReservation); + maxApplicationsPerUser = maxAppsPerUserForReservation; + } + + // used by the super constructor, we initialize to zero + protected float getCapacityFromConf() { + return 0f; + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/QueueEntitlement.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/QueueEntitlement.java new file mode 100644 index 0000000000..a348e13fd9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/QueueEntitlement.java @@ -0,0 +1,28 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common; + +public class QueueEntitlement { + + private float capacity; + private float maxCapacity; + + public QueueEntitlement(float capacity, float maxCapacity){ + this.setCapacity(capacity); + this.maxCapacity = maxCapacity; + } + + public float getMaxCapacity() { + return maxCapacity; + } + + public void setMaxCapacity(float maxCapacity) { + this.maxCapacity = maxCapacity; + } + + public float getCapacity() { + return capacity; + } + + public void setCapacity(float capacity) { + this.capacity = capacity; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerQueueInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerQueueInfo.java index ac16ce0e63..0a8faadccb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerQueueInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerQueueInfo.java @@ -25,6 +25,7 @@ import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.PlanQueue; @XmlRootElement @XmlAccessorType(XmlAccessType.FIELD) @@ -48,6 +49,7 @@ public class CapacitySchedulerQueueInfo { protected QueueState state; protected CapacitySchedulerQueueInfoList queues; protected ResourceInfo resourcesUsed; + private boolean hideReservationQueues = true; CapacitySchedulerQueueInfo() { }; @@ -69,6 +71,10 @@ public class CapacitySchedulerQueueInfo { queueName = q.getQueueName(); state = q.getState(); resourcesUsed = new ResourceInfo(q.getUsedResources()); + if(q instanceof PlanQueue && + ((PlanQueue)q).showReservationsAsQueues()) { + hideReservationQueues = false; + } } public float getCapacity() { @@ -112,6 +118,9 @@ public String getQueuePath() { } public CapacitySchedulerQueueInfoList getQueues() { + if(hideReservationQueues) { + return new CapacitySchedulerQueueInfoList(); + } return this.queues; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index e5f4df2959..f7c098c209 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -410,7 +410,7 @@ null, new RMContainerTokenSecretManager(conf), cs.stop(); } - private void checkQueueCapacities(CapacityScheduler cs, + void checkQueueCapacities(CapacityScheduler cs, float capacityA, float capacityB) { CSQueue rootQueue = cs.getRootQueue(); CSQueue queueA = findQueue(rootQueue, A); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerDynamicBehavior.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerDynamicBehavior.java new file mode 100644 index 0000000000..aecbfa8500 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerDynamicBehavior.java @@ -0,0 +1,282 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestCapacitySchedulerDynamicBehavior { + private static final Log LOG = LogFactory + .getLog(TestCapacitySchedulerDynamicBehavior.class); + private static final String A = CapacitySchedulerConfiguration.ROOT + ".a"; + private static final String B = CapacitySchedulerConfiguration.ROOT + ".b"; + private static final String B1 = B + ".b1"; + private static final String B2 = B + ".b2"; + private static final String B3 = B + ".b3"; + private static float A_CAPACITY = 10.5f; + private static float B_CAPACITY = 89.5f; + private static float A1_CAPACITY = 30; + private static float A2_CAPACITY = 70; + private static float B1_CAPACITY = 79.2f; + private static float B2_CAPACITY = 0.8f; + private static float B3_CAPACITY = 20; + + private final TestCapacityScheduler tcs = new TestCapacityScheduler(); + + private int GB = 1024; + + private MockRM rm; + + @Before + public void setUp() { + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + setupPlanQueueConfiguration(conf); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + conf.setBoolean(YarnConfiguration.RM_RESERVATIONS_ENABLE, false); + rm = new MockRM(conf); + rm.start(); + } + + @Test + public void testRefreshQueuesWithReservations() throws Exception { + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + + // Test add one reservation dynamically and manually modify capacity + ReservationQueue a1 = + new ReservationQueue(cs, "a1", (PlanQueue) cs.getQueue("a")); + cs.addQueue(a1); + a1.setEntitlement(new QueueEntitlement(A1_CAPACITY / 100, 1f)); + + // Test add another reservation queue and use setEntitlement to modify + // capacity + ReservationQueue a2 = + new ReservationQueue(cs, "a2", (PlanQueue) cs.getQueue("a")); + cs.addQueue(a2); + cs.setEntitlement("a2", new QueueEntitlement(A2_CAPACITY / 100, 1.0f)); + + // Verify all allocations match + tcs.checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY); + + // Reinitialize and verify all dynamic queued survived + CapacitySchedulerConfiguration conf = cs.getConfiguration(); + conf.setCapacity(A, 80f); + conf.setCapacity(B, 20f); + cs.reinitialize(conf, rm.getRMContext()); + + tcs.checkQueueCapacities(cs, 80f, 20f); + } + + @Test + public void testAddQueueFailCases() throws Exception { + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + + try { + // Test invalid addition (adding non-zero size queue) + ReservationQueue a1 = + new ReservationQueue(cs, "a1", (PlanQueue) cs.getQueue("a")); + a1.setEntitlement(new QueueEntitlement(A1_CAPACITY / 100, 1f)); + cs.addQueue(a1); + fail(); + } catch (Exception e) { + // expected + } + + // Test add one reservation dynamically and manually modify capacity + ReservationQueue a1 = + new ReservationQueue(cs, "a1", (PlanQueue) cs.getQueue("a")); + cs.addQueue(a1); + a1.setEntitlement(new QueueEntitlement(A1_CAPACITY / 100, 1f)); + + // Test add another reservation queue and use setEntitlement to modify + // capacity + ReservationQueue a2 = + new ReservationQueue(cs, "a2", (PlanQueue) cs.getQueue("a")); + + cs.addQueue(a2); + + try { + // Test invalid entitlement (sum of queues exceed 100%) + cs.setEntitlement("a2", new QueueEntitlement(A2_CAPACITY / 100 + 0.1f, + 1.0f)); + fail(); + } catch (Exception e) { + // expected + } + + cs.setEntitlement("a2", new QueueEntitlement(A2_CAPACITY / 100, 1.0f)); + + // Verify all allocations match + tcs.checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY); + + cs.stop(); + } + + @Test + public void testRemoveQueue() throws Exception { + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + + // Test add one reservation dynamically and manually modify capacity + ReservationQueue a1 = + new ReservationQueue(cs, "a1", (PlanQueue) cs.getQueue("a")); + cs.addQueue(a1); + a1.setEntitlement(new QueueEntitlement(A1_CAPACITY / 100, 1f)); + + // submit an app + RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1"); + // check preconditions + List appsInA1 = cs.getAppsInQueue("a1"); + assertEquals(1, appsInA1.size()); + try { + cs.removeQueue("a1"); + fail(); + } catch (SchedulerDynamicEditException s) { + // expected a1 contains applications + } + // clear queue by killling all apps + cs.killAllAppsInQueue("a1"); + // wait for events of move to propagate + rm.waitForState(app.getApplicationId(), RMAppState.KILLED); + + try { + cs.removeQueue("a1"); + fail(); + } catch (SchedulerDynamicEditException s) { + // expected a1 is not zero capacity + } + // set capacity to zero + cs.setEntitlement("a1", new QueueEntitlement(0f, 0f)); + cs.removeQueue("a1"); + + assertTrue(cs.getQueue("a1") == null); + + rm.stop(); + } + + @Test + public void testMoveAppToPlanQueue() throws Exception { + CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler(); + + // submit an app + RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "b1"); + ApplicationAttemptId appAttemptId = + rm.getApplicationReport(app.getApplicationId()) + .getCurrentApplicationAttemptId(); + + // check preconditions + List appsInB1 = scheduler.getAppsInQueue("b1"); + assertEquals(1, appsInB1.size()); + + List appsInB = scheduler.getAppsInQueue("b"); + assertEquals(1, appsInB.size()); + assertTrue(appsInB.contains(appAttemptId)); + + List appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.isEmpty()); + + String queue = + scheduler.getApplicationAttempt(appsInB1.get(0)).getQueue() + .getQueueName(); + Assert.assertTrue(queue.equals("b1")); + + List appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(1, appsInRoot.size()); + + // create the default reservation queue + String defQName = "a" + PlanQueue.DEFAULT_QUEUE_SUFFIX; + ReservationQueue defQ = + new ReservationQueue(scheduler, defQName, + (PlanQueue) scheduler.getQueue("a")); + scheduler.addQueue(defQ); + defQ.setEntitlement(new QueueEntitlement(1f, 1f)); + + List appsInDefQ = scheduler.getAppsInQueue(defQName); + assertTrue(appsInDefQ.isEmpty()); + + // now move the app to plan queue + scheduler.moveApplication(app.getApplicationId(), "a"); + + // check postconditions + appsInDefQ = scheduler.getAppsInQueue(defQName); + assertEquals(1, appsInDefQ.size()); + queue = + scheduler.getApplicationAttempt(appsInDefQ.get(0)).getQueue() + .getQueueName(); + Assert.assertTrue(queue.equals(defQName)); + + appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.contains(appAttemptId)); + assertEquals(1, appsInA.size()); + + appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(1, appsInRoot.size()); + + appsInB1 = scheduler.getAppsInQueue("b1"); + assertTrue(appsInB1.isEmpty()); + + appsInB = scheduler.getAppsInQueue("b"); + assertTrue(appsInB.isEmpty()); + + rm.stop(); + } + + private void setupPlanQueueConfiguration(CapacitySchedulerConfiguration conf) { + + conf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[] { "a", "b" }); + + conf.setCapacity(A, A_CAPACITY); + conf.setCapacity(B, B_CAPACITY); + + // Define 2nd-level queues + conf.setQueues(B, new String[] { "b1", "b2", "b3" }); + conf.setCapacity(B1, B1_CAPACITY); + conf.setUserLimitFactor(B1, 100.0f); + conf.setCapacity(B2, B2_CAPACITY); + conf.setUserLimitFactor(B2, 100.0f); + conf.setCapacity(B3, B3_CAPACITY); + conf.setUserLimitFactor(B3, 100.0f); + + conf.setReservableQueue(A, true); + conf.setReservationWindow(A, 86400 * 1000); + conf.setAverageCapacity(A, 1.0f); + + LOG.info("Setup a as a plan queue"); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservationQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservationQueue.java new file mode 100644 index 0000000000..c53b7a978d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservationQueue.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; +import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.Before; +import org.junit.Test; + +public class TestReservationQueue { + + CapacitySchedulerConfiguration csConf; + CapacitySchedulerContext csContext; + final static int GB = 1024; + private final ResourceCalculator resourceCalculator = + new DefaultResourceCalculator(); + ReservationQueue reservationQueue; + + @Before + public void setup() { + + // setup a context / conf + csConf = new CapacitySchedulerConfiguration(); + YarnConfiguration conf = new YarnConfiguration(); + csContext = mock(CapacitySchedulerContext.class); + when(csContext.getConfiguration()).thenReturn(csConf); + when(csContext.getConf()).thenReturn(conf); + when(csContext.getMinimumResourceCapability()).thenReturn( + Resources.createResource(GB, 1)); + when(csContext.getMaximumResourceCapability()).thenReturn( + Resources.createResource(16 * GB, 32)); + when(csContext.getClusterResource()).thenReturn( + Resources.createResource(100 * 16 * GB, 100 * 32)); + when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); + + // create a queue + PlanQueue pq = new PlanQueue(csContext, "root", null, null); + reservationQueue = new ReservationQueue(csContext, "a", pq); + + } + + @Test + public void testAddSubtractCapacity() throws Exception { + + // verify that setting, adding, subtracting capacity works + reservationQueue.setCapacity(1.0F); + assertTrue(" actual capacity: " + reservationQueue.getCapacity(), + reservationQueue.getCapacity() - 1 < CSQueueUtils.EPSILON); + reservationQueue.setEntitlement(new QueueEntitlement(0.9f, 1f)); + assertTrue(" actual capacity: " + reservationQueue.getCapacity(), + reservationQueue.getCapacity() - 0.9 < CSQueueUtils.EPSILON); + reservationQueue.setEntitlement(new QueueEntitlement(1f, 1f)); + assertTrue(" actual capacity: " + reservationQueue.getCapacity(), + reservationQueue.getCapacity() - 1 < CSQueueUtils.EPSILON); + reservationQueue.setEntitlement(new QueueEntitlement(0f, 1f)); + assertTrue(" actual capacity: " + reservationQueue.getCapacity(), + reservationQueue.getCapacity() < CSQueueUtils.EPSILON); + + try { + reservationQueue.setEntitlement(new QueueEntitlement(1.1f, 1f)); + fail(); + } catch (SchedulerDynamicEditException iae) { + // expected + assertTrue(" actual capacity: " + reservationQueue.getCapacity(), + reservationQueue.getCapacity() - 1 < CSQueueUtils.EPSILON); + } + + try { + reservationQueue.setEntitlement(new QueueEntitlement(-0.1f, 1f)); + fail(); + } catch (SchedulerDynamicEditException iae) { + // expected + assertTrue(" actual capacity: " + reservationQueue.getCapacity(), + reservationQueue.getCapacity() - 1 < CSQueueUtils.EPSILON); + } + + } +}