YARN-614. Changed ResourceManager to not count disk failure, node loss and RM restart towards app failures. Contributed by Xuan Gong
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1606407 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
94a1462bd5
commit
b717d44b52
@ -195,6 +195,9 @@ Release 2.5.0 - UNRELEASED
|
||||
YARN-2171. Improved CapacityScheduling to not lock on nodemanager-count when
|
||||
AMs heartbeat in. (Jason Lowe via vinodkv)
|
||||
|
||||
YARN-614. Changed ResourceManager to not count disk failure, node loss and
|
||||
RM restart towards app failures. (Xuan Gong via jianhe)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
@ -687,9 +687,10 @@ private void createNewAttempt() {
|
||||
new RMAppAttemptImpl(appAttemptId, rmContext, scheduler, masterService,
|
||||
submissionContext, conf,
|
||||
// The newly created attempt maybe last attempt if (number of
|
||||
// previously NonPreempted attempts + 1) equal to the max-attempt
|
||||
// previously failed attempts(which should not include Preempted,
|
||||
// hardware error and NM resync) + 1) equal to the max-attempt
|
||||
// limit.
|
||||
maxAppAttempts == (getNumNonPreemptedAppAttempts() + 1));
|
||||
maxAppAttempts == (getNumFailedAppAttempts() + 1));
|
||||
attempts.put(appAttemptId, attempt);
|
||||
currentAttempt = attempt;
|
||||
}
|
||||
@ -797,7 +798,7 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) {
|
||||
&& (app.currentAttempt.getState() == RMAppAttemptState.KILLED
|
||||
|| app.currentAttempt.getState() == RMAppAttemptState.FINISHED
|
||||
|| (app.currentAttempt.getState() == RMAppAttemptState.FAILED
|
||||
&& app.getNumNonPreemptedAppAttempts() == app.maxAppAttempts))) {
|
||||
&& app.getNumFailedAppAttempts() == app.maxAppAttempts))) {
|
||||
return RMAppState.ACCEPTED;
|
||||
}
|
||||
|
||||
@ -888,7 +889,7 @@ private String getAppAttemptFailedDiagnostics(RMAppEvent event) {
|
||||
msg = "Unmanaged application " + this.getApplicationId()
|
||||
+ " failed due to " + failedEvent.getDiagnostics()
|
||||
+ ". Failing the application.";
|
||||
} else if (getNumNonPreemptedAppAttempts() >= this.maxAppAttempts) {
|
||||
} else if (getNumFailedAppAttempts() >= this.maxAppAttempts) {
|
||||
msg = "Application " + this.getApplicationId() + " failed "
|
||||
+ this.maxAppAttempts + " times due to "
|
||||
+ failedEvent.getDiagnostics() + ". Failing the application.";
|
||||
@ -1105,11 +1106,12 @@ public void transition(RMAppImpl app, RMAppEvent event) {
|
||||
};
|
||||
}
|
||||
|
||||
private int getNumNonPreemptedAppAttempts() {
|
||||
private int getNumFailedAppAttempts() {
|
||||
int completedAttempts = 0;
|
||||
// Do not count AM preemption as attempt failure.
|
||||
// Do not count AM preemption, hardware failures or NM resync
|
||||
// as attempt failure.
|
||||
for (RMAppAttempt attempt : attempts.values()) {
|
||||
if (!attempt.isPreempted()) {
|
||||
if (attempt.shouldCountTowardsMaxAttemptRetry()) {
|
||||
completedAttempts++;
|
||||
}
|
||||
}
|
||||
@ -1129,7 +1131,7 @@ public AttemptFailedTransition(RMAppState initialState) {
|
||||
public RMAppState transition(RMAppImpl app, RMAppEvent event) {
|
||||
|
||||
if (!app.submissionContext.getUnmanagedAM()
|
||||
&& app.getNumNonPreemptedAppAttempts() < app.maxAppAttempts) {
|
||||
&& app.getNumFailedAppAttempts() < app.maxAppAttempts) {
|
||||
boolean transferStateFromPreviousAttempt = false;
|
||||
RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event;
|
||||
transferStateFromPreviousAttempt =
|
||||
|
@ -197,8 +197,14 @@ public interface RMAppAttempt extends EventHandler<RMAppAttemptEvent> {
|
||||
ApplicationAttemptReport createApplicationAttemptReport();
|
||||
|
||||
/**
|
||||
* Return the flag which indicates whether the attempt is preempted by the
|
||||
* scheduler.
|
||||
* Return the flag which indicates whether the attempt failure should be
|
||||
* counted to attempt retry count.
|
||||
* <ul>
|
||||
* There failure types should not be counted to attempt retry count:
|
||||
* <li>preempted by the scheduler.</li>
|
||||
* <li>hardware failures, such as NM failing, lost NM and NM disk errors.</li>
|
||||
* <li>killed by RM because of RM restart or failover.</li>
|
||||
* </ul>
|
||||
*/
|
||||
boolean isPreempted();
|
||||
boolean shouldCountTowardsMaxAttemptRetry();
|
||||
}
|
||||
|
@ -149,9 +149,10 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
||||
private int amContainerExitStatus = ContainerExitStatus.INVALID;
|
||||
|
||||
private Configuration conf;
|
||||
// Since AM preemption is not counted towards AM failure count,
|
||||
// even if this flag is true, a new attempt can still be re-created if this
|
||||
// attempt is eventually preempted. So this flag indicates that this may be
|
||||
// Since AM preemption, hardware error and NM resync are not counted towards
|
||||
// AM failure count, even if this flag is true, a new attempt can still be
|
||||
// re-created if this attempt is eventually failed because of preemption,
|
||||
// hardware error or NM resync. So this flag indicates that this may be
|
||||
// last attempt.
|
||||
private final boolean maybeLastAttempt;
|
||||
private static final ExpiredTransition EXPIRED_TRANSITION =
|
||||
@ -1087,12 +1088,13 @@ public void transition(RMAppAttemptImpl appAttempt,
|
||||
.getKeepContainersAcrossApplicationAttempts()
|
||||
&& !appAttempt.submissionContext.getUnmanagedAM()) {
|
||||
// See if we should retain containers for non-unmanaged applications
|
||||
if (appAttempt.isPreempted()) {
|
||||
// Premption doesn't count towards app-failures and so we should
|
||||
// retain containers.
|
||||
if (!appAttempt.shouldCountTowardsMaxAttemptRetry()) {
|
||||
// Premption, hardware failures, NM resync doesn't count towards
|
||||
// app-failures and so we should retain containers.
|
||||
keepContainersAcrossAppAttempts = true;
|
||||
} else if (!appAttempt.maybeLastAttempt) {
|
||||
// Not preemption. Not last-attempt too - keep containers.
|
||||
// Not preemption, hardware failures or NM resync.
|
||||
// Not last-attempt too - keep containers.
|
||||
keepContainersAcrossAppAttempts = true;
|
||||
}
|
||||
}
|
||||
@ -1136,8 +1138,17 @@ public void transition(RMAppAttemptImpl appAttempt,
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isPreempted() {
|
||||
return getAMContainerExitStatus() == ContainerExitStatus.PREEMPTED;
|
||||
public boolean shouldCountTowardsMaxAttemptRetry() {
|
||||
try {
|
||||
this.readLock.lock();
|
||||
int exitStatus = getAMContainerExitStatus();
|
||||
return !(exitStatus == ContainerExitStatus.PREEMPTED
|
||||
|| exitStatus == ContainerExitStatus.ABORTED
|
||||
|| exitStatus == ContainerExitStatus.DISKS_FAILED
|
||||
|| exitStatus == ContainerExitStatus.KILLED_BY_RESOURCEMANAGER);
|
||||
} finally {
|
||||
this.readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
private static final class UnmanagedAMAttemptSavedTransition
|
||||
|
@ -19,13 +19,16 @@
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
@ -34,6 +37,7 @@
|
||||
import org.apache.hadoop.yarn.api.records.NMToken;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||
@ -49,6 +53,7 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
@ -347,15 +352,20 @@ public void testNMTokensRebindOnAMRestart() throws Exception {
|
||||
rm1.stop();
|
||||
}
|
||||
|
||||
// AM container preempted should not be counted towards AM max retry count.
|
||||
@Test(timeout = 20000)
|
||||
public void testAMPreemptedNotCountedForAMFailures() throws Exception {
|
||||
// AM container preempted, nm disk failure
|
||||
// should not be counted towards AM max retry count.
|
||||
@Test(timeout = 100000)
|
||||
public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception {
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
||||
ResourceScheduler.class);
|
||||
// explicitly set max-am-retry count as 1.
|
||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
|
||||
MockRM rm1 = new MockRM(conf);
|
||||
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
|
||||
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
MockRM rm1 = new MockRM(conf, memStore);
|
||||
rm1.start();
|
||||
MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService());
|
||||
@ -371,8 +381,10 @@ public void testAMPreemptedNotCountedForAMFailures() throws Exception {
|
||||
scheduler.killContainer(scheduler.getRMContainer(amContainer));
|
||||
|
||||
am1.waitForState(RMAppAttemptState.FAILED);
|
||||
Assert.assertTrue(attempt1.isPreempted());
|
||||
Assert.assertTrue(! attempt1.shouldCountTowardsMaxAttemptRetry());
|
||||
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
||||
ApplicationState appState =
|
||||
memStore.getState().getApplicationState().get(app1.getApplicationId());
|
||||
// AM should be restarted even though max-am-attempt is 1.
|
||||
MockAM am2 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||
RMAppAttempt attempt2 = app1.getCurrentAppAttempt();
|
||||
@ -384,20 +396,62 @@ public void testAMPreemptedNotCountedForAMFailures() throws Exception {
|
||||
scheduler.killContainer(scheduler.getRMContainer(amContainer2));
|
||||
|
||||
am2.waitForState(RMAppAttemptState.FAILED);
|
||||
Assert.assertTrue(attempt2.isPreempted());
|
||||
Assert.assertTrue(! attempt2.shouldCountTowardsMaxAttemptRetry());
|
||||
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
||||
MockAM am3 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||
RMAppAttempt attempt3 = app1.getCurrentAppAttempt();
|
||||
Assert.assertTrue(((RMAppAttemptImpl) attempt3).mayBeLastAttempt());
|
||||
|
||||
// fail the AM normally
|
||||
nm1.nodeHeartbeat(am3.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
||||
// mimic NM disk_failure
|
||||
ContainerStatus containerStatus = Records.newRecord(ContainerStatus.class);
|
||||
containerStatus.setContainerId(attempt3.getMasterContainer().getId());
|
||||
containerStatus.setDiagnostics("mimic NM disk_failure");
|
||||
containerStatus.setState(ContainerState.COMPLETE);
|
||||
containerStatus.setExitStatus(ContainerExitStatus.DISKS_FAILED);
|
||||
Map<ApplicationId, List<ContainerStatus>> conts =
|
||||
new HashMap<ApplicationId, List<ContainerStatus>>();
|
||||
conts.put(app1.getApplicationId(),
|
||||
Collections.singletonList(containerStatus));
|
||||
nm1.nodeHeartbeat(conts, true);
|
||||
|
||||
am3.waitForState(RMAppAttemptState.FAILED);
|
||||
Assert.assertFalse(attempt3.isPreempted());
|
||||
Assert.assertTrue(! attempt3.shouldCountTowardsMaxAttemptRetry());
|
||||
Assert.assertEquals(ContainerExitStatus.DISKS_FAILED,
|
||||
appState.getAttempt(am3.getApplicationAttemptId())
|
||||
.getAMContainerExitStatus());
|
||||
|
||||
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
||||
MockAM am4 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||
RMAppAttempt attempt4 = app1.getCurrentAppAttempt();
|
||||
Assert.assertTrue(((RMAppAttemptImpl) attempt4).mayBeLastAttempt());
|
||||
|
||||
// create second NM, and register to rm1
|
||||
MockNM nm2 =
|
||||
new MockNM("127.0.0.1:2234", 8000, rm1.getResourceTrackerService());
|
||||
nm2.registerNode();
|
||||
// nm1 heartbeats to report unhealthy
|
||||
// This will mimic ContainerExitStatus.ABORT
|
||||
nm1.nodeHeartbeat(false);
|
||||
am4.waitForState(RMAppAttemptState.FAILED);
|
||||
Assert.assertTrue(! attempt4.shouldCountTowardsMaxAttemptRetry());
|
||||
Assert.assertEquals(ContainerExitStatus.ABORTED,
|
||||
appState.getAttempt(am4.getApplicationAttemptId())
|
||||
.getAMContainerExitStatus());
|
||||
// launch next AM in nm2
|
||||
nm2.nodeHeartbeat(true);
|
||||
MockAM am5 =
|
||||
rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 5, nm2);
|
||||
RMAppAttempt attempt5 = app1.getCurrentAppAttempt();
|
||||
Assert.assertTrue(((RMAppAttemptImpl) attempt5).mayBeLastAttempt());
|
||||
// fail the AM normally
|
||||
nm2
|
||||
.nodeHeartbeat(am5.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
||||
am5.waitForState(RMAppAttemptState.FAILED);
|
||||
Assert.assertTrue(attempt5.shouldCountTowardsMaxAttemptRetry());
|
||||
|
||||
// AM should not be restarted.
|
||||
rm1.waitForState(app1.getApplicationId(), RMAppState.FAILED);
|
||||
Assert.assertEquals(3, app1.getAppAttempts().size());
|
||||
Assert.assertEquals(5, app1.getAppAttempts().size());
|
||||
rm1.stop();
|
||||
}
|
||||
|
||||
@ -433,7 +487,7 @@ public void testPreemptedAMRestartOnRMRestart() throws Exception {
|
||||
scheduler.killContainer(scheduler.getRMContainer(amContainer));
|
||||
|
||||
am1.waitForState(RMAppAttemptState.FAILED);
|
||||
Assert.assertTrue(attempt1.isPreempted());
|
||||
Assert.assertTrue(! attempt1.shouldCountTowardsMaxAttemptRetry());
|
||||
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
||||
|
||||
// state store has 1 attempt stored.
|
||||
@ -457,11 +511,74 @@ public void testPreemptedAMRestartOnRMRestart() throws Exception {
|
||||
RMAppAttempt attempt2 =
|
||||
rm2.getRMContext().getRMApps().get(app1.getApplicationId())
|
||||
.getCurrentAppAttempt();
|
||||
Assert.assertFalse(attempt2.isPreempted());
|
||||
Assert.assertTrue(attempt2.shouldCountTowardsMaxAttemptRetry());
|
||||
Assert.assertEquals(ContainerExitStatus.INVALID,
|
||||
appState.getAttempt(am2.getApplicationAttemptId())
|
||||
.getAMContainerExitStatus());
|
||||
rm1.stop();
|
||||
rm2.stop();
|
||||
}
|
||||
|
||||
// Test regular RM restart/failover, new RM should not count
|
||||
// AM failure towards the max-retry-account and should be able to
|
||||
// re-launch the AM.
|
||||
@Test(timeout = 50000)
|
||||
public void testRMRestartOrFailoverNotCountedForAMFailures()
|
||||
throws Exception {
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
||||
ResourceScheduler.class);
|
||||
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
|
||||
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
|
||||
// explicitly set max-am-retry count as 1.
|
||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
|
||||
MockRM rm1 = new MockRM(conf, memStore);
|
||||
rm1.start();
|
||||
MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService());
|
||||
nm1.registerNode();
|
||||
RMApp app1 = rm1.submitApp(200);
|
||||
// AM should be restarted even though max-am-attempt is 1.
|
||||
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
|
||||
Assert.assertTrue(((RMAppAttemptImpl) attempt1).mayBeLastAttempt());
|
||||
|
||||
// Restart rm.
|
||||
MockRM rm2 = new MockRM(conf, memStore);
|
||||
rm2.start();
|
||||
ApplicationState appState =
|
||||
memStore.getState().getApplicationState().get(app1.getApplicationId());
|
||||
// re-register the NM
|
||||
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
|
||||
NMContainerStatus status = Records.newRecord(NMContainerStatus.class);
|
||||
status
|
||||
.setContainerExitStatus(ContainerExitStatus.KILLED_BY_RESOURCEMANAGER);
|
||||
status.setContainerId(attempt1.getMasterContainer().getId());
|
||||
status.setContainerState(ContainerState.COMPLETE);
|
||||
status.setDiagnostics("");
|
||||
nm1.registerNode(Collections.singletonList(status), null);
|
||||
|
||||
rm2.waitForState(attempt1.getAppAttemptId(), RMAppAttemptState.FAILED);
|
||||
Assert.assertEquals(ContainerExitStatus.KILLED_BY_RESOURCEMANAGER,
|
||||
appState.getAttempt(am1.getApplicationAttemptId())
|
||||
.getAMContainerExitStatus());
|
||||
// Will automatically start a new AppAttempt in rm2
|
||||
rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
||||
MockAM am2 =
|
||||
rm2.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 2, nm1);
|
||||
MockRM.finishAMAndVerifyAppState(app1, rm2, nm1, am2);
|
||||
RMAppAttempt attempt3 =
|
||||
rm2.getRMContext().getRMApps().get(app1.getApplicationId())
|
||||
.getCurrentAppAttempt();
|
||||
Assert.assertTrue(attempt3.shouldCountTowardsMaxAttemptRetry());
|
||||
Assert.assertEquals(ContainerExitStatus.INVALID,
|
||||
appState.getAttempt(am2.getApplicationAttemptId())
|
||||
.getAMContainerExitStatus());
|
||||
|
||||
rm1.stop();
|
||||
rm2.stop();
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user