YARN-6246. Identifying starved apps does not need the scheduler writelock
(Contributed by Karthik Kambatla via Daniel Templeton)
This commit is contained in:
parent
4369690ce6
commit
d5b71e4175
@ -198,13 +198,10 @@ public void collectSchedulerApplications(
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void updateInternal(boolean checkStarvation) {
|
void updateInternal() {
|
||||||
readLock.lock();
|
readLock.lock();
|
||||||
try {
|
try {
|
||||||
policy.computeShares(runnableApps, getFairShare());
|
policy.computeShares(runnableApps, getFairShare());
|
||||||
if (checkStarvation) {
|
|
||||||
updateStarvedApps();
|
|
||||||
}
|
|
||||||
} finally {
|
} finally {
|
||||||
readLock.unlock();
|
readLock.unlock();
|
||||||
}
|
}
|
||||||
@ -283,8 +280,10 @@ private void updateStarvedAppsMinshare(
|
|||||||
* If this queue is starving due to fairshare, there must be at least
|
* If this queue is starving due to fairshare, there must be at least
|
||||||
* one application that is starved. And, even if the queue is not
|
* one application that is starved. And, even if the queue is not
|
||||||
* starved due to fairshare, there might still be starved applications.
|
* starved due to fairshare, there might still be starved applications.
|
||||||
|
*
|
||||||
|
* Caller does not need read/write lock on the leaf queue.
|
||||||
*/
|
*/
|
||||||
private void updateStarvedApps() {
|
void updateStarvedApps() {
|
||||||
// Fetch apps with pending demand
|
// Fetch apps with pending demand
|
||||||
TreeSet<FSAppAttempt> appsWithDemand = fetchAppsWithDemand(false);
|
TreeSet<FSAppAttempt> appsWithDemand = fetchAppsWithDemand(false);
|
||||||
|
|
||||||
|
@ -79,13 +79,13 @@ void removeChildQueue(FSQueue child) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void updateInternal(boolean checkStarvation) {
|
void updateInternal() {
|
||||||
readLock.lock();
|
readLock.lock();
|
||||||
try {
|
try {
|
||||||
policy.computeShares(childQueues, getFairShare());
|
policy.computeShares(childQueues, getFairShare());
|
||||||
for (FSQueue childQueue : childQueues) {
|
for (FSQueue childQueue : childQueues) {
|
||||||
childQueue.getMetrics().setFairShare(childQueue.getFairShare());
|
childQueue.getMetrics().setFairShare(childQueue.getFairShare());
|
||||||
childQueue.updateInternal(checkStarvation);
|
childQueue.updateInternal();
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
readLock.unlock();
|
readLock.unlock();
|
||||||
|
@ -326,16 +326,23 @@ public boolean isPreemptable() {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Recomputes the shares for all child queues and applications based on this
|
* Recomputes the shares for all child queues and applications based on this
|
||||||
* queue's current share, and checks for starvation.
|
* queue's current share.
|
||||||
*
|
*
|
||||||
* @param checkStarvation whether to check for fairshare or minshare
|
* To be called holding the scheduler writelock.
|
||||||
* starvation on update
|
|
||||||
*/
|
*/
|
||||||
abstract void updateInternal(boolean checkStarvation);
|
abstract void updateInternal();
|
||||||
|
|
||||||
public void update(Resource fairShare, boolean checkStarvation) {
|
/**
|
||||||
|
* Set the queue's fairshare and update the demand/fairshare of child
|
||||||
|
* queues/applications.
|
||||||
|
*
|
||||||
|
* To be called holding the scheduler writelock.
|
||||||
|
*
|
||||||
|
* @param fairShare
|
||||||
|
*/
|
||||||
|
public void update(Resource fairShare) {
|
||||||
setFairShare(fairShare);
|
setFairShare(fairShare);
|
||||||
updateInternal(checkStarvation);
|
updateInternal();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -366,20 +366,31 @@ private void dumpSchedulerState() {
|
|||||||
*/
|
*/
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public void update() {
|
public void update() {
|
||||||
|
FSQueue rootQueue = queueMgr.getRootQueue();
|
||||||
|
|
||||||
|
// Update demands and fairshares
|
||||||
|
writeLock.lock();
|
||||||
try {
|
try {
|
||||||
writeLock.lock();
|
|
||||||
|
|
||||||
FSQueue rootQueue = queueMgr.getRootQueue();
|
|
||||||
|
|
||||||
// Recursively update demands for all queues
|
// Recursively update demands for all queues
|
||||||
rootQueue.updateDemand();
|
rootQueue.updateDemand();
|
||||||
|
rootQueue.update(getClusterResource());
|
||||||
Resource clusterResource = getClusterResource();
|
|
||||||
rootQueue.update(clusterResource, shouldAttemptPreemption());
|
|
||||||
|
|
||||||
// Update metrics
|
// Update metrics
|
||||||
updateRootQueueMetrics();
|
updateRootQueueMetrics();
|
||||||
|
} finally {
|
||||||
|
writeLock.unlock();
|
||||||
|
}
|
||||||
|
|
||||||
|
readLock.lock();
|
||||||
|
try {
|
||||||
|
// Update starvation stats and identify starved applications
|
||||||
|
if (shouldAttemptPreemption()) {
|
||||||
|
for (FSLeafQueue queue : queueMgr.getLeafQueues()) {
|
||||||
|
queue.updateStarvedApps();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Log debug information
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
if (--updatesToSkipForDebug < 0) {
|
if (--updatesToSkipForDebug < 0) {
|
||||||
updatesToSkipForDebug = UPDATE_DEBUG_FREQUENCY;
|
updatesToSkipForDebug = UPDATE_DEBUG_FREQUENCY;
|
||||||
@ -387,7 +398,7 @@ public void update() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
writeLock.unlock();
|
readLock.unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user