diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 542b045498..27374b6757 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -299,6 +299,10 @@ Release 2.4.0 - UNRELEASED expand _HOST properly in their kerberos principles. (Mohammad Kamrul Islam va vinodkv) + YARN-1428. Fixed RM to write the final state of RMApp/RMAppAttempt to the + application history store in the transition to the final state. (Contributed + by Zhijie Shen) + Release 2.3.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/RMApplicationHistoryWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/RMApplicationHistoryWriter.java index ffc4a4fbdc..7aa8e81096 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/RMApplicationHistoryWriter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/RMApplicationHistoryWriter.java @@ -43,9 +43,12 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationStartData; import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerFinishData; import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerStartData; +import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; /** @@ -219,12 +222,13 @@ public void applicationStarted(RMApp app) { } @SuppressWarnings("unchecked") - public void applicationFinished(RMApp app) { + public void applicationFinished(RMApp app, RMAppState finalState) { dispatcher.getEventHandler().handle( new WritingApplicationFinishEvent(app.getApplicationId(), ApplicationFinishData.newInstance(app.getApplicationId(), app.getFinishTime(), app.getDiagnostics().toString(), - app.getFinalApplicationStatus(), app.createApplicationState()))); + app.getFinalApplicationStatus(), + RMServerUtils.createApplicationState(finalState)))); } @SuppressWarnings("unchecked") @@ -239,15 +243,16 @@ public void applicationAttemptStarted(RMAppAttempt appAttempt) { } @SuppressWarnings("unchecked") - public void applicationAttemptFinished(RMAppAttempt appAttempt) { + public void applicationAttemptFinished(RMAppAttempt appAttempt, + RMAppAttemptState finalState) { if (historyServiceEnabled) { dispatcher.getEventHandler().handle( new WritingApplicationAttemptFinishEvent(appAttempt.getAppAttemptId(), ApplicationAttemptFinishData.newInstance( appAttempt.getAppAttemptId(), appAttempt.getDiagnostics() .toString(), appAttempt.getTrackingUrl(), appAttempt - .getFinalApplicationStatus(), appAttempt - .createApplicationAttemptState()))); + .getFinalApplicationStatus(), + RMServerUtils.createApplicationAttemptState(finalState)))); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 196e89d32c..d84e3d3d59 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -112,9 +112,7 @@ public class RMAppImpl implements RMApp, Recoverable { private long storedFinishTime = 0; private RMAppAttempt currentAttempt; private String queue; - @SuppressWarnings("rawtypes") private EventHandler handler; - private static final FinalTransition FINAL_TRANSITION = new FinalTransition(); private static final AppFinishedTransition FINISHED_TRANSITION = new AppFinishedTransition(); @@ -705,7 +703,6 @@ public void transition(RMAppImpl app, RMAppEvent event) { * either as an exception for failure or null for success, or the client will * be left waiting forever. */ - @SuppressWarnings("unchecked") private static final class RMAppMoveTransition extends RMAppTransition { public void transition(RMAppImpl app, RMAppEvent event) { RMAppMoveEvent moveEvent = (RMAppMoveEvent) event; @@ -723,7 +720,6 @@ public void transition(RMAppImpl app, RMAppEvent event) { } } - @SuppressWarnings("unchecked") private static final class RMAppRecoveredTransition implements MultipleArcTransition { @@ -742,7 +738,7 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) { // The app has completed. if (app.recoveredFinalState != null) { - FINAL_TRANSITION.transition(app, event); + new FinalTransition(app.recoveredFinalState).transition(app, event); return app.recoveredFinalState; } @@ -824,7 +820,7 @@ public void transition(RMAppImpl app, RMAppEvent event) { LOG.info(msg); app.diagnostics.append(msg); // Inform the node for app-finish - FINAL_TRANSITION.transition(app, event); + new FinalTransition(RMAppState.FAILED).transition(app, event); } } @@ -937,6 +933,10 @@ public void transition(RMAppImpl app, RMAppEvent event) { } private static class AppFinishedTransition extends FinalTransition { + public AppFinishedTransition() { + super(RMAppState.FINISHED); + } + public void transition(RMAppImpl app, RMAppEvent event) { RMAppFinishedAttemptEvent finishedEvent = (RMAppFinishedAttemptEvent)event; @@ -980,6 +980,10 @@ public void transition(RMAppImpl app, RMAppEvent event) { private static class AppKilledTransition extends FinalTransition { + public AppKilledTransition() { + super(RMAppState.KILLED); + } + @Override public void transition(RMAppImpl app, RMAppEvent event) { app.diagnostics.append(getAppKilledDiagnostics()); @@ -1002,6 +1006,10 @@ public void transition(RMAppImpl app, RMAppEvent event) { private static final class AppRejectedTransition extends FinalTransition{ + public AppRejectedTransition() { + super(RMAppState.FAILED); + } + public void transition(RMAppImpl app, RMAppEvent event) { RMAppRejectedEvent rejectedEvent = (RMAppRejectedEvent)event; app.diagnostics.append(rejectedEvent.getMessage()); @@ -1011,6 +1019,12 @@ public void transition(RMAppImpl app, RMAppEvent event) { private static class FinalTransition extends RMAppTransition { + private final RMAppState finalState; + + public FinalTransition(RMAppState finalState) { + this.finalState = finalState; + } + private Set getNodesOnWhichAttemptRan(RMAppImpl app) { Set nodes = new HashSet(); for (RMAppAttempt attempt : app.attempts.values()) { @@ -1035,10 +1049,8 @@ public void transition(RMAppImpl app, RMAppEvent event) { new RMAppManagerEvent(app.applicationId, RMAppManagerEventType.APP_COMPLETED)); - // TODO: We need to fix for the problem that RMApp enters the final state - // after RMAppAttempt in the killing case app.rmContext.getRMApplicationHistoryWriter() - .applicationFinished(app); + .applicationFinished(app, finalState); }; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 88c9ba5591..8d69d08096 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -1053,7 +1053,7 @@ public void transition(RMAppAttemptImpl appAttempt, appAttempt.removeCredentials(appAttempt); appAttempt.rmContext.getRMApplicationHistoryWriter() - .applicationAttemptFinished(appAttempt); + .applicationAttemptFinished(appAttempt, finalAttemptState); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/TestRMApplicationHistoryWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/TestRMApplicationHistoryWriter.java index 6e063b584e..feaab3fe3e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/TestRMApplicationHistoryWriter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/TestRMApplicationHistoryWriter.java @@ -137,8 +137,6 @@ private static RMApp createRMApp(ApplicationId appId) { new StringBuilder("test diagnostics info")); when(app.getFinalApplicationStatus()).thenReturn( FinalApplicationStatus.UNDEFINED); - when(app.createApplicationState()) - .thenReturn(YarnApplicationState.FINISHED); return app; } @@ -156,8 +154,6 @@ private static RMAppAttempt createRMAppAttempt( when(appAttempt.getTrackingUrl()).thenReturn("test url"); when(appAttempt.getFinalApplicationStatus()).thenReturn( FinalApplicationStatus.UNDEFINED); - when(appAttempt.createApplicationAttemptState()).thenReturn( - YarnApplicationAttemptState.FINISHED); return appAttempt; } @@ -200,7 +196,7 @@ public void testWriteApplication() throws Exception { Assert.assertEquals(0L, appHD.getSubmitTime()); Assert.assertEquals(1L, appHD.getStartTime()); - writer.applicationFinished(app); + writer.applicationFinished(app, RMAppState.FINISHED); for (int i = 0; i < MAX_RETRIES; ++i) { appHD = store.getApplication(ApplicationId.newInstance(0, 1)); if (appHD.getYarnApplicationState() != null) { @@ -241,7 +237,7 @@ public void testWriteApplicationAttempt() throws Exception { ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 1), 1), 1), appAttemptHD.getMasterContainerId()); - writer.applicationAttemptFinished(appAttempt); + writer.applicationAttemptFinished(appAttempt, RMAppAttemptState.FINISHED); for (int i = 0; i < MAX_RETRIES; ++i) { appAttemptHD = store.getApplicationAttempt(ApplicationAttemptId.newInstance( @@ -326,9 +322,10 @@ public void testParallelWrite() throws Exception { writer.containerStarted(container); writer.containerFinished(container); } - writer.applicationAttemptFinished(appAttempt); + writer.applicationAttemptFinished( + appAttempt, RMAppAttemptState.FINISHED); } - writer.applicationFinished(app); + writer.applicationFinished(app, RMAppState.FINISHED); } for (int i = 0; i < MAX_RETRIES; ++i) { if (allEventsHandled(20 * 10 * 10 + 20 * 10 + 20)) { @@ -369,7 +366,7 @@ public void applicationStarted(RMApp app) { } @Override - public void applicationFinished(RMApp app) { + public void applicationFinished(RMApp app, RMAppState finalState) { } @Override @@ -377,7 +374,8 @@ public void applicationAttemptStarted(RMAppAttempt appAttempt) { } @Override - public void applicationAttemptFinished(RMAppAttempt appAttempt) { + public void applicationAttemptFinished( + RMAppAttempt appAttempt, RMAppAttemptState finalState) { } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java index 58482ee38b..509bceb451 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java @@ -57,7 +57,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; @@ -71,6 +70,7 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import org.mockito.ArgumentCaptor; @RunWith(value = Parameterized.class) @@ -308,16 +308,6 @@ private void assertKilled(RMApp application) { "Application killed by user.", diag.toString()); } - private void assertAppAndAttemptKilled(RMApp application) - throws InterruptedException { - sendAttemptUpdateSavedEvent(application); - sendAppUpdateSavedEvent(application); - assertKilled(application); - Assert.assertEquals(RMAppAttemptState.KILLED, application - .getCurrentAppAttempt().getAppAttemptState()); - assertAppFinalStateSaved(application); - } - private void assertFailed(RMApp application, String regex) { assertTimesAtFinish(application); assertAppState(RMAppState.FAILED, application); @@ -511,7 +501,7 @@ public void testAppNewKill() throws IOException { sendAppUpdateSavedEvent(application); assertKilled(application); assertAppFinalStateNotSaved(application); - verify(writer).applicationFinished(any(RMApp.class)); + verifyApplicationFinished(RMAppState.KILLED); } @Test @@ -528,7 +518,7 @@ public void testAppNewReject() throws IOException { sendAppUpdateSavedEvent(application); assertFailed(application, rejectedText); assertAppFinalStateNotSaved(application); - verify(writer).applicationFinished(any(RMApp.class)); + verifyApplicationFinished(RMAppState.FAILED); } @Test (timeout = 30000) @@ -543,7 +533,7 @@ public void testAppNewSavingKill() throws IOException { rmDispatcher.await(); sendAppUpdateSavedEvent(application); assertKilled(application); - verify(writer).applicationFinished(any(RMApp.class)); + verifyApplicationFinished(RMAppState.KILLED); } @Test (timeout = 30000) @@ -560,7 +550,7 @@ public void testAppNewSavingReject() throws IOException { sendAppUpdateSavedEvent(application); assertFailed(application, rejectedText); assertAppFinalStateSaved(application); - verify(writer).applicationFinished(any(RMApp.class)); + verifyApplicationFinished(RMAppState.FAILED); } @Test (timeout = 30000) @@ -577,7 +567,7 @@ public void testAppSubmittedRejected() throws IOException { sendAppUpdateSavedEvent(application); assertFailed(application, rejectedText); assertAppFinalStateSaved(application); - verify(writer).applicationFinished(any(RMApp.class)); + verifyApplicationFinished(RMAppState.FAILED); } @Test @@ -592,7 +582,7 @@ public void testAppSubmittedKill() throws IOException, InterruptedException { sendAppUpdateSavedEvent(application); assertKilled(application); assertAppFinalStateSaved(application); - verify(writer).applicationFinished(any(RMApp.class)); + verifyApplicationFinished(RMAppState.KILLED); } @Test @@ -627,7 +617,7 @@ public void testAppAcceptedFailed() throws IOException { sendAppUpdateSavedEvent(application); assertFailed(application, ".*" + message + ".*Failing the application.*"); assertAppFinalStateSaved(application); - verify(writer).applicationFinished(any(RMApp.class)); + verifyApplicationFinished(RMAppState.FAILED); } @Test @@ -649,7 +639,7 @@ public void testAppAcceptedKill() throws IOException, InterruptedException { sendAppUpdateSavedEvent(application); assertKilled(application); assertAppFinalStateSaved(application); - verify(writer).applicationFinished(any(RMApp.class)); + verifyApplicationFinished(RMAppState.KILLED); } @Test @@ -672,7 +662,7 @@ public void testAppRunningKill() throws IOException { sendAttemptUpdateSavedEvent(application); sendAppUpdateSavedEvent(application); assertKilled(application); - verify(writer).applicationFinished(any(RMApp.class)); + verifyApplicationFinished(RMAppState.KILLED); } @Test @@ -727,7 +717,7 @@ public void testAppRunningFailed() throws IOException { rmDispatcher.await(); assertFailed(application, ".*Failing the application.*"); assertAppFinalStateSaved(application); - verify(writer).applicationFinished(any(RMApp.class)); + verifyApplicationFinished(RMAppState.FAILED); } @Test @@ -785,7 +775,7 @@ public void testAppFinishedFinished() throws IOException { StringBuilder diag = application.getDiagnostics(); Assert.assertEquals("application diagnostics is not correct", "", diag.toString()); - verify(writer).applicationFinished(any(RMApp.class)); + verifyApplicationFinished(RMAppState.FINISHED); } @Test (timeout = 30000) @@ -810,10 +800,10 @@ public void testAppFailedFailed() throws IOException { rmDispatcher.await(); assertTimesAtFinish(application); assertAppState(RMAppState.FAILED, application); + verifyApplicationFinished(RMAppState.FAILED); assertTimesAtFinish(application); assertAppState(RMAppState.FAILED, application); - verify(writer).applicationFinished(any(RMApp.class)); } @Test (timeout = 30000) @@ -856,10 +846,10 @@ public void testAppKilledKilled() throws IOException { rmDispatcher.await(); assertTimesAtFinish(application); assertAppState(RMAppState.KILLED, application); + verifyApplicationFinished(RMAppState.KILLED); assertTimesAtFinish(application); assertAppState(RMAppState.KILLED, application); - verify(writer).applicationFinished(any(RMApp.class)); } @Test @@ -871,4 +861,11 @@ public void testGetAppReport() { report = app.createAndGetApplicationReport("clientuser", true); Assert.assertNotNull(report.getApplicationResourceUsageReport()); } + + private void verifyApplicationFinished(RMAppState state) { + ArgumentCaptor finalState = + ArgumentCaptor.forClass(RMAppState.class); + verify(writer).applicationFinished(any(RMApp.class), finalState.capture()); + Assert.assertEquals(state, finalState.getValue()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java index b7fe8f79d0..a34a42b5ad 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java @@ -103,6 +103,7 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import org.mockito.ArgumentCaptor; @RunWith(value = Parameterized.class) public class TestRMAppAttemptTransitions { @@ -367,6 +368,7 @@ private void testAppAttemptSubmittedToFailedState(String diagnostics) { // verify(application).handle(anyObject()); verify(application).handle(any(RMAppRejectedEvent.class)); verifyTokenCount(applicationAttempt.getAppAttemptId(), 1); + verifyApplicationAttemptFinished(RMAppAttemptState.FAILED); } /** @@ -384,9 +386,9 @@ private void testAppAttemptKilledState(Container amContainer, assertEquals(0, applicationAttempt.getRanNodes().size()); assertNull(applicationAttempt.getFinalApplicationStatus()); verifyTokenCount(applicationAttempt.getAppAttemptId(), 1); - verify(writer).applicationAttemptFinished(any(RMAppAttempt.class)); verifyAttemptFinalStateSaved(); assertFalse(transferStateFromPreviousAttempt); + verifyApplicationAttemptFinished(RMAppAttemptState.KILLED); } /** @@ -460,8 +462,8 @@ private void testAppAttemptFailedState(Container container, // Check events verify(application, times(1)).handle(any(RMAppFailedAttemptEvent.class)); verifyTokenCount(applicationAttempt.getAppAttemptId(), 1); - verify(writer).applicationAttemptFinished(any(RMAppAttempt.class)); verifyAttemptFinalStateSaved(); + verifyApplicationAttemptFinished(RMAppAttemptState.FAILED); } /** @@ -496,7 +498,6 @@ private void testAppAttemptRunningState(Container container, assertEquals(getProxyUrl(applicationAttempt), applicationAttempt.getTrackingUrl()); } - verify(writer).applicationAttemptStarted(any(RMAppAttempt.class)); // TODO - need to add more checks relevant to this state } @@ -544,6 +545,7 @@ private void testAppAttemptFinishedState(Container container, assertEquals(finalStatus, applicationAttempt.getFinalApplicationStatus()); verifyTokenCount(applicationAttempt.getAppAttemptId(), 1); assertFalse(transferStateFromPreviousAttempt); + verifyApplicationAttemptFinished(RMAppAttemptState.FINISHED); } @@ -806,7 +808,7 @@ public void testAMCrashAtAllocated() { assertEquals(RMAppAttemptState.FAILED, applicationAttempt.getAppAttemptState()); verifyTokenCount(applicationAttempt.getAppAttemptId(), 1); - verify(writer).applicationAttemptFinished(any(RMAppAttempt.class)); + verifyApplicationAttemptFinished(RMAppAttemptState.FAILED); } @Test @@ -846,6 +848,7 @@ public void testRunningToFailed() { assertEquals(rmAppPageUrl, applicationAttempt.getOriginalTrackingUrl()); assertEquals(rmAppPageUrl, applicationAttempt.getTrackingUrl()); verifyAMHostAndPortInvalidated(); + verifyApplicationAttemptFinished(RMAppAttemptState.FAILED); } @Test @@ -883,6 +886,7 @@ public void testRunningToKilled() { assertEquals(rmAppPageUrl, applicationAttempt.getTrackingUrl()); verifyTokenCount(applicationAttempt.getAppAttemptId(), 1); verifyAMHostAndPortInvalidated(); + verifyApplicationAttemptFinished(RMAppAttemptState.KILLED); } @Test(timeout=10000) @@ -903,6 +907,7 @@ public void testLaunchedExpire() { assertEquals(rmAppPageUrl, applicationAttempt.getOriginalTrackingUrl()); assertEquals(rmAppPageUrl, applicationAttempt.getTrackingUrl()); verifyTokenCount(applicationAttempt.getAppAttemptId(), 1); + verifyApplicationAttemptFinished(RMAppAttemptState.FAILED); } @Test(timeout=20000) @@ -925,6 +930,7 @@ public void testRunningExpire() { assertEquals(rmAppPageUrl, applicationAttempt.getTrackingUrl()); verifyTokenCount(applicationAttempt.getAppAttemptId(), 1); verifyAMHostAndPortInvalidated(); + verifyApplicationAttemptFinished(RMAppAttemptState.FAILED); } @Test @@ -1177,6 +1183,7 @@ public void testFailedToFailed() { applicationAttempt.getAppAttemptState()); // should not kill containers when attempt fails. assertTrue(transferStateFromPreviousAttempt); + verifyApplicationAttemptFinished(RMAppAttemptState.FAILED); // failed attempt captured the container finished event. assertEquals(0, applicationAttempt.getJustFinishedContainers().size()); @@ -1216,6 +1223,7 @@ scheduler, masterService, submissionContext, new Configuration(), assertEquals(RMAppAttemptState.FAILED, applicationAttempt.getAppAttemptState()); assertFalse(transferStateFromPreviousAttempt); + verifyApplicationAttemptFinished(RMAppAttemptState.FAILED); } private void verifyTokenCount(ApplicationAttemptId appAttemptId, int count) { @@ -1245,4 +1253,13 @@ private void verifyAMHostAndPortInvalidated() { assertEquals("N/A", applicationAttempt.getHost()); assertEquals(-1, applicationAttempt.getRpcPort()); } + + private void verifyApplicationAttemptFinished(RMAppAttemptState state) { + ArgumentCaptor finalState = + ArgumentCaptor.forClass(RMAppAttemptState.class); + verify(writer).applicationAttemptFinished( + any(RMAppAttempt.class), finalState.capture()); + Assert.assertEquals(state, finalState.getValue()); + } + }