diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index a81b95ac92..4dde899c87 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -148,6 +148,9 @@ Release 2.4.0 - UNRELEASED
YARN-807. When querying apps by queue, iterating over all apps is
inefficient and limiting (Sandy Ryza)
+ YARN-1378. Implemented a cleaner of old finished applications from the RM
+ state-store. (Jian He via vinodkv)
+
OPTIMIZATIONS
BUG FIXES
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 0480cab7d2..839765c04a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -342,7 +342,16 @@ public class YarnConfiguration extends Configuration {
public static final String RM_MAX_COMPLETED_APPLICATIONS =
RM_PREFIX + "max-completed-applications";
public static final int DEFAULT_RM_MAX_COMPLETED_APPLICATIONS = 10000;
-
+
+ /**
+ * The maximum number of completed applications RM state store keeps, by
+ * default equals to DEFAULT_RM_MAX_COMPLETED_APPLICATIONS
+ */
+ public static final String RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS =
+ RM_PREFIX + "state-store.max-completed-applications";
+ public static final int DEFAULT_RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS =
+ DEFAULT_RM_MAX_COMPLETED_APPLICATIONS;
+
/** Default application name */
public static final String DEFAULT_APPLICATION_NAME = "N/A";
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index d798f4c976..c43dc1a444 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -275,6 +275,21 @@
org.apache.hadoop.yarn.server.resourcemanager.recovery.FileSystemRMStateStore
+
+ The maximum number of completed applications RM state
+ store keeps, less than or equals to ${yarn.resourcemanager.max-completed-applications}.
+ By default, it equals to ${yarn.resourcemanager.max-completed-applications}.
+ This ensures that the applications kept in the state store are consistent with
+ the applications remembered in RM memory.
+ Any values larger than ${yarn.resourcemanager.max-completed-applications} will
+ be reset to ${yarn.resourcemanager.max-completed-applications}.
+ Note that this value impacts the RM recovery performance.Typically,
+ a smaller value indicates better performance on RM recovery.
+
+ yarn.resourcemanager.state-store.max-completed-applications
+ ${yarn.resourcemanager.max-completed-applications}
+
+
Host:Port of the ZooKeeper server where RM state will
be stored. This must be supplied when using
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
index 4dfa3ba6d2..7dbc4cc280 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
@@ -65,7 +65,9 @@ public class RMAppManager implements EventHandler,
private static final Log LOG = LogFactory.getLog(RMAppManager.class);
- private int completedAppsMax = YarnConfiguration.DEFAULT_RM_MAX_COMPLETED_APPLICATIONS;
+ private int maxCompletedAppsInMemory;
+ private int maxCompletedAppsInStateStore;
+ protected int completedAppsInStateStore = 0;
private LinkedList completedApps = new LinkedList();
private final RMContext rmContext;
@@ -82,9 +84,16 @@ public RMAppManager(RMContext context,
this.masterService = masterService;
this.applicationACLsManager = applicationACLsManager;
this.conf = conf;
- setCompletedAppsMax(conf.getInt(
+ this.maxCompletedAppsInMemory = conf.getInt(
YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS,
- YarnConfiguration.DEFAULT_RM_MAX_COMPLETED_APPLICATIONS));
+ YarnConfiguration.DEFAULT_RM_MAX_COMPLETED_APPLICATIONS);
+ this.maxCompletedAppsInStateStore =
+ conf.getInt(
+ YarnConfiguration.RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS,
+ YarnConfiguration.DEFAULT_RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS);
+ if (this.maxCompletedAppsInStateStore > this.maxCompletedAppsInMemory) {
+ this.maxCompletedAppsInStateStore = this.maxCompletedAppsInMemory;
+ }
}
/**
@@ -173,10 +182,6 @@ public void logApplicationSummary(ApplicationId appId) {
ApplicationSummary.logAppSummary(rmContext.getRMApps().get(appId));
}
- protected synchronized void setCompletedAppsMax(int max) {
- this.completedAppsMax = max;
- }
-
protected synchronized int getCompletedAppsListSize() {
return this.completedApps.size();
}
@@ -190,7 +195,8 @@ protected synchronized void finishApplication(ApplicationId applicationId) {
rmContext.getDelegationTokenRenewer().applicationFinished(applicationId);
}
- completedApps.add(applicationId);
+ completedApps.add(applicationId);
+ completedAppsInStateStore++;
writeAuditLog(applicationId);
}
}
@@ -229,15 +235,31 @@ protected void writeAuditLog(ApplicationId appId) {
* check to see if hit the limit for max # completed apps kept
*/
protected synchronized void checkAppNumCompletedLimit() {
- while (completedApps.size() > this.completedAppsMax) {
- ApplicationId removeId = completedApps.remove();
- LOG.info("Application should be expired, max # apps"
- + " met. Removing app: " + removeId);
+ // check apps kept in state store.
+ while (completedAppsInStateStore > this.maxCompletedAppsInStateStore) {
+ ApplicationId removeId =
+ completedApps.get(completedApps.size() - completedAppsInStateStore);
+ RMApp removeApp = rmContext.getRMApps().get(removeId);
+ LOG.info("Max number of completed apps kept in state store met:"
+ + " maxCompletedAppsInStateStore = " + maxCompletedAppsInStateStore
+ + ", removing app " + removeApp.getApplicationId()
+ + " from state store.");
+ rmContext.getStateStore().removeApplication(removeApp);
+ completedAppsInStateStore--;
+ }
+
+ // check apps kept in memorty.
+ while (completedApps.size() > this.maxCompletedAppsInMemory) {
+ ApplicationId removeId = completedApps.remove();
+ LOG.info("Application should be expired, max number of completed apps"
+ + " kept in memory met: maxCompletedAppsInMemory = "
+ + this.maxCompletedAppsInMemory + ", removing app " + removeId
+ + " from memory: ");
rmContext.getRMApps().remove(removeId);
this.applicationACLsManager.removeApplication(removeId);
}
}
-
+
@SuppressWarnings("unchecked")
protected void submitApplication(
ApplicationSubmissionContext submissionContext, long submitTime,
@@ -380,8 +402,6 @@ public void recover(RMState state) throws Exception {
Map appStates = state.getApplicationState();
LOG.info("Recovering " + appStates.size() + " applications");
for (ApplicationState appState : appStates.values()) {
- LOG.info("Recovering application " + appState.getAppId());
-
submitApplication(appState.getApplicationSubmissionContext(),
appState.getSubmitTime(), appState.getUser(), true, state);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
index 23cefd334d..88b1a90bf6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
@@ -167,7 +167,9 @@ private void loadRMAppState(RMState rmState) throws Exception {
readFile(childNodeStatus.getPath(), childNodeStatus.getLen());
if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
// application
- LOG.info("Loading application from node: " + childNodeName);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Loading application from node: " + childNodeName);
+ }
ApplicationId appId = ConverterUtils.toApplicationId(childNodeName);
ApplicationStateDataPBImpl appStateData =
new ApplicationStateDataPBImpl(
@@ -185,7 +187,10 @@ private void loadRMAppState(RMState rmState) throws Exception {
} else if (childNodeName
.startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) {
// attempt
- LOG.info("Loading application attempt from node: " + childNodeName);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Loading application attempt from node: "
+ + childNodeName);
+ }
ApplicationAttemptId attemptId =
ConverterUtils.toApplicationAttemptId(childNodeName);
ApplicationAttemptStateDataPBImpl attemptStateData =
@@ -225,6 +230,7 @@ private void loadRMAppState(RMState rmState) throws Exception {
assert appState != null;
appState.attempts.put(attemptState.getAttemptId(), attemptState);
}
+ LOG.info("Done Loading applications from FS state store");
} catch (Exception e) {
LOG.error("Failed to load state.", e);
throw e;
@@ -362,7 +368,7 @@ public synchronized void updateApplicationAttemptStateInternal(
}
@Override
- public synchronized void removeApplicationState(ApplicationState appState)
+ public synchronized void removeApplicationStateInternal(ApplicationState appState)
throws Exception {
String appId = appState.getAppId().toString();
Path nodeRemovePath = getAppDir(rmAppRoot, appId);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
index d5ff5ededd..961bec3165 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
@@ -171,8 +171,8 @@ public synchronized void updateApplicationAttemptStateInternal(
}
@Override
- public synchronized void removeApplicationState(ApplicationState appState)
- throws Exception {
+ public synchronized void removeApplicationStateInternal(
+ ApplicationState appState) throws Exception {
ApplicationId appId = appState.getAppId();
ApplicationState removed = state.appState.remove(appId);
if (removed == null) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java
index c212c1fe85..3098b260b7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java
@@ -63,7 +63,7 @@ protected void storeApplicationAttemptStateInternal(String attemptId,
}
@Override
- protected void removeApplicationState(ApplicationState appState)
+ protected void removeApplicationStateInternal(ApplicationState appState)
throws Exception {
// Do nothing
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
index 5e0e94429c..a8452642ce 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
@@ -53,7 +53,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNewSavedEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRemovedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppUpdateSavedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -519,6 +518,7 @@ protected abstract void removeRMDTMasterKeyState(DelegationKey delegationKey)
* This does not block the dispatcher threads
* There is no notification of completion for this operation.
*/
+ @SuppressWarnings("unchecked")
public synchronized void removeApplication(RMApp app) {
ApplicationState appState = new ApplicationState(
app.getSubmitTime(), app.getStartTime(),
@@ -532,14 +532,6 @@ public synchronized void removeApplication(RMApp app) {
appState.attempts.put(attemptState.getAttemptId(), attemptState);
}
- removeApplication(appState);
- }
-
- @SuppressWarnings("unchecked")
- /**
- * Non-Blocking API
- */
- public synchronized void removeApplication(ApplicationState appState) {
dispatcher.getEventHandler().handle(new RMStateStoreRemoveAppEvent(appState));
}
@@ -548,8 +540,8 @@ public synchronized void removeApplication(ApplicationState appState) {
* Derived classes must implement this method to remove the state of an
* application and its attempts
*/
- protected abstract void removeApplicationState(ApplicationState appState)
- throws Exception;
+ protected abstract void removeApplicationStateInternal(
+ ApplicationState appState) throws Exception;
// TODO: This should eventually become cluster-Id + "AM_RM_TOKEN_SERVICE". See
// YARN-986
@@ -666,11 +658,9 @@ protected void handleStoreEvent(RMStateStoreEvent event) {
ApplicationState appState =
((RMStateStoreRemoveAppEvent) event).getAppState();
ApplicationId appId = appState.getAppId();
- Exception removedException = null;
LOG.info("Removing info for app: " + appId);
try {
- removeApplicationState(appState);
- notifyDoneRemovingApplcation(appId, removedException);
+ removeApplicationStateInternal(appState);
} catch (Exception e) {
LOG.error("Error removing app: " + appId, e);
notifyStoreOperationFailed(e);
@@ -738,17 +728,6 @@ private void notifyDoneUpdatingApplicationAttempt(ApplicationAttemptId attemptId
new RMAppAttemptUpdateSavedEvent(attemptId, updatedException));
}
- @SuppressWarnings("unchecked")
- /**
- * This is to notify RMApp that this application has been removed from
- * RMStateStore
- */
- private void notifyDoneRemovingApplcation(ApplicationId appId,
- Exception removedException) {
- rmDispatcher.getEventHandler().handle(
- new RMAppRemovedEvent(appId, removedException));
- }
-
/**
* EventHandler implementation which forward events to the FSRMStateStore
* This hides the EventHandle methods of the store from its public interface
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
index f419ff0988..87377814c8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
@@ -392,7 +392,9 @@ private synchronized void loadRMAppState(RMState rmState) throws Exception {
byte[] childData = getDataWithRetries(childNodePath, true);
if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
// application
- LOG.info("Loading application from znode: " + childNodeName);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Loading application from znode: " + childNodeName);
+ }
ApplicationId appId = ConverterUtils.toApplicationId(childNodeName);
ApplicationStateDataPBImpl appStateData =
new ApplicationStateDataPBImpl(
@@ -412,7 +414,9 @@ private synchronized void loadRMAppState(RMState rmState) throws Exception {
} else if (childNodeName
.startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) {
// attempt
- LOG.info("Loading application attempt from znode: " + childNodeName);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Loading application attempt from znode: " + childNodeName);
+ }
ApplicationAttemptId attemptId =
ConverterUtils.toApplicationAttemptId(childNodeName);
ApplicationAttemptStateDataPBImpl attemptStateData =
@@ -456,10 +460,10 @@ private synchronized void loadRMAppState(RMState rmState) throws Exception {
LOG.info("Application node not found for attempt: "
+ attemptState.getAttemptId());
deleteWithRetries(
- getNodePath(rmAppRoot, attemptState.getAttemptId().toString()),
- 0);
+ getNodePath(rmAppRoot, attemptState.getAttemptId().toString()), -1);
}
}
+ LOG.info("Done Loading applications from ZK state store");
}
@Override
@@ -517,16 +521,16 @@ public synchronized void updateApplicationAttemptStateInternal(
}
@Override
- public synchronized void removeApplicationState(ApplicationState appState)
+ public synchronized void removeApplicationStateInternal(ApplicationState appState)
throws Exception {
String appId = appState.getAppId().toString();
String nodeRemovePath = getNodePath(rmAppRoot, appId);
ArrayList opList = new ArrayList();
- opList.add(Op.delete(nodeRemovePath, 0));
+ opList.add(Op.delete(nodeRemovePath, -1));
for (ApplicationAttemptId attemptId : appState.attempts.keySet()) {
String attemptRemovePath = getNodePath(rmAppRoot, attemptId.toString());
- opList.add(Op.delete(attemptRemovePath, 0));
+ opList.add(Op.delete(attemptRemovePath, -1));
}
if (LOG.isDebugEnabled()) {
LOG.debug("Removing info for app: " + appId + " at: " + nodeRemovePath
@@ -569,7 +573,7 @@ protected synchronized void storeRMDelegationTokenAndSequenceNumberState(
}
if (dtSequenceNumberPath != null) {
- opList.add(Op.delete(dtSequenceNumberPath, 0));
+ opList.add(Op.delete(dtSequenceNumberPath, -1));
}
opList.add(Op.create(latestSequenceNumberPath, null, zkAcl,
CreateMode.PERSISTENT));
@@ -587,7 +591,7 @@ protected synchronized void removeRMDelegationTokenState(
LOG.debug("Removing RMDelegationToken_"
+ rmDTIdentifier.getSequenceNumber());
}
- deleteWithRetries(nodeRemovePath, 0);
+ deleteWithRetries(nodeRemovePath, -1);
}
@Override
@@ -619,7 +623,7 @@ protected synchronized void removeRMDTMasterKeyState(
if (LOG.isDebugEnabled()) {
LOG.debug("Removing RMDelegationKey_" + delegationKey.getKeyId());
}
- deleteWithRetries(nodeRemovePath, 0);
+ deleteWithRetries(nodeRemovePath, -1);
}
// ZK related code
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index 76d59ec960..5a70cc2116 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -660,32 +660,34 @@ public void transition(RMAppImpl app, RMAppEvent event) {
@SuppressWarnings("unchecked")
private static final class RMAppRecoveredTransition implements
MultipleArcTransition {
-
+
@Override
public RMAppState transition(RMAppImpl app, RMAppEvent event) {
-
- if (app.attempts.isEmpty()) {
- // Saved application was not running any attempts.
- app.createNewAttempt(true);
- return RMAppState.SUBMITTED;
- } else {
- /*
- * If last attempt recovered final state is null .. it means attempt
- * was started but AM container may or may not have started / finished.
- * Therefore we should wait for it to finish.
- */
- for (RMAppAttempt attempt : app.getAppAttempts().values()) {
- app.dispatcher.getEventHandler().handle(
- new RMAppAttemptEvent(attempt.getAppAttemptId(),
- RMAppAttemptEventType.RECOVER));
- }
- if (app.recoveredFinalState != null) {
- FINAL_TRANSITION.transition(app, event);
- return app.recoveredFinalState;
- } else {
- return RMAppState.RUNNING;
- }
+ /*
+ * If last attempt recovered final state is null .. it means attempt was
+ * started but AM container may or may not have started / finished.
+ * Therefore we should wait for it to finish.
+ */
+ for (RMAppAttempt attempt : app.getAppAttempts().values()) {
+ app.dispatcher.getEventHandler().handle(
+ new RMAppAttemptEvent(attempt.getAppAttemptId(),
+ RMAppAttemptEventType.RECOVER));
}
+
+ // The app has completed.
+ if (app.recoveredFinalState != null) {
+ FINAL_TRANSITION.transition(app, event);
+ return app.recoveredFinalState;
+ }
+
+ // No existent attempts means the attempt associated with this app was not
+ // started or started but not yet saved。
+ if (app.attempts.isEmpty()) {
+ app.createNewAttempt(true);
+ return RMAppState.SUBMITTED;
+ }
+
+ return RMAppState.RUNNING;
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppRemovedEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppRemovedEvent.java
deleted file mode 100644
index a030dbe18b..0000000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppRemovedEvent.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
-
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-
-public class RMAppRemovedEvent extends RMAppEvent {
-
- private final Exception removedException;
-
- public RMAppRemovedEvent(ApplicationId appId, Exception removedException) {
- super(appId, RMAppEventType.APP_REMOVED);
- this.removedException = removedException;
- }
-
- public Exception getRemovedException() {
- return removedException;
- }
-
-}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
index ffa021f6b0..67a2e41458 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
@@ -76,14 +76,13 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFinishedAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRejectedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
@@ -675,9 +674,8 @@ public void recover(RMState state) throws Exception {
ApplicationAttemptState attemptState =
appState.getAttempt(getAppAttemptId());
assert attemptState != null;
- LOG.info("Recovered attempt: AppId: "
- + getAppAttemptId().getApplicationId() + " AttemptId: "
- + getAppAttemptId() + " MasterContainer: " + masterContainer);
+ LOG.info("Recovering attempt: " + getAppAttemptId() + " with final state: "
+ + attemptState.getState());
diagnostics.append("Attempt recovered after RM restart");
diagnostics.append(attemptState.getDiagnostics());
setMasterContainer(attemptState.getMasterContainer());
@@ -856,8 +854,6 @@ private static class AttemptRecoveredTransition
@Override
public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) {
- LOG.info("Recovering attempt : recoverdFinalState :"
- + appAttempt.recoveredFinalState);
if (appAttempt.recoveredFinalState != null) {
appAttempt.progress = 1.0f;
RMApp rmApp =appAttempt.rmContext.getRMApps().get(
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
index 8fe8de4547..0a3738200e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
@@ -19,8 +19,12 @@
package org.apache.hadoop.yarn.server.resourcemanager;
+import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.times;
import java.util.HashMap;
import java.util.List;
@@ -43,6 +47,7 @@
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
@@ -99,7 +104,7 @@ public static RMContext mockRMContext(int n, long time) {
rmDispatcher);
AMLivelinessMonitor amFinishingMonitor = new AMLivelinessMonitor(
rmDispatcher);
- return new RMContextImpl(rmDispatcher,
+ RMContext context = new RMContextImpl(rmDispatcher,
containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
null, null, null, null, null) {
@Override
@@ -107,6 +112,8 @@ public ConcurrentMap getRMApps() {
return map;
}
};
+ ((RMContextImpl)context).setStateStore(mock(RMStateStore.class));
+ return context;
}
public class TestAppManagerDispatcher implements
@@ -142,7 +149,6 @@ public class TestRMAppManager extends RMAppManager {
public TestRMAppManager(RMContext context, Configuration conf) {
super(context, null, null, new ApplicationACLsManager(conf), conf);
- setCompletedAppsMax(YarnConfiguration.DEFAULT_RM_MAX_COMPLETED_APPLICATIONS);
}
public TestRMAppManager(RMContext context,
@@ -150,7 +156,6 @@ public TestRMAppManager(RMContext context,
YarnScheduler scheduler, ApplicationMasterService masterService,
ApplicationACLsManager applicationACLsManager, Configuration conf) {
super(context, scheduler, masterService, applicationACLsManager, conf);
- setCompletedAppsMax(YarnConfiguration.DEFAULT_RM_MAX_COMPLETED_APPLICATIONS);
}
public void checkAppNumCompletedLimit() {
@@ -164,9 +169,8 @@ public void finishApplication(ApplicationId appId) {
public int getCompletedAppsListSize() {
return super.getCompletedAppsListSize();
}
-
- public void setCompletedAppsMax(int max) {
- super.setCompletedAppsMax(max);
+ public int getCompletedAppsInStateStore() {
+ return this.completedAppsInStateStore;
}
public void submitApplication(
ApplicationSubmissionContext submissionContext, String user)
@@ -227,9 +231,9 @@ public void testRMAppRetireNone() throws Exception {
// Create such that none of the applications will retire since
// haven't hit max #
RMContext rmContext = mockRMContext(10, now - 10);
- TestRMAppManager appMonitor = new TestRMAppManager(rmContext, new Configuration());
-
- appMonitor.setCompletedAppsMax(10);
+ Configuration conf = new YarnConfiguration();
+ conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 10);
+ TestRMAppManager appMonitor = new TestRMAppManager(rmContext,conf);
Assert.assertEquals("Number of apps incorrect before checkAppTimeLimit",
10, rmContext.getRMApps().size());
@@ -243,6 +247,8 @@ public void testRMAppRetireNone() throws Exception {
rmContext.getRMApps().size());
Assert.assertEquals("Number of completed apps incorrect after check", 10,
appMonitor.getCompletedAppsListSize());
+ verify(rmContext.getStateStore(), never()).removeApplication(
+ isA(RMApp.class));
}
@Test
@@ -250,9 +256,10 @@ public void testRMAppRetireSome() throws Exception {
long now = System.currentTimeMillis();
RMContext rmContext = mockRMContext(10, now - 20000);
- TestRMAppManager appMonitor = new TestRMAppManager(rmContext, new Configuration());
-
- appMonitor.setCompletedAppsMax(3);
+ Configuration conf = new YarnConfiguration();
+ conf.setInt(YarnConfiguration.RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS, 3);
+ conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 3);
+ TestRMAppManager appMonitor = new TestRMAppManager(rmContext, conf);
Assert.assertEquals("Number of apps incorrect before", 10, rmContext
.getRMApps().size());
@@ -266,6 +273,8 @@ public void testRMAppRetireSome() throws Exception {
rmContext.getRMApps().size());
Assert.assertEquals("Number of completed apps incorrect after check", 3,
appMonitor.getCompletedAppsListSize());
+ verify(rmContext.getStateStore(), times(7)).removeApplication(
+ isA(RMApp.class));
}
@Test
@@ -274,14 +283,17 @@ public void testRMAppRetireSomeDifferentStates() throws Exception {
// these parameters don't matter, override applications below
RMContext rmContext = mockRMContext(10, now - 20000);
- TestRMAppManager appMonitor = new TestRMAppManager(rmContext, new Configuration());
+ Configuration conf = new YarnConfiguration();
+ conf.setInt(YarnConfiguration.RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS, 2);
+ conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 2);
- appMonitor.setCompletedAppsMax(2);
+ TestRMAppManager appMonitor = new TestRMAppManager(rmContext, conf);
// clear out applications map
rmContext.getRMApps().clear();
Assert.assertEquals("map isn't empty", 0, rmContext.getRMApps().size());
+ // 6 applications are in final state, 4 are not in final state.
// / set with various finished states
RMApp app = new MockRMApp(0, now - 20000, RMAppState.KILLED);
rmContext.getRMApps().put(app.getApplicationId(), app);
@@ -318,7 +330,9 @@ public void testRMAppRetireSomeDifferentStates() throws Exception {
rmContext.getRMApps().size());
Assert.assertEquals("Number of completed apps incorrect after check", 2,
appMonitor.getCompletedAppsListSize());
-
+ // 6 applications in final state, 4 of them are removed
+ verify(rmContext.getStateStore(), times(4)).removeApplication(
+ isA(RMApp.class));
}
@Test
@@ -342,14 +356,13 @@ public void testRMAppRetireZeroSetting() throws Exception {
long now = System.currentTimeMillis();
RMContext rmContext = mockRMContext(10, now - 20000);
- TestRMAppManager appMonitor = new TestRMAppManager(rmContext, new Configuration());
-
+ Configuration conf = new YarnConfiguration();
+ conf.setInt(YarnConfiguration.RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS, 0);
+ conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 0);
+ TestRMAppManager appMonitor = new TestRMAppManager(rmContext, conf);
Assert.assertEquals("Number of apps incorrect before", 10, rmContext
.getRMApps().size());
- // test with 0
- appMonitor.setCompletedAppsMax(0);
-
addToCompletedApps(appMonitor, rmContext);
Assert.assertEquals("Number of completed apps incorrect", 10,
appMonitor.getCompletedAppsListSize());
@@ -360,6 +373,64 @@ public void testRMAppRetireZeroSetting() throws Exception {
rmContext.getRMApps().size());
Assert.assertEquals("Number of completed apps incorrect after check", 0,
appMonitor.getCompletedAppsListSize());
+ verify(rmContext.getStateStore(), times(10)).removeApplication(
+ isA(RMApp.class));
+ }
+
+ @Test
+ public void testStateStoreAppLimitLessThanMemoryAppLimit() {
+ long now = System.currentTimeMillis();
+ RMContext rmContext = mockRMContext(10, now - 20000);
+ Configuration conf = new YarnConfiguration();
+ int maxAppsInMemory = 8;
+ int maxAppsInStateStore = 4;
+ conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, maxAppsInMemory);
+ conf.setInt(YarnConfiguration.RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS,
+ maxAppsInStateStore);
+ TestRMAppManager appMonitor = new TestRMAppManager(rmContext, conf);
+
+ addToCompletedApps(appMonitor, rmContext);
+ Assert.assertEquals("Number of completed apps incorrect", 10,
+ appMonitor.getCompletedAppsListSize());
+ appMonitor.checkAppNumCompletedLimit();
+
+ Assert.assertEquals("Number of apps incorrect after # completed check",
+ maxAppsInMemory, rmContext.getRMApps().size());
+ Assert.assertEquals("Number of completed apps incorrect after check",
+ maxAppsInMemory, appMonitor.getCompletedAppsListSize());
+
+ int numRemoveAppsFromStateStore = 10 - maxAppsInStateStore;
+ verify(rmContext.getStateStore(), times(numRemoveAppsFromStateStore))
+ .removeApplication(isA(RMApp.class));
+ Assert.assertEquals(maxAppsInStateStore,
+ appMonitor.getCompletedAppsInStateStore());
+ }
+
+ @Test
+ public void testStateStoreAppLimitLargerThanMemoryAppLimit() {
+ long now = System.currentTimeMillis();
+ RMContext rmContext = mockRMContext(10, now - 20000);
+ Configuration conf = new YarnConfiguration();
+ int maxAppsInMemory = 8;
+ conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, maxAppsInMemory);
+ // larger than maxCompletedAppsInMemory, reset to RM_MAX_COMPLETED_APPLICATIONS.
+ conf.setInt(YarnConfiguration.RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS, 1000);
+ TestRMAppManager appMonitor = new TestRMAppManager(rmContext, conf);
+
+ addToCompletedApps(appMonitor, rmContext);
+ Assert.assertEquals("Number of completed apps incorrect", 10,
+ appMonitor.getCompletedAppsListSize());
+ appMonitor.checkAppNumCompletedLimit();
+
+ int numRemoveApps = 10 - maxAppsInMemory;
+ Assert.assertEquals("Number of apps incorrect after # completed check",
+ maxAppsInMemory, rmContext.getRMApps().size());
+ Assert.assertEquals("Number of completed apps incorrect after check",
+ maxAppsInMemory, appMonitor.getCompletedAppsListSize());
+ verify(rmContext.getStateStore(), times(numRemoveApps)).removeApplication(
+ isA(RMApp.class));
+ Assert.assertEquals(maxAppsInMemory,
+ appMonitor.getCompletedAppsInStateStore());
}
protected void setupDispatcher(RMContext rmContext, Configuration conf) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
index 38f8542bde..acedd6e9ea 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
@@ -80,6 +80,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
@@ -423,6 +424,8 @@ public void testRMRestartAppRunningAMFailed() throws Exception {
rm2.getRMContext().getRMApps().get(app0.getApplicationId());
Assert.assertEquals(RMAppAttemptState.FAILED, recoveredApp
.getAppAttempts().get(am0.getApplicationAttemptId()).getAppAttemptState());
+ rm1.stop();
+ rm2.stop();
}
@Test
@@ -629,6 +632,8 @@ public void testRMRestartFailedApp() throws Exception {
.contains("Failing the application."));
// failed diagnostics from attempt is lost because the diagnostics from
// attempt is not yet available by the time app is saving the app state.
+ rm1.stop();
+ rm2.stop();
}
@Test
@@ -675,6 +680,48 @@ public void testRMRestartKilledApp() throws Exception{
ApplicationReport appReport = verifyAppReportAfterRMRestart(app0, rm2);
Assert.assertEquals(app0.getDiagnostics().toString(),
appReport.getDiagnostics());
+ rm1.stop();
+ rm2.stop();
+ }
+
+ @Test
+ public void testRMRestartKilledAppWithNoAttempts() throws Exception {
+ MemoryRMStateStore memStore = new MemoryRMStateStore() {
+ @Override
+ public synchronized void storeApplicationAttemptStateInternal(
+ String attemptIdStr,
+ ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception {
+ // ignore attempt saving request.
+ }
+
+ @Override
+ public synchronized void updateApplicationAttemptStateInternal(
+ String attemptIdStr,
+ ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception {
+ // ignore attempt saving request.
+ }
+ };
+ memStore.init(conf);
+
+ // start RM
+ MockRM rm1 = new MockRM(conf, memStore);
+ rm1.start();
+ // create app
+ RMApp app0 =
+ rm1.submitApp(200, "name", "user",
+ new HashMap(), false, "default", -1,
+ null, "MAPREDUCE", false);
+ // kill the app.
+ rm1.killApp(app0.getApplicationId());
+ rm1.waitForState(app0.getApplicationId(), RMAppState.KILLED);
+
+ // restart rm
+ MockRM rm2 = new MockRM(conf, memStore);
+ rm2.start();
+ RMApp loadedApp0 =
+ rm2.getRMContext().getRMApps().get(app0.getApplicationId());
+ rm2.waitForState(loadedApp0.getApplicationId(), RMAppState.KILLED);
+ Assert.assertTrue(loadedApp0.getAppAttempts().size() == 0);
}
@Test
@@ -724,6 +771,9 @@ public void testRMRestartSucceededApp() throws Exception {
Assert.assertEquals(FinalApplicationStatus.SUCCEEDED,
appReport.getFinalApplicationStatus());
Assert.assertEquals("trackingUrl", appReport.getOriginalTrackingUrl());
+
+ rm1.stop();
+ rm2.stop();
}
@Test
@@ -817,6 +867,9 @@ protected RMAppManager createRMAppManager() {
// check application summary is logged for the completed apps after RM restart.
verify(rm2.getRMAppManager(), times(3)).logApplicationSummary(
isA(ApplicationId.class));
+
+ rm1.stop();
+ rm2.stop();
}
private MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
@@ -1378,6 +1431,52 @@ protected void handleStoreEvent(RMStateStoreEvent event) {
Assert.assertTrue(rmAppState.size() == NUM_APPS);
}
+ @Test
+ public void testFinishedAppRemovalAfterRMRestart() throws Exception {
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
+ conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 1);
+ memStore.init(conf);
+ RMState rmState = memStore.getState();
+
+ // start RM
+ MockRM rm1 = new MockRM(conf, memStore);
+ rm1.start();
+ MockNM nm1 =
+ new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
+ nm1.registerNode();
+
+ // create an app and finish the app.
+ RMApp app0 = rm1.submitApp(200);
+ MockAM am0 = launchAM(app0, rm1, nm1);
+ finishApplicationMaster(app0, rm1, nm1, am0);
+
+ MockRM rm2 = new MockRM(conf, memStore);
+ rm2.start();
+ nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+ nm1 = rm2.registerNode("127.0.0.1:1234", 15120);
+
+ Map rmAppState =
+ rmState.getApplicationState();
+
+ // app0 exits in both state store and rmContext
+ Assert.assertEquals(RMAppState.FINISHED,
+ rmAppState.get(app0.getApplicationId()).getState());
+ rm2.waitForState(app0.getApplicationId(), RMAppState.FINISHED);
+
+ // create one more app and finish the app.
+ RMApp app1 = rm2.submitApp(200);
+ MockAM am1 = launchAM(app1, rm2, nm1);
+ finishApplicationMaster(app1, rm2, nm1, am1);
+
+ // the first app0 get kicked out from both rmContext and state store
+ Assert.assertNull(rm2.getRMContext().getRMApps()
+ .get(app0.getApplicationId()));
+ Assert.assertNull(rmAppState.get(app0.getApplicationId()));
+
+ rm1.stop();
+ rm2.stop();
+ }
+
public static class TestSecurityMockRM extends MockRM {
public TestSecurityMockRM(Configuration conf, RMStateStore store) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
index 417fdb147c..ff110b31f2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
@@ -26,6 +26,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -109,6 +110,7 @@ interface RMStateStoreHelper {
boolean isFinalStateValid() throws Exception;
void writeVersion(RMStateVersion version) throws Exception;
RMStateVersion getCurrentVersion() throws Exception;
+ boolean appExists(RMApp app) throws Exception;
}
void waitNotify(TestDispatcher dispatcher) {
@@ -128,7 +130,7 @@ void waitNotify(TestDispatcher dispatcher) {
dispatcher.notified = false;
}
- void storeApp(RMStateStore store, ApplicationId appId, long submitTime,
+ RMApp storeApp(RMStateStore store, ApplicationId appId, long submitTime,
long startTime) throws Exception {
ApplicationSubmissionContext context =
new ApplicationSubmissionContextPBImpl();
@@ -141,6 +143,7 @@ void storeApp(RMStateStore store, ApplicationId appId, long submitTime,
when(mockApp.getApplicationSubmissionContext()).thenReturn(context);
when(mockApp.getUser()).thenReturn("test");
store.storeNewApplication(mockApp);
+ return mockApp;
}
ContainerId storeAttempt(RMStateStore store, ApplicationAttemptId attemptId,
@@ -370,6 +373,7 @@ public void testRMDTSecretManagerStateStore(
Assert.assertEquals(keySet, secretManagerState.getMasterKeyState());
Assert.assertEquals(sequenceNumber,
secretManagerState.getDTSequenceNumber());
+ store.close();
}
private Token generateAMRMToken(
@@ -415,4 +419,43 @@ public void testCheckVersion(RMStateStoreHelper stateStoreHelper)
Assert.assertTrue(t instanceof RMStateVersionIncompatibleException);
}
}
+
+ public void testAppDeletion(RMStateStoreHelper stateStoreHelper)
+ throws Exception {
+ RMStateStore store = stateStoreHelper.getRMStateStore();
+ store.setRMDispatcher(new TestDispatcher());
+ // create and store apps
+ ArrayList appList = new ArrayList();
+ int NUM_APPS = 5;
+ for (int i = 0; i < NUM_APPS; i++) {
+ ApplicationId appId = ApplicationId.newInstance(1383183338, i);
+ RMApp app = storeApp(store, appId, 123456789, 987654321);
+ appList.add(app);
+ }
+
+ Assert.assertEquals(NUM_APPS, appList.size());
+ for (RMApp app : appList) {
+ // wait for app to be stored.
+ while (true) {
+ if (stateStoreHelper.appExists(app)) {
+ break;
+ } else {
+ Thread.sleep(100);
+ }
+ }
+ }
+
+ for (RMApp app : appList) {
+ // remove the app
+ store.removeApplication(app);
+ // wait for app to be removed.
+ while (true) {
+ if (!stateStoreHelper.appExists(app)) {
+ break;
+ } else {
+ Thread.sleep(100);
+ }
+ }
+ }
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
index 4df1c3b0c8..27e8411cc1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
@@ -20,6 +20,7 @@
import static org.junit.Assert.assertTrue;
+import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import junit.framework.Assert;
@@ -38,6 +39,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.Test;
@@ -69,6 +71,13 @@ public Path getVersionNode() {
public RMStateVersion getCurrentVersion() {
return CURRENT_VERSION_INFO;
}
+
+ public Path getAppDir(String appId) {
+ Path rootDir = new Path(workingDirPathURI, ROOT_DIR_NAME);
+ Path appRootDir = new Path(rootDir, RM_APP_ROOT);
+ Path appDir = new Path(appRootDir, appId);
+ return appDir;
+ }
}
public TestFSRMStateStoreTester(MiniDFSCluster cluster) throws Exception {
@@ -109,9 +118,16 @@ public void writeVersion(RMStateVersion version) throws Exception {
public RMStateVersion getCurrentVersion() throws Exception {
return store.getCurrentVersion();
}
+
+ public boolean appExists(RMApp app) throws IOException {
+ FileSystem fs = cluster.getFileSystem();
+ Path nodePath =
+ store.getAppDir(app.getApplicationId().toString());
+ return fs.exists(nodePath);
+ }
}
- @Test
+ @Test(timeout = 60000)
public void testFSRMStateStore() throws Exception {
HdfsConfiguration conf = new HdfsConfiguration();
MiniDFSCluster cluster =
@@ -126,11 +142,8 @@ public void testFSRMStateStore() throws Exception {
String appAttemptIdStr3 = "appattempt_1352994193343_0001_000003";
ApplicationAttemptId attemptId3 =
ConverterUtils.toApplicationAttemptId(appAttemptIdStr3);
- Path rootDir =
- new Path(fileSystemRMStateStore.fsWorkingPath, "FSRMStateRoot");
- Path appRootDir = new Path(rootDir, "RMAppRoot");
Path appDir =
- new Path(appRootDir, attemptId3.getApplicationId().toString());
+ fsTester.store.getAppDir(attemptId3.getApplicationId().toString());
Path tempAppAttemptFile =
new Path(appDir, attemptId3.toString() + ".tmp");
fsOut = fileSystemRMStateStore.fs.create(tempAppAttemptFile, false);
@@ -138,10 +151,11 @@ public void testFSRMStateStore() throws Exception {
fsOut.close();
testRMAppStateStore(fsTester);
- Assert.assertFalse(fileSystemRMStateStore.fsWorkingPath
+ Assert.assertFalse(fsTester.workingDirPathURI
.getFileSystem(conf).exists(tempAppAttemptFile));
testRMDTSecretManagerStateStore(fsTester);
testCheckVersion(fsTester);
+ testAppDeletion(fsTester);
} finally {
cluster.shutdown();
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
index 19121d8973..924a30fd2a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
@@ -46,7 +46,9 @@
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
import org.junit.Test;
public class TestZKRMStateStore extends RMStateStoreTestBase {
@@ -57,6 +59,7 @@ class TestZKRMStateStoreTester implements RMStateStoreHelper {
ZooKeeper client;
TestZKRMStateStoreInternal store;
+ String workingZnode;
class TestZKRMStateStoreInternal extends ZKRMStateStore {
@@ -79,11 +82,16 @@ public String getVersionNode() {
public RMStateVersion getCurrentVersion() {
return CURRENT_VERSION_INFO;
}
+
+ public String getAppNode(String appId) {
+ return workingZnode + "/" + ROOT_ZNODE_NAME + "/" + RM_APP_ROOT + "/"
+ + appId;
+ }
}
public RMStateStore getRMStateStore() throws Exception {
- String workingZnode = "/Test";
- Configuration conf = new YarnConfiguration();
+ YarnConfiguration conf = new YarnConfiguration();
+ workingZnode = "/Test";
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_ADDRESS, hostPort);
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
this.client = createClient();
@@ -107,14 +115,22 @@ public void writeVersion(RMStateVersion version) throws Exception {
public RMStateVersion getCurrentVersion() throws Exception {
return store.getCurrentVersion();
}
+
+ public boolean appExists(RMApp app) throws Exception {
+ Stat node =
+ client.exists(store.getAppNode(app.getApplicationId().toString()),
+ false);
+ return node !=null;
+ }
}
- @Test
+ @Test (timeout = 60000)
public void testZKRMStateStoreRealZK() throws Exception {
TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
testRMAppStateStore(zkTester);
testRMDTSecretManagerStateStore(zkTester);
testCheckVersion(zkTester);
+ testAppDeletion(zkTester);
}
private Configuration createHARMConf(
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java
index 3def83c0ad..f271370f31 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java
@@ -120,7 +120,7 @@ public void testZKClientRetry() throws Exception {
TestZKClient zkClientTester = new TestZKClient();
final String path = "/test";
YarnConfiguration conf = new YarnConfiguration();
- conf.setInt(YarnConfiguration.ZK_RM_STATE_STORE_TIMEOUT_MS, 100);
+ conf.setInt(YarnConfiguration.ZK_RM_STATE_STORE_TIMEOUT_MS, 1000);
conf.setLong(YarnConfiguration.ZK_RM_STATE_STORE_RETRY_INTERVAL_MS, 100);
final ZKRMStateStore store =
(ZKRMStateStore) zkClientTester.getRMStateStore(conf);