YARN-10918. Simplify method: CapacitySchedulerQueueManager#parseQueue. Contributed by Andras Gyori

This commit is contained in:
Szilard Nemeth 2022-03-09 19:36:09 +01:00
parent 2ece95064b
commit db8ae4b654
4 changed files with 73 additions and 73 deletions

View File

@ -95,6 +95,13 @@ public interface CSQueue extends SchedulerQueue<CSQueue> {
*/ */
QueuePath getQueuePathObject(); QueuePath getQueuePathObject();
/**
* Checks whether the queue is a dynamic queue (created dynamically in the fashion of auto queue
* creation v2).
* @return true, if it is a dynamic queue, false otherwise
*/
boolean isDynamicQueue();
public PrivilegedEntity getPrivilegedEntity(); public PrivilegedEntity getPrivilegedEntity();
Resource getMaximumAllocation(); Resource getMaximumAllocation();

View File

@ -43,13 +43,11 @@
import org.apache.hadoop.yarn.security.Permission; import org.apache.hadoop.yarn.security.Permission;
import org.apache.hadoop.yarn.security.YarnAuthorizationProvider; import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationConstants;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueStateManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueStateManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerQueueManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerQueueManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager; import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager;
import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.classification.VisibleForTesting;
@ -231,99 +229,62 @@ public void reinitializeQueues(CapacitySchedulerConfiguration newConf)
* @throws IOException * @throws IOException
*/ */
static CSQueue parseQueue( static CSQueue parseQueue(
CapacitySchedulerQueueContext queueContext, CapacitySchedulerQueueContext queueContext, CapacitySchedulerConfiguration conf,
CapacitySchedulerConfiguration conf, CSQueue parent, String queueName, CSQueueStore newQueues, CSQueueStore oldQueues,
CSQueue parent, String queueName,
CSQueueStore newQueues,
CSQueueStore oldQueues,
QueueHook hook) throws IOException { QueueHook hook) throws IOException {
CSQueue queue; CSQueue queue;
String fullQueueName = (parent == null) ? String fullQueueName = (parent == null) ? queueName :
queueName : (QueuePath.createFromQueues(parent.getQueuePath(), queueName).getFullPath());
(parent.getQueuePath() + "." + queueName);
String[] staticChildQueueNames = conf.getQueues(fullQueueName); String[] staticChildQueueNames = conf.getQueues(fullQueueName);
List<String> childQueueNames = staticChildQueueNames != null ? List<String> childQueueNames = staticChildQueueNames != null ?
Arrays.asList(staticChildQueueNames) : Collections.emptyList(); Arrays.asList(staticChildQueueNames) : Collections.emptyList();
CSQueue oldQueue = oldQueues.get(fullQueueName);
boolean isReservableQueue = conf.isReservable(fullQueueName); boolean isReservableQueue = conf.isReservable(fullQueueName);
boolean isAutoCreateEnabled = conf.isAutoCreateChildQueueEnabled( boolean isAutoCreateEnabled = conf.isAutoCreateChildQueueEnabled(fullQueueName);
fullQueueName); // if a queue is eligible for auto queue creation v2 it must be a ParentQueue
// if a queue is eligible for auto queue creation v2 // (even if it is empty)
// it must be a ParentQueue (even if it is empty) final boolean isDynamicParent = oldQueue instanceof ParentQueue && oldQueue.isDynamicQueue();
boolean isAutoQueueCreationV2Enabled = conf.isAutoQueueCreationV2Enabled( boolean isAutoQueueCreationEnabledParent = isDynamicParent || conf.isAutoQueueCreationV2Enabled(
fullQueueName); fullQueueName) || isAutoCreateEnabled;
boolean isDynamicParent = false;
// Auto created parent queues might not have static children, but they if (childQueueNames.size() == 0 && !isAutoQueueCreationEnabledParent) {
// must be kept as a ParentQueue validateParent(parent, queueName);
CSQueue oldQueue = oldQueues.get(fullQueueName); // Check if the queue will be dynamically managed by the Reservation system
if (oldQueue instanceof ParentQueue) {
isDynamicParent = ((ParentQueue) oldQueue).isDynamicQueue();
}
if (childQueueNames.size() == 0 && !isDynamicParent &&
!isAutoQueueCreationV2Enabled) {
if (null == parent) {
throw new IllegalStateException(
"Queue configuration missing child queue names for " + queueName);
}
// Check if the queue will be dynamically managed by the Reservation
// system
if (isReservableQueue) { if (isReservableQueue) {
queue = new PlanQueue(queueContext, queueName, parent, queue = new PlanQueue(queueContext, queueName, parent, oldQueues.get(fullQueueName));
oldQueues.get(fullQueueName)); ReservationQueue defaultResQueue = ((PlanQueue) queue).initializeDefaultInternalQueue();
newQueues.add(defaultResQueue);
//initializing the "internal" default queue, for SLS compatibility } else {
String defReservationId = queue = new LeafQueue(queueContext, queueName, parent, oldQueues.get(fullQueueName));
queueName + ReservationConstants.DEFAULT_QUEUE_SUFFIX;
List<CSQueue> childQueues = new ArrayList<>();
ReservationQueue resQueue = new ReservationQueue(queueContext,
defReservationId, (PlanQueue) queue);
try {
resQueue.setEntitlement(new QueueEntitlement(1.0f, 1.0f));
} catch (SchedulerDynamicEditException e) {
throw new IllegalStateException(e);
}
childQueues.add(resQueue);
((PlanQueue) queue).setChildQueues(childQueues);
newQueues.add(resQueue);
} else if (isAutoCreateEnabled) {
queue = new ManagedParentQueue(queueContext, queueName, parent,
oldQueues.get(fullQueueName));
} else{
queue = new LeafQueue(queueContext, queueName, parent,
oldQueues.get(fullQueueName));
// Used only for unit tests
queue = hook.hook(queue);
} }
} else{
queue = hook.hook(queue);
} else {
if (isReservableQueue) { if (isReservableQueue) {
throw new IllegalStateException( throw new IllegalStateException("Only Leaf Queues can be reservable for " + fullQueueName);
"Only Leaf Queues can be reservable for " + fullQueueName);
} }
ParentQueue parentQueue; ParentQueue parentQueue;
if (isAutoCreateEnabled) { if (isAutoCreateEnabled) {
parentQueue = new ManagedParentQueue(queueContext, queueName, parent, parentQueue = new ManagedParentQueue(queueContext, queueName, parent, oldQueues.get(
oldQueues.get(fullQueueName)); fullQueueName));
} else{ } else {
parentQueue = new ParentQueue(queueContext, queueName, parent, parentQueue = new ParentQueue(queueContext, queueName, parent, oldQueues.get(
oldQueues.get(fullQueueName)); fullQueueName));
} }
// Used only for unit tests
queue = hook.hook(parentQueue); queue = hook.hook(parentQueue);
List<CSQueue> childQueues = new ArrayList<>(); List<CSQueue> childQueues = new ArrayList<>();
for (String childQueueName : childQueueNames) { for (String childQueueName : childQueueNames) {
CSQueue childQueue = parseQueue(queueContext, conf, queue, childQueueName, CSQueue childQueue = parseQueue(queueContext, conf, queue, childQueueName, newQueues,
newQueues, oldQueues, hook); oldQueues, hook);
childQueues.add(childQueue); childQueues.add(childQueue);
} }
parentQueue.setChildQueues(childQueues);
if (!childQueues.isEmpty()) {
parentQueue.setChildQueues(childQueues);
}
} }
@ -721,4 +682,11 @@ private boolean isDanglingDynamicQueue(
// that existingQueues contain valid dynamic queues. // that existingQueues contain valid dynamic queues.
return !isDynamicQueue(parent); return !isDynamicQueue(parent);
} }
private static void validateParent(CSQueue parent, String queueName) {
if (parent == null) {
throw new IllegalStateException("Queue configuration missing child queue names for "
+ queueName);
}
}
} }

