From dc84800184a58a26d370d9cc3ef094c20e687211 Mon Sep 17 00:00:00 2001 From: Zhijie Shen Date: Fri, 11 Apr 2014 03:36:36 +0000 Subject: [PATCH] YARN-1924. Made ZKRMStateStore updateApplication(Attempt)StateInternal work when Application(Attempt) state hasn't been stored before. Contributed by Jian He. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1586547 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 4 +++ .../recovery/FileSystemRMStateStore.java | 2 ++ .../recovery/RMStateStore.java | 4 +-- .../recovery/ZKRMStateStore.java | 21 +++++++++++++-- .../recovery/RMStateStoreTestBase.java | 27 +++++++++++++++++++ 5 files changed, 54 insertions(+), 4 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index c77cb3392c..417d6852a7 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -107,6 +107,10 @@ Release 2.4.1 - UNRELEASED YARN-1903. Set exit code and diagnostics when container is killed at NEW/LOCALIZING state. (Zhijie Shen via jianhe) + YARN-1924. Made ZKRMStateStore updateApplication(Attempt)StateInternal work + when Application(Attempt) state hasn't been stored before. (Jian He via + zjshen) + Release 2.4.0 - 2014-04-07 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java index cc25be77d4..1f6e175ced 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java @@ -538,6 +538,8 @@ public class FileSystemRMStateStore extends RMStateStore { protected void replaceFile(Path srcPath, Path dstPath) throws Exception { if (fs.exists(dstPath)) { deleteFile(dstPath); + } else { + LOG.info("File doesn't exist. Skip deleting the file " + dstPath); } fs.rename(srcPath, dstPath); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index 32a06c4953..fc4537c793 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -628,7 +628,7 @@ public abstract class RMStateStore extends AbstractService { notifyDoneUpdatingApplication(appId, storedException); } } catch (Exception e) { - LOG.error("Error storing app: " + appId, e); + LOG.error("Error storing/updating app: " + appId, e); notifyStoreOperationFailed(e); } } else if (event.getType().equals(RMStateStoreEventType.STORE_APP_ATTEMPT) @@ -679,7 +679,7 @@ public abstract class RMStateStore extends AbstractService { } } catch (Exception e) { LOG.error( - "Error storing appAttempt: " + attemptState.getAttemptId(), e); + "Error storing/updating appAttempt: " + attemptState.getAttemptId(), e); notifyStoreOperationFailed(e); } } else if (event.getType().equals(RMStateStoreEventType.REMOVE_APP)) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index fdfd6cd7b8..9f06b8572e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -566,7 +566,15 @@ public class ZKRMStateStore extends RMStateStore { + nodeUpdatePath); } byte[] appStateData = appStateDataPB.getProto().toByteArray(); - setDataWithRetries(nodeUpdatePath, appStateData, 0); + + if (zkClient.exists(nodeUpdatePath, true) != null) { + setDataWithRetries(nodeUpdatePath, appStateData, -1); + } else { + createWithRetries(nodeUpdatePath, appStateData, zkAcl, + CreateMode.PERSISTENT); + LOG.info(appId + " znode didn't exist. Created a new znode to" + + " update the application state."); + } } @Override @@ -601,7 +609,15 @@ public class ZKRMStateStore extends RMStateStore { + " at: " + nodeUpdatePath); } byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray(); - setDataWithRetries(nodeUpdatePath, attemptStateData, 0); + + if (zkClient.exists(nodeUpdatePath, true) != null) { + setDataWithRetries(nodeUpdatePath, attemptStateData, -1); + } else { + createWithRetries(nodeUpdatePath, attemptStateData, zkAcl, + CreateMode.PERSISTENT); + LOG.info(appAttemptId + " znode didn't exist. Created a new znode to" + + " update the application attempt state."); + } } @Override @@ -961,6 +977,7 @@ public class ZKRMStateStore extends RMStateStore { Thread.sleep(zkRetryInterval); continue; } + LOG.error("Error while doing ZK operation.", ke); throw ke; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java index 009e96e783..507e164eca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java @@ -310,6 +310,30 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{ "myTrackingUrl", "attemptDiagnostics", FinalApplicationStatus.SUCCEEDED); store.updateApplicationAttemptState(newAttemptState); + + // test updating the state of an app/attempt whose initial state was not + // saved. + ApplicationId dummyAppId = ApplicationId.newInstance(1234, 10); + ApplicationSubmissionContext dummyContext = + new ApplicationSubmissionContextPBImpl(); + dummyContext.setApplicationId(dummyAppId); + ApplicationState dummyApp = + new ApplicationState(appState.submitTime, appState.startTime, + dummyContext, appState.user, RMAppState.FINISHED, "appDiagnostics", + 1234); + store.updateApplicationState(dummyApp); + + ApplicationAttemptId dummyAttemptId = + ApplicationAttemptId.newInstance(dummyAppId, 6); + ApplicationAttemptState dummyAttempt = + new ApplicationAttemptState(dummyAttemptId, + oldAttemptState.getMasterContainer(), + oldAttemptState.getAppAttemptCredentials(), + oldAttemptState.getStartTime(), RMAppAttemptState.FINISHED, + "myTrackingUrl", "attemptDiagnostics", + FinalApplicationStatus.SUCCEEDED); + store.updateApplicationAttemptState(dummyAttempt); + // let things settle down Thread.sleep(1000); store.close(); @@ -320,6 +344,7 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{ RMState newRMState = store.loadState(); Map newRMAppState = newRMState.getApplicationState(); + assertNotNull(newRMAppState.get(dummyApp.getAppId())); ApplicationState updatedAppState = newRMAppState.get(appId1); assertEquals(appState.getAppId(),updatedAppState.getAppId()); assertEquals(appState.getSubmitTime(), updatedAppState.getSubmitTime()); @@ -331,6 +356,8 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{ assertEquals(1234, updatedAppState.getFinishTime()); // check updated attempt state + assertNotNull(newRMAppState.get(dummyApp.getAppId()).getAttempt( + dummyAttemptId)); ApplicationAttemptState updatedAttemptState = updatedAppState.getAttempt(newAttemptState.getAttemptId()); assertEquals(oldAttemptState.getAttemptId(),