YARN-534. Change RM restart recovery to also account for AM max-attempts configuration after the restart. Contributed by Jian He.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1466208 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-04-09 20:16:19 +00:00
parent 3a54a5653b
commit 7d00d3d20f
4 changed files with 122 additions and 9 deletions

View File

@ -211,6 +211,9 @@ Release 2.0.5-beta - UNRELEASED
YARN-112. Fixed a race condition during localization that fails containers. YARN-112. Fixed a race condition during localization that fails containers.
(Omkar Vinit Joshi via vinodkv) (Omkar Vinit Joshi via vinodkv)
YARN-534. Change RM restart recovery to also account for AM max-attempts
configuration after the restart. (Jian He via vinodkv)
Release 2.0.4-alpha - UNRELEASED Release 2.0.4-alpha - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -57,6 +57,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
private static final Log LOG = LogFactory.getLog(RMAppManager.class); private static final Log LOG = LogFactory.getLog(RMAppManager.class);
private int completedAppsMax = YarnConfiguration.DEFAULT_RM_MAX_COMPLETED_APPLICATIONS; private int completedAppsMax = YarnConfiguration.DEFAULT_RM_MAX_COMPLETED_APPLICATIONS;
private int globalMaxAppAttempts;
private LinkedList<ApplicationId> completedApps = new LinkedList<ApplicationId>(); private LinkedList<ApplicationId> completedApps = new LinkedList<ApplicationId>();
private final RMContext rmContext; private final RMContext rmContext;
@ -76,6 +77,8 @@ public RMAppManager(RMContext context,
setCompletedAppsMax(conf.getInt( setCompletedAppsMax(conf.getInt(
YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS,
YarnConfiguration.DEFAULT_RM_MAX_COMPLETED_APPLICATIONS)); YarnConfiguration.DEFAULT_RM_MAX_COMPLETED_APPLICATIONS));
globalMaxAppAttempts = conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
} }
/** /**
@ -308,6 +311,7 @@ public void recover(RMState state) throws Exception {
Map<ApplicationId, ApplicationState> appStates = state.getApplicationState(); Map<ApplicationId, ApplicationState> appStates = state.getApplicationState();
LOG.info("Recovering " + appStates.size() + " applications"); LOG.info("Recovering " + appStates.size() + " applications");
for(ApplicationState appState : appStates.values()) { for(ApplicationState appState : appStates.values()) {
boolean shouldRecover = true;
// re-submit the application // re-submit the application
// this is going to send an app start event but since the async dispatcher // this is going to send an app start event but since the async dispatcher
// has not started that event will be queued until we have completed re // has not started that event will be queued until we have completed re
@ -318,16 +322,39 @@ public void recover(RMState state) throws Exception {
// This will need to be changed in work preserving recovery in which // This will need to be changed in work preserving recovery in which
// RM will re-connect with the running AM's instead of restarting them // RM will re-connect with the running AM's instead of restarting them
LOG.info("Not recovering unmanaged application " + appState.getAppId()); LOG.info("Not recovering unmanaged application " + appState.getAppId());
store.removeApplication(appState); shouldRecover = false;
}
int individualMaxAppAttempts = appState.getApplicationSubmissionContext()
.getMaxAppAttempts();
int maxAppAttempts;
if (individualMaxAppAttempts <= 0 ||
individualMaxAppAttempts > globalMaxAppAttempts) {
maxAppAttempts = globalMaxAppAttempts;
LOG.warn("The specific max attempts: " + individualMaxAppAttempts
+ " for application: " + appState.getAppId()
+ " is invalid, because it is out of the range [1, "
+ globalMaxAppAttempts + "]. Use the global max attempts instead.");
} else { } else {
maxAppAttempts = individualMaxAppAttempts;
}
if(appState.getAttemptCount() >= maxAppAttempts) {
LOG.info("Not recovering application " + appState.getAppId() +
" due to recovering attempt is beyond maxAppAttempt limit");
shouldRecover = false;
}
if(shouldRecover) {
LOG.info("Recovering application " + appState.getAppId()); LOG.info("Recovering application " + appState.getAppId());
submitApplication(appState.getApplicationSubmissionContext(), submitApplication(appState.getApplicationSubmissionContext(),
appState.getSubmitTime()); appState.getSubmitTime());
// re-populate attempt information in application // re-populate attempt information in application
RMAppImpl appImpl = (RMAppImpl) rmContext.getRMApps().get( RMAppImpl appImpl = (RMAppImpl) rmContext.getRMApps().get(
appState.getAppId()); appState.getAppId());
appImpl.recover(state); appImpl.recover(state);
} }
else {
store.removeApplication(appState);
}
} }
} }

View File

@ -128,21 +128,28 @@ public RMApp submitApp(int masterMemory) throws Exception {
// client // client
public RMApp submitApp(int masterMemory, String name, String user) throws Exception { public RMApp submitApp(int masterMemory, String name, String user) throws Exception {
return submitApp(masterMemory, name, user, null, false, null); return submitApp(masterMemory, name, user, null, false, null,
super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));
} }
public RMApp submitApp(int masterMemory, String name, String user, public RMApp submitApp(int masterMemory, String name, String user,
Map<ApplicationAccessType, String> acls) throws Exception { Map<ApplicationAccessType, String> acls) throws Exception {
return submitApp(masterMemory, name, user, acls, false, null); return submitApp(masterMemory, name, user, acls, false, null,
super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));
} }
public RMApp submitApp(int masterMemory, String name, String user, public RMApp submitApp(int masterMemory, String name, String user,
Map<ApplicationAccessType, String> acls, String queue) throws Exception { Map<ApplicationAccessType, String> acls, String queue) throws Exception {
return submitApp(masterMemory, name, user, acls, false, queue); return submitApp(masterMemory, name, user, acls, false, queue,
super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));
} }
public RMApp submitApp(int masterMemory, String name, String user, public RMApp submitApp(int masterMemory, String name, String user,
Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue) throws Exception { Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,
int maxAppAttempts) throws Exception {
ClientRMProtocol client = getClientRMService(); ClientRMProtocol client = getClientRMService();
GetNewApplicationResponse resp = client.getNewApplication(Records GetNewApplicationResponse resp = client.getNewApplication(Records
.newRecord(GetNewApplicationRequest.class)); .newRecord(GetNewApplicationRequest.class));
@ -155,6 +162,7 @@ public RMApp submitApp(int masterMemory, String name, String user,
sub.setApplicationId(appId); sub.setApplicationId(appId);
sub.setApplicationName(name); sub.setApplicationName(name);
sub.setUser(user); sub.setUser(user);
sub.setMaxAppAttempts(maxAppAttempts);
if(unmanaged) { if(unmanaged) {
sub.setUnmanagedAM(true); sub.setUnmanagedAM(true);
} }

View File

@ -19,11 +19,13 @@
package org.apache.hadoop.yarn.server.resourcemanager; package org.apache.hadoop.yarn.server.resourcemanager;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
@ -62,6 +64,7 @@ public void testRMRestart() throws Exception {
"org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore"); "org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore");
conf.set(YarnConfiguration.RM_SCHEDULER, conf.set(YarnConfiguration.RM_SCHEDULER,
"org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler"); "org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler");
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 5);
MemoryRMStateStore memStore = new MemoryRMStateStore(); MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf); memStore.init(conf);
@ -152,7 +155,9 @@ public void testRMRestart() throws Exception {
.getApplicationId()); .getApplicationId());
// create unmanaged app // create unmanaged app
RMApp appUnmanaged = rm1.submitApp(200, "someApp", "someUser", null, true, null); RMApp appUnmanaged = rm1.submitApp(200, "someApp", "someUser", null, true,
null, conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));
ApplicationAttemptId unmanagedAttemptId = ApplicationAttemptId unmanagedAttemptId =
appUnmanaged.getCurrentAppAttempt().getAppAttemptId(); appUnmanaged.getCurrentAppAttempt().getAppAttemptId();
// assert appUnmanaged info is saved // assert appUnmanaged info is saved
@ -306,4 +311,74 @@ public void testRMRestart() throws Exception {
Assert.assertEquals(0, rmAppState.size()); Assert.assertEquals(0, rmAppState.size());
} }
@Test
public void testRMRestartOnMaxAppAttempts() throws Exception {
Logger rootLogger = LogManager.getRootLogger();
rootLogger.setLevel(Level.DEBUG);
ExitUtil.disableSystemExit();
YarnConfiguration conf = new YarnConfiguration();
conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
conf.set(YarnConfiguration.RM_STORE,
"org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore");
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
RMState rmState = memStore.getState();
Map<ApplicationId, ApplicationState> rmAppState =
rmState.getApplicationState();
MockRM rm1 = new MockRM(conf, memStore);
rm1.start();
MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService());
nm1.registerNode();
// submit an app with maxAppAttempts equals to 1
RMApp app1 = rm1.submitApp(200, "name", "user",
new HashMap<ApplicationAccessType, String>(), false, "default", 1);
// submit an app with maxAppAttempts equals to -1
RMApp app2 = rm1.submitApp(200, "name", "user",
new HashMap<ApplicationAccessType, String>(), false, "default", -1);
// assert app1 info is saved
ApplicationState appState = rmAppState.get(app1.getApplicationId());
Assert.assertNotNull(appState);
Assert.assertEquals(0, appState.getAttemptCount());
Assert.assertEquals(appState.getApplicationSubmissionContext()
.getApplicationId(), app1.getApplicationSubmissionContext()
.getApplicationId());
// Allocate the AM
nm1.nodeHeartbeat(true);
RMAppAttempt attempt = app1.getCurrentAppAttempt();
ApplicationAttemptId attemptId1 = attempt.getAppAttemptId();
rm1.waitForState(attemptId1, RMAppAttemptState.ALLOCATED);
Assert.assertEquals(1, appState.getAttemptCount());
ApplicationAttemptState attemptState =
appState.getAttempt(attemptId1);
Assert.assertNotNull(attemptState);
Assert.assertEquals(BuilderUtils.newContainerId(attemptId1, 1),
attemptState.getMasterContainer().getId());
rm1.stop();
// start new RM
MockRM rm2 = new MockRM(conf, memStore);
rm2.start();
// verify that maxAppAttempts is set to global value
Assert.assertEquals(2,
rm2.getRMContext().getRMApps().get(app2.getApplicationId())
.getMaxAppAttempts());
// verify that app2 exists app1 is removed
Assert.assertEquals(1, rm2.getRMContext().getRMApps().size());
Assert.assertNotNull(rm2.getRMContext().getRMApps()
.get(app2.getApplicationId()));
Assert.assertNull(rm2.getRMContext().getRMApps()
.get(app1.getApplicationId()));
// stop the RM
rm2.stop();
}
} }