From 797159bbd4f80c92d8bfe6979b4dd91ce51d0afc Mon Sep 17 00:00:00 2001 From: Bikas Saha Date: Thu, 14 Nov 2013 17:53:25 +0000 Subject: [PATCH] YARN-1222. Make improvements in ZKRMStateStore for fencing (Karthik Kambatla via bikas) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1541995 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/hadoop/util/ZKUtil.java | 14 + .../org/apache/hadoop/util/TestZKUtil.java | 9 +- hadoop-yarn-project/CHANGES.txt | 3 + .../dev-support/findbugs-exclude.xml | 6 + .../org/apache/hadoop/yarn/conf/HAUtil.java | 8 +- .../hadoop/yarn/conf/YarnConfiguration.java | 2 + .../src/main/resources/yarn-default.xml | 31 +- .../resourcemanager/RMHAProtocolService.java | 10 +- .../resourcemanager/ResourceManager.java | 53 ++++ .../recovery/RMStateStore.java | 89 +++--- .../RMStateStoreOperationFailedEvent.java | 36 +++ .../RMStateStoreOperationFailedEventType.java | 23 ++ .../recovery/StoreFencedException.java | 28 ++ .../recovery/ZKRMStateStore.java | 279 ++++++++++++++---- .../recovery/TestZKRMStateStore.java | 96 +++++- 15 files changed, 589 insertions(+), 98 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreOperationFailedEvent.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreOperationFailedEventType.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/StoreFencedException.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ZKUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ZKUtil.java index bd08efb5cd..d23df79d27 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ZKUtil.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ZKUtil.java @@ -71,6 +71,20 @@ private static int getPermFromString(String permString) { return perm; } + /** + * Helper method to remove a subset of permissions (remove) from a + * given set (perms). + * @param perms The permissions flag to remove from. Should be an OR of a + * some combination of {@link ZooDefs.Perms} + * @param remove The permissions to be removed. Should be an OR of a + * some combination of {@link ZooDefs.Perms} + * @return A permissions flag that is an OR of {@link ZooDefs.Perms} + * present in perms and not present in remove + */ + public static int removeSpecificPerms(int perms, int remove) { + return perms ^ remove; + } + /** * Parse comma separated list of ACL entries to secure generated nodes, e.g. * sasl:hdfs/host1@MY.DOMAIN:cdrwa,sasl:hdfs/host2@MY.DOMAIN:cdrwa diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestZKUtil.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestZKUtil.java index 1d14326d2a..52d10ca2fc 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestZKUtil.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestZKUtil.java @@ -24,7 +24,6 @@ import java.io.IOException; import java.util.List; -import org.apache.hadoop.util.ZKUtil; import org.apache.hadoop.util.ZKUtil.BadAclFormatException; import org.apache.hadoop.util.ZKUtil.ZKAuthInfo; import org.apache.zookeeper.ZooDefs.Perms; @@ -75,6 +74,14 @@ private static void badAcl(String acls, String expectedErr) { } } + @Test + public void testRemoveSpecificPerms() { + int perms = Perms.ALL; + int remove = Perms.CREATE; + int newPerms = ZKUtil.removeSpecificPerms(perms, remove); + assertEquals("Removal failed", 0, newPerms & Perms.CREATE); + } + @Test public void testGoodACLs() { List result = ZKUtil.parseACLs( diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index f133b55920..c8db3036c7 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -94,6 +94,9 @@ Release 2.3.0 - UNRELEASED YARN-1387. RMWebServices should use ClientRMService for filtering applications (Karthik Kambatla via Sandy Ryza) + YARN-1222. Make improvements in ZKRMStateStore for fencing (Karthik + Kambatla via bikas) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml index 104ef4a539..b25466533a 100644 --- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml +++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml @@ -178,6 +178,12 @@ + + + + + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java index 5512dcda1b..fdec50967a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java @@ -193,8 +193,8 @@ private static String getConfKeyForRMInstance(String prefix, return addSuffix(prefix, getRMHAId(conf)); } - private static String getConfValueForRMInstance(String prefix, - Configuration conf) { + public static String getConfValueForRMInstance(String prefix, + Configuration conf) { String confKey = getConfKeyForRMInstance(prefix, conf); String retVal = conf.getTrimmed(confKey); if (LOG.isTraceEnabled()) { @@ -205,8 +205,8 @@ private static String getConfValueForRMInstance(String prefix, return retVal; } - static String getConfValueForRMInstance(String prefix, String defaultValue, - Configuration conf) { + public static String getConfValueForRMInstance( + String prefix, String defaultValue, Configuration conf) { String value = getConfValueForRMInstance(prefix, conf); return (value == null) ? defaultValue : value; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 09f6b6e6dc..1e4c71903e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -328,6 +328,8 @@ public class YarnConfiguration extends Configuration { ZK_STATE_STORE_PREFIX + "acl"; public static final String DEFAULT_ZK_RM_STATE_STORE_ACL = "world:anyone:rwcda"; + public static final String ZK_RM_STATE_STORE_ROOT_NODE_ACL = + ZK_STATE_STORE_PREFIX + "root-node.acl"; /** The maximum number of completed applications RM keeps. */ public static final String RM_MAX_COMPLETED_APPLICATIONS = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 86501ad779..7f6e050d2b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -279,7 +279,11 @@ Host:Port of the ZooKeeper server where RM state will be stored. This must be supplied when using org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore - as the value for yarn.resourcemanager.store.class + as the value for yarn.resourcemanager.store.class. ZKRMStateStore + is implicitly fenced, meaning a single ResourceManager is + able to use the store at any point in time. More details on this, along + with setting up appropriate ACLs is discussed under the description for + yarn.resourcemanager.zk.state-store.root-node.acl. yarn.resourcemanager.zk.state-store.address @@ -320,6 +324,31 @@ world:anyone:rwcda + + + ACLs to be used for the root znode when using ZKRMStateStore in a HA + scenario for fencing. + + ZKRMStateStore supports implicit fencing to allow a single + ResourceManager write-access to the store. For fencing, the + ResourceManagers in the cluster share read-write-admin privileges on the + root node, but the Active ResourceManager claims exclusive create-delete + permissions. + + By default, when this property is not set, we use the ACLs from + yarn.resourcemanager.zk.state-store.acl for shared admin access and + rm-address:cluster-timestamp for username-based exclusive create-delete + access. + + This property allows users to set ACLs of their choice instead of using + the default mechanism. For fencing to work, the ACLs should be + carefully set differently on each ResourceManger such that all the + ResourceManagers have shared admin access and the Active ResourceManger + takes over (exclusively) the create-delete access. + + yarn.resourcemanager.zk.state-store.root-node.acl + + URI pointing to the location of the FileSystem path where RM state will be stored. This must be supplied when using diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMHAProtocolService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMHAProtocolService.java index c74b2826c5..f801203b70 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMHAProtocolService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMHAProtocolService.java @@ -67,7 +67,9 @@ public class RMHAProtocolService extends AbstractService implements protected HAServiceState haState = HAServiceState.INITIALIZING; private AccessControlList adminAcl; private Server haAdminServer; - private boolean haEnabled; + + @InterfaceAudience.Private + boolean haEnabled; public RMHAProtocolService(ResourceManager resourceManager) { super("RMHAProtocolService"); @@ -174,7 +176,8 @@ public synchronized void monitorHealth() } } - private synchronized void transitionToActive() throws Exception { + @InterfaceAudience.Private + synchronized void transitionToActive() throws Exception { if (haState == HAServiceState.ACTIVE) { LOG.info("Already in active state"); return; @@ -205,7 +208,8 @@ public synchronized void transitionToActive(StateChangeRequestInfo reqInfo) } } - private synchronized void transitionToStandby(boolean initialize) + @InterfaceAudience.Private + synchronized void transitionToStandby(boolean initialize) throws Exception { if (haState == HAServiceState.STANDBY) { LOG.info("Already in standby state"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 41b119e851..82a1f64973 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -56,6 +56,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreFactory; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreOperationFailedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreOperationFailedEventType; import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; @@ -163,6 +165,10 @@ public class ResourceManager extends CompositeService implements Recoverable { public ResourceManager() { super("ResourceManager"); } + + public RMHAProtocolService getHAService() { + return this.haService; + } public RMContext getRMContext() { return this.rmContext; @@ -216,6 +222,11 @@ protected EventHandler createSchedulerEventDispatcher() { return new SchedulerEventDispatcher(this.scheduler); } + protected RMStateStoreOperationFailedEventDispatcher + createRMStateStoreOperationFailedEventDispatcher() { + return new RMStateStoreOperationFailedEventDispatcher(haService); + } + protected Dispatcher createDispatcher() { return new AsyncDispatcher(); } @@ -339,6 +350,8 @@ protected void serviceInit(Configuration configuration) throws Exception { try { rmStore.init(conf); rmStore.setRMDispatcher(rmDispatcher); + rmDispatcher.register(RMStateStoreOperationFailedEventType.class, + createRMStateStoreOperationFailedEventDispatcher()); } catch (Exception e) { // the Exception from stateStore.init() needs to be handled for // HA and we need to give up master status if we got fenced @@ -632,6 +645,46 @@ public void handle(SchedulerEvent event) { } } + @Private + public static class RMStateStoreOperationFailedEventDispatcher implements + EventHandler { + private final RMHAProtocolService haService; + + public RMStateStoreOperationFailedEventDispatcher( + RMHAProtocolService haService) { + this.haService = haService; + } + + @Override + public void handle(RMStateStoreOperationFailedEvent event) { + if (LOG.isDebugEnabled()) { + LOG.debug("Received a " + + RMStateStoreOperationFailedEvent.class.getName() + " of type " + + event.getType().name()); + } + if (event.getType() == RMStateStoreOperationFailedEventType.FENCED) { + LOG.info("RMStateStore has been fenced"); + synchronized(haService) { + if (haService.haEnabled) { + try { + // Transition to standby and reinit active services + LOG.info("Transitioning RM to Standby mode"); + haService.transitionToStandby(true); + return; + } catch (Exception e) { + LOG.error("Failed to transition RM to Standby mode."); + } + } + } + } + + LOG.error("Shutting down RM on receiving a " + + RMStateStoreOperationFailedEvent.class.getName() + " of type " + + event.getType().name()); + ExitUtil.terminate(1, event.getCause()); + } + } + @Private public static final class ApplicationEventDispatcher implements EventHandler { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index 911107daab..b9724d210b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; + import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; @@ -388,9 +389,13 @@ protected abstract void updateApplicationAttemptStateInternal(String attemptId, */ public synchronized void storeRMDelegationTokenAndSequenceNumber( RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate, - int latestSequenceNumber) throws Exception { - storeRMDelegationTokenAndSequenceNumberState(rmDTIdentifier, renewDate, - latestSequenceNumber); + int latestSequenceNumber) { + try { + storeRMDelegationTokenAndSequenceNumberState(rmDTIdentifier, renewDate, + latestSequenceNumber); + } catch (Exception e) { + notifyStoreOperationFailed(e); + } } /** @@ -406,9 +411,12 @@ protected abstract void storeRMDelegationTokenAndSequenceNumberState( * RMDTSecretManager call this to remove the state of a delegation token */ public synchronized void removeRMDelegationToken( - RMDelegationTokenIdentifier rmDTIdentifier, int sequenceNumber) - throws Exception { - removeRMDelegationTokenState(rmDTIdentifier); + RMDelegationTokenIdentifier rmDTIdentifier, int sequenceNumber) { + try { + removeRMDelegationTokenState(rmDTIdentifier); + } catch (Exception e) { + notifyStoreOperationFailed(e); + } } /** @@ -421,9 +429,12 @@ protected abstract void removeRMDelegationTokenState( /** * RMDTSecretManager call this to store the state of a master key */ - public synchronized void storeRMDTMasterKey(DelegationKey delegationKey) - throws Exception { - storeRMDTMasterKeyState(delegationKey); + public synchronized void storeRMDTMasterKey(DelegationKey delegationKey) { + try { + storeRMDTMasterKeyState(delegationKey); + } catch (Exception e) { + notifyStoreOperationFailed(e); + } } /** @@ -437,9 +448,12 @@ protected abstract void storeRMDTMasterKeyState(DelegationKey delegationKey) /** * RMDTSecretManager call this to remove the state of a master key */ - public synchronized void removeRMDTMasterKey(DelegationKey delegationKey) - throws Exception { - removeRMDTMasterKeyState(delegationKey); + public synchronized void removeRMDTMasterKey(DelegationKey delegationKey) { + try { + removeRMDTMasterKeyState(delegationKey); + } catch (Exception e) { + notifyStoreOperationFailed(e); + } } /** @@ -539,19 +553,15 @@ protected void handleStoreEvent(RMStateStoreEvent event) { try { if (event.getType().equals(RMStateStoreEventType.STORE_APP)) { storeApplicationStateInternal(appId.toString(), appStateData); + notifyDoneStoringApplication(appId, storedException); } else { assert event.getType().equals(RMStateStoreEventType.UPDATE_APP); updateApplicationStateInternal(appId.toString(), appStateData); + notifyDoneUpdatingApplication(appId, storedException); } } catch (Exception e) { LOG.error("Error storing app: " + appId, e); - storedException = e; - } finally { - if (event.getType().equals(RMStateStoreEventType.STORE_APP)) { - notifyDoneStoringApplication(appId, storedException); - } else { - notifyDoneUpdatingApplication(appId, storedException); - } + notifyStoreOperationFailed(e); } } else if (event.getType().equals(RMStateStoreEventType.STORE_APP_ATTEMPT) || event.getType().equals(RMStateStoreEventType.UPDATE_APP_ATTEMPT)) { @@ -589,24 +599,20 @@ protected void handleStoreEvent(RMStateStoreEvent event) { if (event.getType().equals(RMStateStoreEventType.STORE_APP_ATTEMPT)) { storeApplicationAttemptStateInternal(attemptState.getAttemptId() .toString(), attemptStateData); + notifyDoneStoringApplicationAttempt(attemptState.getAttemptId(), + storedException); } else { assert event.getType().equals( RMStateStoreEventType.UPDATE_APP_ATTEMPT); updateApplicationAttemptStateInternal(attemptState.getAttemptId() .toString(), attemptStateData); + notifyDoneUpdatingApplicationAttempt(attemptState.getAttemptId(), + storedException); } } catch (Exception e) { - LOG - .error("Error storing appAttempt: " + attemptState.getAttemptId(), e); - storedException = e; - } finally { - if (event.getType().equals(RMStateStoreEventType.STORE_APP_ATTEMPT)) { - notifyDoneStoringApplicationAttempt(attemptState.getAttemptId(), - storedException); - } else { - notifyDoneUpdatingApplicationAttempt(attemptState.getAttemptId(), - storedException); - } + LOG.error( + "Error storing appAttempt: " + attemptState.getAttemptId(), e); + notifyStoreOperationFailed(e); } } else if (event.getType().equals(RMStateStoreEventType.REMOVE_APP)) { ApplicationState appState = @@ -616,17 +622,34 @@ protected void handleStoreEvent(RMStateStoreEvent event) { LOG.info("Removing info for app: " + appId); try { removeApplicationState(appState); + notifyDoneRemovingApplcation(appId, removedException); } catch (Exception e) { LOG.error("Error removing app: " + appId, e); - removedException = e; - } finally { - notifyDoneRemovingApplcation(appId, removedException); + notifyStoreOperationFailed(e); } } else { LOG.error("Unknown RMStateStoreEvent type: " + event.getType()); } } + @SuppressWarnings("unchecked") + /** + * In {#handleStoreEvent}, this method is called to notify the + * ResourceManager that the store operation has failed. + * @param failureCause the exception due to which the operation failed + */ + private void notifyStoreOperationFailed(Exception failureCause) { + RMStateStoreOperationFailedEventType type; + if (failureCause instanceof StoreFencedException) { + type = RMStateStoreOperationFailedEventType.FENCED; + } else { + type = RMStateStoreOperationFailedEventType.FAILED; + } + + rmDispatcher.getEventHandler().handle( + new RMStateStoreOperationFailedEvent(type, failureCause)); + } + @SuppressWarnings("unchecked") /** * In (@link handleStoreEvent}, this method is called to notify the diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreOperationFailedEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreOperationFailedEvent.java new file mode 100644 index 0000000000..4656a7154d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreOperationFailedEvent.java @@ -0,0 +1,36 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.resourcemanager.recovery; + +import org.apache.hadoop.yarn.event.AbstractEvent; + +public class RMStateStoreOperationFailedEvent + extends AbstractEvent { + private Exception cause; + + RMStateStoreOperationFailedEvent( + RMStateStoreOperationFailedEventType type, Exception cause) { + super(type); + this.cause = cause; + } + + public Exception getCause() { + return this.cause; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreOperationFailedEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreOperationFailedEventType.java new file mode 100644 index 0000000000..f5a7dc5686 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreOperationFailedEventType.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.recovery; + +public enum RMStateStoreOperationFailedEventType { + FENCED, // Store operation failed because it was fenced + FAILED // Store operation failed for no known reason +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/StoreFencedException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/StoreFencedException.java new file mode 100644 index 0000000000..1f8eb16bba --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/StoreFencedException.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.recovery; + +import org.apache.hadoop.yarn.exceptions.YarnException; + +public class StoreFencedException extends YarnException { + private static final long serialVersionUID = 1L; + + public StoreFencedException() { + super("RMStateStore has been fenced"); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index 628d260ea4..45afb4e5d9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -23,7 +23,9 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.security.NoSuchAlgorithmException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import org.apache.commons.logging.Log; @@ -31,6 +33,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.delegation.DelegationKey; @@ -38,11 +41,14 @@ import org.apache.hadoop.util.ZKUtil; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.client.RMHAServiceTarget; +import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.util.ConverterUtils; @@ -53,11 +59,14 @@ import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event; +import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Id; import org.apache.zookeeper.data.Stat; import com.google.common.annotations.VisibleForTesting; +import org.apache.zookeeper.server.auth.DigestAuthenticationProvider; @Private @Unstable @@ -83,6 +92,55 @@ public class ZKRMStateStore extends RMStateStore { protected ZooKeeper zkClient; private ZooKeeper oldZkClient; + /** Fencing related variables */ + private static final String FENCING_LOCK = "RM_ZK_FENCING_LOCK"; + private String fencingNodePath; + private Op createFencingNodePathOp; + private Op deleteFencingNodePathOp; + + @VisibleForTesting + List zkRootNodeAcl; + private boolean useDefaultFencingScheme = false; + public static final int CREATE_DELETE_PERMS = + ZooDefs.Perms.CREATE | ZooDefs.Perms.DELETE; + private final String zkRootNodeAuthScheme = + new DigestAuthenticationProvider().getScheme(); + + private String zkRootNodeUsername; + private String zkRootNodePassword; + + /** + * Given the {@link Configuration} and {@link ACL}s used (zkAcl) for + * ZooKeeper access, construct the {@link ACL}s for the store's root node. + * In the constructed {@link ACL}, all the users allowed by zkAcl are given + * rwa access, while the current RM has exclude create-delete access. + * + * To be called only when HA is enabled and the configuration doesn't set ACL + * for the root node. + */ + @VisibleForTesting + @Private + @Unstable + protected List constructZkRootNodeACL( + Configuration conf, List sourceACLs) throws NoSuchAlgorithmException { + List zkRootNodeAcl = new ArrayList(); + for (ACL acl : sourceACLs) { + zkRootNodeAcl.add(new ACL( + ZKUtil.removeSpecificPerms(acl.getPerms(), CREATE_DELETE_PERMS), + acl.getId())); + } + + zkRootNodeUsername = HAUtil.getConfValueForRMInstance( + YarnConfiguration.RM_ADDRESS, + YarnConfiguration.DEFAULT_RM_ADDRESS, conf); + zkRootNodePassword = Long.toString(ResourceManager.getClusterTimeStamp()); + Id rmId = new Id(zkRootNodeAuthScheme, + DigestAuthenticationProvider.generateDigest( + zkRootNodeUsername + ":" + zkRootNodePassword)); + zkRootNodeAcl.add(new ACL(CREATE_DELETE_PERMS, rmId)); + return zkRootNodeAcl; + } + @Override public synchronized void initInternal(Configuration conf) throws Exception { zkHostPort = conf.get(YarnConfiguration.ZK_RM_STATE_STORE_ADDRESS); @@ -116,6 +174,29 @@ public synchronized void initInternal(Configuration conf) throws Exception { zkRootNodePath = znodeWorkingPath + "/" + ROOT_ZNODE_NAME; rmDTSecretManagerRoot = zkRootNodePath + "/" + RM_DT_SECRET_MANAGER_ROOT; rmAppRoot = zkRootNodePath + "/" + RM_APP_ROOT; + + /* Initialize fencing related paths, acls, and ops */ + fencingNodePath = zkRootNodePath + "/" + FENCING_LOCK; + createFencingNodePathOp = Op.create(fencingNodePath, new byte[0], zkAcl, + CreateMode.PERSISTENT); + deleteFencingNodePathOp = Op.delete(fencingNodePath, -1); + if (HAUtil.isHAEnabled(conf)) { + String zkRootNodeAclConf = HAUtil.getConfValueForRMInstance + (YarnConfiguration.ZK_RM_STATE_STORE_ROOT_NODE_ACL, conf); + if (zkRootNodeAclConf != null) { + zkRootNodeAclConf = ZKUtil.resolveConfIndirection(zkRootNodeAclConf); + try { + zkRootNodeAcl = ZKUtil.parseACLs(zkRootNodeAclConf); + } catch (ZKUtil.BadAclFormatException bafe) { + LOG.error("Invalid format for " + + YarnConfiguration.ZK_RM_STATE_STORE_ROOT_NODE_ACL); + throw bafe; + } + } else { + useDefaultFencingScheme = true; + zkRootNodeAcl = constructZkRootNodeACL(conf, zkAcl); + } + } } @Override @@ -126,20 +207,76 @@ public synchronized void startInternal() throws Exception { // ensure root dirs exist createRootDir(znodeWorkingPath); createRootDir(zkRootNodePath); + if (HAUtil.isHAEnabled(getConfig())){ + fence(); + } createRootDir(rmDTSecretManagerRoot); createRootDir(rmAppRoot); } - private void createRootDir(String rootPath) throws Exception { + private void createRootDir(final String rootPath) throws Exception { + // For root dirs, we shouldn't use the doMulti helper methods try { - createWithRetries(rootPath, null, zkAcl, CreateMode.PERSISTENT); + new ZKAction() { + @Override + public String run() throws KeeperException, InterruptedException { + return zkClient.create(rootPath, null, zkAcl, CreateMode.PERSISTENT); + } + }.runWithRetries(); } catch (KeeperException ke) { - if (ke.code() != Code.NODEEXISTS) { + if (ke.code() == Code.NODEEXISTS) { + LOG.debug(rootPath + "znode already exists!"); + } else { throw ke; } } } + private void logRootNodeAcls(String prefix) throws KeeperException, + InterruptedException { + Stat getStat = new Stat(); + List getAcls = zkClient.getACL(zkRootNodePath, getStat); + + StringBuilder builder = new StringBuilder(); + builder.append(prefix); + for (ACL acl : getAcls) { + builder.append(acl.toString()); + } + builder.append(getStat.toString()); + LOG.debug(builder.toString()); + } + + private synchronized void fence() throws Exception { + if (LOG.isTraceEnabled()) { + logRootNodeAcls("Before fencing\n"); + } + + new ZKAction() { + @Override + public Void run() throws KeeperException, InterruptedException { + zkClient.setACL(zkRootNodePath, zkRootNodeAcl, -1); + return null; + } + }.runWithRetries(); + + // delete fencingnodepath + new ZKAction() { + @Override + public Void run() throws KeeperException, InterruptedException { + try { + zkClient.multi(Collections.singletonList(deleteFencingNodePathOp)); + } catch (KeeperException.NoNodeException nne) { + LOG.info("Fencing node " + fencingNodePath + " doesn't exist to delete"); + } + return null; + } + }.runWithRetries(); + + if (LOG.isTraceEnabled()) { + logRootNodeAcls("After fencing\n"); + } + } + private synchronized void closeZkClients() throws IOException { if (zkClient != null) { try { @@ -176,7 +313,8 @@ public synchronized RMState loadState() throws Exception { private synchronized void loadRMDTSecretManagerState(RMState rmState) throws Exception { - List childNodes = zkClient.getChildren(rmDTSecretManagerRoot, true); + List childNodes = + getChildrenWithRetries(rmDTSecretManagerRoot, true); for (String childNodeName : childNodes) { if (childNodeName.startsWith(DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX)) { @@ -209,7 +347,7 @@ private synchronized void loadRMDTSecretManagerState(RMState rmState) } private synchronized void loadRMAppState(RMState rmState) throws Exception { - List childNodes = zkClient.getChildren(rmAppRoot, true); + List childNodes = getChildrenWithRetries(rmAppRoot, true); List attempts = new ArrayList(); for (String childNodeName : childNodes) { @@ -466,6 +604,8 @@ public void process(WatchedEvent event) { } @VisibleForTesting + @Private + @Unstable public synchronized void processWatchEvent(WatchedEvent event) throws Exception { Event.EventType eventType = event.getType(); @@ -506,65 +646,71 @@ public synchronized void processWatchEvent(WatchedEvent event) } @VisibleForTesting + @Private + @Unstable String getNodePath(String root, String nodeName) { return (root + "/" + nodeName); } - @VisibleForTesting - public String createWithRetries( - final String path, final byte[] data, final List acl, - final CreateMode mode) throws Exception { - return new ZKAction() { + /** + * Helper method that creates fencing node, executes the passed operations, + * and deletes the fencing node. + */ + private synchronized void doMultiWithRetries( + final List opList) throws Exception { + final List execOpList = new ArrayList(opList.size() + 2); + execOpList.add(createFencingNodePathOp); + execOpList.addAll(opList); + execOpList.add(deleteFencingNodePathOp); + new ZKAction() { @Override - public String run() throws KeeperException, InterruptedException { - return zkClient.create(path, data, acl, mode); + public Void run() throws KeeperException, InterruptedException { + zkClient.multi(execOpList); + return null; } }.runWithRetries(); } + /** + * Helper method that creates fencing node, executes the passed operation, + * and deletes the fencing node. + */ + private void doMultiWithRetries(final Op op) throws Exception { + doMultiWithRetries(Collections.singletonList(op)); + } + + @VisibleForTesting + @Private + @Unstable + public void createWithRetries( + final String path, final byte[] data, final List acl, + final CreateMode mode) throws Exception { + doMultiWithRetries(Op.create(path, data, acl, mode)); + } + private void deleteWithRetries(final String path, final int version) throws Exception { - new ZKAction() { - @Override - public Void run() throws KeeperException, InterruptedException { - /** - * Call exists() to leave a watch on the node denoted by path. - * Delete node if exists. To pass the existence information to the - * caller, call delete irrespective of whether node exists or not. - */ - if (zkClient.exists(path, true) == null) { - LOG.error("Trying to delete a path (" + path - + ") that doesn't exist."); - } - zkClient.delete(path, version); - return null; + try { + doMultiWithRetries(Op.delete(path, version)); + } catch (KeeperException.NoNodeException nne) { + // We tried to delete a node that doesn't exist + if (LOG.isDebugEnabled()) { + LOG.debug("Attempted to delete a non-existing znode " + path); } - }.runWithRetries(); - } - - private void doMultiWithRetries(final ArrayList opList) throws Exception { - new ZKAction() { - @Override - public Void run() throws KeeperException, InterruptedException { - zkClient.multi(opList); - return null; - } - }.runWithRetries(); + } } @VisibleForTesting + @Private + @Unstable public void setDataWithRetries(final String path, final byte[] data, final int version) throws Exception { - new ZKAction() { - @Override - public Void run() throws KeeperException, InterruptedException { - zkClient.setData(path, data, version); - return null; - } - }.runWithRetries(); + doMultiWithRetries(Op.setData(path, data, version)); } @VisibleForTesting + @Private + @Unstable public byte[] getDataWithRetries(final String path, final boolean watch) throws Exception { return new ZKAction() { @@ -576,6 +722,16 @@ public byte[] run() throws KeeperException, InterruptedException { }.runWithRetries(); } + private List getChildrenWithRetries( + final String path, final boolean watch) throws Exception { + return new ZKAction>() { + @Override + List run() throws KeeperException, InterruptedException { + return zkClient.getChildren(path, watch); + } + }.runWithRetries(); + } + private abstract class ZKAction { // run() expects synchronization on ZKRMStateStore.this abstract T run() throws KeeperException, InterruptedException; @@ -596,11 +752,29 @@ T runWithCheck() throws Exception { } } + private boolean shouldRetry(Code code) { + switch (code) { + case CONNECTIONLOSS: + case OPERATIONTIMEOUT: + return true; + default: + break; + } + return false; + } + T runWithRetries() throws Exception { int retry = 0; while (true) { try { return runWithCheck(); + } catch (KeeperException.NoAuthException nae) { + if (HAUtil.isHAEnabled(getConfig())) { + // NoAuthException possibly means that this store is fenced due to + // another RM becoming active. Even if not, + // it is safer to assume we have been fenced + throw new StoreFencedException(); + } } catch (KeeperException ke) { if (shouldRetry(ke.code()) && ++retry < numRetries) { continue; @@ -611,17 +785,6 @@ T runWithRetries() throws Exception { } } - private static boolean shouldRetry(Code code) { - switch (code) { - case CONNECTIONLOSS: - case OPERATIONTIMEOUT: - return true; - default: - break; - } - return false; - } - private synchronized void createConnection() throws IOException, InterruptedException { closeZkClients(); @@ -629,6 +792,10 @@ private synchronized void createConnection() retries++) { try { zkClient = getNewZooKeeper(); + if (useDefaultFencingScheme) { + zkClient.addAuthInfo(zkRootNodeAuthScheme, + (zkRootNodeUsername + ":" + zkRootNodePassword).getBytes()); + } } catch (IOException ioe) { // Retry in case of network failures LOG.info("Failed to connect to the ZooKeeper on attempt - " + @@ -646,6 +813,8 @@ private synchronized void createConnection() // protected to mock for testing @VisibleForTesting + @Private + @Unstable protected synchronized ZooKeeper getNewZooKeeper() throws IOException, InterruptedException { ZooKeeper zk = new ZooKeeper(zkHostPort, zkSessionTimeout, null); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java index a6929a8936..7f07ddb4c4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java @@ -18,15 +18,32 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.List; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo; +import org.apache.hadoop.service.Service; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.zookeeper.ZooKeeper; import org.junit.Test; @@ -56,7 +73,7 @@ public ZooKeeper getNewZooKeeper() throws IOException { public RMStateStore getRMStateStore() throws Exception { String workingZnode = "/Test"; - YarnConfiguration conf = new YarnConfiguration(); + Configuration conf = new YarnConfiguration(); conf.set(YarnConfiguration.ZK_RM_STATE_STORE_ADDRESS, hostPort); conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode); this.client = createClient(); @@ -77,4 +94,81 @@ public void testZKRMStateStoreRealZK() throws Exception { testRMAppStateStore(zkTester); testRMDTSecretManagerStateStore(zkTester); } + + private Configuration createHARMConf( + String rmIds, String rmId, int adminPort) { + Configuration conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true); + conf.set(YarnConfiguration.RM_HA_IDS, rmIds); + conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + conf.set(YarnConfiguration.RM_STORE, ZKRMStateStore.class.getName()); + conf.set(YarnConfiguration.ZK_RM_STATE_STORE_ADDRESS, hostPort); + conf.set(YarnConfiguration.RM_HA_ID, rmId); + for (String rpcAddress : HAUtil.RPC_ADDRESS_CONF_KEYS) { + conf.set(HAUtil.addSuffix(rpcAddress, rmId), "localhost:0"); + } + conf.set(YarnConfiguration.RM_HA_ADMIN_ADDRESS, "localhost:" + adminPort); + return conf; + } + + @SuppressWarnings("unchecked") + @Test + public void testFencing() throws Exception { + StateChangeRequestInfo req = new StateChangeRequestInfo( + HAServiceProtocol.RequestSource.REQUEST_BY_USER); + + Configuration conf1 = createHARMConf("rm1,rm2", "rm1", 1234); + ResourceManager rm1 = new ResourceManager(); + rm1.init(conf1); + rm1.start(); + rm1.getHAService().transitionToActive(req); + assertEquals("RM with ZKStore didn't start", + Service.STATE.STARTED, rm1.getServiceState()); + assertEquals("RM should be Active", + HAServiceProtocol.HAServiceState.ACTIVE, + rm1.getHAService().getServiceStatus().getState()); + + Configuration conf2 = createHARMConf("rm1,rm2", "rm2", 5678); + ResourceManager rm2 = new ResourceManager(); + rm2.init(conf2); + rm2.start(); + rm2.getHAService().transitionToActive(req); + assertEquals("RM with ZKStore didn't start", + Service.STATE.STARTED, rm2.getServiceState()); + assertEquals("RM should be Active", + HAServiceProtocol.HAServiceState.ACTIVE, + rm2.getHAService().getServiceStatus().getState()); + + // Submitting an application to RM1 to trigger a state store operation. + // RM1 should realize that it got fenced and is not the Active RM anymore. + Map mockMap = mock(Map.class); + ApplicationSubmissionContext asc = + ApplicationSubmissionContext.newInstance( + ApplicationId.newInstance(1000, 1), + "testApplication", // app Name + "default", // queue name + Priority.newInstance(0), + ContainerLaunchContext.newInstance(mockMap, mockMap, + new ArrayList(), mockMap, mock(ByteBuffer.class), + mockMap), + false, // unmanaged AM + true, // cancelTokens + 1, // max app attempts + Resource.newInstance(1024, 1)); + ClientRMService rmService = rm1.getClientRMService(); + rmService.submitApplication(SubmitApplicationRequest.newInstance(asc)); + + for (int i = 0; i < 30; i++) { + if (HAServiceProtocol.HAServiceState.ACTIVE == rm1.getHAService() + .getServiceStatus().getState()) { + Thread.sleep(100); + } + } + assertEquals("RM should have been fenced", + HAServiceProtocol.HAServiceState.STANDBY, + rm1.getHAService().getServiceStatus().getState()); + assertEquals("RM should be Active", + HAServiceProtocol.HAServiceState.ACTIVE, + rm2.getHAService().getServiceStatus().getState()); + } }