diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 8adde9b027..887d1d44aa 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -868,6 +868,10 @@ Release 2.6.0 - UNRELEASED YARN-2805. Fixed ResourceManager to load HA configs correctly before kerberos login. (Wangda Tan via vinodkv) + YARN-2579. Fixed a deadlock issue when EmbeddedElectorService and + FatalEventDispatcher try to transition RM to StandBy at the same time. + (Rohith Sharmaks via jianhe) + Release 2.5.2 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java index 0440f1d09d..0634cc379b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java @@ -43,8 +43,6 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.server.resourcemanager.AdminService; -import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEvent; -import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEventType; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServer; import org.junit.After; @@ -173,7 +171,6 @@ public void testExplicitFailover() verifyConnections(); } - @SuppressWarnings("unchecked") @Test public void testAutomaticFailover() throws YarnException, InterruptedException, IOException { @@ -196,10 +193,7 @@ public void testAutomaticFailover() // so it transitions to standby. ResourceManager rm = cluster.getResourceManager( cluster.getActiveRMIndex()); - RMFatalEvent event = - new RMFatalEvent(RMFatalEventType.STATE_STORE_FENCED, - "Fake RMFatalEvent"); - rm.getRMContext().getDispatcher().getEventHandler().handle(event); + rm.handleTransitionToStandBy(); int maxWaitingAttempts = 2000; while (maxWaitingAttempts-- > 0 ) { if (rm.getRMContext().getHAServiceState() == HAServiceState.STANDBY) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMFatalEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMFatalEventType.java index 0629c70571..789c018344 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMFatalEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMFatalEventType.java @@ -23,7 +23,6 @@ @InterfaceAudience.Private public enum RMFatalEventType { // Source <- Store - STATE_STORE_FENCED, STATE_STORE_OP_FAILED, // Source <- Embedded Elector diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 6adc73a613..4051054fe5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -269,6 +269,7 @@ protected QueueACLsManager createQueueACLsManager(ResourceScheduler scheduler, @VisibleForTesting protected void setRMStateStore(RMStateStore rmStore) { rmStore.setRMDispatcher(rmDispatcher); + rmStore.setResourceManager(this); rmContext.setStateStore(rmStore); } @@ -397,11 +398,12 @@ public class RMActiveServices extends CompositeService { private EventHandler schedulerDispatcher; private ApplicationMasterLauncher applicationMasterLauncher; private ContainerAllocationExpirer containerAllocationExpirer; - + private ResourceManager rm; private boolean recoveryEnabled; - RMActiveServices() { + RMActiveServices(ResourceManager rm) { super("RMActiveServices"); + this.rm = rm; } @Override @@ -449,6 +451,7 @@ protected void serviceInit(Configuration configuration) throws Exception { try { rmStore.init(conf); rmStore.setRMDispatcher(rmDispatcher); + rmStore.setResourceManager(rm); } catch (Exception e) { // the Exception from stateStore.init() needs to be handled for // HA and we need to give up master status if we got fenced @@ -729,39 +732,31 @@ public void handle(SchedulerEvent event) { @Private public static class RMFatalEventDispatcher implements EventHandler { - private final RMContext rmContext; - private final ResourceManager rm; - - public RMFatalEventDispatcher( - RMContext rmContext, ResourceManager resourceManager) { - this.rmContext = rmContext; - this.rm = resourceManager; - } @Override public void handle(RMFatalEvent event) { LOG.fatal("Received a " + RMFatalEvent.class.getName() + " of type " + event.getType().name() + ". Cause:\n" + event.getCause()); - if (event.getType() == RMFatalEventType.STATE_STORE_FENCED) { - LOG.info("RMStateStore has been fenced"); - if (rmContext.isHAEnabled()) { - try { - // Transition to standby and reinit active services - LOG.info("Transitioning RM to Standby mode"); - rm.transitionToStandby(true); - rm.adminService.resetLeaderElection(); - return; - } catch (Exception e) { - LOG.fatal("Failed to transition RM to Standby mode."); - } - } - } - ExitUtil.terminate(1, event.getCause()); } } + public void handleTransitionToStandBy() { + if (rmContext.isHAEnabled()) { + try { + // Transition to standby and reinit active services + LOG.info("Transitioning RM to Standby mode"); + transitionToStandby(true); + adminService.resetLeaderElection(); + return; + } catch (Exception e) { + LOG.fatal("Failed to transition RM to Standby mode."); + ExitUtil.terminate(1, e); + } + } + } + @Private public static final class ApplicationEventDispatcher implements EventHandler { @@ -990,7 +985,7 @@ protected void startWepApp() { * @throws Exception */ protected void createAndInitActiveServices() throws Exception { - activeServices = new RMActiveServices(); + activeServices = new RMActiveServices(this); activeServices.init(conf); } @@ -1227,7 +1222,7 @@ public static void main(String argv[]) { private Dispatcher setupDispatcher() { Dispatcher dispatcher = createDispatcher(); dispatcher.register(RMFatalEventType.class, - new ResourceManager.RMFatalEventDispatcher(this.rmContext, this)); + new ResourceManager.RMFatalEventDispatcher()); return dispatcher; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index 973fe54c73..beac4032e2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEvent; import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEventType; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; @@ -87,6 +88,7 @@ public abstract class RMStateStore extends AbstractService { "AMRMTokenSecretManagerRoot"; protected static final String VERSION_NODE = "RMVersionNode"; protected static final String EPOCH_NODE = "EpochNode"; + private ResourceManager resourceManager; public static final Log LOG = LogFactory.getLog(RMStateStore.class); @@ -818,13 +820,15 @@ protected void handleStoreEvent(RMStateStoreEvent event) { * @param failureCause the exception due to which the operation failed */ protected void notifyStoreOperationFailed(Exception failureCause) { - RMFatalEventType type; if (failureCause instanceof StoreFencedException) { - type = RMFatalEventType.STATE_STORE_FENCED; + Thread standByTransitionThread = + new Thread(new StandByTransitionThread()); + standByTransitionThread.setName("StandByTransitionThread Handler"); + standByTransitionThread.start(); } else { - type = RMFatalEventType.STATE_STORE_OP_FAILED; + rmDispatcher.getEventHandler().handle( + new RMFatalEvent(RMFatalEventType.STATE_STORE_OP_FAILED, failureCause)); } - rmDispatcher.getEventHandler().handle(new RMFatalEvent(type, failureCause)); } @SuppressWarnings("unchecked") @@ -866,4 +870,16 @@ public void handle(RMStateStoreEvent event) { * @throws Exception */ public abstract void deleteStore() throws Exception; + + public void setResourceManager(ResourceManager rm) { + this.resourceManager = rm; + } + + private class StandByTransitionThread implements Runnable { + @Override + public void run() { + LOG.info("RMStateStore has been fenced"); + resourceManager.handleTransitionToStandBy(); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java index 8cef4c9329..c6d7d09bcd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java @@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFencedException; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; @@ -451,6 +452,67 @@ public synchronized void startInternal() throws Exception { checkActiveRMFunctionality(); } + @Test(timeout = 90000) + public void testTransitionedToStandbyShouldNotHang() throws Exception { + configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); + Configuration conf = new YarnConfiguration(configuration); + + MemoryRMStateStore memStore = new MemoryRMStateStore() { + @Override + public synchronized void updateApplicationState(ApplicationState appState) { + notifyStoreOperationFailed(new StoreFencedException()); + } + }; + memStore.init(conf); + rm = new MockRM(conf, memStore) { + @Override + void stopActiveServices() throws Exception { + Thread.sleep(10000); + super.stopActiveServices(); + } + }; + rm.init(conf); + final StateChangeRequestInfo requestInfo = + new StateChangeRequestInfo( + HAServiceProtocol.RequestSource.REQUEST_BY_USER); + + assertEquals(STATE_ERR, HAServiceState.INITIALIZING, rm.adminService + .getServiceStatus().getState()); + assertFalse("RM is ready to become active before being started", + rm.adminService.getServiceStatus().isReadyToBecomeActive()); + checkMonitorHealth(); + + rm.start(); + checkMonitorHealth(); + checkStandbyRMFunctionality(); + + // 2. Transition to Active. + rm.adminService.transitionToActive(requestInfo); + + // 3. Try Transition to standby + Thread t = new Thread(new Runnable() { + @Override + public void run() { + try { + rm.transitionToStandby(true); + } catch (IOException e) { + e.printStackTrace(); + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + }); + t.start(); + + rm.getRMContext().getStateStore().updateApplicationState(null); + t.join(); // wait for thread to finish + + rm.adminService.transitionToStandby(requestInfo); + checkStandbyRMFunctionality(); + rm.stop(); + } + public void innerTestHAWithRMHostName(boolean includeBindHost) { //this is run two times, with and without a bind host configured if (includeBindHost) {