View File

@ -21,8 +21,10 @@
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationConstants;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -123,6 +125,23 @@ public void reinitialize(CSQueue newlyParsedQueue,
} }
} }
public ReservationQueue initializeDefaultInternalQueue() throws IOException {
//initializing the "internal" default queue, for SLS compatibility
String defReservationId =
getQueueName() + ReservationConstants.DEFAULT_QUEUE_SUFFIX;
ReservationQueue resQueue = new ReservationQueue(queueContext,
defReservationId, this);
try {
resQueue.initializeEntitlements();
} catch (SchedulerDynamicEditException e) {
throw new IllegalStateException(e);
}
childQueues.add(resQueue);
return resQueue;
}
private void updateQuotas(float newUserLimit, float newUserLimitFactor, private void updateQuotas(float newUserLimit, float newUserLimitFactor,
int newMaxAppsForReservation, int newMaxAppsPerUserForReservation) { int newMaxAppsForReservation, int newMaxAppsPerUserForReservation) {
this.userLimit = newUserLimit; this.userLimit = newUserLimit;

View File

@ -22,6 +22,8 @@
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -74,6 +76,10 @@ public void reinitialize(CSQueue newlyParsedQueue,
} }
} }
public void initializeEntitlements() throws SchedulerDynamicEditException {
setEntitlement(new QueueEntitlement(1.0f, 1.0f));
}
private void updateQuotas(float userLimit, float userLimitFactor, private void updateQuotas(float userLimit, float userLimitFactor,
int maxAppsForReservation, int maxAppsPerUserForReservation) { int maxAppsForReservation, int maxAppsPerUserForReservation) {
setUserLimit(userLimit); setUserLimit(userLimit);