diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index fa4d2e3d32..82274fe74a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -568,6 +568,10 @@ public static boolean isAclEnabled(Configuration conf) {
public static final String RM_ZK_TIMEOUT_MS = RM_ZK_PREFIX + "timeout-ms";
public static final int DEFAULT_RM_ZK_TIMEOUT_MS = 10000;
+ public static final String ZK_APPID_NODE_SPLIT_INDEX =
+ RM_ZK_PREFIX + "appid-node.split-index";
+ public static final int DEFAULT_ZK_APPID_NODE_SPLIT_INDEX = 0;
+
public static final String RM_ZK_ACL = RM_ZK_PREFIX + "acl";
public static final String DEFAULT_RM_ZK_ACL = "world:anyone:rwcda";
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index bdd4de52c0..e687eef370 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -609,7 +609,27 @@
- Name of the cluster. In an HA setting,
+ Index at which last section of application id (with each section
+ separated by _ in application id) will be split so that application znode
+ stored in zookeeper RM state store will be stored as two different znodes
+ (parent-child). Split is done from the end.
+ For instance, with no split, appid znode will be of the form
+ application_1352994193343_0001. If the value of this config is 1, the
+ appid znode will be broken into two parts application_1352994193343_000
+ and 1 respectively with former being the parent node.
+ application_1352994193343_0002 will then be stored as 2 under the parent
+ node application_1352994193343_000. This config can take values from 0 to 4.
+ 0 means there will be no split. If configuration value is outside this
+ range, it will be treated as config value of 0(i.e. no split). A value
+ larger than 0 (up to 4) should be configured if you are storing a large number
+ of apps in ZK based RM state store and state store operations are failing due to
+ LenError in Zookeeper.
+ yarn.resourcemanager.zk-appid-node.split-index
+ 0
+
+
+
+ Name of the cluster. In a HA setting,
this is used to ensure the RM participates in leader
election for this cluster and ensures it does not affect
other clusters
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
index 1212a91e6e..86f7a5b594 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
@@ -57,6 +57,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.EpochPBImpl;
import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
@@ -72,6 +73,8 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
/**
* {@link RMStateStore} implementation backed by ZooKeeper.
@@ -82,6 +85,31 @@
* |--- EPOCH_NODE
* |--- RM_ZK_FENCING_LOCK
* |--- RM_APP_ROOT
+ * | |----- HIERARCHIES
+ * | | |----- 1
+ * | | | |----- (#ApplicationId barring last character)
+ * | | | | |----- (#Last character of ApplicationId)
+ * | | | | | |----- (#ApplicationAttemptIds)
+ * | | | ....
+ * | | |
+ * | | |----- 2
+ * | | | |----- (#ApplicationId barring last 2 characters)
+ * | | | | |----- (#Last 2 characters of ApplicationId)
+ * | | | | | |----- (#ApplicationAttemptIds)
+ * | | | ....
+ * | | |
+ * | | |----- 3
+ * | | | |----- (#ApplicationId barring last 3 characters)
+ * | | | | |----- (#Last 3 characters of ApplicationId)
+ * | | | | | |----- (#ApplicationAttemptIds)
+ * | | | ....
+ * | | |
+ * | | |----- 4
+ * | | | |----- (#ApplicationId barring last 4 characters)
+ * | | | | |----- (#Last 4 characters of ApplicationId)
+ * | | | | | |----- (#ApplicationAttemptIds)
+ * | | | ....
+ * | | |
* | |----- (#ApplicationId1)
* | | |----- (#ApplicationAttemptIds)
* | |
@@ -121,6 +149,7 @@
@Unstable
public class ZKRMStateStore extends RMStateStore {
private static final Log LOG = LogFactory.getLog(ZKRMStateStore.class);
+
private static final String RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME =
"RMDelegationTokensRoot";
private static final String RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME =
@@ -129,12 +158,15 @@ public class ZKRMStateStore extends RMStateStore {
"RMDTMasterKeysRoot";
@VisibleForTesting
public static final String ROOT_ZNODE_NAME = "ZKRMStateRoot";
- protected static final Version CURRENT_VERSION_INFO =
- Version.newInstance(1, 3);
+ protected static final Version CURRENT_VERSION_INFO = Version
+ .newInstance(2, 0);
+ @VisibleForTesting
+ public static final String RM_APP_ROOT_HIERARCHIES = "HIERARCHIES";
/* Znode paths */
private String zkRootNodePath;
private String rmAppRoot;
+ private Map rmAppRootHierarchies;
private String rmDTSecretManagerRoot;
private String dtMasterKeysRootPath;
private String delegationTokensRootPath;
@@ -144,6 +176,7 @@ public class ZKRMStateStore extends RMStateStore {
@VisibleForTesting
protected String znodeWorkingPath;
+ private int appIdNodeSplitIndex = 0;
/* Fencing related variables */
private static final String FENCING_LOCK = "RM_ZK_FENCING_LOCK";
@@ -165,6 +198,27 @@ public class ZKRMStateStore extends RMStateStore {
@VisibleForTesting
protected CuratorFramework curatorFramework;
+ /*
+ * Indicates different app attempt state store operations.
+ */
+ private enum AppAttemptOp {
+ STORE,
+ UPDATE,
+ REMOVE
+ };
+
+ /**
+ * Encapsulates full app node path and corresponding split index.
+ */
+ private final static class AppNodeSplitInfo {
+ private final String path;
+ private final int splitIndex;
+ AppNodeSplitInfo(String path, int splitIndex) {
+ this.path = path;
+ this.splitIndex = splitIndex;
+ }
+ }
+
/**
* Given the {@link Configuration} and {@link ACL}s used (sourceACLs) for
* ZooKeeper access, construct the {@link ACL}s for the store's root node.
@@ -212,11 +266,30 @@ public synchronized void initInternal(Configuration conf)
conf.get(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH,
YarnConfiguration.DEFAULT_ZK_RM_STATE_STORE_PARENT_PATH);
zkRootNodePath = getNodePath(znodeWorkingPath, ROOT_ZNODE_NAME);
- fencingNodePath = getNodePath(zkRootNodePath, FENCING_LOCK);
rmAppRoot = getNodePath(zkRootNodePath, RM_APP_ROOT);
+ String hierarchiesPath = getNodePath(rmAppRoot, RM_APP_ROOT_HIERARCHIES);
+ rmAppRootHierarchies = new HashMap<>(5);
+ rmAppRootHierarchies.put(0, rmAppRoot);
+ for (int splitIndex = 1; splitIndex <= 4; splitIndex++) {
+ rmAppRootHierarchies.put(splitIndex,
+ getNodePath(hierarchiesPath, Integer.toString(splitIndex)));
+ }
+
+ fencingNodePath = getNodePath(zkRootNodePath, FENCING_LOCK);
zkSessionTimeout = conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS,
YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
+ appIdNodeSplitIndex =
+ conf.getInt(YarnConfiguration.ZK_APPID_NODE_SPLIT_INDEX,
+ YarnConfiguration.DEFAULT_ZK_APPID_NODE_SPLIT_INDEX);
+ if (appIdNodeSplitIndex < 1 || appIdNodeSplitIndex > 4) {
+ LOG.info("Invalid value " + appIdNodeSplitIndex + " for config " +
+ YarnConfiguration.ZK_APPID_NODE_SPLIT_INDEX + " specified. " +
+ "Resetting it to " +
+ YarnConfiguration.DEFAULT_ZK_APPID_NODE_SPLIT_INDEX);
+ appIdNodeSplitIndex = YarnConfiguration.DEFAULT_ZK_APPID_NODE_SPLIT_INDEX;
+ }
+
zkAcl = RMZKUtils.getZKAcls(conf);
if (HAUtil.isHAEnabled(conf)) {
@@ -269,6 +342,10 @@ public synchronized void startInternal() throws Exception {
verifyActiveStatusThread.start();
}
create(rmAppRoot);
+ create(getNodePath(rmAppRoot, RM_APP_ROOT_HIERARCHIES));
+ for (int splitIndex = 1; splitIndex <= 4; splitIndex++) {
+ create(rmAppRootHierarchies.get(splitIndex));
+ }
create(rmDTSecretManagerRoot);
create(dtMasterKeysRootPath);
create(delegationTokensRootPath);
@@ -524,42 +601,64 @@ private void loadRMDelegationTokenState(RMState rmState) throws Exception {
}
}
+ private void loadRMAppStateFromAppNode(RMState rmState, String appNodePath,
+ String appIdStr) throws Exception {
+ byte[] appData = getData(appNodePath);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Loading application from znode: " + appNodePath);
+ }
+ ApplicationId appId = ApplicationId.fromString(appIdStr);
+ ApplicationStateDataPBImpl appState = new ApplicationStateDataPBImpl(
+ ApplicationStateDataProto.parseFrom(appData));
+ if (!appId.equals(
+ appState.getApplicationSubmissionContext().getApplicationId())) {
+ throw new YarnRuntimeException("The node name is different from the " +
+ "application id");
+ }
+ rmState.appState.put(appId, appState);
+ loadApplicationAttemptState(appState, appNodePath);
+ }
+
private synchronized void loadRMAppState(RMState rmState) throws Exception {
- List childNodes = getChildren(rmAppRoot);
-
- for (String childNodeName : childNodes) {
- String childNodePath = getNodePath(rmAppRoot, childNodeName);
- byte[] childData = getData(childNodePath);
-
- if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
- // application
- if (LOG.isDebugEnabled()) {
- LOG.debug("Loading application from znode: " + childNodeName);
+ for (int splitIndex = 0; splitIndex <= 4; splitIndex++) {
+ String appRoot = rmAppRootHierarchies.get(splitIndex);
+ if (appRoot == null) {
+ continue;
+ }
+ List childNodes = getChildren(appRoot);
+ boolean appNodeFound = false;
+ for (String childNodeName : childNodes) {
+ if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
+ appNodeFound = true;
+ if (splitIndex == 0) {
+ loadRMAppStateFromAppNode(rmState,
+ getNodePath(appRoot, childNodeName), childNodeName);
+ } else {
+ // If AppId Node is partitioned.
+ String parentNodePath = getNodePath(appRoot, childNodeName);
+ List leafNodes = getChildren(parentNodePath);
+ for (String leafNodeName : leafNodes) {
+ String appIdStr = childNodeName + leafNodeName;
+ loadRMAppStateFromAppNode(rmState,
+ getNodePath(parentNodePath, leafNodeName), appIdStr);
+ }
+ }
+ } else {
+ LOG.info("Unknown child node with name: " + childNodeName);
}
-
- ApplicationId appId = ApplicationId.fromString(childNodeName);
- ApplicationStateDataPBImpl appState =
- new ApplicationStateDataPBImpl(
- ApplicationStateDataProto.parseFrom(childData));
-
- if (!appId.equals(
- appState.getApplicationSubmissionContext().getApplicationId())) {
- throw new YarnRuntimeException("The child node name is different "
- + "from the application id");
- }
-
- rmState.appState.put(appId, appState);
- loadApplicationAttemptState(appState, appId);
- } else {
- LOG.info("Unknown child node with name: " + childNodeName);
+ }
+ if (splitIndex != appIdNodeSplitIndex && !appNodeFound) {
+ // If no loaded app exists for a particular split index and the split
+ // index for which apps are being loaded is not the one configured, then
+ // we do not need to keep track of this hierarchy for storing/updating/
+ // removing app/app attempt znodes.
+ rmAppRootHierarchies.remove(splitIndex);
}
}
}
private void loadApplicationAttemptState(ApplicationStateData appState,
- ApplicationId appId)
- throws Exception {
- String appPath = getNodePath(rmAppRoot, appId.toString());
+ String appPath) throws Exception {
List attempts = getChildren(appPath);
for (String attemptIDStr : attempts) {
@@ -574,14 +673,68 @@ private void loadApplicationAttemptState(ApplicationStateData appState,
appState.attempts.put(attemptState.getAttemptId(), attemptState);
}
}
-
LOG.debug("Done loading applications from ZK state store");
}
+ /**
+ * Get parent app node path based on full path and split index supplied.
+ * @param appIdPath App id path for which parent needs to be returned.
+ * @param splitIndex split index.
+ * @return parent app node path.
+ */
+ private String getSplitAppNodeParent(String appIdPath, int splitIndex) {
+ // Calculated as string upto index (appIdPath Length - split index - 1). We
+ // deduct 1 to exclude path separator.
+ return appIdPath.substring(0, appIdPath.length() - splitIndex - 1);
+ }
+
+ /**
+ * Checks if parent app node has no leaf nodes and if it does not have,
+ * removes it. Called while removing application.
+ * @param appIdPath path of app id to be removed.
+ * @param splitIndex split index.
+ * @throws Exception if any problem occurs while performing ZK operation.
+ */
+ private void checkRemoveParentAppNode(String appIdPath, int splitIndex)
+ throws Exception {
+ if (splitIndex != 0) {
+ String parentAppNode = getSplitAppNodeParent(appIdPath, splitIndex);
+ List children = null;
+ try {
+ children = getChildren(parentAppNode);
+ } catch (KeeperException.NoNodeException ke) {
+ // It should be fine to swallow this exception as the parent app node we
+ // intend to delete is already deleted.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Unable to remove app parent node " + parentAppNode +
+ " as it does not exist.");
+ }
+ return;
+ }
+ // No apps stored under parent path.
+ if (children != null && children.isEmpty()) {
+ try {
+ safeDelete(parentAppNode);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("No leaf app node exists. Removing parent node " +
+ parentAppNode);
+ }
+ } catch (KeeperException.NotEmptyException ke) {
+ // It should be fine to swallow this exception as the parent app node
+ // has to be deleted only if it has no children. And this node has.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Unable to remove app parent node " + parentAppNode +
+ " as it has children.");
+ }
+ }
+ }
+ }
+ }
+
@Override
public synchronized void storeApplicationStateInternal(ApplicationId appId,
ApplicationStateData appStateDataPB) throws Exception {
- String nodeCreatePath = getNodePath(rmAppRoot, appId.toString());
+ String nodeCreatePath = getLeafAppIdNodePath(appId.toString(), true);
if (LOG.isDebugEnabled()) {
LOG.debug("Storing info for app: " + appId + " at: " + nodeCreatePath);
@@ -596,7 +749,26 @@ public synchronized void storeApplicationStateInternal(ApplicationId appId,
protected synchronized void updateApplicationStateInternal(
ApplicationId appId, ApplicationStateData appStateDataPB)
throws Exception {
- String nodeUpdatePath = getNodePath(rmAppRoot, appId.toString());
+ String nodeUpdatePath = getLeafAppIdNodePath(appId.toString(), false);
+ boolean pathExists = true;
+ // Look for paths based on other split indices if path as per split index
+ // does not exist.
+ if (!exists(nodeUpdatePath)) {
+ AppNodeSplitInfo alternatePathInfo = getAlternatePath(appId.toString());
+ if (alternatePathInfo != null) {
+ nodeUpdatePath = alternatePathInfo.path;
+ } else {
+ // No alternate path exists. Create path as per configured split index.
+ pathExists = false;
+ if (appIdNodeSplitIndex != 0) {
+ String rootNode =
+ getSplitAppNodeParent(nodeUpdatePath, appIdNodeSplitIndex);
+ if (!exists(rootNode)) {
+ safeCreate(rootNode, null, zkAcl, CreateMode.PERSISTENT);
+ }
+ }
+ }
+ }
if (LOG.isDebugEnabled()) {
LOG.debug("Storing final state info for app: " + appId + " at: "
@@ -605,34 +777,79 @@ protected synchronized void updateApplicationStateInternal(
byte[] appStateData = appStateDataPB.getProto().toByteArray();
- if (exists(nodeUpdatePath)) {
+ if (pathExists) {
safeSetData(nodeUpdatePath, appStateData, -1);
} else {
- safeCreate(nodeUpdatePath, appStateData, zkAcl,
- CreateMode.PERSISTENT);
+ safeCreate(nodeUpdatePath, appStateData, zkAcl, CreateMode.PERSISTENT);
if (LOG.isDebugEnabled()) {
- LOG.debug(appId + " znode didn't exist. Created a new znode to"
- + " update the application state.");
+ LOG.debug("Path " + nodeUpdatePath + " for " + appId + " didn't " +
+ "exist. Creating a new znode to update the application state.");
}
}
}
+ /*
+ * Handles store, update and remove application attempt state store
+ * operations.
+ */
+ private void handleApplicationAttemptStateOp(
+ ApplicationAttemptId appAttemptId,
+ ApplicationAttemptStateData attemptStateDataPB, AppAttemptOp operation)
+ throws Exception {
+ String appId = appAttemptId.getApplicationId().toString();
+ String appDirPath = getLeafAppIdNodePath(appId, false);
+ // Look for paths based on other split indices.
+ if (!exists(appDirPath)) {
+ AppNodeSplitInfo alternatePathInfo = getAlternatePath(appId);
+ if (alternatePathInfo == null) {
+ if (operation == AppAttemptOp.REMOVE) {
+ // Unexpected. Assume that app attempt has been deleted.
+ return;
+ } else { // Store or Update operation
+ throw new YarnRuntimeException("Unexpected Exception. App node for " +
+ "app " + appId + " not found");
+ }
+ } else {
+ appDirPath = alternatePathInfo.path;
+ }
+ }
+ String path = getNodePath(appDirPath, appAttemptId.toString());
+ byte[] attemptStateData = (attemptStateDataPB == null) ? null :
+ attemptStateDataPB.getProto().toByteArray();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(operation + " info for attempt: " + appAttemptId + " at: "
+ + path);
+ }
+ switch (operation) {
+ case UPDATE:
+ if (exists(path)) {
+ safeSetData(path, attemptStateData, -1);
+ } else {
+ safeCreate(path, attemptStateData, zkAcl, CreateMode.PERSISTENT);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Path " + path + " for " + appAttemptId + " didn't exist." +
+ " Created a new znode to update the application attempt state.");
+ }
+ }
+ break;
+ case STORE:
+ safeCreate(path, attemptStateData, zkAcl, CreateMode.PERSISTENT);
+ break;
+ case REMOVE:
+ safeDelete(path);
+ break;
+ default:
+ break;
+ }
+ }
+
@Override
protected synchronized void storeApplicationAttemptStateInternal(
ApplicationAttemptId appAttemptId,
ApplicationAttemptStateData attemptStateDataPB)
throws Exception {
- String appDirPath = getNodePath(rmAppRoot,
- appAttemptId.getApplicationId().toString());
- String nodeCreatePath = getNodePath(appDirPath, appAttemptId.toString());
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Storing info for attempt: " + appAttemptId + " at: "
- + nodeCreatePath);
- }
-
- byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
- safeCreate(nodeCreatePath, attemptStateData, zkAcl, CreateMode.PERSISTENT);
+ handleApplicationAttemptStateOp(appAttemptId, attemptStateDataPB,
+ AppAttemptOp.STORE);
}
@Override
@@ -640,65 +857,73 @@ protected synchronized void updateApplicationAttemptStateInternal(
ApplicationAttemptId appAttemptId,
ApplicationAttemptStateData attemptStateDataPB)
throws Exception {
- String appIdStr = appAttemptId.getApplicationId().toString();
- String appAttemptIdStr = appAttemptId.toString();
- String appDirPath = getNodePath(rmAppRoot, appIdStr);
- String nodeUpdatePath = getNodePath(appDirPath, appAttemptIdStr);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Storing final state info for attempt: " + appAttemptIdStr
- + " at: " + nodeUpdatePath);
- }
-
- byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
-
- if (exists(nodeUpdatePath)) {
- safeSetData(nodeUpdatePath, attemptStateData, -1);
- } else {
- safeCreate(nodeUpdatePath, attemptStateData, zkAcl,
- CreateMode.PERSISTENT);
- if (LOG.isDebugEnabled()) {
- LOG.debug(appAttemptId + " znode didn't exist. Created a new znode to"
- + " update the application attempt state.");
- }
- }
+ handleApplicationAttemptStateOp(appAttemptId, attemptStateDataPB,
+ AppAttemptOp.UPDATE);
}
@Override
protected synchronized void removeApplicationAttemptInternal(
ApplicationAttemptId appAttemptId) throws Exception {
- String appId = appAttemptId.getApplicationId().toString();
- String appIdRemovePath = getNodePath(rmAppRoot, appId);
- String attemptIdRemovePath =
- getNodePath(appIdRemovePath, appAttemptId.toString());
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Removing info for attempt: " + appAttemptId + " at: "
- + attemptIdRemovePath);
- }
-
- safeDelete(attemptIdRemovePath);
+ handleApplicationAttemptStateOp(appAttemptId, null, AppAttemptOp.REMOVE);
}
@Override
protected synchronized void removeApplicationStateInternal(
ApplicationStateData appState) throws Exception {
- String appId = appState.getApplicationSubmissionContext().getApplicationId()
- .toString();
- String appIdRemovePath = getNodePath(rmAppRoot, appId);
+ removeApp(appState.getApplicationSubmissionContext().
+ getApplicationId().toString(), true, appState.attempts.keySet());
+ }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Removing info for app: " + appId + " at: " + appIdRemovePath
- + " and its attempts.");
+ private void removeApp(String removeAppId) throws Exception {
+ removeApp(removeAppId, false, null);
+ }
+
+ /**
+ * Remove application node and its attempt nodes.
+ *
+ * @param removeAppId Application Id to be removed.
+ * @param safeRemove Flag indicating if application and attempt nodes have to
+ * be removed safely under a fencing or not.
+ * @param attempts list of attempts to be removed associated with this app.
+ * Ignored if safeRemove flag is false as we recursively delete all the
+ * child nodes directly.
+ * @throws Exception if any exception occurs during ZK operation.
+ */
+ private void removeApp(String removeAppId, boolean safeRemove,
+ Set attempts) throws Exception {
+ String appIdRemovePath = getLeafAppIdNodePath(removeAppId, false);
+ int splitIndex = appIdNodeSplitIndex;
+ // Look for paths based on other split indices if path as per configured
+ // split index does not exist.
+ if (!exists(appIdRemovePath)) {
+ AppNodeSplitInfo alternatePathInfo = getAlternatePath(removeAppId);
+ if (alternatePathInfo != null) {
+ appIdRemovePath = alternatePathInfo.path;
+ splitIndex = alternatePathInfo.splitIndex;
+ } else {
+ // Alternate path not found so return.
+ return;
+ }
}
-
- for (ApplicationAttemptId attemptId : appState.attempts.keySet()) {
- String attemptRemovePath =
- getNodePath(appIdRemovePath, attemptId.toString());
- safeDelete(attemptRemovePath);
+ if (safeRemove) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Removing info for app: " + removeAppId + " at: " +
+ appIdRemovePath + " and its attempts.");
+ }
+ if (attempts != null) {
+ for (ApplicationAttemptId attemptId : attempts) {
+ String attemptRemovePath =
+ getNodePath(appIdRemovePath, attemptId.toString());
+ safeDelete(attemptRemovePath);
+ }
+ }
+ safeDelete(appIdRemovePath);
+ } else {
+ curatorFramework.delete().deletingChildrenIfNeeded().
+ forPath(appIdRemovePath);
}
-
- safeDelete(appIdRemovePath);
+ // Check if we should remove the parent app node as well.
+ checkRemoveParentAppNode(appIdRemovePath, splitIndex);
}
@Override
@@ -820,8 +1045,7 @@ public synchronized void deleteStore() throws Exception {
@Override
public synchronized void removeApplication(ApplicationId removeAppId)
throws Exception {
- String appIdRemovePath = getNodePath(rmAppRoot, removeAppId.toString());
- delete(appIdRemovePath);
+ removeApp(removeAppId.toString());
}
@VisibleForTesting
@@ -920,6 +1144,79 @@ private void createRootDirRecursively(String path) throws Exception {
}
}
+ /**
+ * Get alternate path for app id if path according to configured split index
+ * does not exist. We look for path based on all possible split indices.
+ * @param appId
+ * @return a {@link AppNodeSplitInfo} object containing the path and split
+ * index if it exists, null otherwise.
+ * @throws Exception if any problem occurs while performing ZK operation.
+ */
+ private AppNodeSplitInfo getAlternatePath(String appId) throws Exception {
+ for (Map.Entry entry : rmAppRootHierarchies.entrySet()) {
+ // Look for other paths
+ int splitIndex = entry.getKey();
+ if (splitIndex != appIdNodeSplitIndex) {
+ String alternatePath =
+ getLeafAppIdNodePath(appId, entry.getValue(), splitIndex, false);
+ if (exists(alternatePath)) {
+ return new AppNodeSplitInfo(alternatePath, splitIndex);
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Returns leaf app node path based on app id and passed split index. If the
+ * passed flag createParentIfNotExists is true, also creates the parent app
+ * node if it does not exist.
+ * @param appId application id.
+ * @param rootNode app root node based on split index.
+ * @param appIdNodeSplitIdx split index.
+ * @param createParentIfNotExists flag which determines if parent app node
+ * needs to be created(as per split) if it does not exist.
+ * @return leaf app node path.
+ * @throws Exception if any problem occurs while performing ZK operation.
+ */
+ private String getLeafAppIdNodePath(String appId, String rootNode,
+ int appIdNodeSplitIdx, boolean createParentIfNotExists) throws Exception {
+ if (appIdNodeSplitIdx == 0) {
+ return getNodePath(rootNode, appId);
+ }
+ String nodeName = appId;
+ int splitIdx = nodeName.length() - appIdNodeSplitIdx;
+ String rootNodePath =
+ getNodePath(rootNode, nodeName.substring(0, splitIdx));
+ if (createParentIfNotExists && !exists(rootNodePath)) {
+ try {
+ safeCreate(rootNodePath, null, zkAcl, CreateMode.PERSISTENT);
+ } catch (KeeperException.NodeExistsException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Unable to create app parent node " + rootNodePath +
+ " as it already exists.");
+ }
+ }
+ }
+ return getNodePath(rootNodePath, nodeName.substring(splitIdx));
+ }
+
+ /**
+ * Returns leaf app node path based on app id and configured split index. If
+ * the passed flag createParentIfNotExists is true, also creates the parent
+ * app node if it does not exist.
+ * @param appId application id.
+ * @param createParentIfNotExists flag which determines if parent app node
+ * needs to be created(as per split) if it does not exist.
+ * @return leaf app node path.
+ * @throws Exception if any problem occurs while performing ZK operation.
+ */
+ private String getLeafAppIdNodePath(String appId,
+ boolean createParentIfNotExists) throws Exception {
+ return getLeafAppIdNodePath(appId, rmAppRootHierarchies.get(
+ appIdNodeSplitIndex), appIdNodeSplitIndex, createParentIfNotExists);
+ }
+
@VisibleForTesting
byte[] getData(final String path) throws Exception {
return curatorFramework.getData().forPath(path);
@@ -930,11 +1227,13 @@ List getACL(final String path) throws Exception {
return curatorFramework.getACL().forPath(path);
}
- private List getChildren(final String path) throws Exception {
+ @VisibleForTesting
+ List getChildren(final String path) throws Exception {
return curatorFramework.getChildren().forPath(path);
}
- private boolean exists(final String path) throws Exception {
+ @VisibleForTesting
+ boolean exists(final String path) throws Exception {
return curatorFramework.checkExists().forPath(path) != null;
}
@@ -963,6 +1262,11 @@ private void safeCreate(String path, byte[] data, List acl,
}
}
+ /**
+ * Deletes the path. Checks for existence of path as well.
+ * @param path Path to be deleted.
+ * @throws Exception if any problem occurs while performing deletion.
+ */
private void safeDelete(final String path) throws Exception {
if (exists(path)) {
SafeTransaction transaction = new SafeTransaction();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMStoreCommands.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMStoreCommands.java
index 6c74616379..0203351808 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMStoreCommands.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMStoreCommands.java
@@ -87,8 +87,13 @@ public void testRemoveApplicationFromStateStoreCmdForZK() throws Exception {
ZKRMStateStore.ROOT_ZNODE_NAME + "/" + RMStateStore.RM_APP_ROOT;
String appIdPath = appRootPath + "/" + appId;
curatorFramework.create().forPath(appIdPath);
- assertEquals("Application node for " + appId + "should exist",
- appId, curatorFramework.getChildren().forPath(appRootPath).get(0));
+ for (String path : curatorFramework.getChildren().forPath(appRootPath)) {
+ if (path.equals(ZKRMStateStore.RM_APP_ROOT_HIERARCHIES)) {
+ continue;
+ }
+ assertEquals("Application node for " + appId + " should exist",
+ appId, path);
+ }
try {
ResourceManager.removeApplication(conf, appId);
} catch (Exception e) {
@@ -96,8 +101,10 @@ public void testRemoveApplicationFromStateStoreCmdForZK() throws Exception {
"rm state store.");
}
assertTrue("After remove app from store there should be no child nodes" +
- " in app root path",
- curatorFramework.getChildren().forPath(appRootPath).isEmpty());
+ " for application in app root path",
+ curatorFramework.getChildren().forPath(appRootPath).size() == 1 &&
+ curatorFramework.getChildren().forPath(appRootPath).get(0).equals(
+ ZKRMStateStore.RM_APP_ROOT_HIERARCHIES));
}
}
}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
index 514e9a044d..ca97914530 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
@@ -156,8 +156,7 @@ void waitNotify(TestDispatcher dispatcher) {
}
protected RMApp storeApp(RMStateStore store, ApplicationId appId,
- long submitTime,
- long startTime) throws Exception {
+ long submitTime, long startTime) throws Exception {
ApplicationSubmissionContext context =
new ApplicationSubmissionContextPBImpl();
context.setApplicationId(appId);
@@ -200,6 +199,13 @@ protected RMAppAttempt storeAttempt(RMStateStore store,
return mockAttempt;
}
+ protected void updateAttempt(RMStateStore store, TestDispatcher dispatcher,
+ ApplicationAttemptStateData attemptState) {
+ dispatcher.attemptId = attemptState.getAttemptId();
+ store.updateApplicationAttemptState(attemptState);
+ waitNotify(dispatcher);
+ }
+
void testRMAppStateStore(RMStateStoreHelper stateStoreHelper)
throws Exception {
testRMAppStateStore(stateStoreHelper, new StoreStateVerifier());
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
index 6d5d2d7324..7c40ddfa98 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
@@ -28,6 +28,8 @@
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -40,19 +42,25 @@
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.server.security.MasterKeyData;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs.Perms;
import org.apache.zookeeper.data.ACL;
@@ -61,13 +69,22 @@
import org.junit.Before;
import org.junit.Test;
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+
+import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import java.io.IOException;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import javax.crypto.SecretKey;
@@ -131,9 +148,21 @@ public Version getCurrentVersion() {
return CURRENT_VERSION_INFO;
}
+ private String getAppNode(String appId, int splitIdx) {
+ String rootPath = workingZnode + "/" + ROOT_ZNODE_NAME + "/" +
+ RM_APP_ROOT;
+ String appPath = appId;
+ if (splitIdx != 0) {
+ int idx = appId.length() - splitIdx;
+ appPath = appId.substring(0, idx) + "/" + appId.substring(idx);
+ return rootPath + "/" + RM_APP_ROOT_HIERARCHIES + "/" +
+ Integer.toString(splitIdx) + "/" + appPath;
+ }
+ return rootPath + "/" + appPath;
+ }
+
public String getAppNode(String appId) {
- return workingZnode + "/" + ROOT_ZNODE_NAME + "/" + RM_APP_ROOT + "/"
- + appId;
+ return getAppNode(appId, 0);
}
public String getAttemptNode(String appId, String attemptId) {
@@ -150,8 +179,7 @@ public void testRetryingCreateRootDir() throws Exception {
}
- public RMStateStore getRMStateStore() throws Exception {
- YarnConfiguration conf = new YarnConfiguration();
+ private RMStateStore createStore(Configuration conf) throws Exception {
workingZnode = "/jira/issue/3077/rmstore";
conf.set(YarnConfiguration.RM_ZK_ADDRESS,
curatorTestingServer.getConnectString());
@@ -160,6 +188,15 @@ public RMStateStore getRMStateStore() throws Exception {
return this.store;
}
+ public RMStateStore getRMStateStore(Configuration conf) throws Exception {
+ return createStore(conf);
+ }
+
+ public RMStateStore getRMStateStore() throws Exception {
+ YarnConfiguration conf = new YarnConfiguration();
+ return createStore(conf);
+ }
+
@Override
public boolean isFinalStateValid() throws Exception {
return 1 ==
@@ -179,8 +216,12 @@ public Version getCurrentVersion() throws Exception {
}
public boolean appExists(RMApp app) throws Exception {
+ String appIdPath = app.getApplicationId().toString();
+ int split =
+ store.getConfig().getInt(YarnConfiguration.ZK_APPID_NODE_SPLIT_INDEX,
+ YarnConfiguration.DEFAULT_ZK_APPID_NODE_SPLIT_INDEX);
return null != curatorFramework.checkExists()
- .forPath(store.getAppNode(app.getApplicationId().toString()));
+ .forPath(store.getAppNode(appIdPath, split));
}
public boolean attemptExists(RMAppAttempt attempt) throws Exception {
@@ -343,7 +384,6 @@ public void testZKRootPathAcls() throws Exception {
rm.close();
}
- @SuppressWarnings("unchecked")
@Test
public void testFencing() throws Exception {
StateChangeRequestInfo req = new StateChangeRequestInfo(
@@ -383,13 +423,15 @@ public void testFencing() throws Exception {
assertEquals("RM should be Active",
HAServiceProtocol.HAServiceState.ACTIVE,
rm2.getRMContext().getRMAdminService().getServiceStatus().getState());
+ rm1.close();
+ rm2.close();
}
@Test
public void testFencedState() throws Exception {
TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
- RMStateStore store = zkTester.getRMStateStore();
-
+ RMStateStore store = zkTester.getRMStateStore();
+
// Move state to FENCED from ACTIVE
store.updateFencedState();
assertEquals("RMStateStore should have been in fenced state",
@@ -528,4 +570,518 @@ public void testDuplicateRMAppDeletion() throws Exception {
}
store.close();
}
+
+ private static String createPath(String... parts) {
+ return Joiner.on("/").join(parts);
+ }
+
+ private static Configuration createConfForAppNodeSplit(int splitIndex) {
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.setInt(YarnConfiguration.ZK_APPID_NODE_SPLIT_INDEX, splitIndex);
+ return conf;
+ }
+
+ private static RMApp createMockAppForRemove(ApplicationId appId,
+ ApplicationAttemptId... attemptIds) {
+ RMApp app = mock(RMApp.class);
+ ApplicationSubmissionContextPBImpl context =
+ new ApplicationSubmissionContextPBImpl();
+ context.setApplicationId(appId);
+ when(app.getApplicationSubmissionContext()).thenReturn(context);
+ when(app.getUser()).thenReturn("test");
+ if (attemptIds.length > 0) {
+ HashMap attempts = new HashMap<>();
+ for (ApplicationAttemptId attemptId : attemptIds) {
+ RMAppAttempt appAttempt = mock(RMAppAttempt.class);
+ when(appAttempt.getAppAttemptId()).thenReturn(attemptId);
+ attempts.put(attemptId, appAttempt);
+ }
+ when(app.getAppAttempts()).thenReturn(attempts);
+ }
+ return app;
+ }
+
+ private static void verifyLoadedApp(ApplicationStateData appState,
+ ApplicationId appId, String user, long submitTime, long startTime,
+ RMAppState state, long finishTime, String diagnostics) {
+ // Check if app is loaded correctly
+ assertNotNull("App " + appId + " should have been loaded.", appState);
+ assertEquals("App submit time in app state", submitTime,
+ appState.getSubmitTime());
+ assertEquals("App start time in app state", startTime,
+ appState.getStartTime());
+ assertEquals("App ID in app state", appId,
+ appState.getApplicationSubmissionContext().getApplicationId());
+ assertEquals("App state", state, appState.getState());
+ assertEquals("Finish time in app state", finishTime,
+ appState.getFinishTime());
+ assertEquals("User in app state", user, appState.getUser());
+ assertEquals("Diagnostics in app state", diagnostics,
+ appState.getDiagnostics());
+ }
+
+ private static void verifyLoadedApp(RMState rmState,
+ ApplicationId appId, long submitTime, long startTime, long finishTime,
+ boolean isFinished, List attempts) {
+ verifyLoadedApp(rmState, appId, submitTime, startTime, finishTime,
+ isFinished, attempts, null, null);
+ }
+
+ private static void verifyLoadedApp(RMState rmState,
+ ApplicationId appId, long submitTime, long startTime, long finishTime,
+ boolean isFinished, List attempts,
+ List amExitStatuses,
+ List finalStatuses) {
+ Map rmAppState =
+ rmState.getApplicationState();
+ ApplicationStateData appState = rmAppState.get(appId);
+ assertNotNull(appId + " is not there in loaded apps", appState);
+ verifyLoadedApp(appState, appId, "test", submitTime, startTime,
+ isFinished ? RMAppState.FINISHED : null, finishTime,
+ isFinished ? "appDiagnostics" : "");
+ // Check attempt state.
+ if (attempts != null) {
+ assertEquals("Attempts loaded for app " + appId, attempts.size(),
+ appState.attempts.size());
+ if (finalStatuses != null && amExitStatuses != null) {
+ for (int i = 0; i < attempts.size(); i++) {
+ if (finalStatuses.get(i) != null) {
+ verifyLoadedAttempt(appState, attempts.get(i),
+ amExitStatuses.get(i), true);
+ } else {
+ verifyLoadedAttempt(appState, attempts.get(i),
+ amExitStatuses.get(i), false);
+ }
+ }
+ }
+ } else {
+ assertEquals(
+ "Attempts loaded for app " + appId, 0, appState.attempts.size());
+ }
+ }
+
+ private static void verifyLoadedAttempt(ApplicationStateData appState,
+ ApplicationAttemptId attemptId, int amExitStatus, boolean isFinished) {
+ verifyLoadedAttempt(appState, attemptId, isFinished ? "myTrackingUrl" :
+ "N/A", ContainerId.newContainerId(attemptId, 1), null,
+ isFinished ? RMAppAttemptState.FINISHED : null, isFinished ?
+ "attemptDiagnostics" : "", 0, amExitStatus,
+ isFinished ? FinalApplicationStatus.SUCCEEDED : null);
+ }
+
+ private static void verifyLoadedAttempt(ApplicationStateData appState,
+ ApplicationAttemptId attemptId, String trackingURL,
+ ContainerId masterContainerId, SecretKey clientTokenKey,
+ RMAppAttemptState state, String diagnostics, long finishTime,
+ int amExitStatus, FinalApplicationStatus finalStatus) {
+ ApplicationAttemptStateData attemptState = appState.getAttempt(attemptId);
+ // Check if attempt is loaded correctly
+ assertNotNull(
+ "Attempt " + attemptId + " should have been loaded.", attemptState);
+ assertEquals("Attempt Id in attempt state",
+ attemptId, attemptState.getAttemptId());
+ assertEquals("Master Container Id in attempt state",
+ masterContainerId, attemptState.getMasterContainer().getId());
+ if (null != clientTokenKey) {
+ assertArrayEquals("Client token key in attempt state",
+ clientTokenKey.getEncoded(), attemptState.getAppAttemptTokens().
+ getSecretKey(RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME));
+ }
+ assertEquals("Attempt state", state, attemptState.getState());
+ assertEquals("Finish time in attempt state", finishTime,
+ attemptState.getFinishTime());
+ assertEquals("Diagnostics in attempt state", diagnostics,
+ attemptState.getDiagnostics());
+ assertEquals("AM Container exit status in attempt state", amExitStatus,
+ attemptState.getAMContainerExitStatus());
+ assertEquals("Final app status in attempt state", finalStatus,
+ attemptState.getFinalApplicationStatus());
+ assertEquals("Tracking URL in attempt state", trackingURL,
+ attemptState.getFinalTrackingUrl());
+ }
+
+ private static ApplicationStateData createAppState(
+ ApplicationSubmissionContext ctxt, long submitTime, long startTime,
+ long finishTime, boolean isFinished) {
+ return ApplicationStateData.newInstance(submitTime, startTime, "test",
+ ctxt, isFinished ? RMAppState.FINISHED : null, isFinished ?
+ "appDiagnostics" : "", isFinished ? finishTime : 0, null);
+ }
+
+ private static ApplicationAttemptStateData createFinishedAttempt(
+ ApplicationAttemptId attemptId, Container container, long startTime,
+ int amExitStatus) {
+ return ApplicationAttemptStateData.newInstance(attemptId,
+ container, null, startTime, RMAppAttemptState.FINISHED,
+ "myTrackingUrl", "attemptDiagnostics", FinalApplicationStatus.SUCCEEDED,
+ amExitStatus, 0, 0, 0, 0, 0);
+ }
+
+ private ApplicationAttemptId storeAttempt(RMStateStore store,
+ TestDispatcher dispatcher, String appAttemptIdStr,
+ AMRMTokenSecretManager appTokenMgr,
+ ClientToAMTokenSecretManagerInRM clientToAMTokenMgr,
+ boolean createContainer) throws Exception {
+ ApplicationAttemptId attemptId =
+ ApplicationAttemptId.fromString(appAttemptIdStr);
+ Token appAttemptToken = null;
+ if (appTokenMgr != null) {
+ appAttemptToken = generateAMRMToken(attemptId, appTokenMgr);
+ }
+ SecretKey clientTokenKey = null;
+ if (clientToAMTokenMgr != null) {
+ clientTokenKey = clientToAMTokenMgr.createMasterKey(attemptId);
+ Credentials attemptCred = new Credentials();
+ attemptCred.addSecretKey(RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME,
+ clientTokenKey.getEncoded());
+ }
+ ContainerId containerId = null;
+ if (createContainer) {
+ containerId = ContainerId.newContainerId(attemptId, 1);
+ }
+ storeAttempt(store, attemptId, containerId.toString(), appAttemptToken,
+ clientTokenKey, dispatcher);
+ return attemptId;
+ }
+
+ private void finishAppWithAttempts(RMState state, RMStateStore store,
+ TestDispatcher dispatcher, ApplicationAttemptId attemptId,
+ long submitTime, long startTime, int amExitStatus, long finishTime,
+ boolean createNewApp) throws Exception {
+ ApplicationId appId = attemptId.getApplicationId();
+ ApplicationStateData appStateNew = null;
+ if (createNewApp) {
+ ApplicationSubmissionContext context =
+ new ApplicationSubmissionContextPBImpl();
+ context.setApplicationId(appId);
+ appStateNew = createAppState(context, submitTime, startTime, finishTime,
+ true);
+ } else {
+ ApplicationStateData appState = state.getApplicationState().get(appId);
+ appStateNew = createAppState(appState.getApplicationSubmissionContext(),
+ submitTime, startTime, finishTime, true);
+ appStateNew.attempts.putAll(appState.attempts);
+ }
+ store.updateApplicationState(appStateNew);
+ waitNotify(dispatcher);
+ Container container = new ContainerPBImpl();
+ container.setId(ContainerId.newContainerId(attemptId, 1));
+ ApplicationAttemptStateData newAttemptState =
+ createFinishedAttempt(attemptId, container, startTime, amExitStatus);
+ updateAttempt(store, dispatcher, newAttemptState);
+ }
+
+ private void storeAppWithAttempts(RMStateStore store,
+ TestDispatcher dispatcher, ApplicationAttemptId attemptId,
+ long submitTime, long startTime) throws Exception {
+ storeAppWithAttempts(store, dispatcher, submitTime, startTime, null, null,
+ attemptId);
+ }
+
+ private void storeApp(RMStateStore store, TestDispatcher dispatcher,
+ ApplicationId appId, long submitTime, long startTime) throws Exception {
+ storeApp(store, appId, submitTime, startTime);
+ waitNotify(dispatcher);
+ }
+
+ private void storeAppWithAttempts(RMStateStore store,
+ TestDispatcher dispatcher, long submitTime, long startTime,
+ AMRMTokenSecretManager appTokenMgr,
+ ClientToAMTokenSecretManagerInRM clientToAMTokenMgr,
+ ApplicationAttemptId attemptId, ApplicationAttemptId... attemptIds)
+ throws Exception {
+ ApplicationId appId = attemptId.getApplicationId();
+ storeApp(store, dispatcher, appId, submitTime, startTime);
+ storeAttempt(store, dispatcher, attemptId.toString(), appTokenMgr,
+ clientToAMTokenMgr, true);
+ for (ApplicationAttemptId attempt : attemptIds) {
+ storeAttempt(store, dispatcher, attempt.toString(), appTokenMgr,
+ clientToAMTokenMgr, true);
+ }
+ }
+
+ private static void removeApps(RMStateStore store,
+ Map appWithAttempts) {
+ for (Map.Entry entry :
+ appWithAttempts.entrySet()) {
+ RMApp mockApp = createMockAppForRemove(entry.getKey(), entry.getValue());
+ store.removeApplication(mockApp);
+ }
+ }
+
+ private static void verifyAppPathPath(RMStateStore store, ApplicationId appId,
+ int splitIndex) throws Exception {
+ String appIdStr = appId.toString();
+ String appParent = appIdStr.substring(0, appIdStr.length() - splitIndex);
+ String appPath = appIdStr.substring(appIdStr.length() - splitIndex);
+ String path = createPath(((ZKRMStateStore)store).znodeWorkingPath,
+ ZKRMStateStore.ROOT_ZNODE_NAME, ZKRMStateStore.RM_APP_ROOT,
+ ZKRMStateStore.RM_APP_ROOT_HIERARCHIES, String.valueOf(splitIndex),
+ appParent, appPath);
+ assertTrue("Application with id " + appIdStr + " does not exist as per " +
+ "split in state store.", ((ZKRMStateStore)store).exists(path));
+ }
+
+ private static void verifyAppInHierarchicalPath(RMStateStore store,
+ String appId, int splitIdx) throws Exception {
+ String path = createPath(((ZKRMStateStore)store).znodeWorkingPath,
+ ZKRMStateStore.ROOT_ZNODE_NAME, ZKRMStateStore.RM_APP_ROOT);
+ if (splitIdx != 0) {
+ path = createPath(path, ZKRMStateStore.RM_APP_ROOT_HIERARCHIES,
+ String.valueOf(splitIdx), appId.substring(0, appId.length() -
+ splitIdx), appId.substring(appId.length() - splitIdx));
+ } else {
+ path = createPath(path, appId);
+ }
+ assertTrue(appId + " should exist in path " + path,
+ ((ZKRMStateStore)store).exists(createPath(path)));
+ }
+
+ private static void assertHierarchicalPaths(RMStateStore store,
+ Map pathToApps) throws Exception {
+ for (Map.Entry entry : pathToApps.entrySet()) {
+ String path = createPath(((ZKRMStateStore)store).znodeWorkingPath,
+ ZKRMStateStore.ROOT_ZNODE_NAME, ZKRMStateStore.RM_APP_ROOT);
+ if (entry.getKey() != 0) {
+ path = createPath(path, ZKRMStateStore.RM_APP_ROOT_HIERARCHIES,
+ String.valueOf(entry.getKey()));
+ }
+ assertEquals("Number of childrens for path " + path,
+ (int) entry.getValue(),
+ ((ZKRMStateStore)store).getChildren(path).size());
+ }
+ }
+
+ // Test to verify storing of apps and app attempts in ZK state store with app
+ // node split index configured more than 0.
+ @Test
+ public void testAppNodeSplit() throws Exception {
+ TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
+ long submitTime = System.currentTimeMillis();
+ long startTime = submitTime + 1234;
+ Configuration conf = new YarnConfiguration();
+
+ // Get store with app node split config set as 1.
+ RMStateStore store = zkTester.getRMStateStore(createConfForAppNodeSplit(1));
+ TestDispatcher dispatcher = new TestDispatcher();
+ store.setRMDispatcher(dispatcher);
+
+ // Create RM Context and app token manager.
+ RMContext rmContext = mock(RMContext.class);
+ when(rmContext.getStateStore()).thenReturn(store);
+ AMRMTokenSecretManager appTokenMgr =
+ spy(new AMRMTokenSecretManager(conf, rmContext));
+ MasterKeyData masterKeyData = appTokenMgr.createNewMasterKey();
+ when(appTokenMgr.getMasterKey()).thenReturn(masterKeyData);
+ ClientToAMTokenSecretManagerInRM clientToAMTokenMgr =
+ new ClientToAMTokenSecretManagerInRM();
+
+ // Store app1.
+ ApplicationId appId1 = ApplicationId.newInstance(1352994193343L, 1);
+ ApplicationAttemptId attemptId1 =
+ ApplicationAttemptId.newInstance(appId1, 1);
+ ApplicationAttemptId attemptId2 =
+ ApplicationAttemptId.newInstance(appId1, 2);
+ storeAppWithAttempts(store, dispatcher, submitTime, startTime,
+ appTokenMgr, clientToAMTokenMgr, attemptId1, attemptId2);
+
+ // Store app2 with app id application_1352994193343_120213.
+ ApplicationId appId21 = ApplicationId.newInstance(1352994193343L, 120213);
+ storeApp(store, appId21, submitTime, startTime);
+ waitNotify(dispatcher);
+
+ // Store another app which will be removed.
+ ApplicationId appIdRemoved = ApplicationId.newInstance(1352994193343L, 2);
+ ApplicationAttemptId attemptIdRemoved =
+ ApplicationAttemptId.newInstance(appIdRemoved, 1);
+ storeAppWithAttempts(store, dispatcher, submitTime, startTime,
+ null, null, attemptIdRemoved);
+ // Remove the app.
+ RMApp mockRemovedApp =
+ createMockAppForRemove(appIdRemoved, attemptIdRemoved);
+ store.removeApplication(mockRemovedApp);
+ // Close state store
+ store.close();
+
+ // Load state store
+ store = zkTester.getRMStateStore(createConfForAppNodeSplit(1));
+ store.setRMDispatcher(dispatcher);
+ RMState state = store.loadState();
+ // Check if application_1352994193343_120213 (i.e. app2) exists in state
+ // store as per split index.
+ verifyAppPathPath(store, appId21, 1);
+
+ // Verify loaded apps and attempts based on the operations we did before
+ // reloading the state store.
+ verifyLoadedApp(state, appId1, submitTime, startTime, 0, false,
+ Lists.newArrayList(attemptId1, attemptId2), Lists.newArrayList(-1000,
+ -1000), Lists.newArrayList((FinalApplicationStatus) null, null));
+
+ // Update app state for app1.
+ finishAppWithAttempts(state, store, dispatcher, attemptId2, submitTime,
+ startTime, 100, 1234, false);
+
+ // Test updating app/attempt for app whose initial state is not saved
+ ApplicationId dummyAppId = ApplicationId.newInstance(1234, 10);
+ ApplicationAttemptId dummyAttemptId =
+ ApplicationAttemptId.newInstance(dummyAppId, 6);
+ finishAppWithAttempts(state, store, dispatcher, dummyAttemptId, submitTime,
+ startTime, 111, 1234, true);
+ // Close the store
+ store.close();
+
+ // Check updated application state.
+ store = zkTester.getRMStateStore(createConfForAppNodeSplit(1));
+ store.setRMDispatcher(dispatcher);
+ RMState newRMState = store.loadState();
+ verifyLoadedApp(newRMState, dummyAppId, submitTime, startTime, 1234, true,
+ Lists.newArrayList(dummyAttemptId), Lists.newArrayList(111),
+ Lists.newArrayList(FinalApplicationStatus.SUCCEEDED));
+ verifyLoadedApp(newRMState, appId1, submitTime, startTime, 1234, true,
+ Lists.newArrayList(attemptId1, attemptId2),
+ Lists.newArrayList(-1000, 100), Lists.newArrayList(null,
+ FinalApplicationStatus.SUCCEEDED));
+
+ // assert store is in expected state after everything is cleaned
+ assertTrue("Store is not in expected state", zkTester.isFinalStateValid());
+ store.close();
+ }
+
+ // Test to verify storing of apps and app attempts in ZK state store with app
+ // node split index config changing across restarts.
+ @Test
+ public void testAppNodeSplitChangeAcrossRestarts() throws Exception {
+ TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
+ long submitTime = System.currentTimeMillis();
+ long startTime = submitTime + 1234;
+ Configuration conf = new YarnConfiguration();
+
+ // Create store with app node split set as 1.
+ RMStateStore store = zkTester.getRMStateStore(createConfForAppNodeSplit(1));
+ TestDispatcher dispatcher = new TestDispatcher();
+ store.setRMDispatcher(dispatcher);
+ RMContext rmContext = mock(RMContext.class);
+ when(rmContext.getStateStore()).thenReturn(store);
+ AMRMTokenSecretManager appTokenMgr =
+ spy(new AMRMTokenSecretManager(conf, rmContext));
+ MasterKeyData masterKeyData = appTokenMgr.createNewMasterKey();
+ when(appTokenMgr.getMasterKey()).thenReturn(masterKeyData);
+ ClientToAMTokenSecretManagerInRM clientToAMTokenMgr =
+ new ClientToAMTokenSecretManagerInRM();
+
+ // Store app1 with 2 attempts.
+ ApplicationId appId1 = ApplicationId.newInstance(1442994194053L, 1);
+ ApplicationAttemptId attemptId1 =
+ ApplicationAttemptId.newInstance(appId1, 1);
+ ApplicationAttemptId attemptId2 =
+ ApplicationAttemptId.newInstance(appId1, 2);
+ storeAppWithAttempts(store, dispatcher, submitTime, startTime,
+ appTokenMgr, clientToAMTokenMgr, attemptId1, attemptId2);
+
+ // Store app2 and associated attempt.
+ ApplicationId appId11 = ApplicationId.newInstance(1442994194053L, 2);
+ ApplicationAttemptId attemptId11 =
+ ApplicationAttemptId.newInstance(appId11, 1);
+ storeAppWithAttempts(store, dispatcher, attemptId11, submitTime, startTime);
+ // Close state store
+ store.close();
+
+ // Load state store with app node split config of 2.
+ store = zkTester.getRMStateStore(createConfForAppNodeSplit(2));
+ store.setRMDispatcher(dispatcher);
+ RMState state = store.loadState();
+ ApplicationId appId21 = ApplicationId.newInstance(1442994194053L, 120213);
+ storeApp(store, dispatcher, appId21, submitTime, startTime);
+
+ // Check if app is loaded correctly despite change in split index.
+ verifyLoadedApp(state, appId1, submitTime, startTime, 0, false,
+ Lists.newArrayList(attemptId1, attemptId2), Lists.newArrayList(-1000,
+ -1000), Lists.newArrayList((FinalApplicationStatus) null, null));
+
+ // Finish app/attempt state
+ finishAppWithAttempts(state, store, dispatcher, attemptId2, submitTime,
+ startTime, 100, 1234, false);
+
+ // Test updating app/attempt for app whose initial state is not saved
+ ApplicationId dummyAppId = ApplicationId.newInstance(1234, 10);
+ ApplicationAttemptId dummyAttemptId =
+ ApplicationAttemptId.newInstance(dummyAppId, 6);
+ finishAppWithAttempts(state, store, dispatcher, dummyAttemptId, submitTime,
+ startTime, 111, 1234, true);
+ // Close the store
+ store.close();
+
+ // Load state store this time with split index of 0.
+ store = zkTester.getRMStateStore(createConfForAppNodeSplit(0));
+ store.setRMDispatcher(dispatcher);
+ state = store.loadState();
+ assertEquals("Number of Apps loaded should be 4.", 4,
+ state.getApplicationState().size());
+ verifyLoadedApp(state, appId1, submitTime, startTime, 1234, true,
+ Lists.newArrayList(attemptId1, attemptId2), Lists.newArrayList(-1000,
+ 100), Lists.newArrayList(null, FinalApplicationStatus.SUCCEEDED));
+ // Remove attempt1
+ store.removeApplicationAttempt(attemptId1);
+ ApplicationId appId31 = ApplicationId.newInstance(1442994195071L, 45);
+ storeApp(store, dispatcher, appId31, submitTime, startTime);
+ // Close state store.
+ store.close();
+
+ // Load state store with split index of 3.
+ store = zkTester.getRMStateStore(createConfForAppNodeSplit(3));
+ store.setRMDispatcher(dispatcher);
+ state = store.loadState();
+ assertEquals("Number of apps loaded should be 5.", 5,
+ state.getApplicationState().size());
+ verifyLoadedApp(state, dummyAppId, submitTime, startTime, 1234, true,
+ Lists.newArrayList(dummyAttemptId), Lists.newArrayList(111),
+ Lists.newArrayList(FinalApplicationStatus.SUCCEEDED));
+ verifyLoadedApp(state, appId31, submitTime, startTime, 0, false, null);
+ verifyLoadedApp(state, appId21, submitTime, startTime, 0, false, null);
+ verifyLoadedApp(state, appId11, submitTime, startTime, 0, false,
+ Lists.newArrayList(attemptId11), Lists.newArrayList(-1000),
+ Lists.newArrayList((FinalApplicationStatus) null));
+ verifyLoadedApp(state, appId1, submitTime, startTime, 1234, true,
+ Lists.newArrayList(attemptId2), Lists.newArrayList(100),
+ Lists.newArrayList(FinalApplicationStatus.SUCCEEDED));
+
+ // Store another app.
+ ApplicationId appId41 = ApplicationId.newInstance(1442994195087L, 1);
+ storeApp(store, dispatcher, appId41, submitTime, startTime);
+ // Check how many apps exist in each of the hierarchy based paths. 0 paths
+ // should exist in "HIERARCHIES/4" path as app split index was never set
+ // as 4 in tests above.
+ assertHierarchicalPaths(store, ImmutableMap.of(0, 2, 1, 1, 2, 2,
+ 3, 1, 4, 0));
+ verifyAppInHierarchicalPath(store, "application_1442994195087_0001", 3);
+
+ ApplicationId appId71 = ApplicationId.newInstance(1442994195087L, 7);
+ //storeApp(store, dispatcher, appId71, submitTime, startTime);
+ storeApp(store, appId71, submitTime, startTime);
+ waitNotify(dispatcher);
+ ApplicationAttemptId attemptId71 =
+ ApplicationAttemptId.newInstance(appId71, 1);
+ storeAttempt(store, ApplicationAttemptId.newInstance(appId71, 1),
+ ContainerId.newContainerId(attemptId71, 1).toString(), null, null,
+ dispatcher);
+ // Remove applications.
+ removeApps(store, ImmutableMap.of(appId11, new ApplicationAttemptId[]
+ {attemptId11}, appId71, new ApplicationAttemptId[] {attemptId71},
+ appId41, new ApplicationAttemptId[0], appId31,
+ new ApplicationAttemptId[0], appId21, new ApplicationAttemptId[0]));
+ removeApps(store, ImmutableMap.of(dummyAppId,
+ new ApplicationAttemptId[] {dummyAttemptId}, appId1,
+ new ApplicationAttemptId[] {attemptId1, attemptId2}));
+ store.close();
+
+ // Load state store with split index of 3 again. As all apps have been
+ // removed nothing should be loaded back.
+ store = zkTester.getRMStateStore(createConfForAppNodeSplit(3));
+ store.setRMDispatcher(dispatcher);
+ state = store.loadState();
+ assertEquals("Number of apps loaded should be 0.", 0,
+ state.getApplicationState().size());
+ // Close the state store.
+ store.close();
+ }
}