YARN-2416. InvalidStateTransitonException in ResourceManager if AMLauncher does not receive response for startContainers() call in time. Contributed by Jonathan Eagles

This commit is contained in:
Jason Lowe 2017-08-22 12:56:09 -05:00
parent 4ec5acc704
commit 3efcd51c3b
2 changed files with 41 additions and 16 deletions

View File

@ -184,7 +184,10 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
new ExpiredTransition();
private static final AttemptFailedTransition FAILED_TRANSITION =
new AttemptFailedTransition();
private static final AMRegisteredTransition REGISTERED_TRANSITION =
new AMRegisteredTransition();
private static final AMLaunchedTransition LAUNCHED_TRANSITION =
new AMLaunchedTransition();
private RMAppAttemptEvent eventCausingFinalSaving;
private RMAppAttemptState targetedFinalState;
private RMAppAttemptState recoveredFinalState;
@ -314,7 +317,7 @@ RMAppAttemptEventType.ATTEMPT_NEW_SAVED, new AttemptStoredTransition())
// Transitions from ALLOCATED State
.addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.LAUNCHED,
RMAppAttemptEventType.LAUNCHED, new AMLaunchedTransition())
RMAppAttemptEventType.LAUNCHED, LAUNCHED_TRANSITION)
.addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.FINAL_SAVING,
RMAppAttemptEventType.LAUNCH_FAILED,
new FinalSavingTransition(new LaunchFailedTransition(),
@ -328,6 +331,8 @@ RMAppAttemptEventType.LAUNCHED, new AMLaunchedTransition())
RMAppAttemptEventType.FAIL,
new FinalSavingTransition(FAILED_TRANSITION,
RMAppAttemptState.FAILED))
.addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.RUNNING,
RMAppAttemptEventType.REGISTERED, REGISTERED_TRANSITION)
.addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.FINAL_SAVING,
RMAppAttemptEventType.CONTAINER_FINISHED,
new FinalSavingTransition(
@ -335,7 +340,7 @@ RMAppAttemptEventType.LAUNCHED, new AMLaunchedTransition())
// Transitions from LAUNCHED State
.addTransition(RMAppAttemptState.LAUNCHED, RMAppAttemptState.RUNNING,
RMAppAttemptEventType.REGISTERED, new AMRegisteredTransition())
RMAppAttemptEventType.REGISTERED, REGISTERED_TRANSITION)
.addTransition(RMAppAttemptState.LAUNCHED,
EnumSet.of(RMAppAttemptState.LAUNCHED, RMAppAttemptState.FINAL_SAVING),
RMAppAttemptEventType.CONTAINER_FINISHED,
@ -357,6 +362,8 @@ RMAppAttemptEventType.REGISTERED, new AMRegisteredTransition())
RMAppAttemptState.FAILED))
// Transitions from RUNNING State
.addTransition(RMAppAttemptState.RUNNING, RMAppAttemptState.RUNNING,
RMAppAttemptEventType.LAUNCHED)
.addTransition(RMAppAttemptState.RUNNING, RMAppAttemptState.FINAL_SAVING,
RMAppAttemptEventType.UNREGISTERED, new AMUnregisteredTransition())
.addTransition(RMAppAttemptState.RUNNING, RMAppAttemptState.RUNNING,
@ -421,6 +428,7 @@ RMAppAttemptEventType.STATUS_UPDATE, new StatusUpdateTransition())
RMAppAttemptState.FAILED,
RMAppAttemptState.FAILED,
EnumSet.of(
RMAppAttemptEventType.LAUNCHED,
RMAppAttemptEventType.EXPIRE,
RMAppAttemptEventType.KILL,
RMAppAttemptEventType.FAIL,
@ -438,6 +446,7 @@ RMAppAttemptEventType.STATUS_UPDATE, new StatusUpdateTransition())
new FinalTransition(RMAppAttemptState.FINISHED))
.addTransition(RMAppAttemptState.FINISHING, RMAppAttemptState.FINISHING,
EnumSet.of(
RMAppAttemptEventType.LAUNCHED,
RMAppAttemptEventType.UNREGISTERED,
RMAppAttemptEventType.STATUS_UPDATE,
RMAppAttemptEventType.CONTAINER_ALLOCATED,
@ -451,6 +460,7 @@ RMAppAttemptEventType.STATUS_UPDATE, new StatusUpdateTransition())
RMAppAttemptState.FINISHED,
RMAppAttemptState.FINISHED,
EnumSet.of(
RMAppAttemptEventType.LAUNCHED,
RMAppAttemptEventType.EXPIRE,
RMAppAttemptEventType.UNREGISTERED,
RMAppAttemptEventType.CONTAINER_ALLOCATED,
@ -1291,7 +1301,7 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
* 2) OR AMLivelinessMonitor expires this attempt (when am doesn't
* heart beat back).
*/
(new AMLaunchedTransition()).transition(appAttempt, event);
LAUNCHED_TRANSITION.transition(appAttempt, event);
return RMAppAttemptState.LAUNCHED;
}
}
@ -1516,7 +1526,8 @@ private static class AMLaunchedTransition extends BaseTransition {
@Override
public void transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) {
if (event.getType() == RMAppAttemptEventType.LAUNCHED) {
if (event.getType() == RMAppAttemptEventType.LAUNCHED
|| event.getType() == RMAppAttemptEventType.REGISTERED) {
appAttempt.launchAMEndTime = System.currentTimeMillis();
long delay = appAttempt.launchAMEndTime -
appAttempt.launchAMStartTime;
@ -1651,6 +1662,10 @@ private static final class AMRegisteredTransition extends BaseTransition {
@Override
public void transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) {
if (!RMAppAttemptState.LAUNCHED.equals(appAttempt.getState())) {
// registered received before launch
LAUNCHED_TRANSITION.transition(appAttempt, event);
}
long delay = System.currentTimeMillis() - appAttempt.launchAMEndTime;
ClusterMetrics.getMetrics().addAMRegisterDelay(delay);
RMAppAttemptRegistrationEvent registrationEvent

View File

@ -526,12 +526,9 @@ private void testAppAttemptFailedState(Container container,
verifyApplicationAttemptFinished(RMAppAttemptState.FAILED);
}
/**
* {@link RMAppAttemptState#LAUNCHED}
*/
private void testAppAttemptLaunchedState(Container container) {
assertEquals(RMAppAttemptState.LAUNCHED,
applicationAttempt.getAppAttemptState());
private void testAppAttemptLaunchedState(Container container,
RMAppAttemptState state) {
assertEquals(state, applicationAttempt.getAppAttemptState());
assertEquals(container, applicationAttempt.getMasterContainer());
if (UserGroupInformation.isSecurityEnabled()) {
// ClientTokenMasterKey has been registered in SecretManager, it's able to
@ -686,13 +683,18 @@ private Container allocateApplicationAttempt() {
}
private void launchApplicationAttempt(Container container) {
launchApplicationAttempt(container, RMAppAttemptState.LAUNCHED);
}
private void launchApplicationAttempt(Container container,
RMAppAttemptState state) {
applicationAttempt.handle(
new RMAppAttemptEvent(applicationAttempt.getAppAttemptId(),
new RMAppAttemptEvent(applicationAttempt.getAppAttemptId(),
RMAppAttemptEventType.LAUNCHED));
testAppAttemptLaunchedState(container);
testAppAttemptLaunchedState(container, state);
}
private void runApplicationAttempt(Container container,
String host,
int rpcPort,
@ -723,7 +725,7 @@ private void testUnmanagedAMSuccess(String url) {
when(submissionContext.getUnmanagedAM()).thenReturn(true);
// submit AM and check it goes to LAUNCHED state
scheduleApplicationAttempt();
testAppAttemptLaunchedState(null);
testAppAttemptLaunchedState(null, RMAppAttemptState.LAUNCHED);
verify(amLivelinessMonitor, times(1)).register(
applicationAttempt.getAppAttemptId());
@ -930,7 +932,15 @@ public void testAllocatedToFailed() {
applicationAttempt.createApplicationAttemptState());
testAppAttemptFailedState(amContainer, diagnostics);
}
@Test(timeout = 10000)
public void testAllocatedToRunning() {
Container amContainer = allocateApplicationAttempt();
// Register attempt event arrives before launched attempt event
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false);
launchApplicationAttempt(amContainer, RMAppAttemptState.RUNNING);
}
@Test(timeout = 10000)
public void testCreateAppAttemptReport() {
RMAppAttemptState[] attemptStates = RMAppAttemptState.values();