YARN-1307. Redesign znode structure for Zookeeper based RM state-store for better organization and scalability. Contributed by Tsuyoshi OZAWA.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1552209 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-12-19 02:33:05 +00:00
parent a9d80ae59d
commit 93907baa0b
9 changed files with 277 additions and 156 deletions

View File

@ -184,6 +184,9 @@ Release 2.4.0 - UNRELEASED
YARN-1446. Changed client API to retry killing application till RM
acknowledges so as to account for RM crashes/failover. (Jian He via vinodkv)
YARN-1307. Redesign znode structure for Zookeeper based RM state-store for
better organization and scalability. (Tsuyoshi OZAWA via vinodkv)
OPTIMIZATIONS
BUG FIXES

View File

@ -287,11 +287,12 @@ private void loadRMDTSecretManagerState(RMState rmState) throws Exception {
}
@Override
public synchronized void storeApplicationStateInternal(String appId,
public synchronized void storeApplicationStateInternal(ApplicationId appId,
ApplicationStateDataPBImpl appStateDataPB) throws Exception {
Path appDirPath = getAppDir(rmAppRoot, appId);
String appIdStr = appId.toString();
Path appDirPath = getAppDir(rmAppRoot, appIdStr);
fs.mkdirs(appDirPath);
Path nodeCreatePath = getNodePath(appDirPath, appId);
Path nodeCreatePath = getNodePath(appDirPath, appIdStr);
LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath);
byte[] appStateData = appStateDataPB.getProto().toByteArray();
@ -306,10 +307,11 @@ public synchronized void storeApplicationStateInternal(String appId,
}
@Override
public synchronized void updateApplicationStateInternal(String appId,
public synchronized void updateApplicationStateInternal(ApplicationId appId,
ApplicationStateDataPBImpl appStateDataPB) throws Exception {
Path appDirPath = getAppDir(rmAppRoot, appId);
Path nodeCreatePath = getNodePath(appDirPath, appId);
String appIdStr = appId.toString();
Path appDirPath = getAppDir(rmAppRoot, appIdStr);
Path nodeCreatePath = getNodePath(appDirPath, appIdStr);
LOG.info("Updating info for app: " + appId + " at: " + nodeCreatePath);
byte[] appStateData = appStateDataPB.getProto().toByteArray();
@ -325,14 +327,13 @@ public synchronized void updateApplicationStateInternal(String appId,
@Override
public synchronized void storeApplicationAttemptStateInternal(
String attemptId, ApplicationAttemptStateDataPBImpl attemptStateDataPB)
ApplicationAttemptId appAttemptId,
ApplicationAttemptStateDataPBImpl attemptStateDataPB)
throws Exception {
ApplicationAttemptId appAttemptId =
ConverterUtils.toApplicationAttemptId(attemptId);
Path appDirPath =
getAppDir(rmAppRoot, appAttemptId.getApplicationId().toString());
Path nodeCreatePath = getNodePath(appDirPath, attemptId);
LOG.info("Storing info for attempt: " + attemptId + " at: "
Path nodeCreatePath = getNodePath(appDirPath, appAttemptId.toString());
LOG.info("Storing info for attempt: " + appAttemptId + " at: "
+ nodeCreatePath);
byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
try {
@ -340,21 +341,20 @@ public synchronized void storeApplicationAttemptStateInternal(
// based on whether we have lost the right to write to FS
writeFile(nodeCreatePath, attemptStateData);
} catch (Exception e) {
LOG.info("Error storing info for attempt: " + attemptId, e);
LOG.info("Error storing info for attempt: " + appAttemptId, e);
throw e;
}
}
@Override
public synchronized void updateApplicationAttemptStateInternal(
String attemptId, ApplicationAttemptStateDataPBImpl attemptStateDataPB)
ApplicationAttemptId appAttemptId,
ApplicationAttemptStateDataPBImpl attemptStateDataPB)
throws Exception {
ApplicationAttemptId appAttemptId =
ConverterUtils.toApplicationAttemptId(attemptId);
Path appDirPath =
getAppDir(rmAppRoot, appAttemptId.getApplicationId().toString());
Path nodeCreatePath = getNodePath(appDirPath, attemptId);
LOG.info("Updating info for attempt: " + attemptId + " at: "
Path nodeCreatePath = getNodePath(appDirPath, appAttemptId.toString());
LOG.info("Updating info for attempt: " + appAttemptId + " at: "
+ nodeCreatePath);
byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
try {
@ -362,7 +362,7 @@ public synchronized void updateApplicationAttemptStateInternal(
// based on whether we have lost the right to write to FS
updateFile(nodeCreatePath, attemptStateData);
} catch (Exception e) {
LOG.info("Error updating info for attempt: " + attemptId, e);
LOG.info("Error updating info for attempt: " + appAttemptId, e);
throw e;
}
}

View File

@ -80,7 +80,7 @@ protected synchronized void closeInternal() throws Exception {
}
@Override
public void storeApplicationStateInternal(String appId,
public void storeApplicationStateInternal(ApplicationId appId,
ApplicationStateDataPBImpl appStateData)
throws Exception {
ApplicationState appState =
@ -88,11 +88,11 @@ public void storeApplicationStateInternal(String appId,
appStateData.getStartTime(),
appStateData.getApplicationSubmissionContext(),
appStateData.getUser());
state.appState.put(appState.getAppId(), appState);
state.appState.put(appId, appState);
}
@Override
public void updateApplicationStateInternal(String appId,
public void updateApplicationStateInternal(ApplicationId appId,
ApplicationStateDataPBImpl appStateData) throws Exception {
ApplicationState updatedAppState =
new ApplicationState(appStateData.getSubmitTime(),
@ -102,21 +102,19 @@ public void updateApplicationStateInternal(String appId,
appStateData.getDiagnostics(), appStateData.getFinishTime());
LOG.info("Updating final state " + appStateData.getState() + " for app: "
+ appId);
ApplicationId applicationId = updatedAppState.getAppId();
if (state.appState.get(applicationId) != null) {
if (state.appState.get(appId) != null) {
// add the earlier attempts back
updatedAppState.attempts
.putAll(state.appState.get(applicationId).attempts);
.putAll(state.appState.get(appId).attempts);
}
state.appState.put(applicationId, updatedAppState);
state.appState.put(appId, updatedAppState);
}
@Override
public synchronized void storeApplicationAttemptStateInternal(String attemptIdStr,
ApplicationAttemptStateDataPBImpl attemptStateData)
throws Exception {
ApplicationAttemptId attemptId = ConverterUtils
.toApplicationAttemptId(attemptIdStr);
public synchronized void storeApplicationAttemptStateInternal(
ApplicationAttemptId appAttemptId,
ApplicationAttemptStateDataPBImpl attemptStateData)
throws Exception {
Credentials credentials = null;
if(attemptStateData.getAppAttemptTokens() != null){
DataInputByteBuffer dibb = new DataInputByteBuffer();
@ -125,7 +123,7 @@ public synchronized void storeApplicationAttemptStateInternal(String attemptIdSt
credentials.readTokenStorageStream(dibb);
}
ApplicationAttemptState attemptState =
new ApplicationAttemptState(attemptId,
new ApplicationAttemptState(appAttemptId,
attemptStateData.getMasterContainer(), credentials,
attemptStateData.getStartTime());
@ -139,10 +137,9 @@ public synchronized void storeApplicationAttemptStateInternal(String attemptIdSt
@Override
public synchronized void updateApplicationAttemptStateInternal(
String attemptIdStr, ApplicationAttemptStateDataPBImpl attemptStateData)
ApplicationAttemptId appAttemptId,
ApplicationAttemptStateDataPBImpl attemptStateData)
throws Exception {
ApplicationAttemptId attemptId =
ConverterUtils.toApplicationAttemptId(attemptIdStr);
Credentials credentials = null;
if (attemptStateData.getAppAttemptTokens() != null) {
DataInputByteBuffer dibb = new DataInputByteBuffer();
@ -151,7 +148,7 @@ public synchronized void updateApplicationAttemptStateInternal(
credentials.readTokenStorageStream(dibb);
}
ApplicationAttemptState updatedAttemptState =
new ApplicationAttemptState(attemptId,
new ApplicationAttemptState(appAttemptId,
attemptStateData.getMasterContainer(), credentials,
attemptStateData.getStartTime(), attemptStateData.getState(),
attemptStateData.getFinalTrackingUrl(),

View File

@ -22,6 +22,8 @@
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
@ -51,13 +53,13 @@ public RMState loadState() throws Exception {
}
@Override
protected void storeApplicationStateInternal(String appId,
protected void storeApplicationStateInternal(ApplicationId appId,
ApplicationStateDataPBImpl appStateData) throws Exception {
// Do nothing
}
@Override
protected void storeApplicationAttemptStateInternal(String attemptId,
protected void storeApplicationAttemptStateInternal(ApplicationAttemptId attemptId,
ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception {
// Do nothing
}
@ -92,13 +94,13 @@ public void removeRMDTMasterKeyState(DelegationKey delegationKey) throws Excepti
}
@Override
protected void updateApplicationStateInternal(String appId,
protected void updateApplicationStateInternal(ApplicationId appId,
ApplicationStateDataPBImpl appStateData) throws Exception {
// Do nothing
}
@Override
protected void updateApplicationAttemptStateInternal(String attemptId,
protected void updateApplicationAttemptStateInternal(ApplicationAttemptId attemptId,
ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception {
}

View File

@ -387,10 +387,10 @@ public synchronized void updateApplicationState(ApplicationState appState) {
* Derived classes must implement this method to store the state of an
* application.
*/
protected abstract void storeApplicationStateInternal(String appId,
protected abstract void storeApplicationStateInternal(ApplicationId appId,
ApplicationStateDataPBImpl appStateData) throws Exception;
protected abstract void updateApplicationStateInternal(String appId,
protected abstract void updateApplicationStateInternal(ApplicationId appId,
ApplicationStateDataPBImpl appStateData) throws Exception;
@SuppressWarnings("unchecked")
@ -424,10 +424,12 @@ public synchronized void updateApplicationAttemptState(
* Derived classes must implement this method to store the state of an
* application attempt
*/
protected abstract void storeApplicationAttemptStateInternal(String attemptId,
protected abstract void storeApplicationAttemptStateInternal(
ApplicationAttemptId attemptId,
ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception;
protected abstract void updateApplicationAttemptStateInternal(String attemptId,
protected abstract void updateApplicationAttemptStateInternal(
ApplicationAttemptId attemptId,
ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception;
/**
@ -592,11 +594,11 @@ protected void handleStoreEvent(RMStateStoreEvent event) {
LOG.info("Storing info for app: " + appId);
try {
if (event.getType().equals(RMStateStoreEventType.STORE_APP)) {
storeApplicationStateInternal(appId.toString(), appStateData);
storeApplicationStateInternal(appId, appStateData);
notifyDoneStoringApplication(appId, storedException);
} else {
assert event.getType().equals(RMStateStoreEventType.UPDATE_APP);
updateApplicationStateInternal(appId.toString(), appStateData);
updateApplicationStateInternal(appId, appStateData);
notifyDoneUpdatingApplication(appId, storedException);
}
} catch (Exception e) {
@ -637,15 +639,15 @@ protected void handleStoreEvent(RMStateStoreEvent event) {
LOG.debug("Storing info for attempt: " + attemptState.getAttemptId());
}
if (event.getType().equals(RMStateStoreEventType.STORE_APP_ATTEMPT)) {
storeApplicationAttemptStateInternal(attemptState.getAttemptId()
.toString(), attemptStateData);
storeApplicationAttemptStateInternal(attemptState.getAttemptId(),
attemptStateData);
notifyDoneStoringApplicationAttempt(attemptState.getAttemptId(),
storedException);
} else {
assert event.getType().equals(
RMStateStoreEventType.UPDATE_APP_ATTEMPT);
updateApplicationAttemptStateInternal(attemptState.getAttemptId()
.toString(), attemptStateData);
updateApplicationAttemptStateInternal(attemptState.getAttemptId(),
attemptStateData);
notifyDoneUpdatingApplicationAttempt(attemptState.getAttemptId(),
storedException);
}

View File

@ -78,16 +78,51 @@ public class ZKRMStateStore extends RMStateStore {
protected static final String ROOT_ZNODE_NAME = "ZKRMStateRoot";
protected static final RMStateVersion CURRENT_VERSION_INFO = RMStateVersion
.newInstance(1, 0);
private static final String RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME =
"RMDelegationTokensRoot";
private static final String RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME =
"RMDTSequentialNumber";
private static final String RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME =
"RMDTMasterKeysRoot";
private int numRetries;
private String zkHostPort = null;
private int zkSessionTimeout;
private long zkRetryInterval;
private List<ACL> zkAcl;
/**
*
* ROOT_DIR_PATH
* |--- VERSION_INFO
* |--- RM_ZK_FENCING_LOCK
* |--- RM_APP_ROOT
* | |----- (#ApplicationId1)
* | | |----- (#ApplicationAttemptIds)
* | |
* | |----- (#ApplicationId2)
* | | |----- (#ApplicationAttemptIds)
* | ....
* |
* |--- RM_DT_SECRET_MANAGER_ROOT
* |----- RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME
* |----- RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME
* | |----- Token_1
* | |----- Token_2
* | ....
* |
* |----- RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME
* | |----- Key_1
* | |----- Key_2
* ....
*
*/
private String zkRootNodePath;
private String rmDTSecretManagerRoot;
private String rmAppRoot;
private String dtSequenceNumberPath = null;
private String rmDTSecretManagerRoot;
private String dtMasterKeysRootPath;
private String delegationTokensRootPath;
private String dtSequenceNumberPath;
@VisibleForTesting
protected String znodeWorkingPath;
@ -178,12 +213,11 @@ public synchronized void initInternal(Configuration conf) throws Exception {
throw bafe;
}
zkRootNodePath = znodeWorkingPath + "/" + ROOT_ZNODE_NAME;
rmDTSecretManagerRoot = zkRootNodePath + "/" + RM_DT_SECRET_MANAGER_ROOT;
rmAppRoot = zkRootNodePath + "/" + RM_APP_ROOT;
zkRootNodePath = getNodePath(znodeWorkingPath, ROOT_ZNODE_NAME);
rmAppRoot = getNodePath(zkRootNodePath, RM_APP_ROOT);
/* Initialize fencing related paths, acls, and ops */
fencingNodePath = zkRootNodePath + "/" + FENCING_LOCK;
fencingNodePath = getNodePath(zkRootNodePath, FENCING_LOCK);
createFencingNodePathOp = Op.create(fencingNodePath, new byte[0], zkAcl,
CreateMode.PERSISTENT);
deleteFencingNodePathOp = Op.delete(fencingNodePath, -1);
@ -204,6 +238,15 @@ public synchronized void initInternal(Configuration conf) throws Exception {
zkRootNodeAcl = constructZkRootNodeACL(conf, zkAcl);
}
}
rmDTSecretManagerRoot =
getNodePath(zkRootNodePath, RM_DT_SECRET_MANAGER_ROOT);
dtMasterKeysRootPath = getNodePath(rmDTSecretManagerRoot,
RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME);
delegationTokensRootPath = getNodePath(rmDTSecretManagerRoot,
RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME);
dtSequenceNumberPath = getNodePath(rmDTSecretManagerRoot,
RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME);
}
@Override
@ -217,8 +260,11 @@ public synchronized void startInternal() throws Exception {
if (HAUtil.isHAEnabled(getConfig())){
fence();
}
createRootDir(rmDTSecretManagerRoot);
createRootDir(rmAppRoot);
createRootDir(rmDTSecretManagerRoot);
createRootDir(dtMasterKeysRootPath);
createRootDir(delegationTokensRootPath);
createRootDir(dtSequenceNumberPath);
}
private void createRootDir(final String rootPath) throws Exception {
@ -350,26 +396,69 @@ public synchronized RMState loadState() throws Exception {
private synchronized void loadRMDTSecretManagerState(RMState rmState)
throws Exception {
List<String> childNodes =
getChildrenWithRetries(rmDTSecretManagerRoot, true);
loadRMDelegationKeyState(rmState);
loadRMSequentialNumberState(rmState);
loadRMDelegationTokenState(rmState);
}
private void loadRMDelegationKeyState(RMState rmState) throws Exception {
List<String> childNodes =
getChildrenWithRetries(dtMasterKeysRootPath, true);
for (String childNodeName : childNodes) {
if (childNodeName.startsWith(DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX)) {
rmState.rmSecretManagerState.dtSequenceNumber =
Integer.parseInt(childNodeName.split("_")[1]);
String childNodePath = getNodePath(dtMasterKeysRootPath, childNodeName);
byte[] childData = getDataWithRetries(childNodePath, true);
if (childData == null) {
LOG.warn("Content of " + childNodePath + " is broken.");
continue;
}
String childNodePath = getNodePath(rmDTSecretManagerRoot, childNodeName);
byte[] childData = getDataWithRetries(childNodePath, true);
ByteArrayInputStream is = new ByteArrayInputStream(childData);
DataInputStream fsIn = new DataInputStream(is);
try {
if (childNodeName.startsWith(DELEGATION_KEY_PREFIX)) {
DelegationKey key = new DelegationKey();
key.readFields(fsIn);
rmState.rmSecretManagerState.masterKeyState.add(key);
} else if (childNodeName.startsWith(DELEGATION_TOKEN_PREFIX)) {
}
} finally {
is.close();
}
}
}
private void loadRMSequentialNumberState(RMState rmState) throws Exception {
byte[] seqData = getDataWithRetries(dtSequenceNumberPath, false);
if (seqData != null) {
ByteArrayInputStream seqIs = new ByteArrayInputStream(seqData);
DataInputStream seqIn = new DataInputStream(seqIs);
try {
rmState.rmSecretManagerState.dtSequenceNumber = seqIn.readInt();
} finally {
seqIn.close();
}
}
}
private void loadRMDelegationTokenState(RMState rmState) throws Exception {
List<String> childNodes = zkClient.getChildren(delegationTokensRootPath, true);
for (String childNodeName : childNodes) {
String childNodePath =
getNodePath(delegationTokensRootPath, childNodeName);
byte[] childData = getDataWithRetries(childNodePath, true);
if (childData == null) {
LOG.warn("Content of " + childNodePath + " is broken.");
continue;
}
ByteArrayInputStream is = new ByteArrayInputStream(childData);
DataInputStream fsIn = new DataInputStream(is);
try {
if (childNodeName.startsWith(DELEGATION_TOKEN_PREFIX)) {
RMDelegationTokenIdentifier identifier =
new RMDelegationTokenIdentifier();
identifier.readFields(fsIn);
@ -385,8 +474,6 @@ private synchronized void loadRMDTSecretManagerState(RMState rmState)
private synchronized void loadRMAppState(RMState rmState) throws Exception {
List<String> childNodes = getChildrenWithRetries(rmAppRoot, true);
List<ApplicationAttemptState> attempts =
new ArrayList<ApplicationAttemptState>();
for (String childNodeName : childNodes) {
String childNodePath = getNodePath(rmAppRoot, childNodeName);
byte[] childData = getDataWithRetries(childNodePath, true);
@ -411,17 +498,28 @@ private synchronized void loadRMAppState(RMState rmState) throws Exception {
"from the application id");
}
rmState.appState.put(appId, appState);
} else if (childNodeName
.startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) {
// attempt
if (LOG.isDebugEnabled()) {
LOG.debug("Loading application attempt from znode: " + childNodeName);
}
loadApplicationAttemptState(appState, appId);
} else {
LOG.info("Unknown child node with name: " + childNodeName);
}
}
}
private void loadApplicationAttemptState(ApplicationState appState,
ApplicationId appId)
throws Exception {
String appPath = getNodePath(rmAppRoot, appId.toString());
List<String> attempts = getChildrenWithRetries(appPath, false);
for (String attemptIDStr : attempts) {
if (attemptIDStr.startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) {
String attemptPath = getNodePath(appPath, attemptIDStr);
byte[] attemptData = getDataWithRetries(attemptPath, true);
ApplicationAttemptId attemptId =
ConverterUtils.toApplicationAttemptId(childNodeName);
ConverterUtils.toApplicationAttemptId(attemptIDStr);
ApplicationAttemptStateDataPBImpl attemptStateData =
new ApplicationAttemptStateDataPBImpl(
ApplicationAttemptStateDataProto.parseFrom(childData));
ApplicationAttemptStateDataProto.parseFrom(attemptData));
Credentials credentials = null;
if (attemptStateData.getAppAttemptTokens() != null) {
credentials = new Credentials();
@ -429,47 +527,26 @@ private synchronized void loadRMAppState(RMState rmState) throws Exception {
dibb.reset(attemptStateData.getAppAttemptTokens());
credentials.readTokenStorageStream(dibb);
}
ApplicationAttemptState attemptState =
new ApplicationAttemptState(attemptId,
attemptStateData.getMasterContainer(), credentials,
attemptStateData.getStartTime(),
attemptStateData.getState(),
attemptStateData.getFinalTrackingUrl(),
attemptStateData.getDiagnostics(),
attemptStateData.getFinalApplicationStatus());
if (!attemptId.equals(attemptState.getAttemptId())) {
throw new YarnRuntimeException("The child node name is different " +
"from the application attempt id");
}
attempts.add(attemptState);
} else {
LOG.info("Unknown child node with name: " + childNodeName);
}
}
attemptStateData.getMasterContainer(), credentials,
attemptStateData.getStartTime(),
attemptStateData.getState(),
attemptStateData.getFinalTrackingUrl(),
attemptStateData.getDiagnostics(),
attemptStateData.getFinalApplicationStatus());
// go through all attempts and add them to their apps
for (ApplicationAttemptState attemptState : attempts) {
ApplicationId appId = attemptState.getAttemptId().getApplicationId();
ApplicationState appState = rmState.appState.get(appId);
if (appState != null) {
appState.attempts.put(attemptState.getAttemptId(), attemptState);
} else {
// the application znode may have been removed when the application
// completed but the RM might have stopped before it could remove the
// application attempt znodes
LOG.info("Application node not found for attempt: "
+ attemptState.getAttemptId());
deleteWithRetries(
getNodePath(rmAppRoot, attemptState.getAttemptId().toString()), -1);
}
}
LOG.info("Done Loading applications from ZK state store");
}
@Override
public synchronized void storeApplicationStateInternal(String appId,
public synchronized void storeApplicationStateInternal(ApplicationId appId,
ApplicationStateDataPBImpl appStateDataPB) throws Exception {
String nodeCreatePath = getNodePath(rmAppRoot, appId);
String nodeCreatePath = getNodePath(rmAppRoot, appId.toString());
if (LOG.isDebugEnabled()) {
LOG.debug("Storing info for app: " + appId + " at: " + nodeCreatePath);
@ -481,25 +558,29 @@ public synchronized void storeApplicationStateInternal(String appId,
}
@Override
public synchronized void updateApplicationStateInternal(String appId,
public synchronized void updateApplicationStateInternal(ApplicationId appId,
ApplicationStateDataPBImpl appStateDataPB) throws Exception {
String nodeCreatePath = getNodePath(rmAppRoot, appId);
String nodeUpdatePath = getNodePath(rmAppRoot, appId.toString());
if (LOG.isDebugEnabled()) {
LOG.debug("Storing final state info for app: " + appId + " at: "
+ nodeCreatePath);
+ nodeUpdatePath);
}
byte[] appStateData = appStateDataPB.getProto().toByteArray();
setDataWithRetries(nodeCreatePath, appStateData, 0);
setDataWithRetries(nodeUpdatePath, appStateData, 0);
}
@Override
public synchronized void storeApplicationAttemptStateInternal(
String attemptId, ApplicationAttemptStateDataPBImpl attemptStateDataPB)
ApplicationAttemptId appAttemptId,
ApplicationAttemptStateDataPBImpl attemptStateDataPB)
throws Exception {
String nodeCreatePath = getNodePath(rmAppRoot, attemptId);
String appDirPath = getNodePath(rmAppRoot,
appAttemptId.getApplicationId().toString());
String nodeCreatePath = getNodePath(appDirPath, appAttemptId.toString());
if (LOG.isDebugEnabled()) {
LOG.debug("Storing info for attempt: " + attemptId + " at: "
LOG.debug("Storing info for attempt: " + appAttemptId + " at: "
+ nodeCreatePath);
}
byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
@ -509,31 +590,36 @@ public synchronized void storeApplicationAttemptStateInternal(
@Override
public synchronized void updateApplicationAttemptStateInternal(
String attemptId, ApplicationAttemptStateDataPBImpl attemptStateDataPB)
ApplicationAttemptId appAttemptId,
ApplicationAttemptStateDataPBImpl attemptStateDataPB)
throws Exception {
String nodeCreatePath = getNodePath(rmAppRoot, attemptId);
String appIdStr = appAttemptId.getApplicationId().toString();
String appAttemptIdStr = appAttemptId.toString();
String appDirPath = getNodePath(rmAppRoot, appIdStr);
String nodeUpdatePath = getNodePath(appDirPath, appAttemptIdStr);
if (LOG.isDebugEnabled()) {
LOG.debug("Storing final state info for attempt: " + attemptId + " at: "
+ nodeCreatePath);
LOG.debug("Storing final state info for attempt: " + appAttemptIdStr
+ " at: " + nodeUpdatePath);
}
byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
setDataWithRetries(nodeCreatePath, attemptStateData, 0);
setDataWithRetries(nodeUpdatePath, attemptStateData, 0);
}
@Override
public synchronized void removeApplicationStateInternal(ApplicationState appState)
throws Exception {
String appId = appState.getAppId().toString();
String nodeRemovePath = getNodePath(rmAppRoot, appId);
String appIdRemovePath = getNodePath(rmAppRoot, appId);
ArrayList<Op> opList = new ArrayList<Op>();
opList.add(Op.delete(nodeRemovePath, -1));
for (ApplicationAttemptId attemptId : appState.attempts.keySet()) {
String attemptRemovePath = getNodePath(rmAppRoot, attemptId.toString());
String attemptRemovePath = getNodePath(appIdRemovePath, attemptId.toString());
opList.add(Op.delete(attemptRemovePath, -1));
}
opList.add(Op.delete(appIdRemovePath, -1));
if (LOG.isDebugEnabled()) {
LOG.debug("Removing info for app: " + appId + " at: " + nodeRemovePath
LOG.debug("Removing info for app: " + appId + " at: " + appIdRemovePath
+ " and its attempts.");
}
doMultiWithRetries(opList);
@ -546,38 +632,37 @@ protected synchronized void storeRMDelegationTokenAndSequenceNumberState(
ArrayList<Op> opList = new ArrayList<Op>();
// store RM delegation token
String nodeCreatePath =
getNodePath(rmDTSecretManagerRoot, DELEGATION_TOKEN_PREFIX
getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
+ rmDTIdentifier.getSequenceNumber());
ByteArrayOutputStream os = new ByteArrayOutputStream();
DataOutputStream fsOut = new DataOutputStream(os);
ByteArrayOutputStream tokenOs = new ByteArrayOutputStream();
DataOutputStream tokenOut = new DataOutputStream(tokenOs);
ByteArrayOutputStream seqOs = new ByteArrayOutputStream();
DataOutputStream seqOut = new DataOutputStream(seqOs);
try {
rmDTIdentifier.write(fsOut);
fsOut.writeLong(renewDate);
rmDTIdentifier.write(tokenOut);
tokenOut.writeLong(renewDate);
if (LOG.isDebugEnabled()) {
LOG.debug("Storing RMDelegationToken_" +
rmDTIdentifier.getSequenceNumber());
}
opList.add(Op.create(nodeCreatePath, os.toByteArray(), zkAcl,
opList.add(Op.create(nodeCreatePath, tokenOs.toByteArray(), zkAcl,
CreateMode.PERSISTENT));
seqOut.writeInt(latestSequenceNumber);
if (LOG.isDebugEnabled()) {
LOG.debug("Storing " + dtSequenceNumberPath +
". SequenceNumber: " + latestSequenceNumber);
}
opList.add(Op.setData(dtSequenceNumberPath, seqOs.toByteArray(), -1));
} finally {
os.close();
tokenOs.close();
seqOs.close();
}
// store sequence number
String latestSequenceNumberPath =
getNodePath(rmDTSecretManagerRoot,
DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX + latestSequenceNumber);
if (LOG.isDebugEnabled()) {
LOG.debug("Storing " + DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX +
latestSequenceNumber);
}
if (dtSequenceNumberPath != null) {
opList.add(Op.delete(dtSequenceNumberPath, -1));
}
opList.add(Op.create(latestSequenceNumberPath, null, zkAcl,
CreateMode.PERSISTENT));
dtSequenceNumberPath = latestSequenceNumberPath;
doMultiWithRetries(opList);
}
@ -585,7 +670,7 @@ protected synchronized void storeRMDelegationTokenAndSequenceNumberState(
protected synchronized void removeRMDelegationTokenState(
RMDelegationTokenIdentifier rmDTIdentifier) throws Exception {
String nodeRemovePath =
getNodePath(rmDTSecretManagerRoot, DELEGATION_TOKEN_PREFIX
getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
+ rmDTIdentifier.getSequenceNumber());
if (LOG.isDebugEnabled()) {
LOG.debug("Removing RMDelegationToken_"
@ -598,7 +683,7 @@ protected synchronized void removeRMDelegationTokenState(
protected synchronized void storeRMDTMasterKeyState(
DelegationKey delegationKey) throws Exception {
String nodeCreatePath =
getNodePath(rmDTSecretManagerRoot, DELEGATION_KEY_PREFIX
getNodePath(dtMasterKeysRootPath, DELEGATION_KEY_PREFIX
+ delegationKey.getKeyId());
ByteArrayOutputStream os = new ByteArrayOutputStream();
DataOutputStream fsOut = new DataOutputStream(os);
@ -618,7 +703,7 @@ protected synchronized void storeRMDTMasterKeyState(
protected synchronized void removeRMDTMasterKeyState(
DelegationKey delegationKey) throws Exception {
String nodeRemovePath =
getNodePath(rmDTSecretManagerRoot, DELEGATION_KEY_PREFIX
getNodePath(dtMasterKeysRootPath, DELEGATION_KEY_PREFIX
+ delegationKey.getKeyId());
if (LOG.isDebugEnabled()) {
LOG.debug("Removing RMDelegationKey_" + delegationKey.getKeyId());
@ -757,8 +842,7 @@ public byte[] getDataWithRetries(final String path, final boolean watch)
return new ZKAction<byte[]>() {
@Override
public byte[] run() throws KeeperException, InterruptedException {
Stat stat = new Stat();
return zkClient.getData(path, watch, stat);
return zkClient.getData(path, watch, null);
}
}.runWithRetries();
}
@ -865,4 +949,5 @@ protected synchronized ZooKeeper getNewZooKeeper()
zk.register(new ForwardingWatcher());
return zk;
}
}

View File

@ -683,14 +683,14 @@ public void testRMRestartKilledAppWithNoAttempts() throws Exception {
MemoryRMStateStore memStore = new MemoryRMStateStore() {
@Override
public synchronized void storeApplicationAttemptStateInternal(
String attemptIdStr,
ApplicationAttemptId attemptId,
ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception {
// ignore attempt saving request.
}
@Override
public synchronized void updateApplicationAttemptStateInternal(
String attemptIdStr,
ApplicationAttemptId attemptId,
ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception {
// ignore attempt saving request.
}
@ -1540,7 +1540,7 @@ public class TestMemoryRMStateStore extends MemoryRMStateStore {
public int updateAttempt = 0;
@Override
public void updateApplicationStateInternal(String appId,
public void updateApplicationStateInternal(ApplicationId appId,
ApplicationStateDataPBImpl appStateData) throws Exception {
updateApp = ++count;
super.updateApplicationStateInternal(appId, appStateData);
@ -1548,11 +1548,12 @@ public void updateApplicationStateInternal(String appId,
@Override
public synchronized void
updateApplicationAttemptStateInternal(String attemptIdStr,
updateApplicationAttemptStateInternal(
ApplicationAttemptId attemptId,
ApplicationAttemptStateDataPBImpl attemptStateData)
throws Exception {
updateAttempt = ++count;
super.updateApplicationAttemptStateInternal(attemptIdStr,
super.updateApplicationAttemptStateInternal(attemptId,
attemptStateData);
}
}

View File

@ -234,6 +234,12 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper)
attempts.put(attemptIdRemoved, mockRemovedAttempt);
store.removeApplication(mockRemovedApp);
// remove application directory recursively.
storeApp(store, appIdRemoved, submitTime, startTime);
storeAttempt(store, attemptIdRemoved,
"container_1352994193343_0002_01_000001", null, null, dispatcher);
store.removeApplication(mockRemovedApp);
// let things settle down
Thread.sleep(1000);
store.close();
@ -373,7 +379,30 @@ public void testRMDTSecretManagerStateStore(
Assert.assertEquals(keySet, secretManagerState.getMasterKeyState());
Assert.assertEquals(sequenceNumber,
secretManagerState.getDTSequenceNumber());
// check to delete delegationKey
store.removeRMDTMasterKey(key);
keySet.clear();
RMDTSecretManagerState noKeySecretManagerState =
store.loadState().getRMDTSecretManagerState();
Assert.assertEquals(token1, noKeySecretManagerState.getTokenState());
Assert.assertEquals(keySet, noKeySecretManagerState.getMasterKeyState());
Assert.assertEquals(sequenceNumber,
noKeySecretManagerState.getDTSequenceNumber());
// check to delete delegationToken
store.removeRMDelegationToken(dtId1, sequenceNumber);
RMDTSecretManagerState noKeyAndTokenSecretManagerState =
store.loadState().getRMDTSecretManagerState();
token1.clear();
Assert.assertEquals(token1,
noKeyAndTokenSecretManagerState.getTokenState());
Assert.assertEquals(keySet,
noKeyAndTokenSecretManagerState.getMasterKeyState());
Assert.assertEquals(sequenceNumber,
noKeySecretManagerState.getDTSequenceNumber());
store.close();
}
private Token<AMRMTokenIdentifier> generateAMRMToken(

View File

@ -35,6 +35,7 @@
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
@ -178,10 +179,11 @@ public void testFSRMStateStoreClientRetry() throws Exception {
@Override
public void run() {
try {
store.storeApplicationStateInternal("application1",
(ApplicationStateDataPBImpl) ApplicationStateDataPBImpl
.newApplicationStateData(111, 111, "user", null,
RMAppState.ACCEPTED, "diagnostics", 333));
store.storeApplicationStateInternal(
ApplicationId.newInstance(100L, 1),
(ApplicationStateDataPBImpl) ApplicationStateDataPBImpl
.newApplicationStateData(111, 111, "user", null,
RMAppState.ACCEPTED, "diagnostics", 333));
} catch (Exception e) {
// TODO 0 datanode exception will not be retried by dfs client, fix
// that separately.