YARN-3762. FairScheduler: CME on FSParentQueue#getQueueUserAclInfo. (kasha)
This commit is contained in:
parent
107da29ff9
commit
edb9cd0f7a
@ -482,6 +482,8 @@ Release 2.8.0 - UNRELEASED
|
||||
YARN-3751. Fixed AppInfo to check if used resources are null. (Sunil G via
|
||||
zjshen)
|
||||
|
||||
YARN-3762. FairScheduler: CME on FSParentQueue#getQueueUserAclInfo. (kasha)
|
||||
|
||||
Release 2.7.1 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -23,6 +23,9 @@
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
@ -44,36 +47,64 @@ public class FSParentQueue extends FSQueue {
|
||||
private static final Log LOG = LogFactory.getLog(
|
||||
FSParentQueue.class.getName());
|
||||
|
||||
private final List<FSQueue> childQueues =
|
||||
new ArrayList<FSQueue>();
|
||||
private final List<FSQueue> childQueues = new ArrayList<>();
|
||||
private Resource demand = Resources.createResource(0);
|
||||
private int runnableApps;
|
||||
|
||||
private ReadWriteLock rwLock = new ReentrantReadWriteLock();
|
||||
private Lock readLock = rwLock.readLock();
|
||||
private Lock writeLock = rwLock.writeLock();
|
||||
|
||||
public FSParentQueue(String name, FairScheduler scheduler,
|
||||
FSParentQueue parent) {
|
||||
super(name, scheduler, parent);
|
||||
}
|
||||
|
||||
public void addChildQueue(FSQueue child) {
|
||||
childQueues.add(child);
|
||||
writeLock.lock();
|
||||
try {
|
||||
childQueues.add(child);
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public void removeChildQueue(FSQueue child) {
|
||||
writeLock.lock();
|
||||
try {
|
||||
childQueues.remove(child);
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void recomputeShares() {
|
||||
policy.computeShares(childQueues, getFairShare());
|
||||
for (FSQueue childQueue : childQueues) {
|
||||
childQueue.getMetrics().setFairShare(childQueue.getFairShare());
|
||||
childQueue.recomputeShares();
|
||||
readLock.lock();
|
||||
try {
|
||||
policy.computeShares(childQueues, getFairShare());
|
||||
for (FSQueue childQueue : childQueues) {
|
||||
childQueue.getMetrics().setFairShare(childQueue.getFairShare());
|
||||
childQueue.recomputeShares();
|
||||
}
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public void recomputeSteadyShares() {
|
||||
policy.computeSteadyShares(childQueues, getSteadyFairShare());
|
||||
for (FSQueue childQueue : childQueues) {
|
||||
childQueue.getMetrics().setSteadyFairShare(childQueue.getSteadyFairShare());
|
||||
if (childQueue instanceof FSParentQueue) {
|
||||
((FSParentQueue) childQueue).recomputeSteadyShares();
|
||||
readLock.lock();
|
||||
try {
|
||||
policy.computeSteadyShares(childQueues, getSteadyFairShare());
|
||||
for (FSQueue childQueue : childQueues) {
|
||||
childQueue.getMetrics()
|
||||
.setSteadyFairShare(childQueue.getSteadyFairShare());
|
||||
if (childQueue instanceof FSParentQueue) {
|
||||
((FSParentQueue) childQueue).recomputeSteadyShares();
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@ -81,21 +112,37 @@ public void recomputeSteadyShares() {
|
||||
public void updatePreemptionVariables() {
|
||||
super.updatePreemptionVariables();
|
||||
// For child queues
|
||||
for (FSQueue childQueue : childQueues) {
|
||||
childQueue.updatePreemptionVariables();
|
||||
|
||||
readLock.lock();
|
||||
try {
|
||||
for (FSQueue childQueue : childQueues) {
|
||||
childQueue.updatePreemptionVariables();
|
||||
}
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Resource getDemand() {
|
||||
return demand;
|
||||
readLock.lock();
|
||||
try {
|
||||
return Resource.newInstance(demand.getMemory(), demand.getVirtualCores());
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Resource getResourceUsage() {
|
||||
Resource usage = Resources.createResource(0);
|
||||
for (FSQueue child : childQueues) {
|
||||
Resources.addTo(usage, child.getResourceUsage());
|
||||
readLock.lock();
|
||||
try {
|
||||
for (FSQueue child : childQueues) {
|
||||
Resources.addTo(usage, child.getResourceUsage());
|
||||
}
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
return usage;
|
||||
}
|
||||
@ -106,20 +153,25 @@ public void updateDemand() {
|
||||
// Limit demand to maxResources
|
||||
Resource maxRes = scheduler.getAllocationConfiguration()
|
||||
.getMaxResources(getName());
|
||||
demand = Resources.createResource(0);
|
||||
for (FSQueue childQueue : childQueues) {
|
||||
childQueue.updateDemand();
|
||||
Resource toAdd = childQueue.getDemand();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Counting resource from " + childQueue.getName() + " " +
|
||||
toAdd + "; Total resource consumption for " + getName() +
|
||||
" now " + demand);
|
||||
}
|
||||
demand = Resources.add(demand, toAdd);
|
||||
demand = Resources.componentwiseMin(demand, maxRes);
|
||||
if (Resources.equals(demand, maxRes)) {
|
||||
break;
|
||||
writeLock.lock();
|
||||
try {
|
||||
demand = Resources.createResource(0);
|
||||
for (FSQueue childQueue : childQueues) {
|
||||
childQueue.updateDemand();
|
||||
Resource toAdd = childQueue.getDemand();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Counting resource from " + childQueue.getName() + " " +
|
||||
toAdd + "; Total resource consumption for " + getName() +
|
||||
" now " + demand);
|
||||
}
|
||||
demand = Resources.add(demand, toAdd);
|
||||
demand = Resources.componentwiseMin(demand, maxRes);
|
||||
if (Resources.equals(demand, maxRes)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("The updated demand for " + getName() + " is " + demand +
|
||||
@ -127,33 +179,31 @@ public void updateDemand() {
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized QueueUserACLInfo getUserAclInfo(
|
||||
UserGroupInformation user) {
|
||||
QueueUserACLInfo userAclInfo =
|
||||
recordFactory.newRecordInstance(QueueUserACLInfo.class);
|
||||
List<QueueACL> operations = new ArrayList<QueueACL>();
|
||||
private QueueUserACLInfo getUserAclInfo(UserGroupInformation user) {
|
||||
List<QueueACL> operations = new ArrayList<>();
|
||||
for (QueueACL operation : QueueACL.values()) {
|
||||
if (hasAccess(operation, user)) {
|
||||
operations.add(operation);
|
||||
}
|
||||
}
|
||||
|
||||
userAclInfo.setQueueName(getQueueName());
|
||||
userAclInfo.setUserAcls(operations);
|
||||
return userAclInfo;
|
||||
return QueueUserACLInfo.newInstance(getQueueName(), operations);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized List<QueueUserACLInfo> getQueueUserAclInfo(
|
||||
UserGroupInformation user) {
|
||||
public List<QueueUserACLInfo> getQueueUserAclInfo(UserGroupInformation user) {
|
||||
List<QueueUserACLInfo> userAcls = new ArrayList<QueueUserACLInfo>();
|
||||
|
||||
// Add queue acls
|
||||
userAcls.add(getUserAclInfo(user));
|
||||
|
||||
// Add children queue acls
|
||||
for (FSQueue child : childQueues) {
|
||||
userAcls.addAll(child.getQueueUserAclInfo(user));
|
||||
readLock.lock();
|
||||
try {
|
||||
for (FSQueue child : childQueues) {
|
||||
userAcls.addAll(child.getQueueUserAclInfo(user));
|
||||
}
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
|
||||
return userAcls;
|
||||
@ -168,12 +218,32 @@ public Resource assignContainer(FSSchedulerNode node) {
|
||||
return assigned;
|
||||
}
|
||||
|
||||
Collections.sort(childQueues, policy.getComparator());
|
||||
for (FSQueue child : childQueues) {
|
||||
assigned = child.assignContainer(node);
|
||||
if (!Resources.equals(assigned, Resources.none())) {
|
||||
break;
|
||||
// Hold the write lock when sorting childQueues
|
||||
writeLock.lock();
|
||||
try {
|
||||
Collections.sort(childQueues, policy.getComparator());
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
|
||||
/*
|
||||
* We are releasing the lock between the sort and iteration of the
|
||||
* "sorted" list. There could be changes to the list here:
|
||||
* 1. Add a child queue to the end of the list, this doesn't affect
|
||||
* container assignment.
|
||||
* 2. Remove a child queue, this is probably good to take care of so we
|
||||
* don't assign to a queue that is going to be removed shortly.
|
||||
*/
|
||||
readLock.lock();
|
||||
try {
|
||||
for (FSQueue child : childQueues) {
|
||||
assigned = child.assignContainer(node);
|
||||
if (!Resources.equals(assigned, Resources.none())) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
return assigned;
|
||||
}
|
||||
@ -185,11 +255,17 @@ public RMContainer preemptContainer() {
|
||||
// Find the childQueue which is most over fair share
|
||||
FSQueue candidateQueue = null;
|
||||
Comparator<Schedulable> comparator = policy.getComparator();
|
||||
for (FSQueue queue : childQueues) {
|
||||
if (candidateQueue == null ||
|
||||
comparator.compare(queue, candidateQueue) > 0) {
|
||||
candidateQueue = queue;
|
||||
|
||||
readLock.lock();
|
||||
try {
|
||||
for (FSQueue queue : childQueues) {
|
||||
if (candidateQueue == null ||
|
||||
comparator.compare(queue, candidateQueue) > 0) {
|
||||
candidateQueue = queue;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
|
||||
// Let the selected queue choose which of its container to preempt
|
||||
@ -201,7 +277,12 @@ public RMContainer preemptContainer() {
|
||||
|
||||
@Override
|
||||
public List<FSQueue> getChildQueues() {
|
||||
return childQueues;
|
||||
readLock.lock();
|
||||
try {
|
||||
return Collections.unmodifiableList(childQueues);
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -218,23 +299,43 @@ public void setPolicy(SchedulingPolicy policy)
|
||||
}
|
||||
|
||||
public void incrementRunnableApps() {
|
||||
runnableApps++;
|
||||
writeLock.lock();
|
||||
try {
|
||||
runnableApps++;
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public void decrementRunnableApps() {
|
||||
runnableApps--;
|
||||
writeLock.lock();
|
||||
try {
|
||||
runnableApps--;
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getNumRunnableApps() {
|
||||
return runnableApps;
|
||||
readLock.lock();
|
||||
try {
|
||||
return runnableApps;
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void collectSchedulerApplications(
|
||||
Collection<ApplicationAttemptId> apps) {
|
||||
for (FSQueue childQueue : childQueues) {
|
||||
childQueue.collectSchedulerApplications(apps);
|
||||
readLock.lock();
|
||||
try {
|
||||
for (FSQueue childQueue : childQueues) {
|
||||
childQueue.collectSchedulerApplications(apps);
|
||||
}
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -304,7 +304,8 @@ private void removeQueue(FSQueue queue) {
|
||||
}
|
||||
}
|
||||
queues.remove(queue.getName());
|
||||
queue.getParent().getChildQueues().remove(queue);
|
||||
FSParentQueue parent = queue.getParent();
|
||||
parent.removeChildQueue(queue);
|
||||
}
|
||||
|
||||
/**
|
||||
|
Loading…
Reference in New Issue
Block a user