YARN-1222. Make improvements in ZKRMStateStore for fencing (Karthik Kambatla via bikas)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1541995 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bikas Saha 2013-11-14 17:53:25 +00:00
parent 95a87caed0
commit 797159bbd4
15 changed files with 589 additions and 98 deletions

View File

@ -71,6 +71,20 @@ private static int getPermFromString(String permString) {
return perm;
}
/**
* Helper method to remove a subset of permissions (remove) from a
* given set (perms).
* @param perms The permissions flag to remove from. Should be an OR of a
* some combination of {@link ZooDefs.Perms}
* @param remove The permissions to be removed. Should be an OR of a
* some combination of {@link ZooDefs.Perms}
* @return A permissions flag that is an OR of {@link ZooDefs.Perms}
* present in perms and not present in remove
*/
public static int removeSpecificPerms(int perms, int remove) {
return perms ^ remove;
}
/**
* Parse comma separated list of ACL entries to secure generated nodes, e.g.
* <code>sasl:hdfs/host1@MY.DOMAIN:cdrwa,sasl:hdfs/host2@MY.DOMAIN:cdrwa</code>

View File

@ -24,7 +24,6 @@
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.util.ZKUtil;
import org.apache.hadoop.util.ZKUtil.BadAclFormatException;
import org.apache.hadoop.util.ZKUtil.ZKAuthInfo;
import org.apache.zookeeper.ZooDefs.Perms;
@ -75,6 +74,14 @@ private static void badAcl(String acls, String expectedErr) {
}
}
@Test
public void testRemoveSpecificPerms() {
int perms = Perms.ALL;
int remove = Perms.CREATE;
int newPerms = ZKUtil.removeSpecificPerms(perms, remove);
assertEquals("Removal failed", 0, newPerms & Perms.CREATE);
}
@Test
public void testGoodACLs() {
List<ACL> result = ZKUtil.parseACLs(

View File

@ -94,6 +94,9 @@ Release 2.3.0 - UNRELEASED
YARN-1387. RMWebServices should use ClientRMService for filtering
applications (Karthik Kambatla via Sandy Ryza)
YARN-1222. Make improvements in ZKRMStateStore for fencing (Karthik
Kambatla via bikas)
OPTIMIZATIONS
BUG FIXES

View File

@ -178,6 +178,12 @@
<Field name="minimumAllocation" />
<Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>
<!-- Inconsistent sync warning - numRetries is only initialized once and never changed -->
<Match>
<Class name="org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore" />
<Field name="numRetries" />
<Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>
<Match>
<Class name="org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer"/>
<Field name="renewalTimer" />

View File

@ -193,8 +193,8 @@ private static String getConfKeyForRMInstance(String prefix,
return addSuffix(prefix, getRMHAId(conf));
}
private static String getConfValueForRMInstance(String prefix,
Configuration conf) {
public static String getConfValueForRMInstance(String prefix,
Configuration conf) {
String confKey = getConfKeyForRMInstance(prefix, conf);
String retVal = conf.getTrimmed(confKey);
if (LOG.isTraceEnabled()) {
@ -205,8 +205,8 @@ private static String getConfValueForRMInstance(String prefix,
return retVal;
}
static String getConfValueForRMInstance(String prefix, String defaultValue,
Configuration conf) {
public static String getConfValueForRMInstance(
String prefix, String defaultValue, Configuration conf) {
String value = getConfValueForRMInstance(prefix, conf);
return (value == null) ? defaultValue : value;
}

View File

@ -328,6 +328,8 @@ public class YarnConfiguration extends Configuration {
ZK_STATE_STORE_PREFIX + "acl";
public static final String DEFAULT_ZK_RM_STATE_STORE_ACL =
"world:anyone:rwcda";
public static final String ZK_RM_STATE_STORE_ROOT_NODE_ACL =
ZK_STATE_STORE_PREFIX + "root-node.acl";
/** The maximum number of completed applications RM keeps. */
public static final String RM_MAX_COMPLETED_APPLICATIONS =

View File

