From 89022f8d4bac0e9d0b848fd91e9c4d700fe1cdbe Mon Sep 17 00:00:00 2001 From: Xuan Date: Thu, 7 Jan 2016 14:33:06 -0800 Subject: [PATCH] YARN-4438. Implement RM leader election with curator. Contributed by Jian He --- hadoop-yarn-project/CHANGES.txt | 2 + .../hadoop/yarn/conf/YarnConfiguration.java | 5 + .../conf/TestYarnConfigurationFields.java | 1 + .../server/resourcemanager/AdminService.java | 49 +++- .../resourcemanager/LeaderElectorService.java | 144 ++++++++++ .../server/resourcemanager/RMContext.java | 4 + .../server/resourcemanager/RMContextImpl.java | 11 + .../resourcemanager/ResourceManager.java | 24 +- .../TestLeaderElectorService.java | 269 ++++++++++++++++++ .../yarn/server/resourcemanager/TestRMHA.java | 8 +- 10 files changed, 496 insertions(+), 21 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/LeaderElectorService.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestLeaderElectorService.java diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 5273614ccd..00d31d8d19 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -86,6 +86,8 @@ Release 2.9.0 - UNRELEASED YARN-4524. Cleanup AppSchedulingInfo. (Karthik Kambatla via wangda) + YARN-4438. Implement RM leader election with curator. (Jian He via xgong) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 9a1eb5453c..37c81eca89 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -545,6 +545,11 @@ private static void addDeprecatedKeys() { public static final String RM_HA_FC_ELECTOR_ZK_RETRIES_KEY = RM_HA_PREFIX + "failover-controller.active-standby-elector.zk.retries"; + @Private + public static final String CURATOR_LEADER_ELECTOR = + RM_HA_PREFIX + "curator-leader-elector.enabled"; + public static final boolean DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED = false; + //////////////////////////////// // RM state store configs //////////////////////////////// diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java index 41c3d87146..0e508ed2e3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java @@ -90,6 +90,7 @@ public void initializeMemberVariables() { .add(YarnConfiguration.DEFAULT_SHARED_CACHE_CHECKSUM_ALGO_IMPL); configurationPropsToSkipCompare .add(YarnConfiguration.DEFAULT_AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE); + configurationPropsToSkipCompare.add(YarnConfiguration.CURATOR_LEADER_ELECTOR); // Ignore all YARN Application Timeline Service (version 1) properties configurationPrefixToSkipCompare.add("yarn.timeline-service."); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java index 353e72d626..fcce722419 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java @@ -106,6 +106,7 @@ public class AdminService extends CompositeService implements private String rmId; private boolean autoFailoverEnabled; + private boolean curatorEnabled; private EmbeddedElectorService embeddedElector; private Server server; @@ -132,13 +133,16 @@ public AdminService(ResourceManager rm, RMContext rmContext) { @Override public void serviceInit(Configuration conf) throws Exception { if (rmContext.isHAEnabled()) { + curatorEnabled = conf.getBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR, + YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED); autoFailoverEnabled = HAUtil.isAutomaticFailoverEnabled(conf); - if (autoFailoverEnabled) { + if (autoFailoverEnabled && !curatorEnabled) { if (HAUtil.isAutomaticFailoverEmbedded(conf)) { embeddedElector = createEmbeddedElectorService(); addIfService(embeddedElector); } } + } masterServiceBindAddress = conf.getSocketAddr( @@ -319,7 +323,7 @@ public synchronized void transitionToActive( rm.transitionToActive(); } catch (Exception e) { RMAuditLogger.logFailure(user.getShortUserName(), "transitionToActive", - "", "RMHAProtocolService", + "", "RM", "Exception transitioning to active"); throw new ServiceFailedException( "Error when transitioning to Active mode", e); @@ -338,7 +342,7 @@ public synchronized void transitionToActive( "Error on refreshAll during transistion to Active", e); } RMAuditLogger.logSuccess(user.getShortUserName(), "transitionToActive", - "RMHAProtocolService"); + "RM"); } @Override @@ -356,10 +360,10 @@ public synchronized void transitionToStandby( try { rm.transitionToStandby(true); RMAuditLogger.logSuccess(user.getShortUserName(), - "transitionToStandby", "RMHAProtocolService"); + "transitionToStandby", "RM"); } catch (Exception e) { RMAuditLogger.logFailure(user.getShortUserName(), "transitionToStandby", - "", "RMHAProtocolService", + "", "RM", "Exception transitioning to standby"); throw new ServiceFailedException( "Error when transitioning to Standby mode", e); @@ -369,15 +373,28 @@ public synchronized void transitionToStandby( @Override public synchronized HAServiceStatus getServiceStatus() throws IOException { checkAccess("getServiceState"); - HAServiceState haState = rmContext.getHAServiceState(); - HAServiceStatus ret = new HAServiceStatus(haState); - if (isRMActive() || haState == HAServiceProtocol.HAServiceState.STANDBY) { - ret.setReadyToBecomeActive(); + if (curatorEnabled) { + HAServiceStatus state; + if (rmContext.getLeaderElectorService().hasLeaderShip()) { + state = new HAServiceStatus(HAServiceState.ACTIVE); + } else { + state = new HAServiceStatus(HAServiceState.STANDBY); + } + // set empty string to avoid NPE at + // HAServiceProtocolServerSideTranslatorPB#getServiceStatus + state.setNotReadyToBecomeActive(""); + return state; } else { - ret.setNotReadyToBecomeActive("State is " + haState); + HAServiceState haState = rmContext.getHAServiceState(); + HAServiceStatus ret = new HAServiceStatus(haState); + if (isRMActive() || haState == HAServiceProtocol.HAServiceState.STANDBY) { + ret.setReadyToBecomeActive(); + } else { + ret.setNotReadyToBecomeActive("State is " + haState); + } + return ret; } - return ret; - } + } @Override public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request) @@ -837,6 +854,12 @@ public String getHAZookeeperConnectionState() { } else if (!autoFailoverEnabled) { return "Auto Failover is not enabled."; } - return this.embeddedElector.getHAZookeeperConnectionState(); + if (curatorEnabled) { + return "Connected to zookeeper : " + rmContext + .getLeaderElectorService().getCuratorClient().getZookeeperClient() + .isConnected(); + } else { + return this.embeddedElector.getHAZookeeperConnectionState(); + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/LeaderElectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/LeaderElectorService.java new file mode 100644 index 0000000000..3766676f81 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/LeaderElectorService.java @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.recipes.leader.LeaderLatch; +import org.apache.curator.framework.recipes.leader.LeaderLatchListener; +import org.apache.curator.retry.RetryNTimes; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.conf.HAUtil; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +import java.io.IOException; + + +public class LeaderElectorService extends AbstractService implements + LeaderLatchListener { + public static final Log LOG = LogFactory.getLog(LeaderElectorService.class); + private LeaderLatch leaderLatch; + private CuratorFramework curator; + private RMContext rmContext; + private String latchPath; + private String rmId; + + public LeaderElectorService(RMContext rmContext) { + super(LeaderElectorService.class.getName()); + this.rmContext = rmContext; + + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + String zkHostPort = conf.get(YarnConfiguration.RM_ZK_ADDRESS); + Preconditions.checkNotNull(zkHostPort, + YarnConfiguration.RM_ZK_ADDRESS + " is not set"); + + rmId = HAUtil.getRMHAId(conf); + String clusterId = YarnConfiguration.getClusterId(conf); + + int zkSessionTimeout = conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, + YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS); + int maxRetryNum = conf.getInt(YarnConfiguration.RM_ZK_NUM_RETRIES, + YarnConfiguration.DEFAULT_ZK_RM_NUM_RETRIES); + + String zkBasePath = conf.get( + YarnConfiguration.AUTO_FAILOVER_ZK_BASE_PATH, + YarnConfiguration.DEFAULT_AUTO_FAILOVER_ZK_BASE_PATH); + latchPath = zkBasePath + "/" + clusterId; + + curator = CuratorFrameworkFactory.builder().connectString(zkHostPort) + .retryPolicy(new RetryNTimes(maxRetryNum, zkSessionTimeout)).build(); + curator.start(); + initAndStartLeaderLatch(); + super.serviceInit(conf); + } + + private void initAndStartLeaderLatch() throws Exception { + leaderLatch = new LeaderLatch(curator, latchPath, rmId); + leaderLatch.addListener(this); + leaderLatch.start(); + } + + @Override + protected void serviceStop() throws Exception { + closeLeaderLatch(); + super.serviceStop(); + } + + public boolean hasLeaderShip() { + return leaderLatch.hasLeadership(); + } + + + @Override + public void isLeader() { + LOG.info(rmId + "is elected leader, transitioning to active"); + try { + rmContext.getRMAdminService().transitionToActive( + new HAServiceProtocol.StateChangeRequestInfo( + HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC)); + } catch (Exception e) { + LOG.info(rmId + " failed to transition to active, giving up leadership", + e); + notLeader(); + reJoinElection(); + } + } + + public void reJoinElection() { + try { + closeLeaderLatch(); + Thread.sleep(1000); + initAndStartLeaderLatch(); + } catch (Exception e) { + LOG.info("Fail to re-join election.", e); + } + } + + private void closeLeaderLatch() throws IOException { + if (leaderLatch != null) { + leaderLatch.close(); + } + } + @Override + public void notLeader() { + LOG.info(rmId + " relinquish leadership"); + try { + rmContext.getRMAdminService().transitionToStandby( + new HAServiceProtocol.StateChangeRequestInfo( + HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC)); + } catch (Exception e) { + LOG.info(rmId + " did not transition to standby successfully."); + } + } + + // only for testing + @VisibleForTesting + public CuratorFramework getCuratorClient() { + return this.curator; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index 9802a3796b..f50da3bfd7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -135,4 +135,8 @@ void setRMDelegatedNodeLabelsUpdater( PlacementManager getQueuePlacementManager(); void setQueuePlacementManager(PlacementManager placementMgr); + + void setLeaderElectorService(LeaderElectorService elector); + + LeaderElectorService getLeaderElectorService(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index ed9942bce2..ec2aeb7f0e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -72,6 +72,7 @@ public class RMContextImpl implements RMContext { private RMApplicationHistoryWriter rmApplicationHistoryWriter; private SystemMetricsPublisher systemMetricsPublisher; + private LeaderElectorService elector; /** * Default constructor. To be used in conjunction with setter methods for @@ -133,6 +134,16 @@ public Dispatcher getDispatcher() { return this.rmDispatcher; } + @Override + public void setLeaderElectorService(LeaderElectorService elector) { + this.elector = elector; + } + + @Override + public LeaderElectorService getLeaderElectorService() { + return this.elector; + } + @Override public RMStateStore getStateStore() { return activeServiceContext.getStateStore(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index aada69f37e..3b23ad8292 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -157,6 +157,7 @@ public class ResourceManager extends CompositeService implements Recoverable { private AppReportFetcher fetcher = null; protected ResourceTrackerService resourceTracker; private JvmPauseMonitor pauseMonitor; + private boolean curatorEnabled = false; @VisibleForTesting protected String webAppAddress; @@ -228,6 +229,13 @@ protected void serviceInit(Configuration conf) throws Exception { this.rmContext.setHAEnabled(HAUtil.isHAEnabled(this.conf)); if (this.rmContext.isHAEnabled()) { HAUtil.verifyAndSetConfiguration(this.conf); + curatorEnabled = conf.getBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR, + YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED); + if (curatorEnabled) { + LeaderElectorService elector = new LeaderElectorService(rmContext); + addService(elector); + rmContext.setLeaderElectorService(elector); + } } // Set UGI and do login @@ -759,7 +767,11 @@ public void handleTransitionToStandBy() { // Transition to standby and reinit active services LOG.info("Transitioning RM to Standby mode"); transitionToStandby(true); - adminService.resetLeaderElection(); + if (curatorEnabled) { + rmContext.getLeaderElectorService().reJoinElection(); + } else { + adminService.resetLeaderElection(); + } return; } catch (Exception e) { LOG.fatal("Failed to transition RM to Standby mode."); @@ -996,7 +1008,7 @@ protected void startWepApp() { * instance of {@link RMActiveServices} and initializes it. * @throws Exception */ - protected void createAndInitActiveServices() throws Exception { + protected void createAndInitActiveServices() { activeServices = new RMActiveServices(this); activeServices.init(conf); } @@ -1016,14 +1028,14 @@ void startActiveServices() throws Exception { * Helper method to stop {@link #activeServices}. * @throws Exception */ - void stopActiveServices() throws Exception { + void stopActiveServices() { if (activeServices != null) { activeServices.stop(); activeServices = null; } } - void reinitialize(boolean initialize) throws Exception { + void reinitialize(boolean initialize) { ClusterMetrics.destroy(); QueueMetrics.clearQueueMetrics(); if (initialize) { @@ -1042,7 +1054,6 @@ synchronized void transitionToActive() throws Exception { LOG.info("Already in active state"); return; } - LOG.info("Transitioning to active state"); this.rmLoginUGI.doAs(new PrivilegedExceptionAction() { @@ -1083,7 +1094,7 @@ synchronized void transitionToStandby(boolean initialize) @Override protected void serviceStart() throws Exception { if (this.rmContext.isHAEnabled()) { - transitionToStandby(true); + transitionToStandby(false); } else { transitionToActive(); } @@ -1338,4 +1349,5 @@ private static void printUsage(PrintStream out) { out.println(" " + "[-remove-application-from-state-store ]" + "\n"); } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestLeaderElectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestLeaderElectorService.java new file mode 100644 index 0000000000..bb10041133 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestLeaderElectorService.java @@ -0,0 +1,269 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import com.google.common.base.Supplier; +import org.apache.curator.CuratorZookeeperClient; +import org.apache.curator.test.InstanceSpec; +import org.apache.curator.test.KillSession; +import org.apache.curator.test.TestingCluster; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.HAUtil; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.zookeeper.ZooKeeper; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collection; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; + +public class TestLeaderElectorService { + + private static final String RM1_ADDRESS = "1.1.1.1:1"; + private static final String RM1_NODE_ID = "rm1"; + + private static final String RM2_ADDRESS = "0.0.0.0:0"; + private static final String RM2_NODE_ID = "rm2"; + + Configuration conf ; + MockRM rm1; + MockRM rm2; + TestingCluster zkCluster; + @Before + public void setUp() throws Exception { + Logger rootLogger = LogManager.getRootLogger(); + rootLogger.setLevel(Level.INFO); + conf = new Configuration(); + conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true); + conf.setBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR, true); + conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, true); + + conf.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1"); + conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM2_NODE_ID); + + for (String confKey : YarnConfiguration.getServiceAddressConfKeys(conf)) { + conf.set(HAUtil.addSuffix(confKey, RM1_NODE_ID), RM1_ADDRESS); + conf.set(HAUtil.addSuffix(confKey, RM2_NODE_ID), RM2_ADDRESS); + } + zkCluster = new TestingCluster(3); + conf.set(YarnConfiguration.RM_ZK_ADDRESS, zkCluster.getConnectString()); + zkCluster.start(); + } + + @After + public void tearDown() throws Exception { + if (rm1 != null) { + rm1.stop(); + } + if (rm2 !=null) { + rm2.stop(); + } + } + + // 1. rm1 active + // 2. rm2 standby + // 3. stop rm1 + // 4. rm2 become active + @Test (timeout = 20000) + public void testRMShutDownCauseFailover() throws Exception { + rm1 = startRM("rm1", HAServiceState.ACTIVE); + rm2 = startRM("rm2", HAServiceState.STANDBY); + + // wait for some time to make sure rm2 will not become active; + Thread.sleep(5000); + waitFor(rm2, HAServiceState.STANDBY); + + rm1.stop(); + // rm2 should become active; + waitFor(rm2, HAServiceState.ACTIVE); + } + + // 1. rm1 active + // 2. rm2 standby + // 3. submit a job to rm1 which triggers state-store failure. + // 4. rm2 become + @Test + public void testStateStoreFailureCauseFailover() throws Exception { + + conf.set(YarnConfiguration.RM_HA_ID, "rm1"); + MemoryRMStateStore memStore = new MemoryRMStateStore() { + @Override + public synchronized void storeApplicationStateInternal(ApplicationId + appId, ApplicationStateData appState) throws Exception{ + throw new Exception("store app failure."); + } + }; + memStore.init(conf); + rm1 = new MockRM(conf, memStore); + rm1.init(conf); + rm1.start(); + + waitFor(rm1, HAServiceState.ACTIVE); + + rm2 = startRM("rm2", HAServiceState.STANDBY); + + // submit an app which will trigger state-store failure. + rm1.submitApp(200, "app1", "user1", null, "default", false); + waitFor(rm1, HAServiceState.STANDBY); + + // rm2 should become active; + waitFor(rm2, HAServiceState.ACTIVE); + + rm2.stop(); + // rm1 will become active again + waitFor(rm1, HAServiceState.ACTIVE); + } + + // 1. rm1 active + // 2. restart zk cluster + // 3. rm1 will first relinquish leadership and re-acquire leadership + @Test + public void testZKClusterDown() throws Exception { + rm1 = startRM("rm1", HAServiceState.ACTIVE); + + // stop zk cluster + zkCluster.stop(); + waitFor(rm1, HAServiceState.STANDBY); + + Collection instanceSpecs = zkCluster.getInstances(); + zkCluster = new TestingCluster(instanceSpecs); + zkCluster.start(); + // rm becomes active again + waitFor(rm1, HAServiceState.ACTIVE); + } + + // 1. rm1 active + // 2. kill the zk session between the rm and zk cluster. + // 3. rm1 will first relinquish leadership and re-acquire leadership + @Test + public void testExpireCurrentZKSession() throws Exception{ + + rm1 = startRM("rm1", HAServiceState.ACTIVE); + + LeaderElectorService service = rm1.getRMContext().getLeaderElectorService(); + CuratorZookeeperClient client = + service.getCuratorClient().getZookeeperClient(); + // this will expire current curator client session. curator will re-establish + // the session. RM will first relinquish leadership and re-acquire leadership + KillSession + .kill(client.getZooKeeper(), client.getCurrentConnectionString()); + + waitFor(rm1, HAServiceState.ACTIVE); + } + + // 1. rm1 fail to become active. + // 2. rm1 will rejoin leader election and retry the leadership + @Test + public void testRMFailToTransitionToActive() throws Exception{ + conf.set(YarnConfiguration.RM_HA_ID, "rm1"); + final AtomicBoolean throwException = new AtomicBoolean(true); + Thread launchRM = new Thread() { + @Override + public void run() { + rm1 = new MockRM(conf) { + @Override + synchronized void transitionToActive() throws Exception { + if (throwException.get()) { + throw new Exception("Fail to transition to active"); + } else { + super.transitionToActive(); + } + } + }; + rm1.init(conf); + rm1.start(); + } + }; + launchRM.start(); + // wait some time, rm will keep retry the leadership; + Thread.sleep(5000); + throwException.set(false); + waitFor(rm1, HAServiceState.ACTIVE); + } + + // 1. rm1 active + // 2. rm2 standby + // 3. kill the current connected zk instance + // 4. either rm1 or rm2 will become active. + @Test + public void testKillZKInstance() throws Exception { + rm1 = startRM("rm1", HAServiceState.ACTIVE); + rm2 = startRM("rm2", HAServiceState.STANDBY); + + ZooKeeper zkClient = + rm1.getRMContext().getLeaderElectorService().getCuratorClient() + .getZookeeperClient().getZooKeeper(); + InstanceSpec connectionInstance = zkCluster.findConnectionInstance(zkClient); + zkCluster.killServer(connectionInstance); + + // wait for rm1 or rm2 to be active by randomness + GenericTestUtils.waitFor(new Supplier() { + @Override public Boolean get() { + try { + HAServiceState rm1State = + rm1.getAdminService().getServiceStatus().getState(); + HAServiceState rm2State = + rm2.getAdminService().getServiceStatus().getState(); + return (rm1State.equals(HAServiceState.ACTIVE) && rm2State + .equals(HAServiceState.STANDBY)) || ( + rm1State.equals(HAServiceState.STANDBY) && rm2State + .equals(HAServiceState.ACTIVE)); + } catch (IOException e) { + } + return false; + } + }, 2000, 15000); + } + + private MockRM startRM(String rmId, HAServiceState state) throws Exception{ + YarnConfiguration yarnConf = new YarnConfiguration(conf); + yarnConf.set(YarnConfiguration.RM_HA_ID, rmId); + MockRM rm = new MockRM(yarnConf); + rm.init(yarnConf); + rm.start(); + waitFor(rm, state); + return rm; + } + + private void waitFor(final MockRM rm, + final HAServiceState state) + throws TimeoutException, InterruptedException { + GenericTestUtils.waitFor(new Supplier() { + @Override public Boolean get() { + try { + return rm.getAdminService().getServiceStatus().getState() + .equals(state); + } catch (IOException e) { + } + return false; + } + }, 2000, 15000); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java index 62cfe848f2..70bba1545b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java @@ -471,8 +471,12 @@ public synchronized void updateApplicationState( memStore.init(conf); rm = new MockRM(conf, memStore) { @Override - void stopActiveServices() throws Exception { - Thread.sleep(10000); + void stopActiveServices() { + try { + Thread.sleep(10000); + } catch (Exception e) { + throw new RuntimeException (e); + } super.stopActiveServices(); } };