YARN-5709. Cleanup leader election configs and pluggability. Contribtued by Karthik Kambatla

This commit is contained in:
Jian He 2016-12-09 16:38:49 -08:00
parent b0aace21b1
commit a6410a542e
15 changed files with 246 additions and 172 deletions

View File

@ -29,6 +29,7 @@
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.ActiveStandbyElector;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.StringUtils;
@ -654,9 +655,20 @@ public static boolean isAclEnabled(Configuration conf) {
public static final String RM_HA_FC_ELECTOR_ZK_RETRIES_KEY = RM_HA_PREFIX
+ "failover-controller.active-standby-elector.zk.retries";
@Private
/**
* Whether to use curator-based elector for leader election.
*
* @deprecated Eventually, we want to default to the curator-based
* implementation and remove the {@link ActiveStandbyElector} based
* implementation. We should remove this config then.
*/
@Unstable
@Deprecated
public static final String CURATOR_LEADER_ELECTOR =
RM_HA_PREFIX + "curator-leader-elector.enabled";
@Private
@Unstable
public static final boolean DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED = false;
////////////////////////////////

View File

@ -43,12 +43,16 @@
import java.util.Timer;
import java.util.TimerTask;
/**
* Leader election implementation that uses {@link ActiveStandbyElector}.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class EmbeddedElectorService extends AbstractService
implements ActiveStandbyElector.ActiveStandbyElectorCallback {
private static final Log LOG =
LogFactory.getLog(EmbeddedElectorService.class.getName());
public class ActiveStandbyElectorBasedElectorService extends AbstractService
implements EmbeddedElector,
ActiveStandbyElector.ActiveStandbyElectorCallback {
private static final Log LOG = LogFactory.getLog(
ActiveStandbyElectorBasedElectorService.class.getName());
private static final HAServiceProtocol.StateChangeRequestInfo req =
new HAServiceProtocol.StateChangeRequestInfo(
HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC);
@ -62,19 +66,21 @@ public class EmbeddedElectorService extends AbstractService
@VisibleForTesting
final Object zkDisconnectLock = new Object();
EmbeddedElectorService(RMContext rmContext) {
super(EmbeddedElectorService.class.getName());
ActiveStandbyElectorBasedElectorService(RMContext rmContext) {
super(ActiveStandbyElectorBasedElectorService.class.getName());
this.rmContext = rmContext;
}
@Override
protected void serviceInit(Configuration conf)
throws Exception {
conf = conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf);
conf = conf instanceof YarnConfiguration
? conf
: new YarnConfiguration(conf);
String zkQuorum = conf.get(YarnConfiguration.RM_ZK_ADDRESS);
if (zkQuorum == null) {
throw new YarnRuntimeException("Embedded automatic failover " +
throw new YarnRuntimeException("Embedded automatic failover " +
"is enabled, but " + YarnConfiguration.RM_ZK_ADDRESS +
" is not set");
}
@ -199,7 +205,8 @@ public void run() {
@Override
public void notifyFatalError(String errorMessage) {
rmContext.getDispatcher().getEventHandler().handle(
new RMFatalEvent(RMFatalEventType.EMBEDDED_ELECTOR_FAILED, errorMessage));
new RMFatalEvent(RMFatalEventType.EMBEDDED_ELECTOR_FAILED,
errorMessage));
}
@Override
@ -249,12 +256,16 @@ private boolean isParentZnodeSafe(String clusterId)
return true;
}
public void resetLeaderElection() {
// EmbeddedElector methods
@Override
public void rejoinElection() {
elector.quitElection(false);
elector.joinElection(localActiveNodeInfo);
}
public String getHAZookeeperConnectionState() {
@Override
public String getZookeeperConnectionState() {
return elector.getHAZookeeperConnectionState();
}
}

View File

@ -29,7 +29,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.ha.HAServiceProtocol;
@ -108,8 +107,6 @@ public class AdminService extends CompositeService implements
private String rmId;
private boolean autoFailoverEnabled;
private boolean curatorEnabled;
private EmbeddedElectorService embeddedElector;
private Server server;
@ -134,18 +131,8 @@ public AdminService(ResourceManager rm, RMContext rmContext) {
@Override
public void serviceInit(Configuration conf) throws Exception {
if (rmContext.isHAEnabled()) {
curatorEnabled = conf.getBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR,
YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED);
autoFailoverEnabled = HAUtil.isAutomaticFailoverEnabled(conf);
if (autoFailoverEnabled && !curatorEnabled) {
if (HAUtil.isAutomaticFailoverEmbedded(conf)) {
embeddedElector = createEmbeddedElectorService();
addIfService(embeddedElector);
}
}
}
autoFailoverEnabled =
rmContext.isHAEnabled() && HAUtil.isAutomaticFailoverEnabled(conf);
masterServiceBindAddress = conf.getSocketAddr(
YarnConfiguration.RM_BIND_HOST,
@ -228,17 +215,6 @@ protected void stopServer() throws Exception {
}
}
protected EmbeddedElectorService createEmbeddedElectorService() {
return new EmbeddedElectorService(rmContext);
}
@InterfaceAudience.Private
void resetLeaderElection() {
if (embeddedElector != null) {
embeddedElector.resetLeaderElection();
}
}
private UserGroupInformation checkAccess(String method) throws IOException {
return RMServerUtils.verifyAdminAccess(authorizer, method, LOG);
}
@ -375,30 +351,24 @@ public synchronized void transitionToStandby(
}
}
/**
* Return the HA status of this RM. This includes the current state and
* whether the RM is ready to become active.
*
* @return {@link HAServiceStatus} of the current RM
* @throws IOException if the caller does not have permissions
*/
@Override
public synchronized HAServiceStatus getServiceStatus() throws IOException {
checkAccess("getServiceState");
if (curatorEnabled) {
HAServiceStatus state;
if (rmContext.getLeaderElectorService().hasLeaderShip()) {
state = new HAServiceStatus(HAServiceState.ACTIVE);
} else {
state = new HAServiceStatus(HAServiceState.STANDBY);
}
// set empty string to avoid NPE at
// HAServiceProtocolServerSideTranslatorPB#getServiceStatus
state.setNotReadyToBecomeActive("");
return state;
HAServiceState haState = rmContext.getHAServiceState();
HAServiceStatus ret = new HAServiceStatus(haState);
if (isRMActive() || haState == HAServiceProtocol.HAServiceState.STANDBY) {
ret.setReadyToBecomeActive();
} else {
HAServiceState haState = rmContext.getHAServiceState();
HAServiceStatus ret = new HAServiceStatus(haState);
if (isRMActive() || haState == HAServiceProtocol.HAServiceState.STANDBY) {
ret.setReadyToBecomeActive();
} else {
ret.setNotReadyToBecomeActive("State is " + haState);
}
return ret;
ret.setNotReadyToBecomeActive("State is " + haState);
}
return ret;
}
@Override
@ -926,19 +896,4 @@ private void refreshClusterMaxPriority() throws IOException, YarnException {
rmContext.getScheduler().setClusterMaxPriority(conf);
}
public String getHAZookeeperConnectionState() {
if (!rmContext.isHAEnabled()) {
return "ResourceManager HA is not enabled.";
} else if (!autoFailoverEnabled) {
return "Auto Failover is not enabled.";
}
if (curatorEnabled) {
return "Connected to zookeeper : " + rmContext
.getLeaderElectorService().getCuratorClient().getZookeeperClient()
.isConnected();
} else {
return this.embeddedElector.getHAZookeeperConnectionState();
}
}
}

View File

@ -24,6 +24,8 @@
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.service.AbstractService;
@ -32,10 +34,15 @@
import java.io.IOException;
public class LeaderElectorService extends AbstractService implements
LeaderLatchListener {
public static final Log LOG = LogFactory.getLog(LeaderElectorService.class);
/**
* Leader election implementation that uses Curator.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class CuratorBasedElectorService extends AbstractService
implements EmbeddedElector, LeaderLatchListener {
public static final Log LOG =
LogFactory.getLog(CuratorBasedElectorService.class);
private LeaderLatch leaderLatch;
private CuratorFramework curator;
private RMContext rmContext;
@ -43,8 +50,8 @@ public class LeaderElectorService extends AbstractService implements
private String rmId;
private ResourceManager rm;
public LeaderElectorService(RMContext rmContext, ResourceManager rm) {
super(LeaderElectorService.class.getName());
public CuratorBasedElectorService(RMContext rmContext, ResourceManager rm) {
super(CuratorBasedElectorService.class.getName());
this.rmContext = rmContext;
this.rm = rm;
}
@ -74,10 +81,22 @@ protected void serviceStop() throws Exception {
super.serviceStop();
}
public boolean hasLeaderShip() {
return leaderLatch.hasLeadership();
@Override
public void rejoinElection() {
try {
closeLeaderLatch();
Thread.sleep(1000);
initAndStartLeaderLatch();
} catch (Exception e) {
LOG.info("Fail to re-join election.", e);
}
}
@Override
public String getZookeeperConnectionState() {
return "Connected to zookeeper : " +
curator.getZookeeperClient().isConnected();
}
@Override
public void isLeader() {
@ -90,17 +109,7 @@ public void isLeader() {
LOG.info(rmId + " failed to transition to active, giving up leadership",
e);
notLeader();
reJoinElection();
}
}
public void reJoinElection() {
try {
closeLeaderLatch();
Thread.sleep(1000);
initAndStartLeaderLatch();
} catch (Exception e) {
LOG.info("Fail to re-join election.", e);
rejoinElection();
}
}
@ -109,6 +118,7 @@ private void closeLeaderLatch() throws IOException {
leaderLatch.close();
}
}
@Override
public void notLeader() {
LOG.info(rmId + " relinquish leadership");

View File

@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.service.Service;
/**
* Interface that all embedded leader electors must implement.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public interface EmbeddedElector extends Service{
/**
* Leave and rejoin leader election.
*/
void rejoinElection();
/**
* Get information about the elector's connection to Zookeeper.
*
* @return zookeeper connection state
*/
String getZookeeperConnectionState();
}

View File

@ -145,13 +145,15 @@ void setRMDelegatedNodeLabelsUpdater(
void setQueuePlacementManager(PlacementManager placementMgr);
void setLeaderElectorService(LeaderElectorService elector);
void setLeaderElectorService(EmbeddedElector elector);
LeaderElectorService getLeaderElectorService();
EmbeddedElector getLeaderElectorService();
QueueLimitCalculator getNodeManagerQueueLimitCalculator();
void setRMAppLifetimeMonitor(RMAppLifetimeMonitor rmAppLifetimeMonitor);
RMAppLifetimeMonitor getRMAppLifetimeMonitor();
String getHAZookeeperConnectionState();
}

View File

@ -76,7 +76,7 @@ public class RMContextImpl implements RMContext {
private RMApplicationHistoryWriter rmApplicationHistoryWriter;
private SystemMetricsPublisher systemMetricsPublisher;
private LeaderElectorService elector;
private EmbeddedElector elector;
private QueueLimitCalculator queueLimitCalculator;
@ -143,12 +143,12 @@ public Dispatcher getDispatcher() {
}
@Override
public void setLeaderElectorService(LeaderElectorService elector) {
public void setLeaderElectorService(EmbeddedElector elector) {
this.elector = elector;
}
@Override
public LeaderElectorService getLeaderElectorService() {
public EmbeddedElector getLeaderElectorService() {
return this.elector;
}
@ -513,4 +513,13 @@ public void setRMAppLifetimeMonitor(
public RMAppLifetimeMonitor getRMAppLifetimeMonitor() {
return this.activeServiceContext.getRMAppLifetimeMonitor();
}
public String getHAZookeeperConnectionState() {
if (elector == null) {
return "Could not find leader elector. Verify both HA and automatic " +
"failover are enabled.";
} else {
return elector.getZookeeperConnectionState();
}
}
}

View File

@ -272,16 +272,17 @@ protected void serviceInit(Configuration conf) throws Exception {
this.rmContext.setHAEnabled(HAUtil.isHAEnabled(this.conf));
if (this.rmContext.isHAEnabled()) {
HAUtil.verifyAndSetConfiguration(this.conf);
curatorEnabled = conf.getBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR,
YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED);
if (curatorEnabled) {
this.curator = createAndStartCurator(conf);
LeaderElectorService elector = new LeaderElectorService(rmContext, this);
addService(elector);
// If the RM is configured to use an embedded leader elector,
// initialize the leader elector.
if (HAUtil.isAutomaticFailoverEnabled(conf) &&
HAUtil.isAutomaticFailoverEmbedded(conf)) {
EmbeddedElector elector = createEmbeddedElector();
addIfService(elector);
rmContext.setLeaderElectorService(elector);
}
}
// Set UGI and do login
// If security is enabled, use login user
// If security is not enabled, use current user
@ -331,6 +332,20 @@ protected void serviceInit(Configuration conf) throws Exception {
super.serviceInit(this.conf);
}
protected EmbeddedElector createEmbeddedElector() throws IOException {
EmbeddedElector elector;
curatorEnabled =
conf.getBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR,
YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED);
if (curatorEnabled) {
this.curator = createAndStartCurator(conf);
elector = new CuratorBasedElectorService(rmContext, this);
} else {
elector = new ActiveStandbyElectorBasedElectorService(rmContext);
}
return elector;
}
public CuratorFramework createAndStartCurator(Configuration conf)
throws IOException {
String zkHostPort = conf.get(YarnConfiguration.RM_ZK_ADDRESS);
@ -802,14 +817,12 @@ public void handleTransitionToStandBy() {
// Transition to standby and reinit active services
LOG.info("Transitioning RM to Standby mode");
transitionToStandby(true);
if (curatorEnabled) {
rmContext.getLeaderElectorService().reJoinElection();
} else {
adminService.resetLeaderElection();
EmbeddedElector elector = rmContext.getLeaderElectorService();
if (elector != null) {
elector.rejoinElection();
}
return;
} catch (Exception e) {
LOG.fatal("Failed to transition RM to Standby mode.");
LOG.fatal("Failed to transition RM to Standby mode.", e);
ExitUtil.terminate(1, e);
}
}

View File

@ -121,8 +121,7 @@ private String buildRedirectPath() {
}
public String getHAZookeeperConnectionState() {
return rm.getRMContext().getRMAdminService()
.getHAZookeeperConnectionState();
return getRMContext().getHAZookeeperConnectionState();
}
public RMContext getRMContext() {

View File

@ -64,7 +64,7 @@ public ClusterInfo(ResourceManager rm) {
this.hadoopBuildVersion = VersionInfo.getBuildVersion();
this.hadoopVersionBuiltOn = VersionInfo.getDate();
this.haZooKeeperConnectionState =
rm.getRMContext().getRMAdminService().getHAZookeeperConnectionState();
rm.getRMContext().getHAZookeeperConnectionState();
}
public String getState() {

View File

@ -109,6 +109,7 @@
import org.apache.log4j.Logger;
import org.junit.Assert;
@SuppressWarnings("unchecked")
public class MockRM extends ResourceManager {
@ -123,6 +124,8 @@ public class MockRM extends ResourceManager {
private final boolean useNullRMNodeLabelsManager;
private boolean disableDrainEventsImplicitly;
private boolean useRealElector = false;
public MockRM() {
this(new YarnConfiguration());
}
@ -132,13 +135,23 @@ public MockRM(Configuration conf) {
}
public MockRM(Configuration conf, RMStateStore store) {
this(conf, store, true);
this(conf, store, true, false);
}
public MockRM(Configuration conf, boolean useRealElector) {
this(conf, null, true, useRealElector);
}
public MockRM(Configuration conf, RMStateStore store,
boolean useNullRMNodeLabelsManager) {
boolean useRealElector) {
this(conf, store, true, useRealElector);
}
public MockRM(Configuration conf, RMStateStore store,
boolean useNullRMNodeLabelsManager, boolean useRealElector) {
super();
this.useNullRMNodeLabelsManager = useNullRMNodeLabelsManager;
this.useRealElector = useRealElector;
init(conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf));
if (store != null) {
setRMStateStore(store);
@ -192,6 +205,15 @@ protected Dispatcher createDispatcher() {
return new DrainDispatcher();
}
@Override
protected EmbeddedElector createEmbeddedElector() throws IOException {
if (useRealElector) {
return super.createEmbeddedElector();
} else {
return null;
}
}
@Override
protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
return new EventHandler<SchedulerEvent>() {
@ -984,11 +1006,6 @@ protected void startServer() {
protected void stopServer() {
// don't do anything
}
@Override
protected EmbeddedElectorService createEmbeddedElectorService() {
return null;
}
};
}

View File

@ -108,13 +108,13 @@ protected MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
}
protected void startRMs() throws IOException {
rm1 = new MockRM(confForRM1, null, false){
rm1 = new MockRM(confForRM1, null, false, false){
@Override
protected Dispatcher createDispatcher() {
return new DrainDispatcher();
}
};
rm2 = new MockRM(confForRM2, null, false){
rm2 = new MockRM(confForRM2, null, false, false){
@Override
protected Dispatcher createDispatcher() {
return new DrainDispatcher();

View File

@ -63,7 +63,6 @@ public void setUp() throws Exception {
conf = new Configuration();
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
conf.setBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR, true);
conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, true);
conf.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1");
conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM2_NODE_ID);
@ -121,7 +120,7 @@ public synchronized void storeApplicationStateInternal(ApplicationId
}
};
memStore.init(conf);
rm1 = new MockRM(conf, memStore);
rm1 = new MockRM(conf, memStore, true);
rm1.init(conf);
rm1.start();
@ -167,7 +166,8 @@ public void testExpireCurrentZKSession() throws Exception{
rm1 = startRM("rm1", HAServiceState.ACTIVE);
LeaderElectorService service = rm1.getRMContext().getLeaderElectorService();
CuratorBasedElectorService service = (CuratorBasedElectorService)
rm1.getRMContext().getLeaderElectorService();
CuratorZookeeperClient client =
service.getCuratorClient().getZookeeperClient();
// this will expire current curator client session. curator will re-establish
@ -187,7 +187,7 @@ public void testRMFailToTransitionToActive() throws Exception{
Thread launchRM = new Thread() {
@Override
public void run() {
rm1 = new MockRM(conf) {
rm1 = new MockRM(conf, true) {
@Override
synchronized void transitionToActive() throws Exception {
if (throwException.get()) {
@ -217,9 +217,12 @@ public void testKillZKInstance() throws Exception {
rm1 = startRM("rm1", HAServiceState.ACTIVE);
rm2 = startRM("rm2", HAServiceState.STANDBY);
CuratorBasedElectorService service = (CuratorBasedElectorService)
rm1.getRMContext().getLeaderElectorService();
ZooKeeper zkClient =
rm1.getRMContext().getLeaderElectorService().getCuratorClient()
.getZookeeperClient().getZooKeeper();
service.getCuratorClient().getZookeeperClient().getZooKeeper();
InstanceSpec connectionInstance = zkCluster.findConnectionInstance(zkClient);
zkCluster.killServer(connectionInstance);
@ -245,7 +248,7 @@ public void testKillZKInstance() throws Exception {
private MockRM startRM(String rmId, HAServiceState state) throws Exception{
YarnConfiguration yarnConf = new YarnConfiguration(conf);
yarnConf.set(YarnConfiguration.RM_HA_ID, rmId);
MockRM rm = new MockRM(yarnConf);
MockRM rm = new MockRM(yarnConf, true);
rm.init(yarnConf);
rm.start();
waitFor(rm, state);

View File

@ -127,7 +127,8 @@ private void testCallbackSynchronization(SyncTestType type)
myConf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, 50);
when(rc.getRMAdminService()).thenReturn(as);
EmbeddedElectorService ees = new EmbeddedElectorService(rc);
ActiveStandbyElectorBasedElectorService
ees = new ActiveStandbyElectorBasedElectorService(rc);
ees.init(myConf);
ees.enterNeutralMode();
@ -164,7 +165,8 @@ private void testCallbackSynchronization(SyncTestType type)
* @throws InterruptedException if interrupted
*/
private void testCallbackSynchronizationActive(AdminService as,
EmbeddedElectorService ees) throws IOException, InterruptedException {
ActiveStandbyElectorBasedElectorService ees)
throws IOException, InterruptedException {
ees.becomeActive();
Thread.sleep(100);
@ -183,7 +185,8 @@ private void testCallbackSynchronizationActive(AdminService as,
* @throws InterruptedException if interrupted
*/
private void testCallbackSynchronizationStandby(AdminService as,
EmbeddedElectorService ees) throws IOException, InterruptedException {
ActiveStandbyElectorBasedElectorService ees)
throws IOException, InterruptedException {
ees.becomeStandby();
Thread.sleep(100);
@ -201,7 +204,8 @@ private void testCallbackSynchronizationStandby(AdminService as,
* @throws InterruptedException if interrupted
*/
private void testCallbackSynchronizationNeutral(AdminService as,
EmbeddedElectorService ees) throws IOException, InterruptedException {
ActiveStandbyElectorBasedElectorService ees)
throws IOException, InterruptedException {
ees.enterNeutralMode();
Thread.sleep(100);
@ -220,7 +224,8 @@ private void testCallbackSynchronizationNeutral(AdminService as,
* @throws InterruptedException if interrupted
*/
private void testCallbackSynchronizationTimingActive(AdminService as,
EmbeddedElectorService ees) throws IOException, InterruptedException {
ActiveStandbyElectorBasedElectorService ees)
throws IOException, InterruptedException {
synchronized (ees.zkDisconnectLock) {
// Sleep while holding the lock so that the timer thread can't do
// anything when it runs. Sleep until we're pretty sure the timer thread
@ -250,7 +255,8 @@ private void testCallbackSynchronizationTimingActive(AdminService as,
* @throws InterruptedException if interrupted
*/
private void testCallbackSynchronizationTimingStandby(AdminService as,
EmbeddedElectorService ees) throws IOException, InterruptedException {
ActiveStandbyElectorBasedElectorService ees)
throws IOException, InterruptedException {
synchronized (ees.zkDisconnectLock) {
// Sleep while holding the lock so that the timer thread can't do
// anything when it runs. Sleep until we're pretty sure the timer thread
@ -283,25 +289,20 @@ private class MockRMWithElector extends MockRM {
}
@Override
protected AdminService createAdminService() {
return new AdminService(MockRMWithElector.this, getRMContext()) {
protected EmbeddedElector createEmbeddedElector() {
return new ActiveStandbyElectorBasedElectorService(getRMContext()) {
@Override
protected EmbeddedElectorService createEmbeddedElectorService() {
return new EmbeddedElectorService(getRMContext()) {
@Override
public void becomeActive() throws
ServiceFailedException {
try {
callbackCalled.set(true);
TestRMEmbeddedElector.LOG.info("Callback called. Sleeping now");
Thread.sleep(delayMs);
TestRMEmbeddedElector.LOG.info("Sleep done");
} catch (InterruptedException e) {
e.printStackTrace();
}
super.becomeActive();
}
};
public void becomeActive() throws
ServiceFailedException {
try {
callbackCalled.set(true);
TestRMEmbeddedElector.LOG.info("Callback called. Sleeping now");
Thread.sleep(delayMs);
TestRMEmbeddedElector.LOG.info("Sleep done");
} catch (InterruptedException e) {
e.printStackTrace();
}
super.becomeActive();
}
};
}

View File

@ -161,8 +161,8 @@ private void checkActiveRMWebServices() throws JSONException {
ClientResponse response =
webResource.path("ws").path("v1").path("cluster").path("apps")
.path(path).accept(MediaType.APPLICATION_JSON)
.get(ClientResponse.class);
.path(path).accept(MediaType.APPLICATION_JSON)
.get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
response.getType().toString());
JSONObject json = response.getEntity(JSONObject.class);
@ -178,13 +178,13 @@ private void checkActiveRMWebServices() throws JSONException {
* 1. Standby: Should be a no-op
* 2. Active: Active services should start
* 3. Active: Should be a no-op.
* While active, submit a couple of jobs
* While active, submit a couple of jobs
* 4. Standby: Active services should stop
* 5. Active: Active services should start
* 6. Stop the RM: All services should stop and RM should not be ready to
* become Active
*/
@Test (timeout = 30000)
@Test(timeout = 30000)
public void testFailoverAndTransitions() throws Exception {
configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
Configuration conf = new YarnConfiguration(configuration);
@ -204,37 +204,37 @@ public void testFailoverAndTransitions() throws Exception {
checkMonitorHealth();
checkStandbyRMFunctionality();
verifyClusterMetrics(0, 0, 0, 0, 0, 0);
// 1. Transition to Standby - must be a no-op
rm.adminService.transitionToStandby(requestInfo);
checkMonitorHealth();
checkStandbyRMFunctionality();
verifyClusterMetrics(0, 0, 0, 0, 0, 0);
// 2. Transition to active
rm.adminService.transitionToActive(requestInfo);
checkMonitorHealth();
checkActiveRMFunctionality();
verifyClusterMetrics(1, 1, 1, 1, 2048, 1);
// 3. Transition to active - no-op
rm.adminService.transitionToActive(requestInfo);
checkMonitorHealth();
checkActiveRMFunctionality();
verifyClusterMetrics(1, 2, 2, 2, 2048, 2);
// 4. Transition to standby
rm.adminService.transitionToStandby(requestInfo);
checkMonitorHealth();
checkStandbyRMFunctionality();
verifyClusterMetrics(0, 0, 0, 0, 0, 0);
// 5. Transition to active to check Active->Standby->Active works
rm.adminService.transitionToActive(requestInfo);
checkMonitorHealth();
checkActiveRMFunctionality();
verifyClusterMetrics(1, 1, 1, 1, 2048, 1);
// 6. Stop the RM. All services should stop and RM should not be ready to
// become active
rm.stop();
@ -340,7 +340,7 @@ protected Dispatcher createDispatcher() {
rm.adminService.transitionToStandby(requestInfo);
rm.adminService.transitionToActive(requestInfo);
rm.adminService.transitionToStandby(requestInfo);
MyCountingDispatcher dispatcher =
(MyCountingDispatcher) rm.getRMContext().getDispatcher();
assertTrue(!dispatcher.isStopped());
@ -348,24 +348,24 @@ protected Dispatcher createDispatcher() {
rm.adminService.transitionToActive(requestInfo);
assertEquals(errorMessageForEventHandler, expectedEventHandlerCount,
((MyCountingDispatcher) rm.getRMContext().getDispatcher())
.getEventHandlerCount());
.getEventHandlerCount());
assertEquals(errorMessageForService, expectedServiceCount,
rm.getServices().size());
// Keep the dispatcher reference before transitioning to standby
dispatcher = (MyCountingDispatcher) rm.getRMContext().getDispatcher();
rm.adminService.transitionToStandby(requestInfo);
assertEquals(errorMessageForEventHandler, expectedEventHandlerCount,
((MyCountingDispatcher) rm.getRMContext().getDispatcher())
.getEventHandlerCount());
.getEventHandlerCount());
assertEquals(errorMessageForService, expectedServiceCount,
rm.getServices().size());
assertTrue(dispatcher.isStopped());
rm.stop();
}
@ -386,7 +386,8 @@ public void testHAIDLookup() {
assertEquals(conf.get(YarnConfiguration.RM_HA_ID), RM1_NODE_ID);
//test if RM_HA_ID can not be found
configuration.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID+ "," + RM3_NODE_ID);
configuration
.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM3_NODE_ID);
configuration.unset(YarnConfiguration.RM_HA_ID);
conf = new YarnConfiguration(configuration);
try {
@ -458,7 +459,7 @@ public synchronized void startInternal() throws Exception {
checkActiveRMFunctionality();
}
@Test(timeout = 90000)
@Test
public void testTransitionedToStandbyShouldNotHang() throws Exception {
configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
Configuration conf = new YarnConfiguration(configuration);