diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 0d5f2cbaca..9e5b8e3afb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -639,6 +639,13 @@ public static boolean isAclEnabled(Configuration conf) {
RM_ZK_PREFIX + "appid-node.split-index";
public static final int DEFAULT_ZK_APPID_NODE_SPLIT_INDEX = 0;
+ /** Index at which the RM Delegation Token ids will be split so that the
+ * delegation token znodes stored in the zookeeper RM state store will be
+ * stored as two different znodes (parent-child). **/
+ public static final String ZK_DELEGATION_TOKEN_NODE_SPLIT_INDEX =
+ RM_ZK_PREFIX + "delegation-token-node.split-index";
+ public static final int DEFAULT_ZK_DELEGATION_TOKEN_NODE_SPLIT_INDEX = 0;
+
public static final String RM_ZK_ACL = RM_ZK_PREFIX + "acl";
public static final String DEFAULT_RM_ZK_ACL = "world:anyone:rwcda";
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 4e78947fb8..97ecdac56b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -593,6 +593,24 @@
0
+
+ Index at which the RM Delegation Token ids will be split so
+ that the delegation token znodes stored in the zookeeper RM state store
+ will be stored as two different znodes (parent-child). The split is done
+ from the end. For instance, with no split, a delegation token znode will
+ be of the form RMDelegationToken_123456789. If the value of this config is
+ 1, the delegation token znode will be broken into two parts:
+ RMDelegationToken_12345678 and 9 respectively with former being the parent
+ node. This config can take values from 0 to 4. 0 means there will be no
+ split. If the value is outside this range, it will be treated as 0 (i.e.
+ no split). A value larger than 0 (up to 4) should be configured if you are
+ running a large number of applications, with long-lived delegation tokens
+ and state store operations (e.g. failover) are failing due to LenError in
+ Zookeeper.
+ yarn.resourcemanager.zk-delegation-token-node.split-index
+ 0
+
+
Specifies the maximum size of the data that can be stored
in a znode. Value should be same or less than jute.maxbuffer configured
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
index 00ef39fdd9..f0ab324ace 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
@@ -89,6 +89,8 @@ public abstract class RMStateStore extends AbstractService {
@VisibleForTesting
public static final String RM_APP_ROOT = "RMAppRoot";
protected static final String RM_DT_SECRET_MANAGER_ROOT = "RMDTSecretManagerRoot";
+ protected static final String RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME =
+ "RMDelegationTokensRoot";
protected static final String DELEGATION_KEY_PREFIX = "DelegationKey_";
protected static final String DELEGATION_TOKEN_PREFIX = "RMDelegationToken_";
protected static final String DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX =
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
index 5bff77f341..5d3ca45206 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
@@ -118,6 +118,22 @@
* |--- RM_DT_SECRET_MANAGER_ROOT
* |----- RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME
* |----- RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME
+ * | |----- 1
+ * | | |----- (#TokenId barring last character)
+ * | | | |----- (#Last character of TokenId)
+ * | | ....
+ * | |----- 2
+ * | | |----- (#TokenId barring last 2 characters)
+ * | | | |----- (#Last 2 characters of TokenId)
+ * | | ....
+ * | |----- 3
+ * | | |----- (#TokenId barring last 3 characters)
+ * | | | |----- (#Last 3 characters of TokenId)
+ * | | ....
+ * | |----- 4
+ * | | |----- (#TokenId barring last 4 characters)
+ * | | | |----- (#Last 4 characters of TokenId)
+ * | | ....
* | |----- Token_1
* | |----- Token_2
* | ....
@@ -147,6 +163,11 @@
* splitting it in 2 parts, depending on a configurable split index. This limits
* the number of application znodes returned in a single call while loading
* app state.
+ *
+ * Changes from 1.4 to 1.5 - Change the structure of delegation token znode by
+ * splitting it in 2 parts, depending on a configurable split index. This limits
+ * the number of delegation token znodes returned in a single call while loading
+ * tokens state.
*/
@Private
@Unstable
@@ -162,7 +183,7 @@ public class ZKRMStateStore extends RMStateStore {
@VisibleForTesting
public static final String ROOT_ZNODE_NAME = "ZKRMStateRoot";
protected static final Version CURRENT_VERSION_INFO = Version
- .newInstance(1, 4);
+ .newInstance(1, 5);
@VisibleForTesting
public static final String RM_APP_ROOT_HIERARCHIES = "HIERARCHIES";
@@ -170,6 +191,7 @@ public class ZKRMStateStore extends RMStateStore {
private String zkRootNodePath;
private String rmAppRoot;
private Map rmAppRootHierarchies;
+ private Map rmDelegationTokenHierarchies;
private String rmDTSecretManagerRoot;
private String dtMasterKeysRootPath;
private String delegationTokensRootPath;
@@ -180,6 +202,8 @@ public class ZKRMStateStore extends RMStateStore {
@VisibleForTesting
protected String znodeWorkingPath;
private int appIdNodeSplitIndex = 0;
+ @VisibleForTesting
+ protected int delegationTokenNodeSplitIndex = 0;
/* Fencing related variables */
private static final String FENCING_LOCK = "RM_ZK_FENCING_LOCK";
@@ -212,12 +236,13 @@ private enum AppAttemptOp {
};
/**
- * Encapsulates full app node path and corresponding split index.
+ * Encapsulates znode path and corresponding split index for hierarchical
+ * znode layouts.
*/
- private final static class AppNodeSplitInfo {
+ private final static class ZnodeSplitInfo {
private final String path;
private final int splitIndex;
- AppNodeSplitInfo(String path, int splitIndex) {
+ ZnodeSplitInfo(String path, int splitIndex) {
this.path = path;
this.splitIndex = splitIndex;
}
@@ -288,7 +313,7 @@ public synchronized void initInternal(Configuration conf)
appIdNodeSplitIndex =
conf.getInt(YarnConfiguration.ZK_APPID_NODE_SPLIT_INDEX,
YarnConfiguration.DEFAULT_ZK_APPID_NODE_SPLIT_INDEX);
- if (appIdNodeSplitIndex < 1 || appIdNodeSplitIndex > 4) {
+ if (appIdNodeSplitIndex < 0 || appIdNodeSplitIndex > 4) {
LOG.info("Invalid value " + appIdNodeSplitIndex + " for config " +
YarnConfiguration.ZK_APPID_NODE_SPLIT_INDEX + " specified. " +
"Resetting it to " +
@@ -322,12 +347,30 @@ public synchronized void initInternal(Configuration conf)
RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME);
delegationTokensRootPath = getNodePath(rmDTSecretManagerRoot,
RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME);
+ rmDelegationTokenHierarchies = new HashMap<>(5);
+ rmDelegationTokenHierarchies.put(0, delegationTokensRootPath);
+ for (int splitIndex = 1; splitIndex <= 4; splitIndex++) {
+ rmDelegationTokenHierarchies.put(splitIndex,
+ getNodePath(delegationTokensRootPath, Integer.toString(splitIndex)));
+ }
dtSequenceNumberPath = getNodePath(rmDTSecretManagerRoot,
RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME);
amrmTokenSecretManagerRoot =
getNodePath(zkRootNodePath, AMRMTOKEN_SECRET_MANAGER_ROOT);
reservationRoot = getNodePath(zkRootNodePath, RESERVATION_SYSTEM_ROOT);
zkManager = resourceManager.getAndStartZKManager(conf);
+ delegationTokenNodeSplitIndex =
+ conf.getInt(YarnConfiguration.ZK_DELEGATION_TOKEN_NODE_SPLIT_INDEX,
+ YarnConfiguration.DEFAULT_ZK_DELEGATION_TOKEN_NODE_SPLIT_INDEX);
+ if (delegationTokenNodeSplitIndex < 0
+ || delegationTokenNodeSplitIndex > 4) {
+ LOG.info("Invalid value " + delegationTokenNodeSplitIndex + " for config "
+ + YarnConfiguration.ZK_DELEGATION_TOKEN_NODE_SPLIT_INDEX
+ + " specified. Resetting it to " +
+ YarnConfiguration.DEFAULT_ZK_DELEGATION_TOKEN_NODE_SPLIT_INDEX);
+ delegationTokenNodeSplitIndex =
+ YarnConfiguration.DEFAULT_ZK_DELEGATION_TOKEN_NODE_SPLIT_INDEX;
+ }
}
@Override
@@ -350,6 +393,9 @@ public synchronized void startInternal() throws Exception {
create(rmDTSecretManagerRoot);
create(dtMasterKeysRootPath);
create(delegationTokensRootPath);
+ for (int splitIndex = 1; splitIndex <= 4; splitIndex++) {
+ create(rmDelegationTokenHierarchies.get(splitIndex));
+ }
create(dtSequenceNumberPath);
create(amrmTokenSecretManagerRoot);
create(reservationRoot);
@@ -572,36 +618,63 @@ private void loadRMSequentialNumberState(RMState rmState) throws Exception {
}
private void loadRMDelegationTokenState(RMState rmState) throws Exception {
- List childNodes =
- getChildren(delegationTokensRootPath);
-
- for (String childNodeName : childNodes) {
- String childNodePath =
- getNodePath(delegationTokensRootPath, childNodeName);
- byte[] childData = getData(childNodePath);
-
- if (childData == null) {
- LOG.warn("Content of " + childNodePath + " is broken.");
+ for (int splitIndex = 0; splitIndex <= 4; splitIndex++) {
+ String tokenRoot = rmDelegationTokenHierarchies.get(splitIndex);
+ if (tokenRoot == null) {
continue;
}
-
- ByteArrayInputStream is = new ByteArrayInputStream(childData);
-
- try (DataInputStream fsIn = new DataInputStream(is)) {
+ List childNodes = getChildren(tokenRoot);
+ boolean dtNodeFound = false;
+ for (String childNodeName : childNodes) {
if (childNodeName.startsWith(DELEGATION_TOKEN_PREFIX)) {
- RMDelegationTokenIdentifierData identifierData =
- new RMDelegationTokenIdentifierData();
- identifierData.readFields(fsIn);
- RMDelegationTokenIdentifier identifier =
- identifierData.getTokenIdentifier();
- long renewDate = identifierData.getRenewDate();
- rmState.rmSecretManagerState.delegationTokenState.put(identifier,
- renewDate);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Loaded RMDelegationTokenIdentifier: " + identifier
- + " renewDate=" + renewDate);
+ dtNodeFound = true;
+ String parentNodePath = getNodePath(tokenRoot, childNodeName);
+ if (splitIndex == 0) {
+ loadDelegationTokenFromNode(rmState, parentNodePath);
+ } else {
+ // If znode is partitioned.
+ List leafNodes = getChildren(parentNodePath);
+ for (String leafNodeName : leafNodes) {
+ loadDelegationTokenFromNode(rmState,
+ getNodePath(parentNodePath, leafNodeName));
+ }
}
+ } else if (splitIndex == 0
+ && !(childNodeName.equals("1") || childNodeName.equals("2")
+ || childNodeName.equals("3") || childNodeName.equals("4"))) {
+ LOG.debug("Unknown child node with name " + childNodeName + " under" +
+ tokenRoot);
+ }
+ }
+ if (splitIndex != delegationTokenNodeSplitIndex && !dtNodeFound) {
+ // If no loaded delegation token exists for a particular split index and
+ // the split index for which tokens are being loaded is not the one
+ // configured, then we do not need to keep track of this hierarchy for
+ // storing/updating/removing delegation token znodes.
+ rmDelegationTokenHierarchies.remove(splitIndex);
+ }
+ }
+ }
+
+ private void loadDelegationTokenFromNode(RMState rmState, String path)
+ throws Exception {
+ byte[] data = getData(path);
+ if (data == null) {
+ LOG.warn("Content of " + path + " is broken.");
+ } else {
+ ByteArrayInputStream is = new ByteArrayInputStream(data);
+ try (DataInputStream fsIn = new DataInputStream(is)) {
+ RMDelegationTokenIdentifierData identifierData =
+ new RMDelegationTokenIdentifierData();
+ identifierData.readFields(fsIn);
+ RMDelegationTokenIdentifier identifier =
+ identifierData.getTokenIdentifier();
+ long renewDate = identifierData.getRenewDate();
+ rmState.rmSecretManagerState.delegationTokenState.put(identifier,
+ renewDate);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Loaded RMDelegationTokenIdentifier: " + identifier
+ + " renewDate=" + renewDate);
}
}
}
@@ -649,8 +722,9 @@ private synchronized void loadRMAppState(RMState rmState) throws Exception {
getNodePath(parentNodePath, leafNodeName), appIdStr);
}
}
- } else {
- LOG.info("Unknown child node with name: " + childNodeName);
+ } else if (!childNodeName.equals(RM_APP_ROOT_HIERARCHIES)){
+ LOG.debug("Unknown child node with name " + childNodeName + " under" +
+ appRoot);
}
}
if (splitIndex != appIdNodeSplitIndex && !appNodeFound) {
@@ -683,36 +757,36 @@ private void loadApplicationAttemptState(ApplicationStateData appState,
}
/**
- * Get parent app node path based on full path and split index supplied.
- * @param appIdPath App id path for which parent needs to be returned.
+ * Get znode path based on full path and split index supplied.
+ * @param path path for which parent needs to be returned.
* @param splitIndex split index.
* @return parent app node path.
*/
- private String getSplitAppNodeParent(String appIdPath, int splitIndex) {
- // Calculated as string upto index (appIdPath Length - split index - 1). We
+ private String getSplitZnodeParent(String path, int splitIndex) {
+ // Calculated as string up to index (path Length - split index - 1). We
// deduct 1 to exclude path separator.
- return appIdPath.substring(0, appIdPath.length() - splitIndex - 1);
+ return path.substring(0, path.length() - splitIndex - 1);
}
/**
- * Checks if parent app node has no leaf nodes and if it does not have,
- * removes it. Called while removing application.
- * @param appIdPath path of app id to be removed.
+ * Checks if parent znode has no leaf nodes and if it does not have,
+ * removes it.
+ * @param path path of znode to be removed.
* @param splitIndex split index.
* @throws Exception if any problem occurs while performing ZK operation.
*/
- private void checkRemoveParentAppNode(String appIdPath, int splitIndex)
+ private void checkRemoveParentZnode(String path, int splitIndex)
throws Exception {
if (splitIndex != 0) {
- String parentAppNode = getSplitAppNodeParent(appIdPath, splitIndex);
+ String parentZnode = getSplitZnodeParent(path, splitIndex);
List children = null;
try {
- children = getChildren(parentAppNode);
+ children = getChildren(parentZnode);
} catch (KeeperException.NoNodeException ke) {
- // It should be fine to swallow this exception as the parent app node we
+ // It should be fine to swallow this exception as the parent znode we
// intend to delete is already deleted.
if (LOG.isDebugEnabled()) {
- LOG.debug("Unable to remove app parent node " + parentAppNode +
+ LOG.debug("Unable to remove parent node " + parentZnode +
" as it does not exist.");
}
return;
@@ -720,16 +794,16 @@ private void checkRemoveParentAppNode(String appIdPath, int splitIndex)
// No apps stored under parent path.
if (children != null && children.isEmpty()) {
try {
- zkManager.safeDelete(parentAppNode, zkAcl, fencingNodePath);
+ zkManager.safeDelete(parentZnode, zkAcl, fencingNodePath);
if (LOG.isDebugEnabled()) {
- LOG.debug("No leaf app node exists. Removing parent node " +
- parentAppNode);
+ LOG.debug("No leaf znode exists. Removing parent node " +
+ parentZnode);
}
} catch (KeeperException.NotEmptyException ke) {
- // It should be fine to swallow this exception as the parent app node
+ // It should be fine to swallow this exception as the parent znode
// has to be deleted only if it has no children. And this node has.
if (LOG.isDebugEnabled()) {
- LOG.debug("Unable to remove app parent node " + parentAppNode +
+ LOG.debug("Unable to remove app parent node " + parentZnode +
" as it has children.");
}
}
@@ -770,7 +844,7 @@ protected synchronized void updateApplicationStateInternal(
// Look for paths based on other split indices if path as per split index
// does not exist.
if (!exists(nodeUpdatePath)) {
- AppNodeSplitInfo alternatePathInfo = getAlternatePath(appId.toString());
+ ZnodeSplitInfo alternatePathInfo = getAlternateAppPath(appId.toString());
if (alternatePathInfo != null) {
nodeUpdatePath = alternatePathInfo.path;
} else {
@@ -778,7 +852,7 @@ protected synchronized void updateApplicationStateInternal(
pathExists = false;
if (appIdNodeSplitIndex != 0) {
String rootNode =
- getSplitAppNodeParent(nodeUpdatePath, appIdNodeSplitIndex);
+ getSplitZnodeParent(nodeUpdatePath, appIdNodeSplitIndex);
if (!exists(rootNode)) {
zkManager.safeCreate(rootNode, null, zkAcl, CreateMode.PERSISTENT,
zkAcl, fencingNodePath);
@@ -819,7 +893,7 @@ private void handleApplicationAttemptStateOp(
String appDirPath = getLeafAppIdNodePath(appId, false);
// Look for paths based on other split indices.
if (!exists(appDirPath)) {
- AppNodeSplitInfo alternatePathInfo = getAlternatePath(appId);
+ ZnodeSplitInfo alternatePathInfo = getAlternateAppPath(appId);
if (alternatePathInfo == null) {
if (operation == AppAttemptOp.REMOVE) {
// Unexpected. Assume that app attempt has been deleted.
@@ -918,7 +992,7 @@ private void removeApp(String removeAppId, boolean safeRemove,
// Look for paths based on other split indices if path as per configured
// split index does not exist.
if (!exists(appIdRemovePath)) {
- AppNodeSplitInfo alternatePathInfo = getAlternatePath(removeAppId);
+ ZnodeSplitInfo alternatePathInfo = getAlternateAppPath(removeAppId);
if (alternatePathInfo != null) {
appIdRemovePath = alternatePathInfo.path;
splitIndex = alternatePathInfo.splitIndex;
@@ -946,24 +1020,60 @@ private void removeApp(String removeAppId, boolean safeRemove,
forPath(appIdRemovePath);
}
// Check if we should remove the parent app node as well.
- checkRemoveParentAppNode(appIdRemovePath, splitIndex);
+ checkRemoveParentZnode(appIdRemovePath, splitIndex);
}
@Override
protected synchronized void storeRMDelegationTokenState(
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate)
throws Exception {
- SafeTransaction trx = zkManager.createTransaction(zkAcl, fencingNodePath);
- addStoreOrUpdateOps(trx, rmDTIdentifier, renewDate, false);
- trx.commit();
+ String nodeCreatePath = getLeafDelegationTokenNodePath(
+ rmDTIdentifier.getSequenceNumber(), true);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Storing " + DELEGATION_TOKEN_PREFIX
+ + rmDTIdentifier.getSequenceNumber());
+ }
+
+ RMDelegationTokenIdentifierData identifierData =
+ new RMDelegationTokenIdentifierData(rmDTIdentifier, renewDate);
+ ByteArrayOutputStream seqOs = new ByteArrayOutputStream();
+ try (DataOutputStream seqOut = new DataOutputStream(seqOs)) {
+ SafeTransaction trx = zkManager.createTransaction(zkAcl,
+ fencingNodePath);
+ trx.create(nodeCreatePath, identifierData.toByteArray(), zkAcl,
+ CreateMode.PERSISTENT);
+ // Update Sequence number only while storing DT
+ seqOut.writeInt(rmDTIdentifier.getSequenceNumber());
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Storing " + dtSequenceNumberPath + ". SequenceNumber: "
+ + rmDTIdentifier.getSequenceNumber());
+ }
+
+ trx.setData(dtSequenceNumberPath, seqOs.toByteArray(), -1);
+ trx.commit();
+ }
}
@Override
protected synchronized void removeRMDelegationTokenState(
RMDelegationTokenIdentifier rmDTIdentifier) throws Exception {
- String nodeRemovePath =
- getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
- + rmDTIdentifier.getSequenceNumber());
+ String nodeRemovePath = getLeafDelegationTokenNodePath(
+ rmDTIdentifier.getSequenceNumber(), false);
+ int splitIndex = delegationTokenNodeSplitIndex;
+ // Look for paths based on other split indices if path as per configured
+ // split index does not exist.
+ if (!exists(nodeRemovePath)) {
+ ZnodeSplitInfo alternatePathInfo =
+ getAlternateDTPath(rmDTIdentifier.getSequenceNumber());
+ if (alternatePathInfo != null) {
+ nodeRemovePath = alternatePathInfo.path;
+ splitIndex = alternatePathInfo.splitIndex;
+ } else {
+ // Alternate path not found so return.
+ return;
+ }
+ }
if (LOG.isDebugEnabled()) {
LOG.debug("Removing RMDelegationToken_"
@@ -971,62 +1081,41 @@ protected synchronized void removeRMDelegationTokenState(
}
zkManager.safeDelete(nodeRemovePath, zkAcl, fencingNodePath);
+
+ // Check if we should remove the parent app node as well.
+ checkRemoveParentZnode(nodeRemovePath, splitIndex);
}
@Override
protected synchronized void updateRMDelegationTokenState(
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate)
throws Exception {
- SafeTransaction trx = zkManager.createTransaction(zkAcl, fencingNodePath);
- String nodeRemovePath =
- getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
- + rmDTIdentifier.getSequenceNumber());
-
- if (exists(nodeRemovePath)) {
- // in case znode exists
- addStoreOrUpdateOps(trx, rmDTIdentifier, renewDate, true);
- } else {
- // in case znode doesn't exist
- addStoreOrUpdateOps(trx, rmDTIdentifier, renewDate, false);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Attempted to update a non-existing znode " + nodeRemovePath);
+ String nodeUpdatePath = getLeafDelegationTokenNodePath(
+ rmDTIdentifier.getSequenceNumber(), false);
+ boolean pathExists = true;
+ // Look for paths based on other split indices if path as per split index
+ // does not exist.
+ if (!exists(nodeUpdatePath)) {
+ ZnodeSplitInfo alternatePathInfo =
+ getAlternateDTPath(rmDTIdentifier.getSequenceNumber());
+ if (alternatePathInfo != null) {
+ nodeUpdatePath = alternatePathInfo.path;
+ } else {
+ pathExists = false;
}
}
- trx.commit();
- }
-
- private void addStoreOrUpdateOps(SafeTransaction trx,
- RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
- boolean isUpdate) throws Exception {
- // store RM delegation token
- String nodeCreatePath = getNodePath(delegationTokensRootPath,
- DELEGATION_TOKEN_PREFIX + rmDTIdentifier.getSequenceNumber());
- RMDelegationTokenIdentifierData identifierData =
- new RMDelegationTokenIdentifierData(rmDTIdentifier, renewDate);
- ByteArrayOutputStream seqOs = new ByteArrayOutputStream();
-
- try (DataOutputStream seqOut = new DataOutputStream(seqOs)) {
-
- if (isUpdate) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Updating RMDelegationToken_"
- + rmDTIdentifier.getSequenceNumber());
- }
- trx.setData(nodeCreatePath, identifierData.toByteArray(), -1);
- } else {
- trx.create(nodeCreatePath, identifierData.toByteArray(), zkAcl,
- CreateMode.PERSISTENT);
- // Update Sequence number only while storing DT
- seqOut.writeInt(rmDTIdentifier.getSequenceNumber());
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Storing " + dtSequenceNumberPath + ". SequenceNumber: "
- + rmDTIdentifier.getSequenceNumber());
- }
-
- trx.setData(dtSequenceNumberPath, seqOs.toByteArray(), -1);
+ if (pathExists) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Updating " + DELEGATION_TOKEN_PREFIX
+ + rmDTIdentifier.getSequenceNumber());
}
+ RMDelegationTokenIdentifierData identifierData =
+ new RMDelegationTokenIdentifierData(rmDTIdentifier, renewDate);
+ zkManager.safeSetData(nodeUpdatePath, identifierData.toByteArray(), -1,
+ zkAcl, fencingNodePath);
+ } else {
+ storeRMDelegationTokenState(rmDTIdentifier, renewDate);
}
}
@@ -1156,19 +1245,19 @@ private void addOrUpdateReservationState(
* Get alternate path for app id if path according to configured split index
* does not exist. We look for path based on all possible split indices.
* @param appId
- * @return a {@link AppNodeSplitInfo} object containing the path and split
+ * @return a {@link ZnodeSplitInfo} object containing the path and split
* index if it exists, null otherwise.
* @throws Exception if any problem occurs while performing ZK operation.
*/
- private AppNodeSplitInfo getAlternatePath(String appId) throws Exception {
+ private ZnodeSplitInfo getAlternateAppPath(String appId) throws Exception {
for (Map.Entry entry : rmAppRootHierarchies.entrySet()) {
// Look for other paths
int splitIndex = entry.getKey();
if (splitIndex != appIdNodeSplitIndex) {
String alternatePath =
- getLeafAppIdNodePath(appId, entry.getValue(), splitIndex, false);
+ getLeafZnodePath(appId, entry.getValue(), splitIndex, false);
if (exists(alternatePath)) {
- return new AppNodeSplitInfo(alternatePath, splitIndex);
+ return new ZnodeSplitInfo(alternatePath, splitIndex);
}
}
}
@@ -1176,26 +1265,25 @@ private AppNodeSplitInfo getAlternatePath(String appId) throws Exception {
}
/**
- * Returns leaf app node path based on app id and passed split index. If the
- * passed flag createParentIfNotExists is true, also creates the parent app
- * node if it does not exist.
- * @param appId application id.
+ * Returns leaf znode path based on node name and passed split index. If the
+ * passed flag createParentIfNotExists is true, also creates the parent znode
+ * if it does not exist.
+ * @param nodeName the node name.
* @param rootNode app root node based on split index.
- * @param appIdNodeSplitIdx split index.
- * @param createParentIfNotExists flag which determines if parent app node
+ * @param splitIdx split index.
+ * @param createParentIfNotExists flag which determines if parent znode
* needs to be created(as per split) if it does not exist.
- * @return leaf app node path.
+ * @return leaf znode path.
* @throws Exception if any problem occurs while performing ZK operation.
*/
- private String getLeafAppIdNodePath(String appId, String rootNode,
- int appIdNodeSplitIdx, boolean createParentIfNotExists) throws Exception {
- if (appIdNodeSplitIdx == 0) {
- return getNodePath(rootNode, appId);
+ private String getLeafZnodePath(String nodeName, String rootNode,
+ int splitIdx, boolean createParentIfNotExists) throws Exception {
+ if (splitIdx == 0) {
+ return getNodePath(rootNode, nodeName);
}
- String nodeName = appId;
- int splitIdx = nodeName.length() - appIdNodeSplitIdx;
+ int split = nodeName.length() - splitIdx;
String rootNodePath =
- getNodePath(rootNode, nodeName.substring(0, splitIdx));
+ getNodePath(rootNode, nodeName.substring(0, split));
if (createParentIfNotExists && !exists(rootNodePath)) {
try {
zkManager.safeCreate(rootNodePath, null, zkAcl, CreateMode.PERSISTENT,
@@ -1207,7 +1295,7 @@ private String getLeafAppIdNodePath(String appId, String rootNode,
}
}
}
- return getNodePath(rootNodePath, nodeName.substring(splitIdx));
+ return getNodePath(rootNodePath, nodeName.substring(split));
}
/**
@@ -1222,10 +1310,77 @@ private String getLeafAppIdNodePath(String appId, String rootNode,
*/
private String getLeafAppIdNodePath(String appId,
boolean createParentIfNotExists) throws Exception {
- return getLeafAppIdNodePath(appId, rmAppRootHierarchies.get(
+ return getLeafZnodePath(appId, rmAppRootHierarchies.get(
appIdNodeSplitIndex), appIdNodeSplitIndex, createParentIfNotExists);
}
+ /**
+ * Returns leaf delegation token node path based on sequence number and
+ * configured split index. If the passed flag createParentIfNotExists is true,
+ * also creates the parent znode if it does not exist. The sequence number
+ * is padded to be at least 4 digits wide to ensure consistency with the split
+ * indexing.
+ * @param rmDTSequenceNumber delegation token sequence number.
+ * @param createParentIfNotExists flag which determines if parent znode
+ * needs to be created(as per split) if it does not exist.
+ * @return leaf delegation token node path.
+ * @throws Exception if any problem occurs while performing ZK operation.
+ */
+ private String getLeafDelegationTokenNodePath(int rmDTSequenceNumber,
+ boolean createParentIfNotExists) throws Exception {
+ return getLeafDelegationTokenNodePath(rmDTSequenceNumber,
+ createParentIfNotExists, delegationTokenNodeSplitIndex);
+ }
+
+ /**
+ * Returns leaf delegation token node path based on sequence number and
+ * passed split index. If the passed flag createParentIfNotExists is true,
+ * also creates the parent znode if it does not exist. The sequence number
+ * is padded to be at least 4 digits wide to ensure consistency with the split
+ * indexing.
+ * @param rmDTSequenceNumber delegation token sequence number.
+ * @param createParentIfNotExists flag which determines if parent znode
+ * needs to be created(as per split) if it does not exist.
+ * @param split the split index to use
+ * @return leaf delegation token node path.
+ * @throws Exception if any problem occurs while performing ZK operation.
+ */
+ private String getLeafDelegationTokenNodePath(int rmDTSequenceNumber,
+ boolean createParentIfNotExists, int split) throws Exception {
+ String nodeName = DELEGATION_TOKEN_PREFIX;
+ if (split == 0) {
+ nodeName += rmDTSequenceNumber;
+ } else {
+ nodeName += String.format("%04d", rmDTSequenceNumber);
+ }
+ return getLeafZnodePath(nodeName, rmDelegationTokenHierarchies.get(split),
+ split, createParentIfNotExists);
+ }
+
+ /**
+ * Get alternate path for delegation token if path according to configured
+ * split index does not exist. We look for path based on all possible split
+ * indices.
+ * @param rmDTSequenceNumber delegation token sequence number.
+ * @return a {@link ZnodeSplitInfo} object containing the path and split
+ * index if it exists, null otherwise.
+ * @throws Exception if any problem occurs while performing ZK operation.
+ */
+ private ZnodeSplitInfo getAlternateDTPath(int rmDTSequenceNumber)
+ throws Exception {
+ // Check all possible paths until we find it
+ for (int splitIndex : rmDelegationTokenHierarchies.keySet()) {
+ if (splitIndex != delegationTokenNodeSplitIndex) {
+ String alternatePath = getLeafDelegationTokenNodePath(
+ rmDTSequenceNumber, false, splitIndex);
+ if (exists(alternatePath)) {
+ return new ZnodeSplitInfo(alternatePath, splitIndex);
+ }
+ }
+ }
+ return null;
+ }
+
@VisibleForTesting
byte[] getData(final String path) throws Exception {
return zkManager.getData(path);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
index f94bbd31b4..6a8f47d085 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
@@ -64,7 +64,6 @@
import org.apache.zookeeper.ZooDefs.Perms;
import org.apache.zookeeper.data.ACL;
import org.junit.After;
-import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -75,16 +74,20 @@
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import javax.crypto.SecretKey;
@@ -128,10 +131,9 @@ class TestZKRMStateStoreTester implements RMStateStoreHelper {
TestZKRMStateStoreInternal store;
String workingZnode;
-
class TestZKRMStateStoreInternal extends ZKRMStateStore {
- public TestZKRMStateStoreInternal(Configuration conf, String workingZnode)
+ TestZKRMStateStoreInternal(Configuration conf, String workingZnode)
throws Exception {
setResourceManager(new ResourceManager());
init(conf);
@@ -140,7 +142,7 @@ public TestZKRMStateStoreInternal(Configuration conf, String workingZnode)
assertTrue(znodeWorkingPath.equals(workingZnode));
}
- public String getVersionNode() {
+ private String getVersionNode() {
return znodeWorkingPath + "/" + ROOT_ZNODE_NAME + "/" + VERSION_NODE;
}
@@ -162,11 +164,11 @@ private String getAppNode(String appId, int splitIdx) {
return rootPath + "/" + appPath;
}
- public String getAppNode(String appId) {
+ private String getAppNode(String appId) {
return getAppNode(appId, 0);
}
- public String getAttemptNode(String appId, String attemptId) {
+ private String getAttemptNode(String appId, String attemptId) {
return getAppNode(appId) + "/" + attemptId;
}
@@ -174,10 +176,28 @@ public String getAttemptNode(String appId, String attemptId) {
* Emulating retrying createRootDir not to raise NodeExist exception
* @throws Exception
*/
- public void testRetryingCreateRootDir() throws Exception {
+ private void testRetryingCreateRootDir() throws Exception {
create(znodeWorkingPath);
}
+ private String getDelegationTokenNode(int rmDTSequenceNumber, int splitIdx) {
+ String rootPath = workingZnode + "/" + ROOT_ZNODE_NAME + "/" +
+ RM_DT_SECRET_MANAGER_ROOT + "/" +
+ RMStateStore.RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME;
+ String nodeName = DELEGATION_TOKEN_PREFIX;
+ if (splitIdx == 0) {
+ nodeName += rmDTSequenceNumber;
+ } else {
+ nodeName += String.format("%04d", rmDTSequenceNumber);
+ }
+ String path = nodeName;
+ if (splitIdx != 0) {
+ int idx = nodeName.length() - splitIdx;
+ path = splitIdx + "/" + nodeName.substring(0, idx) + "/"
+ + nodeName.substring(idx);
+ }
+ return rootPath + "/" + path;
+ }
}
private RMStateStore createStore(Configuration conf) throws Exception {
@@ -235,6 +255,17 @@ public boolean attemptExists(RMAppAttempt attempt) throws Exception {
.forPath(store.getAttemptNode(
attemptId.getApplicationId().toString(), attemptId.toString()));
}
+
+ public boolean delegationTokenExists(RMDelegationTokenIdentifier token,
+ int index) throws Exception {
+ int rmDTSequenceNumber = token.getSequenceNumber();
+ return curatorFramework.checkExists().forPath(
+ store.getDelegationTokenNode(rmDTSequenceNumber, index)) != null;
+ }
+
+ public int getDelegationTokenNodeSplitIndex() {
+ return store.delegationTokenNodeSplitIndex;
+ }
}
@Test (timeout = 60000)
@@ -332,7 +363,8 @@ protected synchronized void storeVersion() throws Exception {
RMStateStore store = zkTester.getRMStateStore();
Version defaultVersion = zkTester.getCurrentVersion();
store.checkVersion();
- Assert.assertEquals(defaultVersion, store.loadVersion());
+ assertEquals("Store had wrong version",
+ defaultVersion, store.loadVersion());
}
public static Configuration createHARMConf(String rmIds, String rmId,
@@ -546,11 +578,20 @@ public void testFencedState() throws Exception {
new Text("renewer1"), new Text("realuser1"));
Long renewDate1 = new Long(System.currentTimeMillis());
dtId1.setSequenceNumber(1111);
+ assertFalse("Token " + dtId1
+ + " should not exist but was found in ZooKeeper",
+ zkTester.delegationTokenExists(dtId1, 0));
store.storeRMDelegationToken(dtId1, renewDate1);
+ assertFalse("Token " + dtId1
+ + " should not exist but was found in ZooKeeper",
+ zkTester.delegationTokenExists(dtId1, 0));
assertEquals("RMStateStore should have been in fenced state", true,
store.isFencedState());
store.updateRMDelegationToken(dtId1, renewDate1);
+ assertFalse("Token " + dtId1
+ + " should not exist but was found in ZooKeeper",
+ zkTester.delegationTokenExists(dtId1, 0));
assertEquals("RMStateStore should have been in fenced state", true,
store.isFencedState());
@@ -606,7 +647,7 @@ public void testDuplicateRMAppDeletion() throws Exception {
try {
store.removeApplicationStateInternal(appStateRemoved);
} catch (KeeperException.NoNodeException nne) {
- Assert.fail("NoNodeException should not happen.");
+ fail("NoNodeException should not happen.");
}
store.close();
}
@@ -1134,4 +1175,317 @@ public void testAppNodeSplitChangeAcrossRestarts() throws Exception {
// Close the state store.
store.close();
}
+
+ private static Configuration createConfForDelegationTokenNodeSplit(
+ int splitIndex) {
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.setInt(YarnConfiguration.ZK_DELEGATION_TOKEN_NODE_SPLIT_INDEX,
+ splitIndex);
+ return conf;
+ }
+
+ private void verifyDelegationTokensStateStore(
+ TestZKRMStateStoreTester zkTester,
+ Map tokensWithRenewal,
+ Map tokensWithIndex,
+ int sequenceNumber) throws Exception {
+ RMStateStore.RMDTSecretManagerState secretManagerState =
+ zkTester.store.loadState().getRMDTSecretManagerState();
+ assertEquals("Unexpected token state",
+ tokensWithRenewal, secretManagerState.getTokenState());
+ assertEquals("Unexpected sequence number", sequenceNumber,
+ secretManagerState.getDTSequenceNumber());
+ for (Map.Entry tokenEntry
+ : tokensWithIndex.entrySet()) {
+ assertTrue("Expected to find token " + tokenEntry.getKey()
+ + " in zookeeper but did not",
+ zkTester.delegationTokenExists(tokenEntry.getKey(),
+ tokenEntry.getValue()));
+ }
+ }
+
+ private void verifyDelegationTokenInStateStore(
+ TestZKRMStateStoreTester zkTester, RMDelegationTokenIdentifier token,
+ long renewDate, int index) throws Exception {
+ RMStateStore.RMDTSecretManagerState secretManagerState =
+ zkTester.store.loadState().getRMDTSecretManagerState();
+ Map tokenState =
+ secretManagerState.getTokenState();
+ assertTrue("token state does not contain " + token,
+ tokenState.containsKey(token));
+ assertTrue("token state does not contain a token with renewal " + renewDate,
+ tokenState.containsValue(renewDate));
+ assertTrue("Token " + token + "should exist but was not found in ZooKeeper",
+ zkTester.delegationTokenExists(token, index));
+ }
+
+ private RMDelegationTokenIdentifier storeUpdateAndVerifyDelegationToken(
+ TestZKRMStateStoreTester zkTester,
+ Map tokensWithRenewal,
+ Map tokensWithIndex,
+ int sequenceNumber, int split) throws Exception {
+ // Store token
+ RMDelegationTokenIdentifier token =
+ new RMDelegationTokenIdentifier(new Text("owner"),
+ new Text("renewer"), new Text("realuser"));
+ assertFalse("Token should not exist but was found in ZooKeeper",
+ zkTester.delegationTokenExists(token, split));
+ token.setSequenceNumber(sequenceNumber);
+ Long renewDate = System.currentTimeMillis();
+ zkTester.store.storeRMDelegationToken(token, renewDate);
+ modifyRMDelegationTokenState();
+ tokensWithRenewal.put(token, renewDate);
+ tokensWithIndex.put(token, split);
+
+ // Verify the token
+ verifyDelegationTokensStateStore(zkTester, tokensWithRenewal,
+ tokensWithIndex, sequenceNumber);
+
+ // Update the token
+ renewDate = System.currentTimeMillis();
+ zkTester.store.updateRMDelegationToken(token, renewDate);
+ tokensWithRenewal.put(token, renewDate);
+ tokensWithIndex.put(token, split);
+
+ // Verify updates
+ verifyDelegationTokensStateStore(zkTester, tokensWithRenewal,
+ tokensWithIndex, sequenceNumber);
+
+ return token;
+ }
+
+ @Test
+ public void testDelegationTokenSplitIndexConfig() throws Exception {
+ // Valid values
+ TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
+ zkTester.getRMStateStore(createConfForDelegationTokenNodeSplit(0)).close();
+ assertEquals("Incorrect split index",
+ 0, zkTester.getDelegationTokenNodeSplitIndex());
+ zkTester.getRMStateStore(createConfForDelegationTokenNodeSplit(1)).close();
+ assertEquals("Incorrect split index",
+ 1, zkTester.getDelegationTokenNodeSplitIndex());
+ zkTester.getRMStateStore(createConfForDelegationTokenNodeSplit(2)).close();
+ assertEquals("Incorrect split index",
+ 2, zkTester.getDelegationTokenNodeSplitIndex());
+ zkTester.getRMStateStore(createConfForDelegationTokenNodeSplit(3)).close();
+ assertEquals("Incorrect split index",
+ 3, zkTester.getDelegationTokenNodeSplitIndex());
+ zkTester.getRMStateStore(createConfForDelegationTokenNodeSplit(4)).close();
+ assertEquals("Incorrect split index",
+ 4, zkTester.getDelegationTokenNodeSplitIndex());
+
+ // Invalid values --> override to 0
+ zkTester.getRMStateStore(createConfForDelegationTokenNodeSplit(-1)).close();
+ assertEquals("Incorrect split index",
+ 0, zkTester.getDelegationTokenNodeSplitIndex());
+ zkTester.getRMStateStore(createConfForDelegationTokenNodeSplit(5)).close();
+ assertEquals("Incorrect split index",
+ 0, zkTester.getDelegationTokenNodeSplitIndex());
+ }
+
+ @Test
+ public void testDelegationTokenNodeNoSplit() throws Exception {
+ testDelegationTokenNode(0);
+ }
+
+ @Test
+ public void testDelegationTokenNodeWithSplitOne() throws Exception {
+ testDelegationTokenNode(1);
+ }
+
+ @Test
+ public void testDelegationTokenNodeWithSplitTwo() throws Exception {
+ testDelegationTokenNode(2);
+ }
+
+ @Test
+ public void testDelegationTokenNodeWithSplitThree() throws Exception {
+ testDelegationTokenNode(3);
+ }
+
+ @Test
+ public void testDelegationTokenNodeWithSplitFour() throws Exception {
+ testDelegationTokenNode(4);
+ }
+
+ public void testDelegationTokenNode(int split) throws Exception {
+ TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
+ Configuration conf = createConfForDelegationTokenNodeSplit(split);
+ RMStateStore store = zkTester.getRMStateStore(conf);
+
+ // Store the token and verify
+ Map tokensWithRenewal = new HashMap<>();
+ Map tokensWithIndex = new HashMap<>();
+ int sequenceNumber = 0;
+ RMDelegationTokenIdentifier token = storeUpdateAndVerifyDelegationToken(
+ zkTester, tokensWithRenewal, tokensWithIndex, sequenceNumber, split);
+
+ // Delete the token and verify
+ store.removeRMDelegationToken(token);
+ RMStateStore.RMDTSecretManagerState state =
+ store.loadState().getRMDTSecretManagerState();
+ tokensWithRenewal.clear();
+ tokensWithIndex.clear();
+ assertEquals("Unexpected token state",
+ tokensWithRenewal, state.getTokenState());
+ assertEquals("Unexpected sequence number",
+ sequenceNumber, state.getDTSequenceNumber());
+ assertFalse("Token should not exist but was found in ZooKeeper",
+ zkTester.delegationTokenExists(token, split));
+ store.close();
+ }
+
+ @Test
+ public void testDelegationTokenNodeWithSplitMultiple() throws Exception {
+ TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
+ Configuration conf = createConfForDelegationTokenNodeSplit(1);
+ RMStateStore store = zkTester.getRMStateStore(conf);
+
+ // With the split set to 1, we can store 10 tokens under a znode (i.e. 0-9)
+ // Try to store more than 10
+ Map tokensWithRenewal = new HashMap<>();
+ Map tokensWithIndex = new HashMap<>();
+ Set tokensToDelete = new HashSet<>();
+ int sequenceNumber = 0;
+ for (int i = 0; i <= 12; i++) {
+ RMDelegationTokenIdentifier token =
+ new RMDelegationTokenIdentifier(new Text("owner" + i),
+ new Text("renewer" + i), new Text("realuser" + i));
+ sequenceNumber = i;
+ token.setSequenceNumber(sequenceNumber);
+ assertFalse("Token should not exist but was found in ZooKeeper",
+ zkTester.delegationTokenExists(token, 1));
+ Long renewDate = System.currentTimeMillis();
+ store.storeRMDelegationToken(token, renewDate);
+ modifyRMDelegationTokenState();
+ tokensWithRenewal.put(token, renewDate);
+ tokensWithIndex.put(token, 1);
+ switch (i) {
+ case 0:
+ case 3:
+ case 6:
+ case 11:
+ tokensToDelete.add(token);
+ break;
+ default:
+ break;
+ }
+ }
+ // Verify
+ verifyDelegationTokensStateStore(zkTester, tokensWithRenewal,
+ tokensWithIndex, sequenceNumber);
+
+ // Try deleting some tokens and adding some new ones
+ for (RMDelegationTokenIdentifier tokenToDelete : tokensToDelete) {
+ store.removeRMDelegationToken(tokenToDelete);
+ tokensWithRenewal.remove(tokenToDelete);
+ tokensWithIndex.remove(tokenToDelete);
+ }
+ for (int i = 13; i <= 22; i++) {
+ RMDelegationTokenIdentifier token =
+ new RMDelegationTokenIdentifier(new Text("owner" + i),
+ new Text("renewer" + i), new Text("realuser" + i));
+ sequenceNumber = i;
+ token.setSequenceNumber(sequenceNumber);
+ Long renewDate = System.currentTimeMillis();
+ store.storeRMDelegationToken(token, renewDate);
+ modifyRMDelegationTokenState();
+ tokensWithRenewal.put(token, renewDate);
+ tokensWithIndex.put(token, 1);
+ }
+ // Verify
+ verifyDelegationTokensStateStore(zkTester, tokensWithRenewal,
+ tokensWithIndex, sequenceNumber);
+ for (RMDelegationTokenIdentifier token : tokensToDelete) {
+ assertFalse("Token " + token
+ + " should not exist but was found in ZooKeeper",
+ zkTester.delegationTokenExists(token, 1));
+ }
+ store.close();
+ }
+
+ @Test
+ public void testDelegationTokenNodeWithSplitChangeAcrossRestarts()
+ throws Exception {
+ TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
+ Map tokensWithRenewal = new HashMap<>();
+ Map tokensWithIndex = new HashMap<>();
+ int sequenceNumber = 0;
+
+ // Start the store with index 1
+ Configuration conf = createConfForDelegationTokenNodeSplit(1);
+ RMStateStore store = zkTester.getRMStateStore(conf);
+ // Store a token with index 1
+ RMDelegationTokenIdentifier token1 = storeUpdateAndVerifyDelegationToken(
+ zkTester, tokensWithRenewal, tokensWithIndex, sequenceNumber, 1);
+ store.close();
+
+ // Start the store with index 2
+ conf = createConfForDelegationTokenNodeSplit(2);
+ store = zkTester.getRMStateStore(conf);
+ // Verify token1 is still there and under the /1/ znode
+ verifyDelegationTokenInStateStore(
+ zkTester, token1, tokensWithRenewal.get(token1), 1);
+ // Store a token with index 2
+ sequenceNumber++;
+ RMDelegationTokenIdentifier token2 = storeUpdateAndVerifyDelegationToken(
+ zkTester, tokensWithRenewal, tokensWithIndex, sequenceNumber, 2);
+ // Update and verify token1
+ long renewDate1 = System.currentTimeMillis();
+ zkTester.store.updateRMDelegationToken(token1, renewDate1);
+ tokensWithRenewal.put(token1, renewDate1);
+ verifyDelegationTokenInStateStore(
+ zkTester, token1, tokensWithRenewal.get(token1), 1);
+ store.close();
+
+ // Start the store with index 0
+ conf = createConfForDelegationTokenNodeSplit(0);
+ store = zkTester.getRMStateStore(conf);
+ // Verify token1 is still there and under the /1/ znode
+ verifyDelegationTokenInStateStore(
+ zkTester, token1, tokensWithRenewal.get(token1), 1);
+ // Verify token2 is still there and under the /2/ znode
+ verifyDelegationTokenInStateStore(
+ zkTester, token2, tokensWithRenewal.get(token2), 2);
+ // Store a token with no index
+ sequenceNumber++;
+ RMDelegationTokenIdentifier token0 = storeUpdateAndVerifyDelegationToken(
+ zkTester, tokensWithRenewal, tokensWithIndex, sequenceNumber, 0);
+ store.close();
+
+ // Start the store with index 3
+ conf = createConfForDelegationTokenNodeSplit(3);
+ store = zkTester.getRMStateStore(conf);
+ // Verify token1 is still there and under the /1/ znode
+ verifyDelegationTokenInStateStore(
+ zkTester, token1, tokensWithRenewal.get(token1), 1);
+ // Verify token2 is still there and under the /2/ znode
+ verifyDelegationTokenInStateStore(
+ zkTester, token2, tokensWithRenewal.get(token2), 2);
+ // Verify token0 is still there and under the token root node
+ verifyDelegationTokenInStateStore(
+ zkTester, token0, tokensWithRenewal.get(token0), 0);
+ // Delete all tokens and verify
+ for (RMDelegationTokenIdentifier token : tokensWithRenewal.keySet()) {
+ store.removeRMDelegationToken(token);
+ }
+ tokensWithRenewal.clear();
+ tokensWithIndex.clear();
+ verifyDelegationTokensStateStore(
+ zkTester, tokensWithRenewal, tokensWithIndex, sequenceNumber);
+ assertFalse("Token " + token1
+ + " should not exist but was found in ZooKeeper",
+ zkTester.delegationTokenExists(token1, 1));
+ assertFalse("Token " + token1
+ + " should not exist but was found in ZooKeeper",
+ zkTester.delegationTokenExists(token2, 2));
+ assertFalse("Token " + token1
+ + " should not exist but was found in ZooKeeper",
+ zkTester.delegationTokenExists(token0, 0));
+ // Store a token with index 3
+ sequenceNumber++;
+ storeUpdateAndVerifyDelegationToken(zkTester, tokensWithRenewal,
+ tokensWithIndex, sequenceNumber, 3);
+ store.close();
+ }
}