YARN-1121. Changed ResourceManager's state-store to drain all events on shut-down. Contributed by Jian He.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1540232 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-11-09 00:55:21 +00:00
parent a4bc1a6218
commit e28015ed1b
5 changed files with 119 additions and 10 deletions

View File

@ -88,6 +88,9 @@ Release 2.3.0 - UNRELEASED
YARN-1323. Set HTTPS webapp address along with other RPC addresses in HAUtil YARN-1323. Set HTTPS webapp address along with other RPC addresses in HAUtil
(Karthik Kambatla via Sandy Ryza) (Karthik Kambatla via Sandy Ryza)
YARN-1121. Changed ResourceManager's state-store to drain all events on
shut-down. (Jian He via vinodkv)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -49,6 +49,19 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
private final BlockingQueue<Event> eventQueue; private final BlockingQueue<Event> eventQueue;
private volatile boolean stopped = false; private volatile boolean stopped = false;
// Configuration flag for enabling/disabling draining dispatcher's events on
// stop functionality.
private volatile boolean drainEventsOnStop = false;
// Indicates all the remaining dispatcher's events on stop have been drained
// and processed.
private volatile boolean drained = true;
// For drainEventsOnStop enabled only, block newly coming events into the
// queue while stopping.
private volatile boolean blockNewEvents = false;
private EventHandler handlerInstance = null;
private Thread eventHandlingThread; private Thread eventHandlingThread;
protected final Map<Class<? extends Enum>, EventHandler> eventDispatchers; protected final Map<Class<? extends Enum>, EventHandler> eventDispatchers;
private boolean exitOnDispatchException; private boolean exitOnDispatchException;
@ -68,6 +81,7 @@ Runnable createThread() {
@Override @Override
public void run() { public void run() {
while (!stopped && !Thread.currentThread().isInterrupted()) { while (!stopped && !Thread.currentThread().isInterrupted()) {
drained = eventQueue.isEmpty();
Event event; Event event;
try { try {
event = eventQueue.take(); event = eventQueue.take();
@ -102,8 +116,19 @@ protected void serviceStart() throws Exception {
eventHandlingThread.start(); eventHandlingThread.start();
} }
public void setDrainEventsOnStop() {
drainEventsOnStop = true;
}
@Override @Override
protected void serviceStop() throws Exception { protected void serviceStop() throws Exception {
if (drainEventsOnStop) {
blockNewEvents = true;
LOG.info("AsyncDispatcher is draining to stop, igonring any new events.");
while(!drained) {
Thread.yield();
}
}
stopped = true; stopped = true;
if (eventHandlingThread != null) { if (eventHandlingThread != null) {
eventHandlingThread.interrupt(); eventHandlingThread.interrupt();
@ -173,11 +198,19 @@ public void register(Class<? extends Enum> eventType,
@Override @Override
public EventHandler getEventHandler() { public EventHandler getEventHandler() {
return new GenericEventHandler(); if (handlerInstance == null) {
handlerInstance = new GenericEventHandler();
}
return handlerInstance;
} }
class GenericEventHandler implements EventHandler<Event> { class GenericEventHandler implements EventHandler<Event> {
public void handle(Event event) { public void handle(Event event) {
if (blockNewEvents) {
return;
}
drained = false;
/* all this method does is enqueue all the events onto the queue */ /* all this method does is enqueue all the events onto the queue */
int qSize = eventQueue.size(); int qSize = eventQueue.size();
if (qSize !=0 && qSize %1000 == 0) { if (qSize !=0 && qSize %1000 == 0) {

View File

@ -261,17 +261,20 @@ public void setRMDispatcher(Dispatcher dispatcher) {
} }
AsyncDispatcher dispatcher; AsyncDispatcher dispatcher;
public synchronized void serviceInit(Configuration conf) throws Exception{ @Override
protected void serviceInit(Configuration conf) throws Exception{
// create async handler // create async handler
dispatcher = new AsyncDispatcher(); dispatcher = new AsyncDispatcher();
dispatcher.init(conf); dispatcher.init(conf);
dispatcher.register(RMStateStoreEventType.class, dispatcher.register(RMStateStoreEventType.class,
new ForwardingEventHandler()); new ForwardingEventHandler());
dispatcher.setDrainEventsOnStop();
initInternal(conf); initInternal(conf);
} }
protected synchronized void serviceStart() throws Exception { @Override
protected void serviceStart() throws Exception {
dispatcher.start(); dispatcher.start();
startInternal(); startInternal();
} }
@ -288,11 +291,12 @@ protected synchronized void serviceStart() throws Exception {
*/ */
protected abstract void startInternal() throws Exception; protected abstract void startInternal() throws Exception;
public synchronized void serviceStop() throws Exception { @Override
protected void serviceStop() throws Exception {
closeInternal(); closeInternal();
dispatcher.stop(); dispatcher.stop();
} }
/** /**
* Derived classes close themselves using this method. * Derived classes close themselves using this method.
* The base class will be closed and the event dispatcher will be shutdown * The base class will be closed and the event dispatcher will be shutdown
@ -509,8 +513,7 @@ public Credentials getCredentialsFromAppAttempt(RMAppAttempt appAttempt) {
} }
// Dispatcher related code // Dispatcher related code
protected void handleStoreEvent(RMStateStoreEvent event) {
private synchronized void handleStoreEvent(RMStateStoreEvent event) {
if (event.getType().equals(RMStateStoreEventType.STORE_APP) if (event.getType().equals(RMStateStoreEventType.STORE_APP)
|| event.getType().equals(RMStateStoreEventType.UPDATE_APP)) { || event.getType().equals(RMStateStoreEventType.UPDATE_APP)) {
ApplicationState appState = null; ApplicationState appState = null;

View File

@ -163,6 +163,14 @@ public RMApp submitApp(int masterMemory, String name, String user,
public RMApp submitApp(int masterMemory, String name, String user, public RMApp submitApp(int masterMemory, String name, String user,
Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue, Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,
int maxAppAttempts, Credentials ts, String appType) throws Exception { int maxAppAttempts, Credentials ts, String appType) throws Exception {
return submitApp(masterMemory, name, user, acls, unmanaged, queue,
maxAppAttempts, ts, appType, true);
}
public RMApp submitApp(int masterMemory, String name, String user,
Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,
int maxAppAttempts, Credentials ts, String appType,
boolean waitForAccepted) throws Exception {
ApplicationClientProtocol client = getClientRMService(); ApplicationClientProtocol client = getClientRMService();
GetNewApplicationResponse resp = client.getNewApplication(Records GetNewApplicationResponse resp = client.getNewApplication(Records
.newRecord(GetNewApplicationRequest.class)); .newRecord(GetNewApplicationRequest.class));
@ -222,7 +230,9 @@ PrivilegedAction<SubmitApplicationResponse> setClientReq(
}.setClientReq(client, req); }.setClientReq(client, req);
fakeUser.doAs(action); fakeUser.doAs(action);
// make sure app is immediately available after submit // make sure app is immediately available after submit
waitForState(appId, RMAppState.ACCEPTED); if (waitForAccepted) {
waitForState(appId, RMAppState.ACCEPTED);
}
return getRMContext().getRMApps().get(appId); return getRMContext().getRMApps().get(appId);
} }

View File

@ -70,6 +70,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@ -1062,6 +1063,65 @@ public void testRMDelegationTokenRestoredOnRMRestart() throws Exception {
rm2.stop(); rm2.stop();
} }
@Test
public void testRMStateStoreDispatcherDrainedOnRMStop() throws Exception {
MemoryRMStateStore memStore = new MemoryRMStateStore() {
volatile boolean wait = true;
@Override
public void serviceStop() throws Exception {
// Unblock app saving request.
wait = false;
super.serviceStop();
}
@Override
protected void handleStoreEvent(RMStateStoreEvent event) {
// Block app saving request.
while (wait);
super.handleStoreEvent(event);
}
};
memStore.init(conf);
// start RM
final MockRM rm1 = new MockRM(conf, memStore);
rm1.start();
// create apps.
final ArrayList<RMApp> appList = new ArrayList<RMApp>();
final int NUM_APPS = 5;
for (int i = 0; i < NUM_APPS; i++) {
RMApp app = rm1.submitApp(200, "name", "user",
new HashMap<ApplicationAccessType, String>(), false,
"default", -1, null, "MAPREDUCE", false);
appList.add(app);
rm1.waitForState(app.getApplicationId(), RMAppState.NEW_SAVING);
}
// all apps's saving request are now enqueued to RMStateStore's dispatcher
// queue, and will be processed once rm.stop() is called.
// Nothing exist in state store before stop is called.
Map<ApplicationId, ApplicationState> rmAppState =
memStore.getState().getApplicationState();
Assert.assertTrue(rmAppState.size() == 0);
// stop rm
rm1.stop();
// Assert app info is still saved even if stop is called with pending saving
// request on dispatcher.
for (RMApp app : appList) {
ApplicationState appState = rmAppState.get(app.getApplicationId());
Assert.assertNotNull(appState);
Assert.assertEquals(0, appState.getAttemptCount());
Assert.assertEquals(appState.getApplicationSubmissionContext()
.getApplicationId(), app.getApplicationSubmissionContext()
.getApplicationId());
}
Assert.assertTrue(rmAppState.size() == NUM_APPS);
}
public static class TestSecurityMockRM extends MockRM { public static class TestSecurityMockRM extends MockRM {
public TestSecurityMockRM(Configuration conf, RMStateStore store) { public TestSecurityMockRM(Configuration conf, RMStateStore store) {