YARN-6215. FairScheduler preemption and update should not run concurrently. (Tao Jie via kasha)

This commit is contained in:
Karthik Kambatla 2017-02-26 20:16:36 -08:00
parent 05391c1845
commit 815d53506f
2 changed files with 16 additions and 1 deletions

View File

@ -32,6 +32,7 @@
import java.util.List; import java.util.List;
import java.util.Timer; import java.util.Timer;
import java.util.TimerTask; import java.util.TimerTask;
import java.util.concurrent.locks.Lock;
/** /**
* Thread that handles FairScheduler preemption. * Thread that handles FairScheduler preemption.
@ -43,6 +44,7 @@ class FSPreemptionThread extends Thread {
private final long warnTimeBeforeKill; private final long warnTimeBeforeKill;
private final long delayBeforeNextStarvationCheck; private final long delayBeforeNextStarvationCheck;
private final Timer preemptionTimer; private final Timer preemptionTimer;
private final Lock schedulerReadLock;
FSPreemptionThread(FairScheduler scheduler) { FSPreemptionThread(FairScheduler scheduler) {
setDaemon(true); setDaemon(true);
@ -61,6 +63,7 @@ class FSPreemptionThread extends Thread {
: 4 * scheduler.getNMHeartbeatInterval()); // 4 heartbeats : 4 * scheduler.getNMHeartbeatInterval()); // 4 heartbeats
delayBeforeNextStarvationCheck = warnTimeBeforeKill + allocDelay + delayBeforeNextStarvationCheck = warnTimeBeforeKill + allocDelay +
fsConf.getWaitTimeBeforeNextStarvationCheck(); fsConf.getWaitTimeBeforeNextStarvationCheck();
schedulerReadLock = scheduler.getSchedulerReadLock();
} }
public void run() { public void run() {
@ -68,7 +71,14 @@ public void run() {
FSAppAttempt starvedApp; FSAppAttempt starvedApp;
try{ try{
starvedApp = context.getStarvedApps().take(); starvedApp = context.getStarvedApps().take();
preemptContainers(identifyContainersToPreempt(starvedApp)); // Hold the scheduler readlock so this is not concurrent with the
// update thread.
schedulerReadLock.lock();
try {
preemptContainers(identifyContainersToPreempt(starvedApp));
} finally {
schedulerReadLock.unlock();
}
starvedApp.preemptionTriggered(delayBeforeNextStarvationCheck); starvedApp.preemptionTriggered(delayBeforeNextStarvationCheck);
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOG.info("Preemption thread interrupted! Exiting."); LOG.info("Preemption thread interrupted! Exiting.");

View File

@ -104,6 +104,7 @@
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
/** /**
* A scheduler that schedules resources between a set of queues. The scheduler * A scheduler that schedules resources between a set of queues. The scheduler
@ -1782,4 +1783,8 @@ public float getReservableNodesRatio() {
long getNMHeartbeatInterval() { long getNMHeartbeatInterval() {
return nmHeartbeatInterval; return nmHeartbeatInterval;
} }
ReadLock getSchedulerReadLock() {
return this.readLock;
}
} }