YARN-6153. KeepContainer does not work when AM retry window is set. Contributed by kyungwan nam
This commit is contained in:
parent
e0bb867c3f
commit
235203dffd
@ -994,13 +994,7 @@ private void createNewAttempt(ApplicationAttemptId appAttemptId) {
|
|||||||
}
|
}
|
||||||
RMAppAttempt attempt =
|
RMAppAttempt attempt =
|
||||||
new RMAppAttemptImpl(appAttemptId, rmContext, scheduler, masterService,
|
new RMAppAttemptImpl(appAttemptId, rmContext, scheduler, masterService,
|
||||||
submissionContext, conf,
|
submissionContext, conf, amReq, this, currentAMBlacklistManager);
|
||||||
// The newly created attempt maybe last attempt if (number of
|
|
||||||
// previously failed attempts(which should not include Preempted,
|
|
||||||
// hardware error and NM resync) + 1) equal to the max-attempt
|
|
||||||
// limit.
|
|
||||||
maxAppAttempts == (getNumFailedAppAttempts() + 1), amReq,
|
|
||||||
currentAMBlacklistManager);
|
|
||||||
attempts.put(appAttemptId, attempt);
|
attempts.put(appAttemptId, attempt);
|
||||||
currentAttempt = attempt;
|
currentAttempt = attempt;
|
||||||
}
|
}
|
||||||
@ -1498,18 +1492,13 @@ public void transition(RMAppImpl app, RMAppEvent event) {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
private int getNumFailedAppAttempts() {
|
public int getNumFailedAppAttempts() {
|
||||||
int completedAttempts = 0;
|
int completedAttempts = 0;
|
||||||
long endTime = this.systemClock.getTime();
|
|
||||||
// Do not count AM preemption, hardware failures or NM resync
|
// Do not count AM preemption, hardware failures or NM resync
|
||||||
// as attempt failure.
|
// as attempt failure.
|
||||||
for (RMAppAttempt attempt : attempts.values()) {
|
for (RMAppAttempt attempt : attempts.values()) {
|
||||||
if (attempt.shouldCountTowardsMaxAttemptRetry()) {
|
if (attempt.shouldCountTowardsMaxAttemptRetry()) {
|
||||||
if (this.attemptFailuresValidityInterval <= 0
|
completedAttempts++;
|
||||||
|| (attempt.getFinishTime() > endTime
|
|
||||||
- this.attemptFailuresValidityInterval)) {
|
|
||||||
completedAttempts++;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return completedAttempts;
|
return completedAttempts;
|
||||||
|
@ -143,6 +143,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||||||
private final EventHandler eventHandler;
|
private final EventHandler eventHandler;
|
||||||
private final YarnScheduler scheduler;
|
private final YarnScheduler scheduler;
|
||||||
private final ApplicationMasterService masterService;
|
private final ApplicationMasterService masterService;
|
||||||
|
private final RMApp rmApp;
|
||||||
|
|
||||||
private final ReadLock readLock;
|
private final ReadLock readLock;
|
||||||
private final WriteLock writeLock;
|
private final WriteLock writeLock;
|
||||||
@ -179,12 +180,6 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
|||||||
private int amContainerExitStatus = ContainerExitStatus.INVALID;
|
private int amContainerExitStatus = ContainerExitStatus.INVALID;
|
||||||
|
|
||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
// Since AM preemption, hardware error and NM resync are not counted towards
|
|
||||||
// AM failure count, even if this flag is true, a new attempt can still be
|
|
||||||
// re-created if this attempt is eventually failed because of preemption,
|
|
||||||
// hardware error or NM resync. So this flag indicates that this may be
|
|
||||||
// last attempt.
|
|
||||||
private final boolean maybeLastAttempt;
|
|
||||||
private static final ExpiredTransition EXPIRED_TRANSITION =
|
private static final ExpiredTransition EXPIRED_TRANSITION =
|
||||||
new ExpiredTransition();
|
new ExpiredTransition();
|
||||||
private static final AttemptFailedTransition FAILED_TRANSITION =
|
private static final AttemptFailedTransition FAILED_TRANSITION =
|
||||||
@ -490,16 +485,16 @@ public RMAppAttemptImpl(ApplicationAttemptId appAttemptId,
|
|||||||
RMContext rmContext, YarnScheduler scheduler,
|
RMContext rmContext, YarnScheduler scheduler,
|
||||||
ApplicationMasterService masterService,
|
ApplicationMasterService masterService,
|
||||||
ApplicationSubmissionContext submissionContext,
|
ApplicationSubmissionContext submissionContext,
|
||||||
Configuration conf, boolean maybeLastAttempt, ResourceRequest amReq) {
|
Configuration conf, ResourceRequest amReq, RMApp rmApp) {
|
||||||
this(appAttemptId, rmContext, scheduler, masterService, submissionContext,
|
this(appAttemptId, rmContext, scheduler, masterService, submissionContext,
|
||||||
conf, maybeLastAttempt, amReq, new DisabledBlacklistManager());
|
conf, amReq, rmApp, new DisabledBlacklistManager());
|
||||||
}
|
}
|
||||||
|
|
||||||
public RMAppAttemptImpl(ApplicationAttemptId appAttemptId,
|
public RMAppAttemptImpl(ApplicationAttemptId appAttemptId,
|
||||||
RMContext rmContext, YarnScheduler scheduler,
|
RMContext rmContext, YarnScheduler scheduler,
|
||||||
ApplicationMasterService masterService,
|
ApplicationMasterService masterService,
|
||||||
ApplicationSubmissionContext submissionContext,
|
ApplicationSubmissionContext submissionContext,
|
||||||
Configuration conf, boolean maybeLastAttempt, ResourceRequest amReq,
|
Configuration conf, ResourceRequest amReq, RMApp rmApp,
|
||||||
BlacklistManager amBlacklistManager) {
|
BlacklistManager amBlacklistManager) {
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
this.applicationAttemptId = appAttemptId;
|
this.applicationAttemptId = appAttemptId;
|
||||||
@ -514,7 +509,6 @@ public RMAppAttemptImpl(ApplicationAttemptId appAttemptId,
|
|||||||
this.writeLock = lock.writeLock();
|
this.writeLock = lock.writeLock();
|
||||||
|
|
||||||
this.proxiedTrackingUrl = generateProxyUriWithScheme();
|
this.proxiedTrackingUrl = generateProxyUriWithScheme();
|
||||||
this.maybeLastAttempt = maybeLastAttempt;
|
|
||||||
this.stateMachine = stateMachineFactory.make(this);
|
this.stateMachine = stateMachineFactory.make(this);
|
||||||
|
|
||||||
this.attemptMetrics =
|
this.attemptMetrics =
|
||||||
@ -531,6 +525,7 @@ public RMAppAttemptImpl(ApplicationAttemptId appAttemptId,
|
|||||||
}
|
}
|
||||||
|
|
||||||
this.diagnostics = new BoundedAppender(diagnosticsLimitKC * 1024);
|
this.diagnostics = new BoundedAppender(diagnosticsLimitKC * 1024);
|
||||||
|
this.rmApp = rmApp;
|
||||||
}
|
}
|
||||||
|
|
||||||
private int getDiagnosticsLimitKCOrThrow(final Configuration configuration) {
|
private int getDiagnosticsLimitKCOrThrow(final Configuration configuration) {
|
||||||
@ -1215,8 +1210,7 @@ private static class AttemptRecoveredTransition
|
|||||||
@Override
|
@Override
|
||||||
public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
|
public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
|
||||||
RMAppAttemptEvent event) {
|
RMAppAttemptEvent event) {
|
||||||
RMApp rmApp = appAttempt.rmContext.getRMApps().get(
|
RMApp rmApp = appAttempt.rmApp;
|
||||||
appAttempt.getAppAttemptId().getApplicationId());
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* If last attempt recovered final state is null .. it means attempt was
|
* If last attempt recovered final state is null .. it means attempt was
|
||||||
@ -1462,14 +1456,9 @@ public void transition(RMAppAttemptImpl appAttempt,
|
|||||||
if (appAttempt.submissionContext
|
if (appAttempt.submissionContext
|
||||||
.getKeepContainersAcrossApplicationAttempts()
|
.getKeepContainersAcrossApplicationAttempts()
|
||||||
&& !appAttempt.submissionContext.getUnmanagedAM()) {
|
&& !appAttempt.submissionContext.getUnmanagedAM()) {
|
||||||
// See if we should retain containers for non-unmanaged applications
|
int numberOfFailure = ((RMAppImpl)appAttempt.rmApp)
|
||||||
if (!appAttempt.shouldCountTowardsMaxAttemptRetry()) {
|
.getNumFailedAppAttempts();
|
||||||
// Premption, hardware failures, NM resync doesn't count towards
|
if (numberOfFailure < appAttempt.rmApp.getMaxAppAttempts()) {
|
||||||
// app-failures and so we should retain containers.
|
|
||||||
keepContainersAcrossAppAttempts = true;
|
|
||||||
} else if (!appAttempt.maybeLastAttempt) {
|
|
||||||
// Not preemption, hardware failures or NM resync.
|
|
||||||
// Not last-attempt too - keep containers.
|
|
||||||
keepContainersAcrossAppAttempts = true;
|
keepContainersAcrossAppAttempts = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1496,9 +1485,7 @@ public void transition(RMAppAttemptImpl appAttempt,
|
|||||||
.applicationAttemptFinished(appAttempt, finalAttemptState);
|
.applicationAttemptFinished(appAttempt, finalAttemptState);
|
||||||
appAttempt.rmContext.getSystemMetricsPublisher()
|
appAttempt.rmContext.getSystemMetricsPublisher()
|
||||||
.appAttemptFinished(appAttempt, finalAttemptState,
|
.appAttemptFinished(appAttempt, finalAttemptState,
|
||||||
appAttempt.rmContext.getRMApps().get(
|
appAttempt.rmApp, System.currentTimeMillis());
|
||||||
appAttempt.applicationAttemptId.getApplicationId()),
|
|
||||||
System.currentTimeMillis());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1545,6 +1532,14 @@ public void transition(RMAppAttemptImpl appAttempt,
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean shouldCountTowardsMaxAttemptRetry() {
|
public boolean shouldCountTowardsMaxAttemptRetry() {
|
||||||
|
long attemptFailuresValidityInterval = this.submissionContext
|
||||||
|
.getAttemptFailuresValidityInterval();
|
||||||
|
long end = System.currentTimeMillis();
|
||||||
|
if (attemptFailuresValidityInterval > 0
|
||||||
|
&& this.getFinishTime() > 0
|
||||||
|
&& this.getFinishTime() < (end - attemptFailuresValidityInterval)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
try {
|
try {
|
||||||
this.readLock.lock();
|
this.readLock.lock();
|
||||||
int exitStatus = getAMContainerExitStatus();
|
int exitStatus = getAMContainerExitStatus();
|
||||||
@ -2222,11 +2217,6 @@ public ApplicationAttemptReport createApplicationAttemptReport() {
|
|||||||
return attemptReport;
|
return attemptReport;
|
||||||
}
|
}
|
||||||
|
|
||||||
// for testing
|
|
||||||
public boolean mayBeLastAttempt() {
|
|
||||||
return maybeLastAttempt;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public RMAppAttemptMetrics getRMAppAttemptMetrics() {
|
public RMAppAttemptMetrics getRMAppAttemptMetrics() {
|
||||||
// didn't use read/write lock here because RMAppAttemptMetrics has its own
|
// didn't use read/write lock here because RMAppAttemptMetrics has its own
|
||||||
|
@ -618,15 +618,15 @@ public RMApp submitApp(int masterMemory, String name, String user,
|
|||||||
false, null, 0, null, true, Priority.newInstance(0));
|
false, null, 0, null, true, Priority.newInstance(0));
|
||||||
}
|
}
|
||||||
|
|
||||||
public RMApp submitApp(int masterMemory, long attemptFailuresValidityInterval)
|
public RMApp submitApp(int masterMemory, long attemptFailuresValidityInterval,
|
||||||
throws Exception {
|
boolean keepContainers) throws Exception {
|
||||||
Resource resource = Records.newRecord(Resource.class);
|
Resource resource = Records.newRecord(Resource.class);
|
||||||
resource.setMemorySize(masterMemory);
|
resource.setMemorySize(masterMemory);
|
||||||
Priority priority = Priority.newInstance(0);
|
Priority priority = Priority.newInstance(0);
|
||||||
return submitApp(resource, "", UserGroupInformation.getCurrentUser()
|
return submitApp(resource, "", UserGroupInformation.getCurrentUser()
|
||||||
.getShortUserName(), null, false, null,
|
.getShortUserName(), null, false, null,
|
||||||
super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
||||||
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, false,
|
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, keepContainers,
|
||||||
false, null, attemptFailuresValidityInterval, null, true, priority);
|
false, null, attemptFailuresValidityInterval, null, true, priority);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -396,7 +396,7 @@ public void handle(Event event) {
|
|||||||
mock(ApplicationSubmissionContext.class);
|
mock(ApplicationSubmissionContext.class);
|
||||||
YarnConfiguration config = new YarnConfiguration();
|
YarnConfiguration config = new YarnConfiguration();
|
||||||
RMAppAttemptImpl rmAppAttemptImpl = new RMAppAttemptImpl(attemptId,
|
RMAppAttemptImpl rmAppAttemptImpl = new RMAppAttemptImpl(attemptId,
|
||||||
rmContext, yarnScheduler, null, asContext, config, false, null);
|
rmContext, yarnScheduler, null, asContext, config, null, null);
|
||||||
ApplicationResourceUsageReport report = rmAppAttemptImpl
|
ApplicationResourceUsageReport report = rmAppAttemptImpl
|
||||||
.getApplicationResourceUsageReport();
|
.getApplicationResourceUsageReport();
|
||||||
assertEquals(report, RMServerUtils.DUMMY_APPLICATION_RESOURCE_USAGE_REPORT);
|
assertEquals(report, RMServerUtils.DUMMY_APPLICATION_RESOURCE_USAGE_REPORT);
|
||||||
@ -1327,7 +1327,7 @@ public ApplicationReport createAndGetApplicationReport(
|
|||||||
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(
|
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(
|
||||||
ApplicationId.newInstance(123456, 1), 1);
|
ApplicationId.newInstance(123456, 1), 1);
|
||||||
RMAppAttemptImpl rmAppAttemptImpl = spy(new RMAppAttemptImpl(attemptId,
|
RMAppAttemptImpl rmAppAttemptImpl = spy(new RMAppAttemptImpl(attemptId,
|
||||||
rmContext, yarnScheduler, null, asContext, config, false, null));
|
rmContext, yarnScheduler, null, asContext, config, null, app));
|
||||||
Container container = Container.newInstance(
|
Container container = Container.newInstance(
|
||||||
ContainerId.newContainerId(attemptId, 1), null, "", null, null, null);
|
ContainerId.newContainerId(attemptId, 1), null, "", null, null, null);
|
||||||
RMContainerImpl containerimpl = spy(new RMContainerImpl(container,
|
RMContainerImpl containerimpl = spy(new RMContainerImpl(container,
|
||||||
|
@ -411,7 +411,6 @@ public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception {
|
|||||||
MockAM am2 =
|
MockAM am2 =
|
||||||
rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 2, nm1);
|
rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 2, nm1);
|
||||||
RMAppAttempt attempt2 = app1.getCurrentAppAttempt();
|
RMAppAttempt attempt2 = app1.getCurrentAppAttempt();
|
||||||
Assert.assertTrue(((RMAppAttemptImpl) attempt2).mayBeLastAttempt());
|
|
||||||
|
|
||||||
// Preempt the second attempt.
|
// Preempt the second attempt.
|
||||||
ContainerId amContainer2 =
|
ContainerId amContainer2 =
|
||||||
@ -427,7 +426,6 @@ public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception {
|
|||||||
MockAM am3 =
|
MockAM am3 =
|
||||||
rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 3, nm1);
|
rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 3, nm1);
|
||||||
RMAppAttempt attempt3 = app1.getCurrentAppAttempt();
|
RMAppAttempt attempt3 = app1.getCurrentAppAttempt();
|
||||||
Assert.assertTrue(((RMAppAttemptImpl) attempt3).mayBeLastAttempt());
|
|
||||||
|
|
||||||
// mimic NM disk_failure
|
// mimic NM disk_failure
|
||||||
ContainerStatus containerStatus = Records.newRecord(ContainerStatus.class);
|
ContainerStatus containerStatus = Records.newRecord(ContainerStatus.class);
|
||||||
@ -454,7 +452,6 @@ public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception {
|
|||||||
MockAM am4 =
|
MockAM am4 =
|
||||||
rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 4, nm1);
|
rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 4, nm1);
|
||||||
RMAppAttempt attempt4 = app1.getCurrentAppAttempt();
|
RMAppAttempt attempt4 = app1.getCurrentAppAttempt();
|
||||||
Assert.assertTrue(((RMAppAttemptImpl) attempt4).mayBeLastAttempt());
|
|
||||||
|
|
||||||
// create second NM, and register to rm1
|
// create second NM, and register to rm1
|
||||||
MockNM nm2 =
|
MockNM nm2 =
|
||||||
@ -475,7 +472,6 @@ public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception {
|
|||||||
MockAM am5 =
|
MockAM am5 =
|
||||||
rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 5, nm2);
|
rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 5, nm2);
|
||||||
RMAppAttempt attempt5 = app1.getCurrentAppAttempt();
|
RMAppAttempt attempt5 = app1.getCurrentAppAttempt();
|
||||||
Assert.assertTrue(((RMAppAttemptImpl) attempt5).mayBeLastAttempt());
|
|
||||||
// fail the AM normally
|
// fail the AM normally
|
||||||
nm2
|
nm2
|
||||||
.nodeHeartbeat(am5.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
.nodeHeartbeat(am5.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
||||||
@ -584,7 +580,6 @@ public void testRMRestartOrFailoverNotCountedForAMFailures()
|
|||||||
// AM should be restarted even though max-am-attempt is 1.
|
// AM should be restarted even though max-am-attempt is 1.
|
||||||
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||||
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
|
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
|
||||||
Assert.assertTrue(((RMAppAttemptImpl) attempt1).mayBeLastAttempt());
|
|
||||||
|
|
||||||
// Restart rm.
|
// Restart rm.
|
||||||
MockRM rm2 = new MockRM(conf, memStore);
|
MockRM rm2 = new MockRM(conf, memStore);
|
||||||
@ -645,7 +640,7 @@ public void testRMAppAttemptFailuresValidityInterval() throws Exception {
|
|||||||
// set window size to a larger number : 60s
|
// set window size to a larger number : 60s
|
||||||
// we will verify the app should be failed if
|
// we will verify the app should be failed if
|
||||||
// two continuous attempts failed in 60s.
|
// two continuous attempts failed in 60s.
|
||||||
RMApp app = rm1.submitApp(200, 60000);
|
RMApp app = rm1.submitApp(200, 60000, false);
|
||||||
|
|
||||||
MockAM am = MockRM.launchAM(app, rm1, nm1);
|
MockAM am = MockRM.launchAM(app, rm1, nm1);
|
||||||
// Fail current attempt normally
|
// Fail current attempt normally
|
||||||
@ -655,8 +650,7 @@ public void testRMAppAttemptFailuresValidityInterval() throws Exception {
|
|||||||
// launch the second attempt
|
// launch the second attempt
|
||||||
rm1.waitForState(app.getApplicationId(), RMAppState.ACCEPTED);
|
rm1.waitForState(app.getApplicationId(), RMAppState.ACCEPTED);
|
||||||
Assert.assertEquals(2, app.getAppAttempts().size());
|
Assert.assertEquals(2, app.getAppAttempts().size());
|
||||||
Assert.assertTrue(((RMAppAttemptImpl) app.getCurrentAppAttempt())
|
|
||||||
.mayBeLastAttempt());
|
|
||||||
MockAM am_2 = MockRM.launchAndRegisterAM(app, rm1, nm1);
|
MockAM am_2 = MockRM.launchAndRegisterAM(app, rm1, nm1);
|
||||||
rm1.waitForState(am_2.getApplicationAttemptId(), RMAppAttemptState.RUNNING);
|
rm1.waitForState(am_2.getApplicationAttemptId(), RMAppAttemptState.RUNNING);
|
||||||
nm1.nodeHeartbeat(am_2.getApplicationAttemptId(),
|
nm1.nodeHeartbeat(am_2.getApplicationAttemptId(),
|
||||||
@ -667,7 +661,7 @@ public void testRMAppAttemptFailuresValidityInterval() throws Exception {
|
|||||||
|
|
||||||
ControlledClock clock = new ControlledClock();
|
ControlledClock clock = new ControlledClock();
|
||||||
// set window size to 10s
|
// set window size to 10s
|
||||||
RMAppImpl app1 = (RMAppImpl)rm1.submitApp(200, 10000);
|
RMAppImpl app1 = (RMAppImpl)rm1.submitApp(200, 10000, false);
|
||||||
app1.setSystemClock(clock);
|
app1.setSystemClock(clock);
|
||||||
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||||
|
|
||||||
@ -684,7 +678,6 @@ public void testRMAppAttemptFailuresValidityInterval() throws Exception {
|
|||||||
Assert.assertEquals(2, app1.getAppAttempts().size());
|
Assert.assertEquals(2, app1.getAppAttempts().size());
|
||||||
|
|
||||||
RMAppAttempt attempt2 = app1.getCurrentAppAttempt();
|
RMAppAttempt attempt2 = app1.getCurrentAppAttempt();
|
||||||
Assert.assertTrue(((RMAppAttemptImpl) attempt2).mayBeLastAttempt());
|
|
||||||
MockAM am2 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
MockAM am2 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||||
rm1.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.RUNNING);
|
rm1.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.RUNNING);
|
||||||
|
|
||||||
@ -863,4 +856,75 @@ public void testAMRestartNotLostContainerCompleteMsg() throws Exception {
|
|||||||
|
|
||||||
rm1.stop();
|
rm1.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Test restarting AM launched with the KeepContainers and AM reset window.
|
||||||
|
// after AM reset window, even if AM who was the last is failed,
|
||||||
|
// all containers are launched by previous AM should be kept.
|
||||||
|
@Test (timeout = 20000)
|
||||||
|
public void testAMRestartNotLostContainerAfterAttemptFailuresValidityInterval()
|
||||||
|
throws Exception {
|
||||||
|
YarnConfiguration conf = new YarnConfiguration();
|
||||||
|
// explicitly set max-am-retry count as 2.
|
||||||
|
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
|
||||||
|
|
||||||
|
MockRM rm1 = new MockRM(conf);
|
||||||
|
rm1.start();
|
||||||
|
MockNM nm1 =
|
||||||
|
new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService());
|
||||||
|
nm1.registerNode();
|
||||||
|
|
||||||
|
// set window size to 10s and enable keepContainers
|
||||||
|
RMAppImpl app1 = (RMAppImpl)rm1.submitApp(200, 10000, true);
|
||||||
|
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||||
|
int NUM_CONTAINERS = 2;
|
||||||
|
allocateContainers(nm1, am1, NUM_CONTAINERS);
|
||||||
|
|
||||||
|
// launch the 2nd container, for testing running container transferred.
|
||||||
|
nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2, ContainerState.RUNNING);
|
||||||
|
ContainerId containerId2 =
|
||||||
|
ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
|
||||||
|
rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING);
|
||||||
|
|
||||||
|
// Fail attempt1 normally
|
||||||
|
nm1.nodeHeartbeat(am1.getApplicationAttemptId(),
|
||||||
|
1, ContainerState.COMPLETE);
|
||||||
|
rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
||||||
|
|
||||||
|
// launch the second attempt
|
||||||
|
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
||||||
|
Assert.assertEquals(2, app1.getAppAttempts().size());
|
||||||
|
|
||||||
|
// It will be the last attempt.
|
||||||
|
RMAppAttempt attempt2 = app1.getCurrentAppAttempt();
|
||||||
|
MockAM am2 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||||
|
rm1.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.RUNNING);
|
||||||
|
|
||||||
|
// wait for 10 seconds to reset AM failure count
|
||||||
|
Thread.sleep(10 * 1000);
|
||||||
|
|
||||||
|
// Fail attempt2 normally
|
||||||
|
nm1.nodeHeartbeat(am2.getApplicationAttemptId(),
|
||||||
|
1, ContainerState.COMPLETE);
|
||||||
|
rm1.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
||||||
|
|
||||||
|
// can launch the third attempt successfully
|
||||||
|
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
||||||
|
Assert.assertEquals(3, app1.getAppAttempts().size());
|
||||||
|
MockAM am3 = rm1.launchAM(app1, rm1, nm1);
|
||||||
|
RegisterApplicationMasterResponse registerResponse =
|
||||||
|
am3.registerAppAttempt();
|
||||||
|
|
||||||
|
// keepContainers is applied, even if attempt2 was the last attempt.
|
||||||
|
Assert.assertEquals(1, registerResponse.getContainersFromPreviousAttempts()
|
||||||
|
.size());
|
||||||
|
boolean containerId2Exists = false;
|
||||||
|
Container container = registerResponse.getContainersFromPreviousAttempts().get(0);
|
||||||
|
if (container.getId().equals(containerId2)) {
|
||||||
|
containerId2Exists = true;
|
||||||
|
}
|
||||||
|
Assert.assertTrue(containerId2Exists);
|
||||||
|
|
||||||
|
rm1.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
|
||||||
|
rm1.stop();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -106,6 +106,6 @@ private RMAppAttemptImpl createRMAppAttemptImpl(
|
|||||||
when(mockRMContext.getDispatcher()).thenReturn(mockDispatcher);
|
when(mockRMContext.getDispatcher()).thenReturn(mockDispatcher);
|
||||||
|
|
||||||
return new RMAppAttemptImpl(mockApplicationAttemptId, mockRMContext, null,
|
return new RMAppAttemptImpl(mockApplicationAttemptId, mockRMContext, null,
|
||||||
null, null, configuration, false, null);
|
null, null, configuration, null, null);
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -327,10 +327,10 @@ public void setUp() throws Exception {
|
|||||||
application = mock(RMAppImpl.class);
|
application = mock(RMAppImpl.class);
|
||||||
applicationAttempt =
|
applicationAttempt =
|
||||||
new RMAppAttemptImpl(applicationAttemptId, spyRMContext, scheduler,
|
new RMAppAttemptImpl(applicationAttemptId, spyRMContext, scheduler,
|
||||||
masterService, submissionContext, new Configuration(), false,
|
masterService, submissionContext, new Configuration(),
|
||||||
BuilderUtils.newResourceRequest(
|
BuilderUtils.newResourceRequest(
|
||||||
RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY,
|
RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY,
|
||||||
submissionContext.getResource(), 1));
|
submissionContext.getResource(), 1), application);
|
||||||
|
|
||||||
when(application.getCurrentAppAttempt()).thenReturn(applicationAttempt);
|
when(application.getCurrentAppAttempt()).thenReturn(applicationAttempt);
|
||||||
when(application.getApplicationId()).thenReturn(applicationId);
|
when(application.getApplicationId()).thenReturn(applicationId);
|
||||||
@ -1107,10 +1107,10 @@ public void testLaunchedFailWhileAHSEnabled() {
|
|||||||
RMAppAttempt myApplicationAttempt =
|
RMAppAttempt myApplicationAttempt =
|
||||||
new RMAppAttemptImpl(applicationAttempt.getAppAttemptId(),
|
new RMAppAttemptImpl(applicationAttempt.getAppAttemptId(),
|
||||||
spyRMContext, scheduler,masterService,
|
spyRMContext, scheduler,masterService,
|
||||||
submissionContext, myConf, false,
|
submissionContext, myConf,
|
||||||
BuilderUtils.newResourceRequest(
|
BuilderUtils.newResourceRequest(
|
||||||
RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY,
|
RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY,
|
||||||
submissionContext.getResource(), 1));
|
submissionContext.getResource(), 1), application);
|
||||||
|
|
||||||
//submit, schedule and allocate app attempt
|
//submit, schedule and allocate app attempt
|
||||||
myApplicationAttempt.handle(
|
myApplicationAttempt.handle(
|
||||||
@ -1536,6 +1536,9 @@ public void testFailedToFailed() {
|
|||||||
// create a failed attempt.
|
// create a failed attempt.
|
||||||
when(submissionContext.getKeepContainersAcrossApplicationAttempts())
|
when(submissionContext.getKeepContainersAcrossApplicationAttempts())
|
||||||
.thenReturn(true);
|
.thenReturn(true);
|
||||||
|
when(application.getMaxAppAttempts()).thenReturn(2);
|
||||||
|
when(application.getNumFailedAppAttempts()).thenReturn(1);
|
||||||
|
|
||||||
Container amContainer = allocateApplicationAttempt();
|
Container amContainer = allocateApplicationAttempt();
|
||||||
launchApplicationAttempt(amContainer);
|
launchApplicationAttempt(amContainer);
|
||||||
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false);
|
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false);
|
||||||
@ -1581,9 +1584,9 @@ public void testContainersCleanupForLastAttempt() {
|
|||||||
applicationAttempt =
|
applicationAttempt =
|
||||||
new RMAppAttemptImpl(applicationAttempt.getAppAttemptId(), spyRMContext,
|
new RMAppAttemptImpl(applicationAttempt.getAppAttemptId(), spyRMContext,
|
||||||
scheduler, masterService, submissionContext, new Configuration(),
|
scheduler, masterService, submissionContext, new Configuration(),
|
||||||
true, BuilderUtils.newResourceRequest(
|
BuilderUtils.newResourceRequest(
|
||||||
RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY,
|
RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY,
|
||||||
submissionContext.getResource(), 1));
|
submissionContext.getResource(), 1), application);
|
||||||
when(submissionContext.getKeepContainersAcrossApplicationAttempts())
|
when(submissionContext.getKeepContainersAcrossApplicationAttempts())
|
||||||
.thenReturn(true);
|
.thenReturn(true);
|
||||||
when(submissionContext.getMaxAppAttempts()).thenReturn(1);
|
when(submissionContext.getMaxAppAttempts()).thenReturn(1);
|
||||||
@ -1642,9 +1645,9 @@ public Allocation answer(InvocationOnMock invocation)
|
|||||||
applicationAttempt =
|
applicationAttempt =
|
||||||
new RMAppAttemptImpl(applicationAttempt.getAppAttemptId(),
|
new RMAppAttemptImpl(applicationAttempt.getAppAttemptId(),
|
||||||
spyRMContext, scheduler, masterService, submissionContext,
|
spyRMContext, scheduler, masterService, submissionContext,
|
||||||
new Configuration(), true, ResourceRequest.newInstance(
|
new Configuration(), ResourceRequest.newInstance(
|
||||||
Priority.UNDEFINED, "host1", Resource.newInstance(3333, 1), 3,
|
Priority.UNDEFINED, "host1", Resource.newInstance(3333, 1), 3,
|
||||||
false, "label-expression"));
|
false, "label-expression"), application);
|
||||||
new RMAppAttemptImpl.ScheduleTransition().transition(
|
new RMAppAttemptImpl.ScheduleTransition().transition(
|
||||||
(RMAppAttemptImpl) applicationAttempt, null);
|
(RMAppAttemptImpl) applicationAttempt, null);
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user