diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 21a2e94b60..6c5d808517 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -88,6 +88,9 @@ Release 2.3.0 - UNRELEASED YARN-1323. Set HTTPS webapp address along with other RPC addresses in HAUtil (Karthik Kambatla via Sandy Ryza) + YARN-1121. Changed ResourceManager's state-store to drain all events on + shut-down. (Jian He via vinodkv) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java index 9301bba081..bf5058a9d1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java @@ -49,6 +49,19 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher { private final BlockingQueue eventQueue; private volatile boolean stopped = false; + // Configuration flag for enabling/disabling draining dispatcher's events on + // stop functionality. + private volatile boolean drainEventsOnStop = false; + + // Indicates all the remaining dispatcher's events on stop have been drained + // and processed. + private volatile boolean drained = true; + + // For drainEventsOnStop enabled only, block newly coming events into the + // queue while stopping. + private volatile boolean blockNewEvents = false; + private EventHandler handlerInstance = null; + private Thread eventHandlingThread; protected final Map, EventHandler> eventDispatchers; private boolean exitOnDispatchException; @@ -68,6 +81,7 @@ Runnable createThread() { @Override public void run() { while (!stopped && !Thread.currentThread().isInterrupted()) { + drained = eventQueue.isEmpty(); Event event; try { event = eventQueue.take(); @@ -102,8 +116,19 @@ protected void serviceStart() throws Exception { eventHandlingThread.start(); } + public void setDrainEventsOnStop() { + drainEventsOnStop = true; + } + @Override protected void serviceStop() throws Exception { + if (drainEventsOnStop) { + blockNewEvents = true; + LOG.info("AsyncDispatcher is draining to stop, igonring any new events."); + while(!drained) { + Thread.yield(); + } + } stopped = true; if (eventHandlingThread != null) { eventHandlingThread.interrupt(); @@ -173,11 +198,19 @@ public void register(Class eventType, @Override public EventHandler getEventHandler() { - return new GenericEventHandler(); + if (handlerInstance == null) { + handlerInstance = new GenericEventHandler(); + } + return handlerInstance; } class GenericEventHandler implements EventHandler { public void handle(Event event) { + if (blockNewEvents) { + return; + } + drained = false; + /* all this method does is enqueue all the events onto the queue */ int qSize = eventQueue.size(); if (qSize !=0 && qSize %1000 == 0) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index 5a7c7dcbb1..911107daab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -261,17 +261,20 @@ public void setRMDispatcher(Dispatcher dispatcher) { } AsyncDispatcher dispatcher; - - public synchronized void serviceInit(Configuration conf) throws Exception{ + + @Override + protected void serviceInit(Configuration conf) throws Exception{ // create async handler dispatcher = new AsyncDispatcher(); dispatcher.init(conf); dispatcher.register(RMStateStoreEventType.class, new ForwardingEventHandler()); + dispatcher.setDrainEventsOnStop(); initInternal(conf); } - - protected synchronized void serviceStart() throws Exception { + + @Override + protected void serviceStart() throws Exception { dispatcher.start(); startInternal(); } @@ -288,11 +291,12 @@ protected synchronized void serviceStart() throws Exception { */ protected abstract void startInternal() throws Exception; - public synchronized void serviceStop() throws Exception { + @Override + protected void serviceStop() throws Exception { closeInternal(); dispatcher.stop(); } - + /** * Derived classes close themselves using this method. * The base class will be closed and the event dispatcher will be shutdown @@ -509,8 +513,7 @@ public Credentials getCredentialsFromAppAttempt(RMAppAttempt appAttempt) { } // Dispatcher related code - - private synchronized void handleStoreEvent(RMStateStoreEvent event) { + protected void handleStoreEvent(RMStateStoreEvent event) { if (event.getType().equals(RMStateStoreEventType.STORE_APP) || event.getType().equals(RMStateStoreEventType.UPDATE_APP)) { ApplicationState appState = null; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index aba334ad94..cb41ca4e45 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -163,6 +163,14 @@ public RMApp submitApp(int masterMemory, String name, String user, public RMApp submitApp(int masterMemory, String name, String user, Map acls, boolean unmanaged, String queue, int maxAppAttempts, Credentials ts, String appType) throws Exception { + return submitApp(masterMemory, name, user, acls, unmanaged, queue, + maxAppAttempts, ts, appType, true); + } + + public RMApp submitApp(int masterMemory, String name, String user, + Map acls, boolean unmanaged, String queue, + int maxAppAttempts, Credentials ts, String appType, + boolean waitForAccepted) throws Exception { ApplicationClientProtocol client = getClientRMService(); GetNewApplicationResponse resp = client.getNewApplication(Records .newRecord(GetNewApplicationRequest.class)); @@ -222,7 +230,9 @@ PrivilegedAction setClientReq( }.setClientReq(client, req); fakeUser.doAs(action); // make sure app is immediately available after submit - waitForState(appId, RMAppState.ACCEPTED); + if (waitForAccepted) { + waitForState(appId, RMAppState.ACCEPTED); + } return getRMContext().getRMApps().get(appId); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index 97f51a2791..f87f6894bb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -70,6 +70,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -1062,6 +1063,65 @@ public void testRMDelegationTokenRestoredOnRMRestart() throws Exception { rm2.stop(); } + @Test + public void testRMStateStoreDispatcherDrainedOnRMStop() throws Exception { + MemoryRMStateStore memStore = new MemoryRMStateStore() { + volatile boolean wait = true; + @Override + public void serviceStop() throws Exception { + // Unblock app saving request. + wait = false; + super.serviceStop(); + } + + @Override + protected void handleStoreEvent(RMStateStoreEvent event) { + // Block app saving request. + while (wait); + super.handleStoreEvent(event); + } + }; + memStore.init(conf); + + // start RM + final MockRM rm1 = new MockRM(conf, memStore); + rm1.start(); + + // create apps. + final ArrayList appList = new ArrayList(); + final int NUM_APPS = 5; + + for (int i = 0; i < NUM_APPS; i++) { + RMApp app = rm1.submitApp(200, "name", "user", + new HashMap(), false, + "default", -1, null, "MAPREDUCE", false); + appList.add(app); + rm1.waitForState(app.getApplicationId(), RMAppState.NEW_SAVING); + } + // all apps's saving request are now enqueued to RMStateStore's dispatcher + // queue, and will be processed once rm.stop() is called. + + // Nothing exist in state store before stop is called. + Map rmAppState = + memStore.getState().getApplicationState(); + Assert.assertTrue(rmAppState.size() == 0); + + // stop rm + rm1.stop(); + + // Assert app info is still saved even if stop is called with pending saving + // request on dispatcher. + for (RMApp app : appList) { + ApplicationState appState = rmAppState.get(app.getApplicationId()); + Assert.assertNotNull(appState); + Assert.assertEquals(0, appState.getAttemptCount()); + Assert.assertEquals(appState.getApplicationSubmissionContext() + .getApplicationId(), app.getApplicationSubmissionContext() + .getApplicationId()); + } + Assert.assertTrue(rmAppState.size() == NUM_APPS); + } + public static class TestSecurityMockRM extends MockRM { public TestSecurityMockRM(Configuration conf, RMStateStore store) {