@ -279,7 +279,11 @@
<description>Host:Port of the ZooKeeper server where RM state will
be stored. This must be supplied when using
org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore
as the value for yarn.resourcemanager.store.class</description>
as the value for yarn.resourcemanager.store.class. ZKRMStateStore
is implicitly fenced, meaning a single ResourceManager is
able to use the store at any point in time. More details on this, along
with setting up appropriate ACLs is discussed under the description for
yarn.resourcemanager.zk.state-store.root-node.acl.</description>
<name>yarn.resourcemanager.zk.state-store.address</name>
<!--value>127.0.0.1:2181</value-->
</property>
@ -320,6 +324,31 @@
<value>world:anyone:rwcda</value>
</property>
<property>
<description>
ACLs to be used for the root znode when using ZKRMStateStore in a HA
scenario for fencing.
ZKRMStateStore supports implicit fencing to allow a single
ResourceManager write-access to the store. For fencing, the
ResourceManagers in the cluster share read-write-admin privileges on the
root node, but the Active ResourceManager claims exclusive create-delete
permissions.
By default, when this property is not set, we use the ACLs from
yarn.resourcemanager.zk.state-store.acl for shared admin access and
rm-address:cluster-timestamp for username-based exclusive create-delete
access.
This property allows users to set ACLs of their choice instead of using
the default mechanism. For fencing to work, the ACLs should be
carefully set differently on each ResourceManger such that all the
ResourceManagers have shared admin access and the Active ResourceManger
takes over (exclusively) the create-delete access.
</description>
<name>yarn.resourcemanager.zk.state-store.root-node.acl</name>
</property>
<property>
<description>URI pointing to the location of the FileSystem path where
RM state will be stored. This must be supplied when using

View File

