YARN-1924. Made ZKRMStateStore updateApplication(Attempt)StateInternal work when Application(Attempt) state hasn't been stored before. Contributed by Jian He.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1586547 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhijie Shen 2014-04-11 03:36:36 +00:00
parent ed78328d50
commit dc84800184
5 changed files with 54 additions and 4 deletions

View File

@ -107,6 +107,10 @@ Release 2.4.1 - UNRELEASED
YARN-1903. Set exit code and diagnostics when container is killed at YARN-1903. Set exit code and diagnostics when container is killed at
NEW/LOCALIZING state. (Zhijie Shen via jianhe) NEW/LOCALIZING state. (Zhijie Shen via jianhe)
YARN-1924. Made ZKRMStateStore updateApplication(Attempt)StateInternal work
when Application(Attempt) state hasn't been stored before. (Jian He via
zjshen)
Release 2.4.0 - 2014-04-07 Release 2.4.0 - 2014-04-07
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -538,6 +538,8 @@ public class FileSystemRMStateStore extends RMStateStore {
protected void replaceFile(Path srcPath, Path dstPath) throws Exception { protected void replaceFile(Path srcPath, Path dstPath) throws Exception {
if (fs.exists(dstPath)) { if (fs.exists(dstPath)) {
deleteFile(dstPath); deleteFile(dstPath);
} else {
LOG.info("File doesn't exist. Skip deleting the file " + dstPath);
} }
fs.rename(srcPath, dstPath); fs.rename(srcPath, dstPath);
} }

View File

@ -628,7 +628,7 @@ public abstract class RMStateStore extends AbstractService {
notifyDoneUpdatingApplication(appId, storedException); notifyDoneUpdatingApplication(appId, storedException);
} }
} catch (Exception e) { } catch (Exception e) {
LOG.error("Error storing app: " + appId, e); LOG.error("Error storing/updating app: " + appId, e);
notifyStoreOperationFailed(e); notifyStoreOperationFailed(e);
} }
} else if (event.getType().equals(RMStateStoreEventType.STORE_APP_ATTEMPT) } else if (event.getType().equals(RMStateStoreEventType.STORE_APP_ATTEMPT)
@ -679,7 +679,7 @@ public abstract class RMStateStore extends AbstractService {
} }
} catch (Exception e) { } catch (Exception e) {
LOG.error( LOG.error(
"Error storing appAttempt: " + attemptState.getAttemptId(), e); "Error storing/updating appAttempt: " + attemptState.getAttemptId(), e);
notifyStoreOperationFailed(e); notifyStoreOperationFailed(e);
} }
} else if (event.getType().equals(RMStateStoreEventType.REMOVE_APP)) { } else if (event.getType().equals(RMStateStoreEventType.REMOVE_APP)) {

View File

@ -566,7 +566,15 @@ public class ZKRMStateStore extends RMStateStore {
+ nodeUpdatePath); + nodeUpdatePath);
} }
byte[] appStateData = appStateDataPB.getProto().toByteArray(); byte[] appStateData = appStateDataPB.getProto().toByteArray();
setDataWithRetries(nodeUpdatePath, appStateData, 0);
if (zkClient.exists(nodeUpdatePath, true) != null) {
setDataWithRetries(nodeUpdatePath, appStateData, -1);
} else {
createWithRetries(nodeUpdatePath, appStateData, zkAcl,
CreateMode.PERSISTENT);
LOG.info(appId + " znode didn't exist. Created a new znode to"
+ " update the application state.");
}
} }
@Override @Override
@ -601,7 +609,15 @@ public class ZKRMStateStore extends RMStateStore {
+ " at: " + nodeUpdatePath); + " at: " + nodeUpdatePath);
} }
byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray(); byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
setDataWithRetries(nodeUpdatePath, attemptStateData, 0);
if (zkClient.exists(nodeUpdatePath, true) != null) {
setDataWithRetries(nodeUpdatePath, attemptStateData, -1);
} else {
createWithRetries(nodeUpdatePath, attemptStateData, zkAcl,
CreateMode.PERSISTENT);
LOG.info(appAttemptId + " znode didn't exist. Created a new znode to"
+ " update the application attempt state.");
}
} }
@Override @Override
@ -961,6 +977,7 @@ public class ZKRMStateStore extends RMStateStore {
Thread.sleep(zkRetryInterval); Thread.sleep(zkRetryInterval);
continue; continue;
} }
LOG.error("Error while doing ZK operation.", ke);
throw ke; throw ke;
} }
} }

View File

@ -310,6 +310,30 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
"myTrackingUrl", "attemptDiagnostics", "myTrackingUrl", "attemptDiagnostics",
FinalApplicationStatus.SUCCEEDED); FinalApplicationStatus.SUCCEEDED);
store.updateApplicationAttemptState(newAttemptState); store.updateApplicationAttemptState(newAttemptState);
// test updating the state of an app/attempt whose initial state was not
// saved.
ApplicationId dummyAppId = ApplicationId.newInstance(1234, 10);
ApplicationSubmissionContext dummyContext =
new ApplicationSubmissionContextPBImpl();
dummyContext.setApplicationId(dummyAppId);
ApplicationState dummyApp =
new ApplicationState(appState.submitTime, appState.startTime,
dummyContext, appState.user, RMAppState.FINISHED, "appDiagnostics",
1234);
store.updateApplicationState(dummyApp);
ApplicationAttemptId dummyAttemptId =
ApplicationAttemptId.newInstance(dummyAppId, 6);
ApplicationAttemptState dummyAttempt =
new ApplicationAttemptState(dummyAttemptId,
oldAttemptState.getMasterContainer(),
oldAttemptState.getAppAttemptCredentials(),
oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED,
"myTrackingUrl", "attemptDiagnostics",
FinalApplicationStatus.SUCCEEDED);
store.updateApplicationAttemptState(dummyAttempt);
// let things settle down // let things settle down
Thread.sleep(1000); Thread.sleep(1000);
store.close(); store.close();
@ -320,6 +344,7 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
RMState newRMState = store.loadState(); RMState newRMState = store.loadState();
Map<ApplicationId, ApplicationState> newRMAppState = Map<ApplicationId, ApplicationState> newRMAppState =
newRMState.getApplicationState(); newRMState.getApplicationState();
assertNotNull(newRMAppState.get(dummyApp.getAppId()));
ApplicationState updatedAppState = newRMAppState.get(appId1); ApplicationState updatedAppState = newRMAppState.get(appId1);
assertEquals(appState.getAppId(),updatedAppState.getAppId()); assertEquals(appState.getAppId(),updatedAppState.getAppId());
assertEquals(appState.getSubmitTime(), updatedAppState.getSubmitTime()); assertEquals(appState.getSubmitTime(), updatedAppState.getSubmitTime());
@ -331,6 +356,8 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
assertEquals(1234, updatedAppState.getFinishTime()); assertEquals(1234, updatedAppState.getFinishTime());
// check updated attempt state // check updated attempt state
assertNotNull(newRMAppState.get(dummyApp.getAppId()).getAttempt(
dummyAttemptId));
ApplicationAttemptState updatedAttemptState = ApplicationAttemptState updatedAttemptState =
updatedAppState.getAttempt(newAttemptState.getAttemptId()); updatedAppState.getAttempt(newAttemptState.getAttemptId());
assertEquals(oldAttemptState.getAttemptId(), assertEquals(oldAttemptState.getAttemptId(),