From edb9cd0f7aa1ecaf34afaa120e3d79583e0ec689 Mon Sep 17 00:00:00 2001 From: Karthik Kambatla Date: Wed, 3 Jun 2015 13:47:24 -0700 Subject: [PATCH] YARN-3762. FairScheduler: CME on FSParentQueue#getQueueUserAclInfo. (kasha) --- hadoop-yarn-project/CHANGES.txt | 2 + .../scheduler/fair/FSParentQueue.java | 219 +++++++++++++----- .../scheduler/fair/QueueManager.java | 3 +- 3 files changed, 164 insertions(+), 60 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 64f6abbd07..794863777e 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -482,6 +482,8 @@ Release 2.8.0 - UNRELEASED YARN-3751. Fixed AppInfo to check if used resources are null. (Sunil G via zjshen) + YARN-3762. FairScheduler: CME on FSParentQueue#getQueueUserAclInfo. (kasha) + Release 2.7.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java index f74106a7da..7d2e5b8f95 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java @@ -23,6 +23,9 @@ import java.util.Collections; import java.util.Comparator; import java.util.List; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -44,36 +47,64 @@ public class FSParentQueue extends FSQueue { private static final Log LOG = LogFactory.getLog( FSParentQueue.class.getName()); - private final List childQueues = - new ArrayList(); + private final List childQueues = new ArrayList<>(); private Resource demand = Resources.createResource(0); private int runnableApps; - + + private ReadWriteLock rwLock = new ReentrantReadWriteLock(); + private Lock readLock = rwLock.readLock(); + private Lock writeLock = rwLock.writeLock(); + public FSParentQueue(String name, FairScheduler scheduler, FSParentQueue parent) { super(name, scheduler, parent); } public void addChildQueue(FSQueue child) { - childQueues.add(child); + writeLock.lock(); + try { + childQueues.add(child); + } finally { + writeLock.unlock(); + } + } + + public void removeChildQueue(FSQueue child) { + writeLock.lock(); + try { + childQueues.remove(child); + } finally { + writeLock.unlock(); + } } @Override public void recomputeShares() { - policy.computeShares(childQueues, getFairShare()); - for (FSQueue childQueue : childQueues) { - childQueue.getMetrics().setFairShare(childQueue.getFairShare()); - childQueue.recomputeShares(); + readLock.lock(); + try { + policy.computeShares(childQueues, getFairShare()); + for (FSQueue childQueue : childQueues) { + childQueue.getMetrics().setFairShare(childQueue.getFairShare()); + childQueue.recomputeShares(); + } + } finally { + readLock.unlock(); } } public void recomputeSteadyShares() { - policy.computeSteadyShares(childQueues, getSteadyFairShare()); - for (FSQueue childQueue : childQueues) { - childQueue.getMetrics().setSteadyFairShare(childQueue.getSteadyFairShare()); - if (childQueue instanceof FSParentQueue) { - ((FSParentQueue) childQueue).recomputeSteadyShares(); + readLock.lock(); + try { + policy.computeSteadyShares(childQueues, getSteadyFairShare()); + for (FSQueue childQueue : childQueues) { + childQueue.getMetrics() + .setSteadyFairShare(childQueue.getSteadyFairShare()); + if (childQueue instanceof FSParentQueue) { + ((FSParentQueue) childQueue).recomputeSteadyShares(); + } } + } finally { + readLock.unlock(); } } @@ -81,21 +112,37 @@ public void recomputeSteadyShares() { public void updatePreemptionVariables() { super.updatePreemptionVariables(); // For child queues - for (FSQueue childQueue : childQueues) { - childQueue.updatePreemptionVariables(); + + readLock.lock(); + try { + for (FSQueue childQueue : childQueues) { + childQueue.updatePreemptionVariables(); + } + } finally { + readLock.unlock(); } } @Override public Resource getDemand() { - return demand; + readLock.lock(); + try { + return Resource.newInstance(demand.getMemory(), demand.getVirtualCores()); + } finally { + readLock.unlock(); + } } @Override public Resource getResourceUsage() { Resource usage = Resources.createResource(0); - for (FSQueue child : childQueues) { - Resources.addTo(usage, child.getResourceUsage()); + readLock.lock(); + try { + for (FSQueue child : childQueues) { + Resources.addTo(usage, child.getResourceUsage()); + } + } finally { + readLock.unlock(); } return usage; } @@ -106,20 +153,25 @@ public void updateDemand() { // Limit demand to maxResources Resource maxRes = scheduler.getAllocationConfiguration() .getMaxResources(getName()); - demand = Resources.createResource(0); - for (FSQueue childQueue : childQueues) { - childQueue.updateDemand(); - Resource toAdd = childQueue.getDemand(); - if (LOG.isDebugEnabled()) { - LOG.debug("Counting resource from " + childQueue.getName() + " " + - toAdd + "; Total resource consumption for " + getName() + - " now " + demand); - } - demand = Resources.add(demand, toAdd); - demand = Resources.componentwiseMin(demand, maxRes); - if (Resources.equals(demand, maxRes)) { - break; + writeLock.lock(); + try { + demand = Resources.createResource(0); + for (FSQueue childQueue : childQueues) { + childQueue.updateDemand(); + Resource toAdd = childQueue.getDemand(); + if (LOG.isDebugEnabled()) { + LOG.debug("Counting resource from " + childQueue.getName() + " " + + toAdd + "; Total resource consumption for " + getName() + + " now " + demand); + } + demand = Resources.add(demand, toAdd); + demand = Resources.componentwiseMin(demand, maxRes); + if (Resources.equals(demand, maxRes)) { + break; + } } + } finally { + writeLock.unlock(); } if (LOG.isDebugEnabled()) { LOG.debug("The updated demand for " + getName() + " is " + demand + @@ -127,33 +179,31 @@ public void updateDemand() { } } - private synchronized QueueUserACLInfo getUserAclInfo( - UserGroupInformation user) { - QueueUserACLInfo userAclInfo = - recordFactory.newRecordInstance(QueueUserACLInfo.class); - List operations = new ArrayList(); + private QueueUserACLInfo getUserAclInfo(UserGroupInformation user) { + List operations = new ArrayList<>(); for (QueueACL operation : QueueACL.values()) { if (hasAccess(operation, user)) { operations.add(operation); } } - - userAclInfo.setQueueName(getQueueName()); - userAclInfo.setUserAcls(operations); - return userAclInfo; + return QueueUserACLInfo.newInstance(getQueueName(), operations); } @Override - public synchronized List getQueueUserAclInfo( - UserGroupInformation user) { + public List getQueueUserAclInfo(UserGroupInformation user) { List userAcls = new ArrayList(); // Add queue acls userAcls.add(getUserAclInfo(user)); // Add children queue acls - for (FSQueue child : childQueues) { - userAcls.addAll(child.getQueueUserAclInfo(user)); + readLock.lock(); + try { + for (FSQueue child : childQueues) { + userAcls.addAll(child.getQueueUserAclInfo(user)); + } + } finally { + readLock.unlock(); } return userAcls; @@ -168,12 +218,32 @@ public Resource assignContainer(FSSchedulerNode node) { return assigned; } - Collections.sort(childQueues, policy.getComparator()); - for (FSQueue child : childQueues) { - assigned = child.assignContainer(node); - if (!Resources.equals(assigned, Resources.none())) { - break; + // Hold the write lock when sorting childQueues + writeLock.lock(); + try { + Collections.sort(childQueues, policy.getComparator()); + } finally { + writeLock.unlock(); + } + + /* + * We are releasing the lock between the sort and iteration of the + * "sorted" list. There could be changes to the list here: + * 1. Add a child queue to the end of the list, this doesn't affect + * container assignment. + * 2. Remove a child queue, this is probably good to take care of so we + * don't assign to a queue that is going to be removed shortly. + */ + readLock.lock(); + try { + for (FSQueue child : childQueues) { + assigned = child.assignContainer(node); + if (!Resources.equals(assigned, Resources.none())) { + break; + } } + } finally { + readLock.unlock(); } return assigned; } @@ -185,11 +255,17 @@ public RMContainer preemptContainer() { // Find the childQueue which is most over fair share FSQueue candidateQueue = null; Comparator comparator = policy.getComparator(); - for (FSQueue queue : childQueues) { - if (candidateQueue == null || - comparator.compare(queue, candidateQueue) > 0) { - candidateQueue = queue; + + readLock.lock(); + try { + for (FSQueue queue : childQueues) { + if (candidateQueue == null || + comparator.compare(queue, candidateQueue) > 0) { + candidateQueue = queue; + } } + } finally { + readLock.unlock(); } // Let the selected queue choose which of its container to preempt @@ -201,7 +277,12 @@ public RMContainer preemptContainer() { @Override public List getChildQueues() { - return childQueues; + readLock.lock(); + try { + return Collections.unmodifiableList(childQueues); + } finally { + readLock.unlock(); + } } @Override @@ -218,23 +299,43 @@ public void setPolicy(SchedulingPolicy policy) } public void incrementRunnableApps() { - runnableApps++; + writeLock.lock(); + try { + runnableApps++; + } finally { + writeLock.unlock(); + } } public void decrementRunnableApps() { - runnableApps--; + writeLock.lock(); + try { + runnableApps--; + } finally { + writeLock.unlock(); + } } @Override public int getNumRunnableApps() { - return runnableApps; + readLock.lock(); + try { + return runnableApps; + } finally { + readLock.unlock(); + } } @Override public void collectSchedulerApplications( Collection apps) { - for (FSQueue childQueue : childQueues) { - childQueue.collectSchedulerApplications(apps); + readLock.lock(); + try { + for (FSQueue childQueue : childQueues) { + childQueue.collectSchedulerApplications(apps); + } + } finally { + readLock.unlock(); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java index 64442abf2c..6556717ba4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java @@ -304,7 +304,8 @@ private void removeQueue(FSQueue queue) { } } queues.remove(queue.getName()); - queue.getParent().getChildQueues().remove(queue); + FSParentQueue parent = queue.getParent(); + parent.removeChildQueue(queue); } /**