YARN-2328. FairScheduler: Verify update and continuous scheduling threads are stopped when the scheduler is stopped. (kasha)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1614432 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Karthik Kambatla 2014-07-29 17:41:52 +00:00
parent 260fd25568
commit c0b49ff107
3 changed files with 65 additions and 25 deletions

View File

@ -71,6 +71,9 @@ Release 2.6.0 - UNRELEASED
YARN-2211. Persist AMRMToken master key in RMStateStore for RM recovery. YARN-2211. Persist AMRMToken master key in RMStateStore for RM recovery.
(Xuan Gong via jianhe) (Xuan Gong via jianhe)
YARN-2328. FairScheduler: Verify update and continuous scheduling threads are
stopped when the scheduler is stopped. (kasha)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -139,8 +139,11 @@ public class FairScheduler extends
private final int UPDATE_DEBUG_FREQUENCY = 5; private final int UPDATE_DEBUG_FREQUENCY = 5;
private int updatesToSkipForDebug = UPDATE_DEBUG_FREQUENCY; private int updatesToSkipForDebug = UPDATE_DEBUG_FREQUENCY;
private Thread updateThread; @VisibleForTesting
private Thread schedulingThread; Thread updateThread;
@VisibleForTesting
Thread schedulingThread;
// timeout to join when we stop this service // timeout to join when we stop this service
protected final long THREAD_JOIN_TIMEOUT_MS = 1000; protected final long THREAD_JOIN_TIMEOUT_MS = 1000;
@ -243,16 +246,21 @@ public QueueManager getQueueManager() {
} }
/** /**
* A runnable which calls {@link FairScheduler#update()} every * Thread which calls {@link FairScheduler#update()} every
* <code>updateInterval</code> milliseconds. * <code>updateInterval</code> milliseconds.
*/ */
private class UpdateThread implements Runnable { private class UpdateThread extends Thread {
@Override
public void run() { public void run() {
while (true) { while (!Thread.currentThread().isInterrupted()) {
try { try {
Thread.sleep(updateInterval); Thread.sleep(updateInterval);
update(); update();
preemptTasksIfNecessary(); preemptTasksIfNecessary();
} catch (InterruptedException ie) {
LOG.warn("Update thread interrupted. Exiting.");
return;
} catch (Exception e) { } catch (Exception e) {
LOG.error("Exception in fair scheduler UpdateThread", e); LOG.error("Exception in fair scheduler UpdateThread", e);
} }
@ -260,6 +268,26 @@ public void run() {
} }
} }
/**
* Thread which attempts scheduling resources continuously,
* asynchronous to the node heartbeats.
*/
private class ContinuousSchedulingThread extends Thread {
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
continuousSchedulingAttempt();
Thread.sleep(getContinuousSchedulingSleepMs());
} catch (InterruptedException e) {
LOG.warn("Continuous scheduling thread interrupted. Exiting.", e);
return;
}
}
}
}
/** /**
* Recompute the internal variables used by the scheduler - per-job weights, * Recompute the internal variables used by the scheduler - per-job weights,
* fair shares, deficits, minimum slot allocations, and amount of used and * fair shares, deficits, minimum slot allocations, and amount of used and
@ -970,7 +998,7 @@ private synchronized void nodeUpdate(RMNode nm) {
} }
} }
void continuousSchedulingAttempt() { void continuousSchedulingAttempt() throws InterruptedException {
List<NodeId> nodeIdList = new ArrayList<NodeId>(nodes.keySet()); List<NodeId> nodeIdList = new ArrayList<NodeId>(nodes.keySet());
// Sort the nodes by space available on them, so that we offer // Sort the nodes by space available on them, so that we offer
// containers on emptier nodes first, facilitating an even spread. This // containers on emptier nodes first, facilitating an even spread. This
@ -1229,30 +1257,14 @@ private synchronized void initScheduler(Configuration conf)
throw new IOException("Failed to start FairScheduler", e); throw new IOException("Failed to start FairScheduler", e);
} }
updateThread = new Thread(new UpdateThread()); updateThread = new UpdateThread();
updateThread.setName("FairSchedulerUpdateThread"); updateThread.setName("FairSchedulerUpdateThread");
updateThread.setDaemon(true); updateThread.setDaemon(true);
if (continuousSchedulingEnabled) { if (continuousSchedulingEnabled) {
// start continuous scheduling thread // start continuous scheduling thread
schedulingThread = new Thread( schedulingThread = new ContinuousSchedulingThread();
new Runnable() { schedulingThread.setName("FairSchedulerContinuousScheduling");
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
continuousSchedulingAttempt();
Thread.sleep(getContinuousSchedulingSleepMs());
} catch (InterruptedException e) {
LOG.error("Continuous scheduling thread interrupted. Exiting. ",
e);
return;
}
}
}
}
);
schedulingThread.setName("ContinuousScheduling");
schedulingThread.setDaemon(true); schedulingThread.setDaemon(true);
} }

View File

@ -20,6 +20,7 @@
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame; import static org.junit.Assert.assertSame;
@ -3341,4 +3342,28 @@ public void testLowestCommonAncestorDeeperHierarchy() throws Exception {
scheduler.findLowestCommonAncestorQueue(a1Queue, b1Queue); scheduler.findLowestCommonAncestorQueue(a1Queue, b1Queue);
assertEquals(ancestorQueue, queue1); assertEquals(ancestorQueue, queue1);
} }
@Test
public void testThreadLifeCycle() throws InterruptedException {
conf.setBoolean(
FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_ENABLED, true);
scheduler.init(conf);
scheduler.start();
Thread updateThread = scheduler.updateThread;
Thread schedulingThread = scheduler.schedulingThread;
assertTrue(updateThread.isAlive());
assertTrue(schedulingThread.isAlive());
scheduler.stop();
int numRetries = 100;
while (numRetries-- > 0 &&
(updateThread.isAlive() || schedulingThread.isAlive())) {
Thread.sleep(50);
}
assertNotEquals("One of the threads is still alive", 0, numRetries);
}
} }