YARN-1378. Implemented a cleaner of old finished applications from the RM state-store. Contributed by Jian He.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1548990 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-12-08 04:17:51 +00:00
parent 155020a6f4
commit 305ae48136
18 changed files with 395 additions and 154 deletions

View File

@ -148,6 +148,9 @@ Release 2.4.0 - UNRELEASED
YARN-807. When querying apps by queue, iterating over all apps is
inefficient and limiting (Sandy Ryza)
YARN-1378. Implemented a cleaner of old finished applications from the RM
state-store. (Jian He via vinodkv)
OPTIMIZATIONS
BUG FIXES

View File

@ -342,7 +342,16 @@ public class YarnConfiguration extends Configuration {
public static final String RM_MAX_COMPLETED_APPLICATIONS =
RM_PREFIX + "max-completed-applications";
public static final int DEFAULT_RM_MAX_COMPLETED_APPLICATIONS = 10000;
/**
* The maximum number of completed applications RM state store keeps, by
* default equals to DEFAULT_RM_MAX_COMPLETED_APPLICATIONS
*/
public static final String RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS =
RM_PREFIX + "state-store.max-completed-applications";
public static final int DEFAULT_RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS =
DEFAULT_RM_MAX_COMPLETED_APPLICATIONS;
/** Default application name */
public static final String DEFAULT_APPLICATION_NAME = "N/A";

View File

@ -275,6 +275,21 @@
<value>org.apache.hadoop.yarn.server.resourcemanager.recovery.FileSystemRMStateStore</value>
</property>
<property>
<description>The maximum number of completed applications RM state
store keeps, less than or equals to ${yarn.resourcemanager.max-completed-applications}.
By default, it equals to ${yarn.resourcemanager.max-completed-applications}.
This ensures that the applications kept in the state store are consistent with
the applications remembered in RM memory.
Any values larger than ${yarn.resourcemanager.max-completed-applications} will
be reset to ${yarn.resourcemanager.max-completed-applications}.
Note that this value impacts the RM recovery performance.Typically,
a smaller value indicates better performance on RM recovery.
</description>
<name>yarn.resourcemanager.state-store.max-completed-applications</name>
<value>${yarn.resourcemanager.max-completed-applications}</value>
</property>
<property>
<description>Host:Port of the ZooKeeper server where RM state will
be stored. This must be supplied when using

View File

@ -65,7 +65,9 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
private static final Log LOG = LogFactory.getLog(RMAppManager.class);
private int completedAppsMax = YarnConfiguration.DEFAULT_RM_MAX_COMPLETED_APPLICATIONS;
private int maxCompletedAppsInMemory;
private int maxCompletedAppsInStateStore;
protected int completedAppsInStateStore = 0;
private LinkedList<ApplicationId> completedApps = new LinkedList<ApplicationId>();
private final RMContext rmContext;
@ -82,9 +84,16 @@ public RMAppManager(RMContext context,
this.masterService = masterService;
this.applicationACLsManager = applicationACLsManager;
this.conf = conf;
setCompletedAppsMax(conf.getInt(
this.maxCompletedAppsInMemory = conf.getInt(
YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS,
YarnConfiguration.DEFAULT_RM_MAX_COMPLETED_APPLICATIONS));
YarnConfiguration.DEFAULT_RM_MAX_COMPLETED_APPLICATIONS);
this.maxCompletedAppsInStateStore =
conf.getInt(
YarnConfiguration.RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS,
YarnConfiguration.DEFAULT_RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS);
if (this.maxCompletedAppsInStateStore > this.maxCompletedAppsInMemory) {
this.maxCompletedAppsInStateStore = this.maxCompletedAppsInMemory;
}
}
/**
@ -173,10 +182,6 @@ public void logApplicationSummary(ApplicationId appId) {
ApplicationSummary.logAppSummary(rmContext.getRMApps().get(appId));
}
protected synchronized void setCompletedAppsMax(int max) {
this.completedAppsMax = max;
}
protected synchronized int getCompletedAppsListSize() {
return this.completedApps.size();
}
@ -190,7 +195,8 @@ protected synchronized void finishApplication(ApplicationId applicationId) {
rmContext.getDelegationTokenRenewer().applicationFinished(applicationId);
}
completedApps.add(applicationId);
completedApps.add(applicationId);
completedAppsInStateStore++;
writeAuditLog(applicationId);
}
}
@ -229,15 +235,31 @@ protected void writeAuditLog(ApplicationId appId) {
* check to see if hit the limit for max # completed apps kept
*/
protected synchronized void checkAppNumCompletedLimit() {
while (completedApps.size() > this.completedAppsMax) {
ApplicationId removeId = completedApps.remove();
LOG.info("Application should be expired, max # apps"
+ " met. Removing app: " + removeId);
// check apps kept in state store.
while (completedAppsInStateStore > this.maxCompletedAppsInStateStore) {
ApplicationId removeId =
completedApps.get(completedApps.size() - completedAppsInStateStore);
RMApp removeApp = rmContext.getRMApps().get(removeId);
LOG.info("Max number of completed apps kept in state store met:"
+ " maxCompletedAppsInStateStore = " + maxCompletedAppsInStateStore
+ ", removing app " + removeApp.getApplicationId()
+ " from state store.");
rmContext.getStateStore().removeApplication(removeApp);
completedAppsInStateStore--;
}
// check apps kept in memorty.
while (completedApps.size() > this.maxCompletedAppsInMemory) {
ApplicationId removeId = completedApps.remove();
LOG.info("Application should be expired, max number of completed apps"
+ " kept in memory met: maxCompletedAppsInMemory = "
+ this.maxCompletedAppsInMemory + ", removing app " + removeId
+ " from memory: ");
rmContext.getRMApps().remove(removeId);
this.applicationACLsManager.removeApplication(removeId);
}
}
@SuppressWarnings("unchecked")
protected void submitApplication(
ApplicationSubmissionContext submissionContext, long submitTime,
@ -380,8 +402,6 @@ public void recover(RMState state) throws Exception {
Map<ApplicationId, ApplicationState> appStates = state.getApplicationState();
LOG.info("Recovering " + appStates.size() + " applications");
for (ApplicationState appState : appStates.values()) {
LOG.info("Recovering application " + appState.getAppId());
submitApplication(appState.getApplicationSubmissionContext(),
appState.getSubmitTime(), appState.getUser(), true, state);
}

View File

@ -167,7 +167,9 @@ private void loadRMAppState(RMState rmState) throws Exception {
readFile(childNodeStatus.getPath(), childNodeStatus.getLen());
if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
// application
LOG.info("Loading application from node: " + childNodeName);
if (LOG.isDebugEnabled()) {
LOG.debug("Loading application from node: " + childNodeName);
}
ApplicationId appId = ConverterUtils.toApplicationId(childNodeName);
ApplicationStateDataPBImpl appStateData =
new ApplicationStateDataPBImpl(
@ -185,7 +187,10 @@ private void loadRMAppState(RMState rmState) throws Exception {
} else if (childNodeName
.startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) {
// attempt
LOG.info("Loading application attempt from node: " + childNodeName);
if (LOG.isDebugEnabled()) {
LOG.debug("Loading application attempt from node: "
+ childNodeName);
}
ApplicationAttemptId attemptId =
ConverterUtils.toApplicationAttemptId(childNodeName);
ApplicationAttemptStateDataPBImpl attemptStateData =
@ -225,6 +230,7 @@ private void loadRMAppState(RMState rmState) throws Exception {
assert appState != null;
appState.attempts.put(attemptState.getAttemptId(), attemptState);
}
LOG.info("Done Loading applications from FS state store");
} catch (Exception e) {
LOG.error("Failed to load state.", e);
throw e;
@ -362,7 +368,7 @@ public synchronized void updateApplicationAttemptStateInternal(
}
@Override
public synchronized void removeApplicationState(ApplicationState appState)
public synchronized void removeApplicationStateInternal(ApplicationState appState)
throws Exception {
String appId = appState.getAppId().toString();
Path nodeRemovePath = getAppDir(rmAppRoot, appId);

View File

@ -171,8 +171,8 @@ public synchronized void updateApplicationAttemptStateInternal(
}
@Override
public synchronized void removeApplicationState(ApplicationState appState)
throws Exception {
public synchronized void removeApplicationStateInternal(
ApplicationState appState) throws Exception {
ApplicationId appId = appState.getAppId();
ApplicationState removed = state.appState.remove(appId);
if (removed == null) {

View File

@ -63,7 +63,7 @@ protected void storeApplicationAttemptStateInternal(String attemptId,
}
@Override
protected void removeApplicationState(ApplicationState appState)
protected void removeApplicationStateInternal(ApplicationState appState)
throws Exception {
// Do nothing
}

View File

@ -53,7 +53,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNewSavedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRemovedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppUpdateSavedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@ -519,6 +518,7 @@ protected abstract void removeRMDTMasterKeyState(DelegationKey delegationKey)
* This does not block the dispatcher threads
* There is no notification of completion for this operation.
*/
@SuppressWarnings("unchecked")
public synchronized void removeApplication(RMApp app) {
ApplicationState appState = new ApplicationState(
app.getSubmitTime(), app.getStartTime(),
@ -532,14 +532,6 @@ public synchronized void removeApplication(RMApp app) {
appState.attempts.put(attemptState.getAttemptId(), attemptState);
}
removeApplication(appState);
}
@SuppressWarnings("unchecked")
/**
* Non-Blocking API
*/
public synchronized void removeApplication(ApplicationState appState) {
dispatcher.getEventHandler().handle(new RMStateStoreRemoveAppEvent(appState));
}
@ -548,8 +540,8 @@ public synchronized void removeApplication(ApplicationState appState) {
* Derived classes must implement this method to remove the state of an
* application and its attempts
*/
protected abstract void removeApplicationState(ApplicationState appState)
throws Exception;
protected abstract void removeApplicationStateInternal(
ApplicationState appState) throws Exception;
// TODO: This should eventually become cluster-Id + "AM_RM_TOKEN_SERVICE". See
// YARN-986
@ -666,11 +658,9 @@ protected void handleStoreEvent(RMStateStoreEvent event) {
ApplicationState appState =
((RMStateStoreRemoveAppEvent) event).getAppState();
ApplicationId appId = appState.getAppId();
Exception removedException = null;
LOG.info("Removing info for app: " + appId);
try {
removeApplicationState(appState);
notifyDoneRemovingApplcation(appId, removedException);
removeApplicationStateInternal(appState);
} catch (Exception e) {
LOG.error("Error removing app: " + appId, e);
notifyStoreOperationFailed(e);
@ -738,17 +728,6 @@ private void notifyDoneUpdatingApplicationAttempt(ApplicationAttemptId attemptId
new RMAppAttemptUpdateSavedEvent(attemptId, updatedException));
}
@SuppressWarnings("unchecked")
/**
* This is to notify RMApp that this application has been removed from
* RMStateStore
*/
private void notifyDoneRemovingApplcation(ApplicationId appId,
Exception removedException) {
rmDispatcher.getEventHandler().handle(
new RMAppRemovedEvent(appId, removedException));
}
/**
* EventHandler implementation which forward events to the FSRMStateStore
* This hides the EventHandle methods of the store from its public interface

View File

@ -392,7 +392,9 @@ private synchronized void loadRMAppState(RMState rmState) throws Exception {
byte[] childData = getDataWithRetries(childNodePath, true);
if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
// application
LOG.info("Loading application from znode: " + childNodeName);
if (LOG.isDebugEnabled()) {
LOG.debug("Loading application from znode: " + childNodeName);
}
ApplicationId appId = ConverterUtils.toApplicationId(childNodeName);
ApplicationStateDataPBImpl appStateData =
new ApplicationStateDataPBImpl(
@ -412,7 +414,9 @@ private synchronized void loadRMAppState(RMState rmState) throws Exception {
} else if (childNodeName
.startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) {
// attempt
LOG.info("Loading application attempt from znode: " + childNodeName);
if (LOG.isDebugEnabled()) {
LOG.debug("Loading application attempt from znode: " + childNodeName);
}
ApplicationAttemptId attemptId =
ConverterUtils.toApplicationAttemptId(childNodeName);
ApplicationAttemptStateDataPBImpl attemptStateData =
@ -456,10 +460,10 @@ private synchronized void loadRMAppState(RMState rmState) throws Exception {
LOG.info("Application node not found for attempt: "
+ attemptState.getAttemptId());
deleteWithRetries(
getNodePath(rmAppRoot, attemptState.getAttemptId().toString()),
0);
getNodePath(rmAppRoot, attemptState.getAttemptId().toString()), -1);
}
}
LOG.info("Done Loading applications from ZK state store");
}
@Override
@ -517,16 +521,16 @@ public synchronized void updateApplicationAttemptStateInternal(
}
@Override
public synchronized void removeApplicationState(ApplicationState appState)
public synchronized void removeApplicationStateInternal(ApplicationState appState)
throws Exception {
String appId = appState.getAppId().toString();
String nodeRemovePath = getNodePath(rmAppRoot, appId);
ArrayList<Op> opList = new ArrayList<Op>();
opList.add(Op.delete(nodeRemovePath, 0));
opList.add(Op.delete(nodeRemovePath, -1));
for (ApplicationAttemptId attemptId : appState.attempts.keySet()) {
String attemptRemovePath = getNodePath(rmAppRoot, attemptId.toString());
opList.add(Op.delete(attemptRemovePath, 0));
opList.add(Op.delete(attemptRemovePath, -1));
}
if (LOG.isDebugEnabled()) {
LOG.debug("Removing info for app: " + appId + " at: " + nodeRemovePath
@ -569,7 +573,7 @@ protected synchronized void storeRMDelegationTokenAndSequenceNumberState(
}
if (dtSequenceNumberPath != null) {
opList.add(Op.delete(dtSequenceNumberPath, 0));
opList.add(Op.delete(dtSequenceNumberPath, -1));
}
opList.add(Op.create(latestSequenceNumberPath, null, zkAcl,
CreateMode.PERSISTENT));
@ -587,7 +591,7 @@ protected synchronized void removeRMDelegationTokenState(
LOG.debug("Removing RMDelegationToken_"
+ rmDTIdentifier.getSequenceNumber());
}
deleteWithRetries(nodeRemovePath, 0);
deleteWithRetries(nodeRemovePath, -1);
}
@Override
@ -619,7 +623,7 @@ protected synchronized void removeRMDTMasterKeyState(
if (LOG.isDebugEnabled()) {
LOG.debug("Removing RMDelegationKey_" + delegationKey.getKeyId());
}
deleteWithRetries(nodeRemovePath, 0);
deleteWithRetries(nodeRemovePath, -1);
}
// ZK related code

View File

@ -660,32 +660,34 @@ public void transition(RMAppImpl app, RMAppEvent event) {
@SuppressWarnings("unchecked")
private static final class RMAppRecoveredTransition implements
MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState> {
@Override
public RMAppState transition(RMAppImpl app, RMAppEvent event) {
if (app.attempts.isEmpty()) {
// Saved application was not running any attempts.
app.createNewAttempt(true);
return RMAppState.SUBMITTED;
} else {
/*
* If last attempt recovered final state is null .. it means attempt
* was started but AM container may or may not have started / finished.
* Therefore we should wait for it to finish.
*/
for (RMAppAttempt attempt : app.getAppAttempts().values()) {
app.dispatcher.getEventHandler().handle(
new RMAppAttemptEvent(attempt.getAppAttemptId(),
RMAppAttemptEventType.RECOVER));
}
if (app.recoveredFinalState != null) {
FINAL_TRANSITION.transition(app, event);
return app.recoveredFinalState;
} else {
return RMAppState.RUNNING;
}
/*
* If last attempt recovered final state is null .. it means attempt was
* started but AM container may or may not have started / finished.
* Therefore we should wait for it to finish.
*/
for (RMAppAttempt attempt : app.getAppAttempts().values()) {
app.dispatcher.getEventHandler().handle(
new RMAppAttemptEvent(attempt.getAppAttemptId(),
RMAppAttemptEventType.RECOVER));
}
// The app has completed.
if (app.recoveredFinalState != null) {
FINAL_TRANSITION.transition(app, event);
return app.recoveredFinalState;
}
// No existent attempts means the attempt associated with this app was not
// started or started but not yet saved
if (app.attempts.isEmpty()) {
app.createNewAttempt(true);
return RMAppState.SUBMITTED;
}
return RMAppState.RUNNING;
}
}

View File

@ -1,36 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
import org.apache.hadoop.yarn.api.records.ApplicationId;
public class RMAppRemovedEvent extends RMAppEvent {
private final Exception removedException;
public RMAppRemovedEvent(ApplicationId appId, Exception removedException) {
super(appId, RMAppEventType.APP_REMOVED);
this.removedException = removedException;
}
public Exception getRemovedException() {
return removedException;
}
}

View File

@ -76,14 +76,13 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFinishedAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRejectedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
@ -675,9 +674,8 @@ public void recover(RMState state) throws Exception {
ApplicationAttemptState attemptState =
appState.getAttempt(getAppAttemptId());
assert attemptState != null;
LOG.info("Recovered attempt: AppId: "
+ getAppAttemptId().getApplicationId() + " AttemptId: "
+ getAppAttemptId() + " MasterContainer: " + masterContainer);
LOG.info("Recovering attempt: " + getAppAttemptId() + " with final state: "
+ attemptState.getState());
diagnostics.append("Attempt recovered after RM restart");
diagnostics.append(attemptState.getDiagnostics());
setMasterContainer(attemptState.getMasterContainer());
@ -856,8 +854,6 @@ private static class AttemptRecoveredTransition
@Override
public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) {
LOG.info("Recovering attempt : recoverdFinalState :"
+ appAttempt.recoveredFinalState);
if (appAttempt.recoveredFinalState != null) {
appAttempt.progress = 1.0f;
RMApp rmApp =appAttempt.rmContext.getRMApps().get(

View File

@ -19,8 +19,12 @@
package org.apache.hadoop.yarn.server.resourcemanager;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.times;
import java.util.HashMap;
import java.util.List;
@ -43,6 +47,7 @@
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
@ -99,7 +104,7 @@ public static RMContext mockRMContext(int n, long time) {
rmDispatcher);
AMLivelinessMonitor amFinishingMonitor = new AMLivelinessMonitor(
rmDispatcher);
return new RMContextImpl(rmDispatcher,
RMContext context = new RMContextImpl(rmDispatcher,
containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
null, null, null, null, null) {
@Override
@ -107,6 +112,8 @@ public ConcurrentMap<ApplicationId, RMApp> getRMApps() {
return map;
}
};
((RMContextImpl)context).setStateStore(mock(RMStateStore.class));
return context;
}
public class TestAppManagerDispatcher implements
@ -142,7 +149,6 @@ public class TestRMAppManager extends RMAppManager {
public TestRMAppManager(RMContext context, Configuration conf) {
super(context, null, null, new ApplicationACLsManager(conf), conf);
setCompletedAppsMax(YarnConfiguration.DEFAULT_RM_MAX_COMPLETED_APPLICATIONS);
}
public TestRMAppManager(RMContext context,
@ -150,7 +156,6 @@ public TestRMAppManager(RMContext context,
YarnScheduler scheduler, ApplicationMasterService masterService,
ApplicationACLsManager applicationACLsManager, Configuration conf) {
super(context, scheduler, masterService, applicationACLsManager, conf);
setCompletedAppsMax(YarnConfiguration.DEFAULT_RM_MAX_COMPLETED_APPLICATIONS);
}
public void checkAppNumCompletedLimit() {
@ -164,9 +169,8 @@ public void finishApplication(ApplicationId appId) {
public int getCompletedAppsListSize() {
return super.getCompletedAppsListSize();
}
public void setCompletedAppsMax(int max) {
super.setCompletedAppsMax(max);
public int getCompletedAppsInStateStore() {
return this.completedAppsInStateStore;
}
public void submitApplication(
ApplicationSubmissionContext submissionContext, String user)
@ -227,9 +231,9 @@ public void testRMAppRetireNone() throws Exception {
// Create such that none of the applications will retire since
// haven't hit max #
RMContext rmContext = mockRMContext(10, now - 10);
TestRMAppManager appMonitor = new TestRMAppManager(rmContext, new Configuration());
appMonitor.setCompletedAppsMax(10);
Configuration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 10);
TestRMAppManager appMonitor = new TestRMAppManager(rmContext,conf);
Assert.assertEquals("Number of apps incorrect before checkAppTimeLimit",
10, rmContext.getRMApps().size());
@ -243,6 +247,8 @@ public void testRMAppRetireNone() throws Exception {
rmContext.getRMApps().size());
Assert.assertEquals("Number of completed apps incorrect after check", 10,
appMonitor.getCompletedAppsListSize());
verify(rmContext.getStateStore(), never()).removeApplication(
isA(RMApp.class));
}
@Test
@ -250,9 +256,10 @@ public void testRMAppRetireSome() throws Exception {
long now = System.currentTimeMillis();
RMContext rmContext = mockRMContext(10, now - 20000);
TestRMAppManager appMonitor = new TestRMAppManager(rmContext, new Configuration());
appMonitor.setCompletedAppsMax(3);
Configuration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS, 3);
conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 3);
TestRMAppManager appMonitor = new TestRMAppManager(rmContext, conf);
Assert.assertEquals("Number of apps incorrect before", 10, rmContext
.getRMApps().size());
@ -266,6 +273,8 @@ public void testRMAppRetireSome() throws Exception {
rmContext.getRMApps().size());
Assert.assertEquals("Number of completed apps incorrect after check", 3,
appMonitor.getCompletedAppsListSize());
verify(rmContext.getStateStore(), times(7)).removeApplication(
isA(RMApp.class));
}
@Test
@ -274,14 +283,17 @@ public void testRMAppRetireSomeDifferentStates() throws Exception {
// these parameters don't matter, override applications below
RMContext rmContext = mockRMContext(10, now - 20000);
TestRMAppManager appMonitor = new TestRMAppManager(rmContext, new Configuration());
Configuration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS, 2);
conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 2);
appMonitor.setCompletedAppsMax(2);
TestRMAppManager appMonitor = new TestRMAppManager(rmContext, conf);
// clear out applications map
rmContext.getRMApps().clear();
Assert.assertEquals("map isn't empty", 0, rmContext.getRMApps().size());
// 6 applications are in final state, 4 are not in final state.
// / set with various finished states
RMApp app = new MockRMApp(0, now - 20000, RMAppState.KILLED);
rmContext.getRMApps().put(app.getApplicationId(), app);
@ -318,7 +330,9 @@ public void testRMAppRetireSomeDifferentStates() throws Exception {
rmContext.getRMApps().size());
Assert.assertEquals("Number of completed apps incorrect after check", 2,
appMonitor.getCompletedAppsListSize());
// 6 applications in final state, 4 of them are removed
verify(rmContext.getStateStore(), times(4)).removeApplication(
isA(RMApp.class));
}
@Test
@ -342,14 +356,13 @@ public void testRMAppRetireZeroSetting() throws Exception {
long now = System.currentTimeMillis();
RMContext rmContext = mockRMContext(10, now - 20000);
TestRMAppManager appMonitor = new TestRMAppManager(rmContext, new Configuration());
Configuration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS, 0);
conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 0);
TestRMAppManager appMonitor = new TestRMAppManager(rmContext, conf);
Assert.assertEquals("Number of apps incorrect before", 10, rmContext
.getRMApps().size());
// test with 0
appMonitor.setCompletedAppsMax(0);
addToCompletedApps(appMonitor, rmContext);
Assert.assertEquals("Number of completed apps incorrect", 10,
appMonitor.getCompletedAppsListSize());
@ -360,6 +373,64 @@ public void testRMAppRetireZeroSetting() throws Exception {
rmContext.getRMApps().size());
Assert.assertEquals("Number of completed apps incorrect after check", 0,
appMonitor.getCompletedAppsListSize());
verify(rmContext.getStateStore(), times(10)).removeApplication(
isA(RMApp.class));
}
@Test
public void testStateStoreAppLimitLessThanMemoryAppLimit() {
long now = System.currentTimeMillis();
RMContext rmContext = mockRMContext(10, now - 20000);
Configuration conf = new YarnConfiguration();
int maxAppsInMemory = 8;
int maxAppsInStateStore = 4;
conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, maxAppsInMemory);
conf.setInt(YarnConfiguration.RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS,
maxAppsInStateStore);
TestRMAppManager appMonitor = new TestRMAppManager(rmContext, conf);
addToCompletedApps(appMonitor, rmContext);
Assert.assertEquals("Number of completed apps incorrect", 10,
appMonitor.getCompletedAppsListSize());
appMonitor.checkAppNumCompletedLimit();
Assert.assertEquals("Number of apps incorrect after # completed check",
maxAppsInMemory, rmContext.getRMApps().size());
Assert.assertEquals("Number of completed apps incorrect after check",
maxAppsInMemory, appMonitor.getCompletedAppsListSize());
int numRemoveAppsFromStateStore = 10 - maxAppsInStateStore;
verify(rmContext.getStateStore(), times(numRemoveAppsFromStateStore))
.removeApplication(isA(RMApp.class));
Assert.assertEquals(maxAppsInStateStore,
appMonitor.getCompletedAppsInStateStore());
}
@Test
public void testStateStoreAppLimitLargerThanMemoryAppLimit() {
long now = System.currentTimeMillis();
RMContext rmContext = mockRMContext(10, now - 20000);
Configuration conf = new YarnConfiguration();
int maxAppsInMemory = 8;
conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, maxAppsInMemory);
// larger than maxCompletedAppsInMemory, reset to RM_MAX_COMPLETED_APPLICATIONS.
conf.setInt(YarnConfiguration.RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS, 1000);
TestRMAppManager appMonitor = new TestRMAppManager(rmContext, conf);
addToCompletedApps(appMonitor, rmContext);
Assert.assertEquals("Number of completed apps incorrect", 10,
appMonitor.getCompletedAppsListSize());
appMonitor.checkAppNumCompletedLimit();
int numRemoveApps = 10 - maxAppsInMemory;
Assert.assertEquals("Number of apps incorrect after # completed check",
maxAppsInMemory, rmContext.getRMApps().size());
Assert.assertEquals("Number of completed apps incorrect after check",
maxAppsInMemory, appMonitor.getCompletedAppsListSize());
verify(rmContext.getStateStore(), times(numRemoveApps)).removeApplication(
isA(RMApp.class));
Assert.assertEquals(maxAppsInMemory,
appMonitor.getCompletedAppsInStateStore());
}
protected void setupDispatcher(RMContext rmContext, Configuration conf) {

View File

@ -80,6 +80,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
@ -423,6 +424,8 @@ public void testRMRestartAppRunningAMFailed() throws Exception {
rm2.getRMContext().getRMApps().get(app0.getApplicationId());
Assert.assertEquals(RMAppAttemptState.FAILED, recoveredApp
.getAppAttempts().get(am0.getApplicationAttemptId()).getAppAttemptState());
rm1.stop();
rm2.stop();
}
@Test
@ -629,6 +632,8 @@ public void testRMRestartFailedApp() throws Exception {
.contains("Failing the application."));
// failed diagnostics from attempt is lost because the diagnostics from
// attempt is not yet available by the time app is saving the app state.
rm1.stop();
rm2.stop();
}
@Test
@ -675,6 +680,48 @@ public void testRMRestartKilledApp() throws Exception{
ApplicationReport appReport = verifyAppReportAfterRMRestart(app0, rm2);
Assert.assertEquals(app0.getDiagnostics().toString(),
appReport.getDiagnostics());
rm1.stop();
rm2.stop();
}
@Test
public void testRMRestartKilledAppWithNoAttempts() throws Exception {
MemoryRMStateStore memStore = new MemoryRMStateStore() {
@Override
public synchronized void storeApplicationAttemptStateInternal(
String attemptIdStr,
ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception {
// ignore attempt saving request.
}
@Override
public synchronized void updateApplicationAttemptStateInternal(
String attemptIdStr,
ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception {
// ignore attempt saving request.
}
};
memStore.init(conf);
// start RM
MockRM rm1 = new MockRM(conf, memStore);
rm1.start();
// create app
RMApp app0 =
rm1.submitApp(200, "name", "user",
new HashMap<ApplicationAccessType, String>(), false, "default", -1,
null, "MAPREDUCE", false);
// kill the app.
rm1.killApp(app0.getApplicationId());
rm1.waitForState(app0.getApplicationId(), RMAppState.KILLED);
// restart rm
MockRM rm2 = new MockRM(conf, memStore);
rm2.start();
RMApp loadedApp0 =
rm2.getRMContext().getRMApps().get(app0.getApplicationId());
rm2.waitForState(loadedApp0.getApplicationId(), RMAppState.KILLED);
Assert.assertTrue(loadedApp0.getAppAttempts().size() == 0);
}
@Test
@ -724,6 +771,9 @@ public void testRMRestartSucceededApp() throws Exception {
Assert.assertEquals(FinalApplicationStatus.SUCCEEDED,
appReport.getFinalApplicationStatus());
Assert.assertEquals("trackingUrl", appReport.getOriginalTrackingUrl());
rm1.stop();
rm2.stop();
}
@Test
@ -817,6 +867,9 @@ protected RMAppManager createRMAppManager() {
// check application summary is logged for the completed apps after RM restart.
verify(rm2.getRMAppManager(), times(3)).logApplicationSummary(
isA(ApplicationId.class));
rm1.stop();
rm2.stop();
}
private MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
@ -1378,6 +1431,52 @@ protected void handleStoreEvent(RMStateStoreEvent event) {
Assert.assertTrue(rmAppState.size() == NUM_APPS);
}
@Test
public void testFinishedAppRemovalAfterRMRestart() throws Exception {
MemoryRMStateStore memStore = new MemoryRMStateStore();
conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 1);
memStore.init(conf);
RMState rmState = memStore.getState();
// start RM
MockRM rm1 = new MockRM(conf, memStore);
rm1.start();
MockNM nm1 =
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
nm1.registerNode();
// create an app and finish the app.
RMApp app0 = rm1.submitApp(200);
MockAM am0 = launchAM(app0, rm1, nm1);
finishApplicationMaster(app0, rm1, nm1, am0);
MockRM rm2 = new MockRM(conf, memStore);
rm2.start();
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
nm1 = rm2.registerNode("127.0.0.1:1234", 15120);
Map<ApplicationId, ApplicationState> rmAppState =
rmState.getApplicationState();
// app0 exits in both state store and rmContext
Assert.assertEquals(RMAppState.FINISHED,
rmAppState.get(app0.getApplicationId()).getState());
rm2.waitForState(app0.getApplicationId(), RMAppState.FINISHED);
// create one more app and finish the app.
RMApp app1 = rm2.submitApp(200);
MockAM am1 = launchAM(app1, rm2, nm1);
finishApplicationMaster(app1, rm2, nm1, am1);
// the first app0 get kicked out from both rmContext and state store
Assert.assertNull(rm2.getRMContext().getRMApps()
.get(app0.getApplicationId()));
Assert.assertNull(rmAppState.get(app0.getApplicationId()));
rm1.stop();
rm2.stop();
}
public static class TestSecurityMockRM extends MockRM {
public TestSecurityMockRM(Configuration conf, RMStateStore store) {

View File

@ -26,6 +26,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@ -109,6 +110,7 @@ interface RMStateStoreHelper {
boolean isFinalStateValid() throws Exception;
void writeVersion(RMStateVersion version) throws Exception;
RMStateVersion getCurrentVersion() throws Exception;
boolean appExists(RMApp app) throws Exception;
}
void waitNotify(TestDispatcher dispatcher) {
@ -128,7 +130,7 @@ void waitNotify(TestDispatcher dispatcher) {
dispatcher.notified = false;
}
void storeApp(RMStateStore store, ApplicationId appId, long submitTime,
RMApp storeApp(RMStateStore store, ApplicationId appId, long submitTime,
long startTime) throws Exception {
ApplicationSubmissionContext context =
new ApplicationSubmissionContextPBImpl();
@ -141,6 +143,7 @@ void storeApp(RMStateStore store, ApplicationId appId, long submitTime,
when(mockApp.getApplicationSubmissionContext()).thenReturn(context);
when(mockApp.getUser()).thenReturn("test");
store.storeNewApplication(mockApp);
return mockApp;
}
ContainerId storeAttempt(RMStateStore store, ApplicationAttemptId attemptId,
@ -370,6 +373,7 @@ public void testRMDTSecretManagerStateStore(
Assert.assertEquals(keySet, secretManagerState.getMasterKeyState());
Assert.assertEquals(sequenceNumber,
secretManagerState.getDTSequenceNumber());
store.close();
}
private Token<AMRMTokenIdentifier> generateAMRMToken(
@ -415,4 +419,43 @@ public void testCheckVersion(RMStateStoreHelper stateStoreHelper)
Assert.assertTrue(t instanceof RMStateVersionIncompatibleException);
}
}
public void testAppDeletion(RMStateStoreHelper stateStoreHelper)
throws Exception {
RMStateStore store = stateStoreHelper.getRMStateStore();
store.setRMDispatcher(new TestDispatcher());
// create and store apps
ArrayList<RMApp> appList = new ArrayList<RMApp>();
int NUM_APPS = 5;
for (int i = 0; i < NUM_APPS; i++) {
ApplicationId appId = ApplicationId.newInstance(1383183338, i);
RMApp app = storeApp(store, appId, 123456789, 987654321);
appList.add(app);
}
Assert.assertEquals(NUM_APPS, appList.size());
for (RMApp app : appList) {
// wait for app to be stored.
while (true) {
if (stateStoreHelper.appExists(app)) {
break;
} else {
Thread.sleep(100);
}
}
}
for (RMApp app : appList) {
// remove the app
store.removeApplication(app);
// wait for app to be removed.
while (true) {
if (!stateStoreHelper.appExists(app)) {
break;
} else {
Thread.sleep(100);
}
}
}
}
}

View File

@ -20,6 +20,7 @@
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import junit.framework.Assert;
@ -38,6 +39,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.Test;
@ -69,6 +71,13 @@ public Path getVersionNode() {
public RMStateVersion getCurrentVersion() {
return CURRENT_VERSION_INFO;
}
public Path getAppDir(String appId) {
Path rootDir = new Path(workingDirPathURI, ROOT_DIR_NAME);
Path appRootDir = new Path(rootDir, RM_APP_ROOT);
Path appDir = new Path(appRootDir, appId);
return appDir;
}
}
public TestFSRMStateStoreTester(MiniDFSCluster cluster) throws Exception {
@ -109,9 +118,16 @@ public void writeVersion(RMStateVersion version) throws Exception {
public RMStateVersion getCurrentVersion() throws Exception {
return store.getCurrentVersion();
}
public boolean appExists(RMApp app) throws IOException {
FileSystem fs = cluster.getFileSystem();
Path nodePath =
store.getAppDir(app.getApplicationId().toString());
return fs.exists(nodePath);
}
}
@Test
@Test(timeout = 60000)
public void testFSRMStateStore() throws Exception {
HdfsConfiguration conf = new HdfsConfiguration();
MiniDFSCluster cluster =
@ -126,11 +142,8 @@ public void testFSRMStateStore() throws Exception {
String appAttemptIdStr3 = "appattempt_1352994193343_0001_000003";
ApplicationAttemptId attemptId3 =
ConverterUtils.toApplicationAttemptId(appAttemptIdStr3);
Path rootDir =
new Path(fileSystemRMStateStore.fsWorkingPath, "FSRMStateRoot");
Path appRootDir = new Path(rootDir, "RMAppRoot");
Path appDir =
new Path(appRootDir, attemptId3.getApplicationId().toString());
fsTester.store.getAppDir(attemptId3.getApplicationId().toString());
Path tempAppAttemptFile =
new Path(appDir, attemptId3.toString() + ".tmp");
fsOut = fileSystemRMStateStore.fs.create(tempAppAttemptFile, false);
@ -138,10 +151,11 @@ public void testFSRMStateStore() throws Exception {
fsOut.close();
testRMAppStateStore(fsTester);
Assert.assertFalse(fileSystemRMStateStore.fsWorkingPath
Assert.assertFalse(fsTester.workingDirPathURI
.getFileSystem(conf).exists(tempAppAttemptFile));
testRMDTSecretManagerStateStore(fsTester);
testCheckVersion(fsTester);
testAppDeletion(fsTester);
} finally {
cluster.shutdown();
}

View File

@ -46,7 +46,9 @@
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.junit.Test;
public class TestZKRMStateStore extends RMStateStoreTestBase {
@ -57,6 +59,7 @@ class TestZKRMStateStoreTester implements RMStateStoreHelper {
ZooKeeper client;
TestZKRMStateStoreInternal store;
String workingZnode;
class TestZKRMStateStoreInternal extends ZKRMStateStore {
@ -79,11 +82,16 @@ public String getVersionNode() {
public RMStateVersion getCurrentVersion() {
return CURRENT_VERSION_INFO;
}
public String getAppNode(String appId) {
return workingZnode + "/" + ROOT_ZNODE_NAME + "/" + RM_APP_ROOT + "/"
+ appId;
}
}
public RMStateStore getRMStateStore() throws Exception {
String workingZnode = "/Test";
Configuration conf = new YarnConfiguration();
YarnConfiguration conf = new YarnConfiguration();
workingZnode = "/Test";
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_ADDRESS, hostPort);
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
this.client = createClient();
@ -107,14 +115,22 @@ public void writeVersion(RMStateVersion version) throws Exception {
public RMStateVersion getCurrentVersion() throws Exception {
return store.getCurrentVersion();
}
public boolean appExists(RMApp app) throws Exception {
Stat node =
client.exists(store.getAppNode(app.getApplicationId().toString()),
false);
return node !=null;
}
}
@Test
@Test (timeout = 60000)
public void testZKRMStateStoreRealZK() throws Exception {
TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
testRMAppStateStore(zkTester);
testRMDTSecretManagerStateStore(zkTester);
testCheckVersion(zkTester);
testAppDeletion(zkTester);
}
private Configuration createHARMConf(

View File

@ -120,7 +120,7 @@ public void testZKClientRetry() throws Exception {
TestZKClient zkClientTester = new TestZKClient();
final String path = "/test";
YarnConfiguration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.ZK_RM_STATE_STORE_TIMEOUT_MS, 100);
conf.setInt(YarnConfiguration.ZK_RM_STATE_STORE_TIMEOUT_MS, 1000);
conf.setLong(YarnConfiguration.ZK_RM_STATE_STORE_RETRY_INTERVAL_MS, 100);
final ZKRMStateStore store =
(ZKRMStateStore) zkClientTester.getRMStateStore(conf);