@ -67,7 +67,9 @@ public class RMHAProtocolService extends AbstractService implements
protected HAServiceState haState = HAServiceState.INITIALIZING;
private AccessControlList adminAcl;
private Server haAdminServer;
private boolean haEnabled;
@InterfaceAudience.Private
boolean haEnabled;
public RMHAProtocolService(ResourceManager resourceManager) {
super("RMHAProtocolService");
@ -174,7 +176,8 @@ public synchronized void monitorHealth()
}
}
private synchronized void transitionToActive() throws Exception {
@InterfaceAudience.Private
synchronized void transitionToActive() throws Exception {
if (haState == HAServiceState.ACTIVE) {
LOG.info("Already in active state");
return;
@ -205,7 +208,8 @@ public synchronized void transitionToActive(StateChangeRequestInfo reqInfo)
}
}
private synchronized void transitionToStandby(boolean initialize)
@InterfaceAudience.Private
synchronized void transitionToStandby(boolean initialize)
throws Exception {
if (haState == HAServiceState.STANDBY) {
LOG.info("Already in standby state");

View File

@ -56,6 +56,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreFactory;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreOperationFailedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreOperationFailedEventType;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
@ -163,6 +165,10 @@ public class ResourceManager extends CompositeService implements Recoverable {
public ResourceManager() {
super("ResourceManager");
}
public RMHAProtocolService getHAService() {
return this.haService;
}
public RMContext getRMContext() {
return this.rmContext;
@ -216,6 +222,11 @@ protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
return new SchedulerEventDispatcher(this.scheduler);
}
protected RMStateStoreOperationFailedEventDispatcher
createRMStateStoreOperationFailedEventDispatcher() {
return new RMStateStoreOperationFailedEventDispatcher(haService);
}
protected Dispatcher createDispatcher() {
return new AsyncDispatcher();
}
@ -339,6 +350,8 @@ protected void serviceInit(Configuration configuration) throws Exception {
try {
rmStore.init(conf);
rmStore.setRMDispatcher(rmDispatcher);
rmDispatcher.register(RMStateStoreOperationFailedEventType.class,
createRMStateStoreOperationFailedEventDispatcher());
} catch (Exception e) {
// the Exception from stateStore.init() needs to be handled for
// HA and we need to give up master status if we got fenced
@ -632,6 +645,46 @@ public void handle(SchedulerEvent event) {
}
}
@Private
public static class RMStateStoreOperationFailedEventDispatcher implements
EventHandler<RMStateStoreOperationFailedEvent> {
private final RMHAProtocolService haService;
public RMStateStoreOperationFailedEventDispatcher(
RMHAProtocolService haService) {
this.haService = haService;
}
@Override
public void handle(RMStateStoreOperationFailedEvent event) {
if (LOG.isDebugEnabled()) {
LOG.debug("Received a " +
RMStateStoreOperationFailedEvent.class.getName() + " of type " +
event.getType().name());
}
if (event.getType() == RMStateStoreOperationFailedEventType.FENCED) {
LOG.info("RMStateStore has been fenced");
synchronized(haService) {
if (haService.haEnabled) {
try {
// Transition to standby and reinit active services
LOG.info("Transitioning RM to Standby mode");
haService.transitionToStandby(true);
return;
} catch (Exception e) {
LOG.error("Failed to transition RM to Standby mode.");
}
}
}
}
LOG.error("Shutting down RM on receiving a " +
RMStateStoreOperationFailedEvent.class.getName() + " of type " +
event.getType().name());
ExitUtil.terminate(1, event.getCause());
}
}
@Private
public static final class ApplicationEventDispatcher implements
EventHandler<RMAppEvent> {

View File

@ -43,6 +43,7 @@
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
@ -388,9 +389,13 @@ protected abstract void updateApplicationAttemptStateInternal(String attemptId,
*/
public synchronized void storeRMDelegationTokenAndSequenceNumber(
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
int latestSequenceNumber) throws Exception {
storeRMDelegationTokenAndSequenceNumberState(rmDTIdentifier, renewDate,
latestSequenceNumber);
int latestSequenceNumber) {
try {
storeRMDelegationTokenAndSequenceNumberState(rmDTIdentifier, renewDate,
latestSequenceNumber);
} catch (Exception e) {
notifyStoreOperationFailed(e);
}
}
/**
@ -406,9 +411,12 @@ protected abstract void storeRMDelegationTokenAndSequenceNumberState(
* RMDTSecretManager call this to remove the state of a delegation token
*/
public synchronized void removeRMDelegationToken(
RMDelegationTokenIdentifier rmDTIdentifier, int sequenceNumber)
throws Exception {
removeRMDelegationTokenState(rmDTIdentifier);
RMDelegationTokenIdentifier rmDTIdentifier, int sequenceNumber) {
try {
removeRMDelegationTokenState(rmDTIdentifier);
} catch (Exception e) {
notifyStoreOperationFailed(e);
}
}
/**
@ -421,9 +429,12 @@ protected abstract void removeRMDelegationTokenState(
/**
* RMDTSecretManager call this to store the state of a master key
*/
public synchronized void storeRMDTMasterKey(DelegationKey delegationKey)
throws Exception {
storeRMDTMasterKeyState(delegationKey);
public synchronized void storeRMDTMasterKey(DelegationKey delegationKey) {
try {
storeRMDTMasterKeyState(delegationKey);
} catch (Exception e) {
notifyStoreOperationFailed(e);
}
}
/**
@ -437,9 +448,12 @@ protected abstract void storeRMDTMasterKeyState(DelegationKey delegationKey)
/**
* RMDTSecretManager call this to remove the state of a master key
*/
public synchronized void removeRMDTMasterKey(DelegationKey delegationKey)
throws Exception {
removeRMDTMasterKeyState(delegationKey);
public synchronized void removeRMDTMasterKey(DelegationKey delegationKey) {
try {
removeRMDTMasterKeyState(delegationKey);
} catch (Exception e) {
notifyStoreOperationFailed(e);
}
}
/**
@ -539,19 +553,15 @@ protected void handleStoreEvent(RMStateStoreEvent event) {
try {
if (event.getType().equals(RMStateStoreEventType.STORE_APP)) {
storeApplicationStateInternal(appId.toString(), appStateData);
notifyDoneStoringApplication(appId, storedException);
} else {
assert event.getType().equals(RMStateStoreEventType.UPDATE_APP);
updateApplicationStateInternal(appId.toString(), appStateData);
notifyDoneUpdatingApplication(appId, storedException);
}
} catch (Exception e) {
LOG.error("Error storing app: " + appId, e);
storedException = e;
} finally {
if (event.getType().equals(RMStateStoreEventType.STORE_APP)) {
notifyDoneStoringApplication(appId, storedException);
} else {
notifyDoneUpdatingApplication(appId, storedException);
}
notifyStoreOperationFailed(e);
}
} else if (event.getType().equals(RMStateStoreEventType.STORE_APP_ATTEMPT)
|| event.getType().equals(RMStateStoreEventType.UPDATE_APP_ATTEMPT)) {
@ -589,24 +599,20 @@ protected void handleStoreEvent(RMStateStoreEvent event) {
if (event.getType().equals(RMStateStoreEventType.STORE_APP_ATTEMPT)) {
storeApplicationAttemptStateInternal(attemptState.getAttemptId()
.toString(), attemptStateData);
notifyDoneStoringApplicationAttempt(attemptState.getAttemptId(),
storedException);
} else {
assert event.getType().equals(
RMStateStoreEventType.UPDATE_APP_ATTEMPT);
updateApplicationAttemptStateInternal(attemptState.getAttemptId()
.toString(), attemptStateData);
notifyDoneUpdatingApplicationAttempt(attemptState.getAttemptId(),
storedException);
}
} catch (Exception e) {
LOG
.error("Error storing appAttempt: " + attemptState.getAttemptId(), e);
storedException = e;
} finally {
if (event.getType().equals(RMStateStoreEventType.STORE_APP_ATTEMPT)) {
notifyDoneStoringApplicationAttempt(attemptState.getAttemptId(),
storedException);
} else {
notifyDoneUpdatingApplicationAttempt(attemptState.getAttemptId(),
storedException);
}
LOG.error(
"Error storing appAttempt: " + attemptState.getAttemptId(), e);
notifyStoreOperationFailed(e);
}
} else if (event.getType().equals(RMStateStoreEventType.REMOVE_APP)) {
ApplicationState appState =
@ -616,17 +622,34 @@ protected void handleStoreEvent(RMStateStoreEvent event) {
LOG.info("Removing info for app: " + appId);
try {
removeApplicationState(appState);
notifyDoneRemovingApplcation(appId, removedException);
} catch (Exception e) {
LOG.error("Error removing app: " + appId, e);
removedException = e;
} finally {
notifyDoneRemovingApplcation(appId, removedException);
notifyStoreOperationFailed(e);
}
} else {
LOG.error("Unknown RMStateStoreEvent type: " + event.getType());
}
}
@SuppressWarnings("unchecked")
/**
* In {#handleStoreEvent}, this method is called to notify the
* ResourceManager that the store operation has failed.
* @param failureCause the exception due to which the operation failed
*/
private void notifyStoreOperationFailed(Exception failureCause) {
RMStateStoreOperationFailedEventType type;
if (failureCause instanceof StoreFencedException) {
type = RMStateStoreOperationFailedEventType.FENCED;
} else {
type = RMStateStoreOperationFailedEventType.FAILED;
}
rmDispatcher.getEventHandler().handle(
new RMStateStoreOperationFailedEvent(type, failureCause));
}
@SuppressWarnings("unchecked")
/**
* In (@link handleStoreEvent}, this method is called to notify the

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import org.apache.hadoop.yarn.event.AbstractEvent;
public class RMStateStoreOperationFailedEvent
extends AbstractEvent<RMStateStoreOperationFailedEventType> {
private Exception cause;
RMStateStoreOperationFailedEvent(
RMStateStoreOperationFailedEventType type, Exception cause) {
super(type);
this.cause = cause;
}
public Exception getCause() {
return this.cause;
}
}

View File

@ -0,0 +1,23 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
public enum RMStateStoreOperationFailedEventType {
FENCED, // Store operation failed because it was fenced
FAILED // Store operation failed for no known reason
}

View File

@ -0,0 +1,28 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import org.apache.hadoop.yarn.exceptions.YarnException;
public class StoreFencedException extends YarnException {
private static final long serialVersionUID = 1L;
public StoreFencedException() {
super("RMStateStore has been fenced");
}
}

View File

@ -23,7 +23,9 @@
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.commons.logging.Log;
@ -31,6 +33,7 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.delegation.DelegationKey;
@ -38,11 +41,14 @@
import org.apache.hadoop.util.ZKUtil;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.client.RMHAServiceTarget;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.util.ConverterUtils;
@ -53,11 +59,14 @@
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
import com.google.common.annotations.VisibleForTesting;
import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
@Private
@Unstable
@ -83,6 +92,55 @@ public class ZKRMStateStore extends RMStateStore {
protected ZooKeeper zkClient;
private ZooKeeper oldZkClient;
/** Fencing related variables */
private static final String FENCING_LOCK = "RM_ZK_FENCING_LOCK";
private String fencingNodePath;
private Op createFencingNodePathOp;
private Op deleteFencingNodePathOp;
@VisibleForTesting
List<ACL> zkRootNodeAcl;
private boolean useDefaultFencingScheme = false;
public static final int CREATE_DELETE_PERMS =
ZooDefs.Perms.CREATE | ZooDefs.Perms.DELETE;
private final String zkRootNodeAuthScheme =
new DigestAuthenticationProvider().getScheme();
private String zkRootNodeUsername;
private String zkRootNodePassword;
/**
* Given the {@link Configuration} and {@link ACL}s used (zkAcl) for
* ZooKeeper access, construct the {@link ACL}s for the store's root node.
* In the constructed {@link ACL}, all the users allowed by zkAcl are given
* rwa access, while the current RM has exclude create-delete access.
*
* To be called only when HA is enabled and the configuration doesn't set ACL
* for the root node.
*/
@VisibleForTesting
@Private
@Unstable
protected List<ACL> constructZkRootNodeACL(
Configuration conf, List<ACL> sourceACLs) throws NoSuchAlgorithmException {
List<ACL> zkRootNodeAcl = new ArrayList<ACL>();
for (ACL acl : sourceACLs) {
zkRootNodeAcl.add(new ACL(
ZKUtil.removeSpecificPerms(acl.getPerms(), CREATE_DELETE_PERMS),
acl.getId()));
}
zkRootNodeUsername = HAUtil.getConfValueForRMInstance(
YarnConfiguration.RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADDRESS, conf);
zkRootNodePassword = Long.toString(ResourceManager.getClusterTimeStamp());
Id rmId = new Id(zkRootNodeAuthScheme,
DigestAuthenticationProvider.generateDigest(
zkRootNodeUsername + ":" + zkRootNodePassword));
zkRootNodeAcl.add(new ACL(CREATE_DELETE_PERMS, rmId));
return zkRootNodeAcl;
}
@Override
public synchronized void initInternal(Configuration conf) throws Exception {
zkHostPort = conf.get(YarnConfiguration.ZK_RM_STATE_STORE_ADDRESS);
@ -116,6 +174,29 @@ public synchronized void initInternal(Configuration conf) throws Exception {
zkRootNodePath = znodeWorkingPath + "/" + ROOT_ZNODE_NAME;
rmDTSecretManagerRoot = zkRootNodePath + "/" + RM_DT_SECRET_MANAGER_ROOT;
rmAppRoot = zkRootNodePath + "/" + RM_APP_ROOT;
/* Initialize fencing related paths, acls, and ops */
fencingNodePath = zkRootNodePath + "/" + FENCING_LOCK;
createFencingNodePathOp = Op.create(fencingNodePath, new byte[0], zkAcl,
CreateMode.PERSISTENT);
deleteFencingNodePathOp = Op.delete(fencingNodePath, -1);
if (HAUtil.isHAEnabled(conf)) {
String zkRootNodeAclConf = HAUtil.getConfValueForRMInstance
(YarnConfiguration.ZK_RM_STATE_STORE_ROOT_NODE_ACL, conf);
if (zkRootNodeAclConf != null) {
zkRootNodeAclConf = ZKUtil.resolveConfIndirection(zkRootNodeAclConf);
try {
zkRootNodeAcl = ZKUtil.parseACLs(zkRootNodeAclConf);
} catch (ZKUtil.BadAclFormatException bafe) {
LOG.error("Invalid format for " +
YarnConfiguration.ZK_RM_STATE_STORE_ROOT_NODE_ACL);
throw bafe;
}
} else {
useDefaultFencingScheme = true;
zkRootNodeAcl = constructZkRootNodeACL(conf, zkAcl);
}
}
}
@Override
@ -126,20 +207,76 @@ public synchronized void startInternal() throws Exception {
// ensure root dirs exist
createRootDir(znodeWorkingPath);
createRootDir(zkRootNodePath);
if (HAUtil.isHAEnabled(getConfig())){
fence();
}
createRootDir(rmDTSecretManagerRoot);
createRootDir(rmAppRoot);
}
private void createRootDir(String rootPath) throws Exception {
private void createRootDir(final String rootPath) throws Exception {
// For root dirs, we shouldn't use the doMulti helper methods
try {
createWithRetries(rootPath, null, zkAcl, CreateMode.PERSISTENT);
new ZKAction<String>() {
@Override
public String run() throws KeeperException, InterruptedException {
return zkClient.create(rootPath, null, zkAcl, CreateMode.PERSISTENT);
}
}.runWithRetries();
} catch (KeeperException ke) {
if (ke.code() != Code.NODEEXISTS) {
if (ke.code() == Code.NODEEXISTS) {
LOG.debug(rootPath + "znode already exists!");
} else {
throw ke;
}
}
}
private void logRootNodeAcls(String prefix) throws KeeperException,
InterruptedException {
Stat getStat = new Stat();
List<ACL> getAcls = zkClient.getACL(zkRootNodePath, getStat);
StringBuilder builder = new StringBuilder();
builder.append(prefix);
for (ACL acl : getAcls) {
builder.append(acl.toString());
}
builder.append(getStat.toString());
LOG.debug(builder.toString());
}
private synchronized void fence() throws Exception {
if (LOG.isTraceEnabled()) {
logRootNodeAcls("Before fencing\n");
}
new ZKAction<Void>() {
@Override
public Void run() throws KeeperException, InterruptedException {
zkClient.setACL(zkRootNodePath, zkRootNodeAcl, -1);
return null;
}
}.runWithRetries();
// delete fencingnodepath
new ZKAction<Void>() {
@Override
public Void run() throws KeeperException, InterruptedException {
try {
zkClient.multi(Collections.singletonList(deleteFencingNodePathOp));
} catch (KeeperException.NoNodeException nne) {
LOG.info("Fencing node " + fencingNodePath + " doesn't exist to delete");
}
return null;
}
}.runWithRetries();
if (LOG.isTraceEnabled()) {
logRootNodeAcls("After fencing\n");
}
}
private synchronized void closeZkClients() throws IOException {
if (zkClient != null) {
try {
@ -176,7 +313,8 @@ public synchronized RMState loadState() throws Exception {
private synchronized void loadRMDTSecretManagerState(RMState rmState)
throws Exception {
List<String> childNodes = zkClient.getChildren(rmDTSecretManagerRoot, true);
List<String> childNodes =
getChildrenWithRetries(rmDTSecretManagerRoot, true);
for (String childNodeName : childNodes) {
if (childNodeName.startsWith(DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX)) {
@ -209,7 +347,7 @@ private synchronized void loadRMDTSecretManagerState(RMState rmState)
}
private synchronized void loadRMAppState(RMState rmState) throws Exception {
List<String> childNodes = zkClient.getChildren(rmAppRoot, true);
List<String> childNodes = getChildrenWithRetries(rmAppRoot, true);
List<ApplicationAttemptState> attempts =
new ArrayList<ApplicationAttemptState>();
for (String childNodeName : childNodes) {
@ -466,6 +604,8 @@ public void process(WatchedEvent event) {
}
@VisibleForTesting
@Private
@Unstable
public synchronized void processWatchEvent(WatchedEvent event)
throws Exception {
Event.EventType eventType = event.getType();
@ -506,65 +646,71 @@ public synchronized void processWatchEvent(WatchedEvent event)
}
@VisibleForTesting
@Private
@Unstable
String getNodePath(String root, String nodeName) {
return (root + "/" + nodeName);
}
@VisibleForTesting
public String createWithRetries(
final String path, final byte[] data, final List<ACL> acl,
final CreateMode mode) throws Exception {
return new ZKAction<String>() {
/**
* Helper method that creates fencing node, executes the passed operations,
* and deletes the fencing node.
*/
private synchronized void doMultiWithRetries(
final List<Op> opList) throws Exception {
final List<Op> execOpList = new ArrayList<Op>(opList.size() + 2);
execOpList.add(createFencingNodePathOp);
execOpList.addAll(opList);
execOpList.add(deleteFencingNodePathOp);
new ZKAction<Void>() {
@Override
public String run() throws KeeperException, InterruptedException {
return zkClient.create(path, data, acl, mode);
public Void run() throws KeeperException, InterruptedException {
zkClient.multi(execOpList);
return null;
}
}.runWithRetries();
}
/**
* Helper method that creates fencing node, executes the passed operation,
* and deletes the fencing node.
*/
private void doMultiWithRetries(final Op op) throws Exception {
doMultiWithRetries(Collections.singletonList(op));
}
@VisibleForTesting
@Private
@Unstable
public void createWithRetries(
final String path, final byte[] data, final List<ACL> acl,
final CreateMode mode) throws Exception {
doMultiWithRetries(Op.create(path, data, acl, mode));
}
private void deleteWithRetries(final String path, final int version)
throws Exception {
new ZKAction<Void>() {
@Override
public Void run() throws KeeperException, InterruptedException {
/**
* Call exists() to leave a watch on the node denoted by path.
* Delete node if exists. To pass the existence information to the
* caller, call delete irrespective of whether node exists or not.
*/
if (zkClient.exists(path, true) == null) {
LOG.error("Trying to delete a path (" + path
+ ") that doesn't exist.");
}
zkClient.delete(path, version);
return null;
try {
doMultiWithRetries(Op.delete(path, version));
} catch (KeeperException.NoNodeException nne) {
// We tried to delete a node that doesn't exist
if (LOG.isDebugEnabled()) {
LOG.debug("Attempted to delete a non-existing znode " + path);
}
}.runWithRetries();
}
private void doMultiWithRetries(final ArrayList<Op> opList) throws Exception {
new ZKAction<Void>() {
@Override
public Void run() throws KeeperException, InterruptedException {
zkClient.multi(opList);
return null;
}
}.runWithRetries();
}
}
@VisibleForTesting
@Private
@Unstable
public void setDataWithRetries(final String path, final byte[] data,
final int version) throws Exception {
new ZKAction<Void>() {
@Override
public Void run() throws KeeperException, InterruptedException {
zkClient.setData(path, data, version);
return null;
}
}.runWithRetries();
doMultiWithRetries(Op.setData(path, data, version));
}
@VisibleForTesting
@Private
@Unstable
public byte[] getDataWithRetries(final String path, final boolean watch)
throws Exception {
return new ZKAction<byte[]>() {
@ -576,6 +722,16 @@ public byte[] run() throws KeeperException, InterruptedException {
}.runWithRetries();
}
private List<String> getChildrenWithRetries(
final String path, final boolean watch) throws Exception {
return new ZKAction<List<String>>() {
@Override
List<String> run() throws KeeperException, InterruptedException {
return zkClient.getChildren(path, watch);
}
}.runWithRetries();
}
private abstract class ZKAction<T> {
// run() expects synchronization on ZKRMStateStore.this
abstract T run() throws KeeperException, InterruptedException;
@ -596,11 +752,29 @@ T runWithCheck() throws Exception {
}
}
private boolean shouldRetry(Code code) {
switch (code) {
case CONNECTIONLOSS:
case OPERATIONTIMEOUT:
return true;
default:
break;
}
return false;
}
T runWithRetries() throws Exception {
int retry = 0;
while (true) {
try {
return runWithCheck();
} catch (KeeperException.NoAuthException nae) {
if (HAUtil.isHAEnabled(getConfig())) {
// NoAuthException possibly means that this store is fenced due to
// another RM becoming active. Even if not,
// it is safer to assume we have been fenced
throw new StoreFencedException();
}
} catch (KeeperException ke) {
if (shouldRetry(ke.code()) && ++retry < numRetries) {
continue;
@ -611,17 +785,6 @@ T runWithRetries() throws Exception {
}
}
private static boolean shouldRetry(Code code) {
switch (code) {
case CONNECTIONLOSS:
case OPERATIONTIMEOUT:
return true;
default:
break;
}
return false;
}
private synchronized void createConnection()
throws IOException, InterruptedException {
closeZkClients();
@ -629,6 +792,10 @@ private synchronized void createConnection()
retries++) {
try {
zkClient = getNewZooKeeper();
if (useDefaultFencingScheme) {
zkClient.addAuthInfo(zkRootNodeAuthScheme,
(zkRootNodeUsername + ":" + zkRootNodePassword).getBytes());
}
} catch (IOException ioe) {
// Retry in case of network failures
LOG.info("Failed to connect to the ZooKeeper on attempt - " +
@ -646,6 +813,8 @@ private synchronized void createConnection()
// protected to mock for testing
@VisibleForTesting
@Private
@Unstable
protected synchronized ZooKeeper getNewZooKeeper()
throws IOException, InterruptedException {
ZooKeeper zk = new ZooKeeper(zkHostPort, zkSessionTimeout, null);

View File

@ -18,15 +18,32 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.zookeeper.ZooKeeper;
import org.junit.Test;
@ -56,7 +73,7 @@ public ZooKeeper getNewZooKeeper() throws IOException {
public RMStateStore getRMStateStore() throws Exception {
String workingZnode = "/Test";
YarnConfiguration conf = new YarnConfiguration();
Configuration conf = new YarnConfiguration();
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_ADDRESS, hostPort);
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
this.client = createClient();
@ -77,4 +94,81 @@ public void testZKRMStateStoreRealZK() throws Exception {
testRMAppStateStore(zkTester);
testRMDTSecretManagerStateStore(zkTester);
}
private Configuration createHARMConf(
String rmIds, String rmId, int adminPort) {
Configuration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
conf.set(YarnConfiguration.RM_HA_IDS, rmIds);
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
conf.set(YarnConfiguration.RM_STORE, ZKRMStateStore.class.getName());
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_ADDRESS, hostPort);
conf.set(YarnConfiguration.RM_HA_ID, rmId);
for (String rpcAddress : HAUtil.RPC_ADDRESS_CONF_KEYS) {
conf.set(HAUtil.addSuffix(rpcAddress, rmId), "localhost:0");
}
conf.set(YarnConfiguration.RM_HA_ADMIN_ADDRESS, "localhost:" + adminPort);
return conf;
}
@SuppressWarnings("unchecked")
@Test
public void testFencing() throws Exception {
StateChangeRequestInfo req = new StateChangeRequestInfo(
HAServiceProtocol.RequestSource.REQUEST_BY_USER);
Configuration conf1 = createHARMConf("rm1,rm2", "rm1", 1234);
ResourceManager rm1 = new ResourceManager();
rm1.init(conf1);
rm1.start();
rm1.getHAService().transitionToActive(req);
assertEquals("RM with ZKStore didn't start",
Service.STATE.STARTED, rm1.getServiceState());
assertEquals("RM should be Active",
HAServiceProtocol.HAServiceState.ACTIVE,
rm1.getHAService().getServiceStatus().getState());
Configuration conf2 = createHARMConf("rm1,rm2", "rm2", 5678);
ResourceManager rm2 = new ResourceManager();
rm2.init(conf2);
rm2.start();
rm2.getHAService().transitionToActive(req);
assertEquals("RM with ZKStore didn't start",
Service.STATE.STARTED, rm2.getServiceState());
assertEquals("RM should be Active",
HAServiceProtocol.HAServiceState.ACTIVE,
rm2.getHAService().getServiceStatus().getState());
// Submitting an application to RM1 to trigger a state store operation.
// RM1 should realize that it got fenced and is not the Active RM anymore.
Map mockMap = mock(Map.class);
ApplicationSubmissionContext asc =
ApplicationSubmissionContext.newInstance(
ApplicationId.newInstance(1000, 1),
"testApplication", // app Name
"default", // queue name
Priority.newInstance(0),
ContainerLaunchContext.newInstance(mockMap, mockMap,
new ArrayList<String>(), mockMap, mock(ByteBuffer.class),
mockMap),
false, // unmanaged AM
true, // cancelTokens
1, // max app attempts
Resource.newInstance(1024, 1));
ClientRMService rmService = rm1.getClientRMService();
rmService.submitApplication(SubmitApplicationRequest.newInstance(asc));
for (int i = 0; i < 30; i++) {
if (HAServiceProtocol.HAServiceState.ACTIVE == rm1.getHAService()
.getServiceStatus().getState()) {
Thread.sleep(100);
}
}
assertEquals("RM should have been fenced",
HAServiceProtocol.HAServiceState.STANDBY,
rm1.getHAService().getServiceStatus().getState());
assertEquals("RM should be Active",
HAServiceProtocol.HAServiceState.ACTIVE,
rm2.getHAService().getServiceStatus().getState());
}
}