diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index e8e262caaa..8e5c463613 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -184,6 +184,9 @@ Release 2.4.0 - UNRELEASED YARN-1446. Changed client API to retry killing application till RM acknowledges so as to account for RM crashes/failover. (Jian He via vinodkv) + YARN-1307. Redesign znode structure for Zookeeper based RM state-store for + better organization and scalability. (Tsuyoshi OZAWA via vinodkv) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java index 88b1a90bf6..d60e8ada08 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java @@ -287,11 +287,12 @@ private void loadRMDTSecretManagerState(RMState rmState) throws Exception { } @Override - public synchronized void storeApplicationStateInternal(String appId, + public synchronized void storeApplicationStateInternal(ApplicationId appId, ApplicationStateDataPBImpl appStateDataPB) throws Exception { - Path appDirPath = getAppDir(rmAppRoot, appId); + String appIdStr = appId.toString(); + Path appDirPath = getAppDir(rmAppRoot, appIdStr); fs.mkdirs(appDirPath); - Path nodeCreatePath = getNodePath(appDirPath, appId); + Path nodeCreatePath = getNodePath(appDirPath, appIdStr); LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath); byte[] appStateData = appStateDataPB.getProto().toByteArray(); @@ -306,10 +307,11 @@ public synchronized void storeApplicationStateInternal(String appId, } @Override - public synchronized void updateApplicationStateInternal(String appId, + public synchronized void updateApplicationStateInternal(ApplicationId appId, ApplicationStateDataPBImpl appStateDataPB) throws Exception { - Path appDirPath = getAppDir(rmAppRoot, appId); - Path nodeCreatePath = getNodePath(appDirPath, appId); + String appIdStr = appId.toString(); + Path appDirPath = getAppDir(rmAppRoot, appIdStr); + Path nodeCreatePath = getNodePath(appDirPath, appIdStr); LOG.info("Updating info for app: " + appId + " at: " + nodeCreatePath); byte[] appStateData = appStateDataPB.getProto().toByteArray(); @@ -325,14 +327,13 @@ public synchronized void updateApplicationStateInternal(String appId, @Override public synchronized void storeApplicationAttemptStateInternal( - String attemptId, ApplicationAttemptStateDataPBImpl attemptStateDataPB) + ApplicationAttemptId appAttemptId, + ApplicationAttemptStateDataPBImpl attemptStateDataPB) throws Exception { - ApplicationAttemptId appAttemptId = - ConverterUtils.toApplicationAttemptId(attemptId); Path appDirPath = getAppDir(rmAppRoot, appAttemptId.getApplicationId().toString()); - Path nodeCreatePath = getNodePath(appDirPath, attemptId); - LOG.info("Storing info for attempt: " + attemptId + " at: " + Path nodeCreatePath = getNodePath(appDirPath, appAttemptId.toString()); + LOG.info("Storing info for attempt: " + appAttemptId + " at: " + nodeCreatePath); byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray(); try { @@ -340,21 +341,20 @@ public synchronized void storeApplicationAttemptStateInternal( // based on whether we have lost the right to write to FS writeFile(nodeCreatePath, attemptStateData); } catch (Exception e) { - LOG.info("Error storing info for attempt: " + attemptId, e); + LOG.info("Error storing info for attempt: " + appAttemptId, e); throw e; } } @Override public synchronized void updateApplicationAttemptStateInternal( - String attemptId, ApplicationAttemptStateDataPBImpl attemptStateDataPB) + ApplicationAttemptId appAttemptId, + ApplicationAttemptStateDataPBImpl attemptStateDataPB) throws Exception { - ApplicationAttemptId appAttemptId = - ConverterUtils.toApplicationAttemptId(attemptId); Path appDirPath = getAppDir(rmAppRoot, appAttemptId.getApplicationId().toString()); - Path nodeCreatePath = getNodePath(appDirPath, attemptId); - LOG.info("Updating info for attempt: " + attemptId + " at: " + Path nodeCreatePath = getNodePath(appDirPath, appAttemptId.toString()); + LOG.info("Updating info for attempt: " + appAttemptId + " at: " + nodeCreatePath); byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray(); try { @@ -362,7 +362,7 @@ public synchronized void updateApplicationAttemptStateInternal( // based on whether we have lost the right to write to FS updateFile(nodeCreatePath, attemptStateData); } catch (Exception e) { - LOG.info("Error updating info for attempt: " + attemptId, e); + LOG.info("Error updating info for attempt: " + appAttemptId, e); throw e; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java index 961bec3165..5a20ff28b9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java @@ -80,7 +80,7 @@ protected synchronized void closeInternal() throws Exception { } @Override - public void storeApplicationStateInternal(String appId, + public void storeApplicationStateInternal(ApplicationId appId, ApplicationStateDataPBImpl appStateData) throws Exception { ApplicationState appState = @@ -88,11 +88,11 @@ public void storeApplicationStateInternal(String appId, appStateData.getStartTime(), appStateData.getApplicationSubmissionContext(), appStateData.getUser()); - state.appState.put(appState.getAppId(), appState); + state.appState.put(appId, appState); } @Override - public void updateApplicationStateInternal(String appId, + public void updateApplicationStateInternal(ApplicationId appId, ApplicationStateDataPBImpl appStateData) throws Exception { ApplicationState updatedAppState = new ApplicationState(appStateData.getSubmitTime(), @@ -102,21 +102,19 @@ public void updateApplicationStateInternal(String appId, appStateData.getDiagnostics(), appStateData.getFinishTime()); LOG.info("Updating final state " + appStateData.getState() + " for app: " + appId); - ApplicationId applicationId = updatedAppState.getAppId(); - if (state.appState.get(applicationId) != null) { + if (state.appState.get(appId) != null) { // add the earlier attempts back updatedAppState.attempts - .putAll(state.appState.get(applicationId).attempts); + .putAll(state.appState.get(appId).attempts); } - state.appState.put(applicationId, updatedAppState); + state.appState.put(appId, updatedAppState); } @Override - public synchronized void storeApplicationAttemptStateInternal(String attemptIdStr, - ApplicationAttemptStateDataPBImpl attemptStateData) - throws Exception { - ApplicationAttemptId attemptId = ConverterUtils - .toApplicationAttemptId(attemptIdStr); + public synchronized void storeApplicationAttemptStateInternal( + ApplicationAttemptId appAttemptId, + ApplicationAttemptStateDataPBImpl attemptStateData) + throws Exception { Credentials credentials = null; if(attemptStateData.getAppAttemptTokens() != null){ DataInputByteBuffer dibb = new DataInputByteBuffer(); @@ -125,7 +123,7 @@ public synchronized void storeApplicationAttemptStateInternal(String attemptIdSt credentials.readTokenStorageStream(dibb); } ApplicationAttemptState attemptState = - new ApplicationAttemptState(attemptId, + new ApplicationAttemptState(appAttemptId, attemptStateData.getMasterContainer(), credentials, attemptStateData.getStartTime()); @@ -139,10 +137,9 @@ public synchronized void storeApplicationAttemptStateInternal(String attemptIdSt @Override public synchronized void updateApplicationAttemptStateInternal( - String attemptIdStr, ApplicationAttemptStateDataPBImpl attemptStateData) + ApplicationAttemptId appAttemptId, + ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception { - ApplicationAttemptId attemptId = - ConverterUtils.toApplicationAttemptId(attemptIdStr); Credentials credentials = null; if (attemptStateData.getAppAttemptTokens() != null) { DataInputByteBuffer dibb = new DataInputByteBuffer(); @@ -151,7 +148,7 @@ public synchronized void updateApplicationAttemptStateInternal( credentials.readTokenStorageStream(dibb); } ApplicationAttemptState updatedAttemptState = - new ApplicationAttemptState(attemptId, + new ApplicationAttemptState(appAttemptId, attemptStateData.getMasterContainer(), credentials, attemptStateData.getStartTime(), attemptStateData.getState(), attemptStateData.getFinalTrackingUrl(), diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java index 3098b260b7..af28a0152e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java @@ -22,6 +22,8 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.token.delegation.DelegationKey; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; @@ -51,13 +53,13 @@ public RMState loadState() throws Exception { } @Override - protected void storeApplicationStateInternal(String appId, + protected void storeApplicationStateInternal(ApplicationId appId, ApplicationStateDataPBImpl appStateData) throws Exception { // Do nothing } @Override - protected void storeApplicationAttemptStateInternal(String attemptId, + protected void storeApplicationAttemptStateInternal(ApplicationAttemptId attemptId, ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception { // Do nothing } @@ -92,13 +94,13 @@ public void removeRMDTMasterKeyState(DelegationKey delegationKey) throws Excepti } @Override - protected void updateApplicationStateInternal(String appId, + protected void updateApplicationStateInternal(ApplicationId appId, ApplicationStateDataPBImpl appStateData) throws Exception { // Do nothing } @Override - protected void updateApplicationAttemptStateInternal(String attemptId, + protected void updateApplicationAttemptStateInternal(ApplicationAttemptId attemptId, ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception { } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index a8452642ce..dcfdad3703 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -387,10 +387,10 @@ public synchronized void updateApplicationState(ApplicationState appState) { * Derived classes must implement this method to store the state of an * application. */ - protected abstract void storeApplicationStateInternal(String appId, + protected abstract void storeApplicationStateInternal(ApplicationId appId, ApplicationStateDataPBImpl appStateData) throws Exception; - protected abstract void updateApplicationStateInternal(String appId, + protected abstract void updateApplicationStateInternal(ApplicationId appId, ApplicationStateDataPBImpl appStateData) throws Exception; @SuppressWarnings("unchecked") @@ -424,10 +424,12 @@ public synchronized void updateApplicationAttemptState( * Derived classes must implement this method to store the state of an * application attempt */ - protected abstract void storeApplicationAttemptStateInternal(String attemptId, + protected abstract void storeApplicationAttemptStateInternal( + ApplicationAttemptId attemptId, ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception; - protected abstract void updateApplicationAttemptStateInternal(String attemptId, + protected abstract void updateApplicationAttemptStateInternal( + ApplicationAttemptId attemptId, ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception; /** @@ -592,11 +594,11 @@ protected void handleStoreEvent(RMStateStoreEvent event) { LOG.info("Storing info for app: " + appId); try { if (event.getType().equals(RMStateStoreEventType.STORE_APP)) { - storeApplicationStateInternal(appId.toString(), appStateData); + storeApplicationStateInternal(appId, appStateData); notifyDoneStoringApplication(appId, storedException); } else { assert event.getType().equals(RMStateStoreEventType.UPDATE_APP); - updateApplicationStateInternal(appId.toString(), appStateData); + updateApplicationStateInternal(appId, appStateData); notifyDoneUpdatingApplication(appId, storedException); } } catch (Exception e) { @@ -637,15 +639,15 @@ protected void handleStoreEvent(RMStateStoreEvent event) { LOG.debug("Storing info for attempt: " + attemptState.getAttemptId()); } if (event.getType().equals(RMStateStoreEventType.STORE_APP_ATTEMPT)) { - storeApplicationAttemptStateInternal(attemptState.getAttemptId() - .toString(), attemptStateData); + storeApplicationAttemptStateInternal(attemptState.getAttemptId(), + attemptStateData); notifyDoneStoringApplicationAttempt(attemptState.getAttemptId(), storedException); } else { assert event.getType().equals( RMStateStoreEventType.UPDATE_APP_ATTEMPT); - updateApplicationAttemptStateInternal(attemptState.getAttemptId() - .toString(), attemptStateData); + updateApplicationAttemptStateInternal(attemptState.getAttemptId(), + attemptStateData); notifyDoneUpdatingApplicationAttempt(attemptState.getAttemptId(), storedException); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index 87377814c8..b8b3d1e6fc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -78,16 +78,51 @@ public class ZKRMStateStore extends RMStateStore { protected static final String ROOT_ZNODE_NAME = "ZKRMStateRoot"; protected static final RMStateVersion CURRENT_VERSION_INFO = RMStateVersion .newInstance(1, 0); + private static final String RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME = + "RMDelegationTokensRoot"; + private static final String RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME = + "RMDTSequentialNumber"; + private static final String RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME = + "RMDTMasterKeysRoot"; private int numRetries; private String zkHostPort = null; private int zkSessionTimeout; private long zkRetryInterval; private List zkAcl; + + /** + * + * ROOT_DIR_PATH + * |--- VERSION_INFO + * |--- RM_ZK_FENCING_LOCK + * |--- RM_APP_ROOT + * | |----- (#ApplicationId1) + * | | |----- (#ApplicationAttemptIds) + * | | + * | |----- (#ApplicationId2) + * | | |----- (#ApplicationAttemptIds) + * | .... + * | + * |--- RM_DT_SECRET_MANAGER_ROOT + * |----- RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME + * |----- RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME + * | |----- Token_1 + * | |----- Token_2 + * | .... + * | + * |----- RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME + * | |----- Key_1 + * | |----- Key_2 + * .... + * + */ private String zkRootNodePath; - private String rmDTSecretManagerRoot; private String rmAppRoot; - private String dtSequenceNumberPath = null; + private String rmDTSecretManagerRoot; + private String dtMasterKeysRootPath; + private String delegationTokensRootPath; + private String dtSequenceNumberPath; @VisibleForTesting protected String znodeWorkingPath; @@ -178,12 +213,11 @@ public synchronized void initInternal(Configuration conf) throws Exception { throw bafe; } - zkRootNodePath = znodeWorkingPath + "/" + ROOT_ZNODE_NAME; - rmDTSecretManagerRoot = zkRootNodePath + "/" + RM_DT_SECRET_MANAGER_ROOT; - rmAppRoot = zkRootNodePath + "/" + RM_APP_ROOT; + zkRootNodePath = getNodePath(znodeWorkingPath, ROOT_ZNODE_NAME); + rmAppRoot = getNodePath(zkRootNodePath, RM_APP_ROOT); /* Initialize fencing related paths, acls, and ops */ - fencingNodePath = zkRootNodePath + "/" + FENCING_LOCK; + fencingNodePath = getNodePath(zkRootNodePath, FENCING_LOCK); createFencingNodePathOp = Op.create(fencingNodePath, new byte[0], zkAcl, CreateMode.PERSISTENT); deleteFencingNodePathOp = Op.delete(fencingNodePath, -1); @@ -204,6 +238,15 @@ public synchronized void initInternal(Configuration conf) throws Exception { zkRootNodeAcl = constructZkRootNodeACL(conf, zkAcl); } } + + rmDTSecretManagerRoot = + getNodePath(zkRootNodePath, RM_DT_SECRET_MANAGER_ROOT); + dtMasterKeysRootPath = getNodePath(rmDTSecretManagerRoot, + RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME); + delegationTokensRootPath = getNodePath(rmDTSecretManagerRoot, + RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME); + dtSequenceNumberPath = getNodePath(rmDTSecretManagerRoot, + RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME); } @Override @@ -217,8 +260,11 @@ public synchronized void startInternal() throws Exception { if (HAUtil.isHAEnabled(getConfig())){ fence(); } - createRootDir(rmDTSecretManagerRoot); createRootDir(rmAppRoot); + createRootDir(rmDTSecretManagerRoot); + createRootDir(dtMasterKeysRootPath); + createRootDir(delegationTokensRootPath); + createRootDir(dtSequenceNumberPath); } private void createRootDir(final String rootPath) throws Exception { @@ -350,26 +396,69 @@ public synchronized RMState loadState() throws Exception { private synchronized void loadRMDTSecretManagerState(RMState rmState) throws Exception { - List childNodes = - getChildrenWithRetries(rmDTSecretManagerRoot, true); + loadRMDelegationKeyState(rmState); + loadRMSequentialNumberState(rmState); + loadRMDelegationTokenState(rmState); + } + private void loadRMDelegationKeyState(RMState rmState) throws Exception { + List childNodes = + getChildrenWithRetries(dtMasterKeysRootPath, true); for (String childNodeName : childNodes) { - if (childNodeName.startsWith(DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX)) { - rmState.rmSecretManagerState.dtSequenceNumber = - Integer.parseInt(childNodeName.split("_")[1]); + String childNodePath = getNodePath(dtMasterKeysRootPath, childNodeName); + byte[] childData = getDataWithRetries(childNodePath, true); + + if (childData == null) { + LOG.warn("Content of " + childNodePath + " is broken."); continue; } - String childNodePath = getNodePath(rmDTSecretManagerRoot, childNodeName); - byte[] childData = getDataWithRetries(childNodePath, true); ByteArrayInputStream is = new ByteArrayInputStream(childData); DataInputStream fsIn = new DataInputStream(is); + try { if (childNodeName.startsWith(DELEGATION_KEY_PREFIX)) { DelegationKey key = new DelegationKey(); key.readFields(fsIn); rmState.rmSecretManagerState.masterKeyState.add(key); - } else if (childNodeName.startsWith(DELEGATION_TOKEN_PREFIX)) { + } + } finally { + is.close(); + } + } + } + + private void loadRMSequentialNumberState(RMState rmState) throws Exception { + byte[] seqData = getDataWithRetries(dtSequenceNumberPath, false); + if (seqData != null) { + ByteArrayInputStream seqIs = new ByteArrayInputStream(seqData); + DataInputStream seqIn = new DataInputStream(seqIs); + + try { + rmState.rmSecretManagerState.dtSequenceNumber = seqIn.readInt(); + } finally { + seqIn.close(); + } + } + } + + private void loadRMDelegationTokenState(RMState rmState) throws Exception { + List childNodes = zkClient.getChildren(delegationTokensRootPath, true); + for (String childNodeName : childNodes) { + String childNodePath = + getNodePath(delegationTokensRootPath, childNodeName); + byte[] childData = getDataWithRetries(childNodePath, true); + + if (childData == null) { + LOG.warn("Content of " + childNodePath + " is broken."); + continue; + } + + ByteArrayInputStream is = new ByteArrayInputStream(childData); + DataInputStream fsIn = new DataInputStream(is); + + try { + if (childNodeName.startsWith(DELEGATION_TOKEN_PREFIX)) { RMDelegationTokenIdentifier identifier = new RMDelegationTokenIdentifier(); identifier.readFields(fsIn); @@ -385,8 +474,6 @@ private synchronized void loadRMDTSecretManagerState(RMState rmState) private synchronized void loadRMAppState(RMState rmState) throws Exception { List childNodes = getChildrenWithRetries(rmAppRoot, true); - List attempts = - new ArrayList(); for (String childNodeName : childNodes) { String childNodePath = getNodePath(rmAppRoot, childNodeName); byte[] childData = getDataWithRetries(childNodePath, true); @@ -411,17 +498,28 @@ private synchronized void loadRMAppState(RMState rmState) throws Exception { "from the application id"); } rmState.appState.put(appId, appState); - } else if (childNodeName - .startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) { - // attempt - if (LOG.isDebugEnabled()) { - LOG.debug("Loading application attempt from znode: " + childNodeName); - } + loadApplicationAttemptState(appState, appId); + } else { + LOG.info("Unknown child node with name: " + childNodeName); + } + } + } + + private void loadApplicationAttemptState(ApplicationState appState, + ApplicationId appId) + throws Exception { + String appPath = getNodePath(rmAppRoot, appId.toString()); + List attempts = getChildrenWithRetries(appPath, false); + for (String attemptIDStr : attempts) { + if (attemptIDStr.startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) { + String attemptPath = getNodePath(appPath, attemptIDStr); + byte[] attemptData = getDataWithRetries(attemptPath, true); + ApplicationAttemptId attemptId = - ConverterUtils.toApplicationAttemptId(childNodeName); + ConverterUtils.toApplicationAttemptId(attemptIDStr); ApplicationAttemptStateDataPBImpl attemptStateData = new ApplicationAttemptStateDataPBImpl( - ApplicationAttemptStateDataProto.parseFrom(childData)); + ApplicationAttemptStateDataProto.parseFrom(attemptData)); Credentials credentials = null; if (attemptStateData.getAppAttemptTokens() != null) { credentials = new Credentials(); @@ -429,47 +527,26 @@ private synchronized void loadRMAppState(RMState rmState) throws Exception { dibb.reset(attemptStateData.getAppAttemptTokens()); credentials.readTokenStorageStream(dibb); } + ApplicationAttemptState attemptState = new ApplicationAttemptState(attemptId, - attemptStateData.getMasterContainer(), credentials, - attemptStateData.getStartTime(), - attemptStateData.getState(), - attemptStateData.getFinalTrackingUrl(), - attemptStateData.getDiagnostics(), - attemptStateData.getFinalApplicationStatus()); - if (!attemptId.equals(attemptState.getAttemptId())) { - throw new YarnRuntimeException("The child node name is different " + - "from the application attempt id"); - } - attempts.add(attemptState); - } else { - LOG.info("Unknown child node with name: " + childNodeName); - } - } + attemptStateData.getMasterContainer(), credentials, + attemptStateData.getStartTime(), + attemptStateData.getState(), + attemptStateData.getFinalTrackingUrl(), + attemptStateData.getDiagnostics(), + attemptStateData.getFinalApplicationStatus()); - // go through all attempts and add them to their apps - for (ApplicationAttemptState attemptState : attempts) { - ApplicationId appId = attemptState.getAttemptId().getApplicationId(); - ApplicationState appState = rmState.appState.get(appId); - if (appState != null) { appState.attempts.put(attemptState.getAttemptId(), attemptState); - } else { - // the application znode may have been removed when the application - // completed but the RM might have stopped before it could remove the - // application attempt znodes - LOG.info("Application node not found for attempt: " - + attemptState.getAttemptId()); - deleteWithRetries( - getNodePath(rmAppRoot, attemptState.getAttemptId().toString()), -1); } } LOG.info("Done Loading applications from ZK state store"); } @Override - public synchronized void storeApplicationStateInternal(String appId, + public synchronized void storeApplicationStateInternal(ApplicationId appId, ApplicationStateDataPBImpl appStateDataPB) throws Exception { - String nodeCreatePath = getNodePath(rmAppRoot, appId); + String nodeCreatePath = getNodePath(rmAppRoot, appId.toString()); if (LOG.isDebugEnabled()) { LOG.debug("Storing info for app: " + appId + " at: " + nodeCreatePath); @@ -481,25 +558,29 @@ public synchronized void storeApplicationStateInternal(String appId, } @Override - public synchronized void updateApplicationStateInternal(String appId, + public synchronized void updateApplicationStateInternal(ApplicationId appId, ApplicationStateDataPBImpl appStateDataPB) throws Exception { - String nodeCreatePath = getNodePath(rmAppRoot, appId); + String nodeUpdatePath = getNodePath(rmAppRoot, appId.toString()); if (LOG.isDebugEnabled()) { LOG.debug("Storing final state info for app: " + appId + " at: " - + nodeCreatePath); + + nodeUpdatePath); } byte[] appStateData = appStateDataPB.getProto().toByteArray(); - setDataWithRetries(nodeCreatePath, appStateData, 0); + setDataWithRetries(nodeUpdatePath, appStateData, 0); } @Override public synchronized void storeApplicationAttemptStateInternal( - String attemptId, ApplicationAttemptStateDataPBImpl attemptStateDataPB) + ApplicationAttemptId appAttemptId, + ApplicationAttemptStateDataPBImpl attemptStateDataPB) throws Exception { - String nodeCreatePath = getNodePath(rmAppRoot, attemptId); + String appDirPath = getNodePath(rmAppRoot, + appAttemptId.getApplicationId().toString()); + String nodeCreatePath = getNodePath(appDirPath, appAttemptId.toString()); + if (LOG.isDebugEnabled()) { - LOG.debug("Storing info for attempt: " + attemptId + " at: " + LOG.debug("Storing info for attempt: " + appAttemptId + " at: " + nodeCreatePath); } byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray(); @@ -509,31 +590,36 @@ public synchronized void storeApplicationAttemptStateInternal( @Override public synchronized void updateApplicationAttemptStateInternal( - String attemptId, ApplicationAttemptStateDataPBImpl attemptStateDataPB) + ApplicationAttemptId appAttemptId, + ApplicationAttemptStateDataPBImpl attemptStateDataPB) throws Exception { - String nodeCreatePath = getNodePath(rmAppRoot, attemptId); + String appIdStr = appAttemptId.getApplicationId().toString(); + String appAttemptIdStr = appAttemptId.toString(); + String appDirPath = getNodePath(rmAppRoot, appIdStr); + String nodeUpdatePath = getNodePath(appDirPath, appAttemptIdStr); if (LOG.isDebugEnabled()) { - LOG.debug("Storing final state info for attempt: " + attemptId + " at: " - + nodeCreatePath); + LOG.debug("Storing final state info for attempt: " + appAttemptIdStr + + " at: " + nodeUpdatePath); } byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray(); - setDataWithRetries(nodeCreatePath, attemptStateData, 0); + setDataWithRetries(nodeUpdatePath, attemptStateData, 0); } @Override public synchronized void removeApplicationStateInternal(ApplicationState appState) throws Exception { String appId = appState.getAppId().toString(); - String nodeRemovePath = getNodePath(rmAppRoot, appId); + String appIdRemovePath = getNodePath(rmAppRoot, appId); ArrayList opList = new ArrayList(); - opList.add(Op.delete(nodeRemovePath, -1)); for (ApplicationAttemptId attemptId : appState.attempts.keySet()) { - String attemptRemovePath = getNodePath(rmAppRoot, attemptId.toString()); + String attemptRemovePath = getNodePath(appIdRemovePath, attemptId.toString()); opList.add(Op.delete(attemptRemovePath, -1)); } + opList.add(Op.delete(appIdRemovePath, -1)); + if (LOG.isDebugEnabled()) { - LOG.debug("Removing info for app: " + appId + " at: " + nodeRemovePath + LOG.debug("Removing info for app: " + appId + " at: " + appIdRemovePath + " and its attempts."); } doMultiWithRetries(opList); @@ -546,38 +632,37 @@ protected synchronized void storeRMDelegationTokenAndSequenceNumberState( ArrayList opList = new ArrayList(); // store RM delegation token String nodeCreatePath = - getNodePath(rmDTSecretManagerRoot, DELEGATION_TOKEN_PREFIX + getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX + rmDTIdentifier.getSequenceNumber()); - ByteArrayOutputStream os = new ByteArrayOutputStream(); - DataOutputStream fsOut = new DataOutputStream(os); + ByteArrayOutputStream tokenOs = new ByteArrayOutputStream(); + DataOutputStream tokenOut = new DataOutputStream(tokenOs); + ByteArrayOutputStream seqOs = new ByteArrayOutputStream(); + DataOutputStream seqOut = new DataOutputStream(seqOs); + try { - rmDTIdentifier.write(fsOut); - fsOut.writeLong(renewDate); + rmDTIdentifier.write(tokenOut); + tokenOut.writeLong(renewDate); if (LOG.isDebugEnabled()) { LOG.debug("Storing RMDelegationToken_" + rmDTIdentifier.getSequenceNumber()); } - opList.add(Op.create(nodeCreatePath, os.toByteArray(), zkAcl, + + opList.add(Op.create(nodeCreatePath, tokenOs.toByteArray(), zkAcl, CreateMode.PERSISTENT)); + + + seqOut.writeInt(latestSequenceNumber); + if (LOG.isDebugEnabled()) { + LOG.debug("Storing " + dtSequenceNumberPath + + ". SequenceNumber: " + latestSequenceNumber); + } + + opList.add(Op.setData(dtSequenceNumberPath, seqOs.toByteArray(), -1)); } finally { - os.close(); + tokenOs.close(); + seqOs.close(); } - // store sequence number - String latestSequenceNumberPath = - getNodePath(rmDTSecretManagerRoot, - DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX + latestSequenceNumber); - if (LOG.isDebugEnabled()) { - LOG.debug("Storing " + DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX + - latestSequenceNumber); - } - - if (dtSequenceNumberPath != null) { - opList.add(Op.delete(dtSequenceNumberPath, -1)); - } - opList.add(Op.create(latestSequenceNumberPath, null, zkAcl, - CreateMode.PERSISTENT)); - dtSequenceNumberPath = latestSequenceNumberPath; doMultiWithRetries(opList); } @@ -585,7 +670,7 @@ protected synchronized void storeRMDelegationTokenAndSequenceNumberState( protected synchronized void removeRMDelegationTokenState( RMDelegationTokenIdentifier rmDTIdentifier) throws Exception { String nodeRemovePath = - getNodePath(rmDTSecretManagerRoot, DELEGATION_TOKEN_PREFIX + getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX + rmDTIdentifier.getSequenceNumber()); if (LOG.isDebugEnabled()) { LOG.debug("Removing RMDelegationToken_" @@ -598,7 +683,7 @@ protected synchronized void removeRMDelegationTokenState( protected synchronized void storeRMDTMasterKeyState( DelegationKey delegationKey) throws Exception { String nodeCreatePath = - getNodePath(rmDTSecretManagerRoot, DELEGATION_KEY_PREFIX + getNodePath(dtMasterKeysRootPath, DELEGATION_KEY_PREFIX + delegationKey.getKeyId()); ByteArrayOutputStream os = new ByteArrayOutputStream(); DataOutputStream fsOut = new DataOutputStream(os); @@ -618,7 +703,7 @@ protected synchronized void storeRMDTMasterKeyState( protected synchronized void removeRMDTMasterKeyState( DelegationKey delegationKey) throws Exception { String nodeRemovePath = - getNodePath(rmDTSecretManagerRoot, DELEGATION_KEY_PREFIX + getNodePath(dtMasterKeysRootPath, DELEGATION_KEY_PREFIX + delegationKey.getKeyId()); if (LOG.isDebugEnabled()) { LOG.debug("Removing RMDelegationKey_" + delegationKey.getKeyId()); @@ -757,8 +842,7 @@ public byte[] getDataWithRetries(final String path, final boolean watch) return new ZKAction() { @Override public byte[] run() throws KeeperException, InterruptedException { - Stat stat = new Stat(); - return zkClient.getData(path, watch, stat); + return zkClient.getData(path, watch, null); } }.runWithRetries(); } @@ -865,4 +949,5 @@ protected synchronized ZooKeeper getNewZooKeeper() zk.register(new ForwardingWatcher()); return zk; } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index d273cab3e6..fe220c07eb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -683,14 +683,14 @@ public void testRMRestartKilledAppWithNoAttempts() throws Exception { MemoryRMStateStore memStore = new MemoryRMStateStore() { @Override public synchronized void storeApplicationAttemptStateInternal( - String attemptIdStr, + ApplicationAttemptId attemptId, ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception { // ignore attempt saving request. } @Override public synchronized void updateApplicationAttemptStateInternal( - String attemptIdStr, + ApplicationAttemptId attemptId, ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception { // ignore attempt saving request. } @@ -1540,7 +1540,7 @@ public class TestMemoryRMStateStore extends MemoryRMStateStore { public int updateAttempt = 0; @Override - public void updateApplicationStateInternal(String appId, + public void updateApplicationStateInternal(ApplicationId appId, ApplicationStateDataPBImpl appStateData) throws Exception { updateApp = ++count; super.updateApplicationStateInternal(appId, appStateData); @@ -1548,11 +1548,12 @@ public void updateApplicationStateInternal(String appId, @Override public synchronized void - updateApplicationAttemptStateInternal(String attemptIdStr, + updateApplicationAttemptStateInternal( + ApplicationAttemptId attemptId, ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception { updateAttempt = ++count; - super.updateApplicationAttemptStateInternal(attemptIdStr, + super.updateApplicationAttemptStateInternal(attemptId, attemptStateData); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java index ff110b31f2..30cdbc157f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java @@ -234,6 +234,12 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper) attempts.put(attemptIdRemoved, mockRemovedAttempt); store.removeApplication(mockRemovedApp); + // remove application directory recursively. + storeApp(store, appIdRemoved, submitTime, startTime); + storeAttempt(store, attemptIdRemoved, + "container_1352994193343_0002_01_000001", null, null, dispatcher); + store.removeApplication(mockRemovedApp); + // let things settle down Thread.sleep(1000); store.close(); @@ -373,7 +379,30 @@ public void testRMDTSecretManagerStateStore( Assert.assertEquals(keySet, secretManagerState.getMasterKeyState()); Assert.assertEquals(sequenceNumber, secretManagerState.getDTSequenceNumber()); + + // check to delete delegationKey + store.removeRMDTMasterKey(key); + keySet.clear(); + RMDTSecretManagerState noKeySecretManagerState = + store.loadState().getRMDTSecretManagerState(); + Assert.assertEquals(token1, noKeySecretManagerState.getTokenState()); + Assert.assertEquals(keySet, noKeySecretManagerState.getMasterKeyState()); + Assert.assertEquals(sequenceNumber, + noKeySecretManagerState.getDTSequenceNumber()); + + // check to delete delegationToken + store.removeRMDelegationToken(dtId1, sequenceNumber); + RMDTSecretManagerState noKeyAndTokenSecretManagerState = + store.loadState().getRMDTSecretManagerState(); + token1.clear(); + Assert.assertEquals(token1, + noKeyAndTokenSecretManagerState.getTokenState()); + Assert.assertEquals(keySet, + noKeyAndTokenSecretManagerState.getMasterKeyState()); + Assert.assertEquals(sequenceNumber, + noKeySecretManagerState.getDTSequenceNumber()); store.close(); + } private Token generateAMRMToken( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java index 27e8411cc1..8427552318 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; @@ -178,10 +179,11 @@ public void testFSRMStateStoreClientRetry() throws Exception { @Override public void run() { try { - store.storeApplicationStateInternal("application1", - (ApplicationStateDataPBImpl) ApplicationStateDataPBImpl - .newApplicationStateData(111, 111, "user", null, - RMAppState.ACCEPTED, "diagnostics", 333)); + store.storeApplicationStateInternal( + ApplicationId.newInstance(100L, 1), + (ApplicationStateDataPBImpl) ApplicationStateDataPBImpl + .newApplicationStateData(111, 111, "user", null, + RMAppState.ACCEPTED, "diagnostics", 333)); } catch (Exception e) { // TODO 0 datanode exception will not be retried by dfs client, fix // that separately.