YARN-1428. Fixed RM to write the final state of RMApp/RMAppAttempt to the application history store in the transition to the final state. (Contributed by Zhijie Shen)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1569585 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
e408966ea7
commit
3a7e7b3671
@ -299,6 +299,10 @@ Release 2.4.0 - UNRELEASED
|
|||||||
expand _HOST properly in their kerberos principles. (Mohammad Kamrul Islam
|
expand _HOST properly in their kerberos principles. (Mohammad Kamrul Islam
|
||||||
va vinodkv)
|
va vinodkv)
|
||||||
|
|
||||||
|
YARN-1428. Fixed RM to write the final state of RMApp/RMAppAttempt to the
|
||||||
|
application history store in the transition to the final state. (Contributed
|
||||||
|
by Zhijie Shen)
|
||||||
|
|
||||||
Release 2.3.1 - UNRELEASED
|
Release 2.3.1 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -43,9 +43,12 @@
|
|||||||
import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationStartData;
|
import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationStartData;
|
||||||
import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerFinishData;
|
import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerFinishData;
|
||||||
import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerStartData;
|
import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerStartData;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -219,12 +222,13 @@ public void applicationStarted(RMApp app) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public void applicationFinished(RMApp app) {
|
public void applicationFinished(RMApp app, RMAppState finalState) {
|
||||||
dispatcher.getEventHandler().handle(
|
dispatcher.getEventHandler().handle(
|
||||||
new WritingApplicationFinishEvent(app.getApplicationId(),
|
new WritingApplicationFinishEvent(app.getApplicationId(),
|
||||||
ApplicationFinishData.newInstance(app.getApplicationId(),
|
ApplicationFinishData.newInstance(app.getApplicationId(),
|
||||||
app.getFinishTime(), app.getDiagnostics().toString(),
|
app.getFinishTime(), app.getDiagnostics().toString(),
|
||||||
app.getFinalApplicationStatus(), app.createApplicationState())));
|
app.getFinalApplicationStatus(),
|
||||||
|
RMServerUtils.createApplicationState(finalState))));
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
@ -239,15 +243,16 @@ public void applicationAttemptStarted(RMAppAttempt appAttempt) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public void applicationAttemptFinished(RMAppAttempt appAttempt) {
|
public void applicationAttemptFinished(RMAppAttempt appAttempt,
|
||||||
|
RMAppAttemptState finalState) {
|
||||||
if (historyServiceEnabled) {
|
if (historyServiceEnabled) {
|
||||||
dispatcher.getEventHandler().handle(
|
dispatcher.getEventHandler().handle(
|
||||||
new WritingApplicationAttemptFinishEvent(appAttempt.getAppAttemptId(),
|
new WritingApplicationAttemptFinishEvent(appAttempt.getAppAttemptId(),
|
||||||
ApplicationAttemptFinishData.newInstance(
|
ApplicationAttemptFinishData.newInstance(
|
||||||
appAttempt.getAppAttemptId(), appAttempt.getDiagnostics()
|
appAttempt.getAppAttemptId(), appAttempt.getDiagnostics()
|
||||||
.toString(), appAttempt.getTrackingUrl(), appAttempt
|
.toString(), appAttempt.getTrackingUrl(), appAttempt
|
||||||
.getFinalApplicationStatus(), appAttempt
|
.getFinalApplicationStatus(),
|
||||||
.createApplicationAttemptState())));
|
RMServerUtils.createApplicationAttemptState(finalState))));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -112,9 +112,7 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||||||
private long storedFinishTime = 0;
|
private long storedFinishTime = 0;
|
||||||
private RMAppAttempt currentAttempt;
|
private RMAppAttempt currentAttempt;
|
||||||
private String queue;
|
private String queue;
|
||||||
@SuppressWarnings("rawtypes")
|
|
||||||
private EventHandler handler;
|
private EventHandler handler;
|
||||||
private static final FinalTransition FINAL_TRANSITION = new FinalTransition();
|
|
||||||
private static final AppFinishedTransition FINISHED_TRANSITION =
|
private static final AppFinishedTransition FINISHED_TRANSITION =
|
||||||
new AppFinishedTransition();
|
new AppFinishedTransition();
|
||||||
|
|
||||||
@ -705,7 +703,6 @@ public void transition(RMAppImpl app, RMAppEvent event) {
|
|||||||
* either as an exception for failure or null for success, or the client will
|
* either as an exception for failure or null for success, or the client will
|
||||||
* be left waiting forever.
|
* be left waiting forever.
|
||||||
*/
|
*/
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
private static final class RMAppMoveTransition extends RMAppTransition {
|
private static final class RMAppMoveTransition extends RMAppTransition {
|
||||||
public void transition(RMAppImpl app, RMAppEvent event) {
|
public void transition(RMAppImpl app, RMAppEvent event) {
|
||||||
RMAppMoveEvent moveEvent = (RMAppMoveEvent) event;
|
RMAppMoveEvent moveEvent = (RMAppMoveEvent) event;
|
||||||
@ -723,7 +720,6 @@ public void transition(RMAppImpl app, RMAppEvent event) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
private static final class RMAppRecoveredTransition implements
|
private static final class RMAppRecoveredTransition implements
|
||||||
MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState> {
|
MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState> {
|
||||||
|
|
||||||
@ -742,7 +738,7 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) {
|
|||||||
|
|
||||||
// The app has completed.
|
// The app has completed.
|
||||||
if (app.recoveredFinalState != null) {
|
if (app.recoveredFinalState != null) {
|
||||||
FINAL_TRANSITION.transition(app, event);
|
new FinalTransition(app.recoveredFinalState).transition(app, event);
|
||||||
return app.recoveredFinalState;
|
return app.recoveredFinalState;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -824,7 +820,7 @@ public void transition(RMAppImpl app, RMAppEvent event) {
|
|||||||
LOG.info(msg);
|
LOG.info(msg);
|
||||||
app.diagnostics.append(msg);
|
app.diagnostics.append(msg);
|
||||||
// Inform the node for app-finish
|
// Inform the node for app-finish
|
||||||
FINAL_TRANSITION.transition(app, event);
|
new FinalTransition(RMAppState.FAILED).transition(app, event);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -937,6 +933,10 @@ public void transition(RMAppImpl app, RMAppEvent event) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private static class AppFinishedTransition extends FinalTransition {
|
private static class AppFinishedTransition extends FinalTransition {
|
||||||
|
public AppFinishedTransition() {
|
||||||
|
super(RMAppState.FINISHED);
|
||||||
|
}
|
||||||
|
|
||||||
public void transition(RMAppImpl app, RMAppEvent event) {
|
public void transition(RMAppImpl app, RMAppEvent event) {
|
||||||
RMAppFinishedAttemptEvent finishedEvent =
|
RMAppFinishedAttemptEvent finishedEvent =
|
||||||
(RMAppFinishedAttemptEvent)event;
|
(RMAppFinishedAttemptEvent)event;
|
||||||
@ -980,6 +980,10 @@ public void transition(RMAppImpl app, RMAppEvent event) {
|
|||||||
|
|
||||||
|
|
||||||
private static class AppKilledTransition extends FinalTransition {
|
private static class AppKilledTransition extends FinalTransition {
|
||||||
|
public AppKilledTransition() {
|
||||||
|
super(RMAppState.KILLED);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void transition(RMAppImpl app, RMAppEvent event) {
|
public void transition(RMAppImpl app, RMAppEvent event) {
|
||||||
app.diagnostics.append(getAppKilledDiagnostics());
|
app.diagnostics.append(getAppKilledDiagnostics());
|
||||||
@ -1002,6 +1006,10 @@ public void transition(RMAppImpl app, RMAppEvent event) {
|
|||||||
|
|
||||||
private static final class AppRejectedTransition extends
|
private static final class AppRejectedTransition extends
|
||||||
FinalTransition{
|
FinalTransition{
|
||||||
|
public AppRejectedTransition() {
|
||||||
|
super(RMAppState.FAILED);
|
||||||
|
}
|
||||||
|
|
||||||
public void transition(RMAppImpl app, RMAppEvent event) {
|
public void transition(RMAppImpl app, RMAppEvent event) {
|
||||||
RMAppRejectedEvent rejectedEvent = (RMAppRejectedEvent)event;
|
RMAppRejectedEvent rejectedEvent = (RMAppRejectedEvent)event;
|
||||||
app.diagnostics.append(rejectedEvent.getMessage());
|
app.diagnostics.append(rejectedEvent.getMessage());
|
||||||
@ -1011,6 +1019,12 @@ public void transition(RMAppImpl app, RMAppEvent event) {
|
|||||||
|
|
||||||
private static class FinalTransition extends RMAppTransition {
|
private static class FinalTransition extends RMAppTransition {
|
||||||
|
|
||||||
|
private final RMAppState finalState;
|
||||||
|
|
||||||
|
public FinalTransition(RMAppState finalState) {
|
||||||
|
this.finalState = finalState;
|
||||||
|
}
|
||||||
|
|
||||||
private Set<NodeId> getNodesOnWhichAttemptRan(RMAppImpl app) {
|
private Set<NodeId> getNodesOnWhichAttemptRan(RMAppImpl app) {
|
||||||
Set<NodeId> nodes = new HashSet<NodeId>();
|
Set<NodeId> nodes = new HashSet<NodeId>();
|
||||||
for (RMAppAttempt attempt : app.attempts.values()) {
|
for (RMAppAttempt attempt : app.attempts.values()) {
|
||||||
@ -1035,10 +1049,8 @@ public void transition(RMAppImpl app, RMAppEvent event) {
|
|||||||
new RMAppManagerEvent(app.applicationId,
|
new RMAppManagerEvent(app.applicationId,
|
||||||
RMAppManagerEventType.APP_COMPLETED));
|
RMAppManagerEventType.APP_COMPLETED));
|
||||||
|
|
||||||
// TODO: We need to fix for the problem that RMApp enters the final state
|
|
||||||
// after RMAppAttempt in the killing case
|
|
||||||
app.rmContext.getRMApplicationHistoryWriter()
|
app.rmContext.getRMApplicationHistoryWriter()
|
||||||
.applicationFinished(app);
|
.applicationFinished(app, finalState);
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1053,7 +1053,7 @@ public void transition(RMAppAttemptImpl appAttempt,
|
|||||||
appAttempt.removeCredentials(appAttempt);
|
appAttempt.removeCredentials(appAttempt);
|
||||||
|
|
||||||
appAttempt.rmContext.getRMApplicationHistoryWriter()
|
appAttempt.rmContext.getRMApplicationHistoryWriter()
|
||||||
.applicationAttemptFinished(appAttempt);
|
.applicationAttemptFinished(appAttempt, finalAttemptState);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -137,8 +137,6 @@ private static RMApp createRMApp(ApplicationId appId) {
|
|||||||
new StringBuilder("test diagnostics info"));
|
new StringBuilder("test diagnostics info"));
|
||||||
when(app.getFinalApplicationStatus()).thenReturn(
|
when(app.getFinalApplicationStatus()).thenReturn(
|
||||||
FinalApplicationStatus.UNDEFINED);
|
FinalApplicationStatus.UNDEFINED);
|
||||||
when(app.createApplicationState())
|
|
||||||
.thenReturn(YarnApplicationState.FINISHED);
|
|
||||||
return app;
|
return app;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -156,8 +154,6 @@ private static RMAppAttempt createRMAppAttempt(
|
|||||||
when(appAttempt.getTrackingUrl()).thenReturn("test url");
|
when(appAttempt.getTrackingUrl()).thenReturn("test url");
|
||||||
when(appAttempt.getFinalApplicationStatus()).thenReturn(
|
when(appAttempt.getFinalApplicationStatus()).thenReturn(
|
||||||
FinalApplicationStatus.UNDEFINED);
|
FinalApplicationStatus.UNDEFINED);
|
||||||
when(appAttempt.createApplicationAttemptState()).thenReturn(
|
|
||||||
YarnApplicationAttemptState.FINISHED);
|
|
||||||
return appAttempt;
|
return appAttempt;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -200,7 +196,7 @@ public void testWriteApplication() throws Exception {
|
|||||||
Assert.assertEquals(0L, appHD.getSubmitTime());
|
Assert.assertEquals(0L, appHD.getSubmitTime());
|
||||||
Assert.assertEquals(1L, appHD.getStartTime());
|
Assert.assertEquals(1L, appHD.getStartTime());
|
||||||
|
|
||||||
writer.applicationFinished(app);
|
writer.applicationFinished(app, RMAppState.FINISHED);
|
||||||
for (int i = 0; i < MAX_RETRIES; ++i) {
|
for (int i = 0; i < MAX_RETRIES; ++i) {
|
||||||
appHD = store.getApplication(ApplicationId.newInstance(0, 1));
|
appHD = store.getApplication(ApplicationId.newInstance(0, 1));
|
||||||
if (appHD.getYarnApplicationState() != null) {
|
if (appHD.getYarnApplicationState() != null) {
|
||||||
@ -241,7 +237,7 @@ public void testWriteApplicationAttempt() throws Exception {
|
|||||||
ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 1), 1), 1),
|
ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 1), 1), 1),
|
||||||
appAttemptHD.getMasterContainerId());
|
appAttemptHD.getMasterContainerId());
|
||||||
|
|
||||||
writer.applicationAttemptFinished(appAttempt);
|
writer.applicationAttemptFinished(appAttempt, RMAppAttemptState.FINISHED);
|
||||||
for (int i = 0; i < MAX_RETRIES; ++i) {
|
for (int i = 0; i < MAX_RETRIES; ++i) {
|
||||||
appAttemptHD =
|
appAttemptHD =
|
||||||
store.getApplicationAttempt(ApplicationAttemptId.newInstance(
|
store.getApplicationAttempt(ApplicationAttemptId.newInstance(
|
||||||
@ -326,9 +322,10 @@ public void testParallelWrite() throws Exception {
|
|||||||
writer.containerStarted(container);
|
writer.containerStarted(container);
|
||||||
writer.containerFinished(container);
|
writer.containerFinished(container);
|
||||||
}
|
}
|
||||||
writer.applicationAttemptFinished(appAttempt);
|
writer.applicationAttemptFinished(
|
||||||
|
appAttempt, RMAppAttemptState.FINISHED);
|
||||||
}
|
}
|
||||||
writer.applicationFinished(app);
|
writer.applicationFinished(app, RMAppState.FINISHED);
|
||||||
}
|
}
|
||||||
for (int i = 0; i < MAX_RETRIES; ++i) {
|
for (int i = 0; i < MAX_RETRIES; ++i) {
|
||||||
if (allEventsHandled(20 * 10 * 10 + 20 * 10 + 20)) {
|
if (allEventsHandled(20 * 10 * 10 + 20 * 10 + 20)) {
|
||||||
@ -369,7 +366,7 @@ public void applicationStarted(RMApp app) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void applicationFinished(RMApp app) {
|
public void applicationFinished(RMApp app, RMAppState finalState) {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -377,7 +374,8 @@ public void applicationAttemptStarted(RMAppAttempt appAttempt) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void applicationAttemptFinished(RMAppAttempt appAttempt) {
|
public void applicationAttemptFinished(
|
||||||
|
RMAppAttempt appAttempt, RMAppAttemptState finalState) {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -57,7 +57,6 @@
|
|||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||||
@ -71,6 +70,7 @@
|
|||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.runner.RunWith;
|
import org.junit.runner.RunWith;
|
||||||
import org.junit.runners.Parameterized;
|
import org.junit.runners.Parameterized;
|
||||||
|
import org.mockito.ArgumentCaptor;
|
||||||
|
|
||||||
|
|
||||||
@RunWith(value = Parameterized.class)
|
@RunWith(value = Parameterized.class)
|
||||||
@ -308,16 +308,6 @@ private void assertKilled(RMApp application) {
|
|||||||
"Application killed by user.", diag.toString());
|
"Application killed by user.", diag.toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void assertAppAndAttemptKilled(RMApp application)
|
|
||||||
throws InterruptedException {
|
|
||||||
sendAttemptUpdateSavedEvent(application);
|
|
||||||
sendAppUpdateSavedEvent(application);
|
|
||||||
assertKilled(application);
|
|
||||||
Assert.assertEquals(RMAppAttemptState.KILLED, application
|
|
||||||
.getCurrentAppAttempt().getAppAttemptState());
|
|
||||||
assertAppFinalStateSaved(application);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void assertFailed(RMApp application, String regex) {
|
private void assertFailed(RMApp application, String regex) {
|
||||||
assertTimesAtFinish(application);
|
assertTimesAtFinish(application);
|
||||||
assertAppState(RMAppState.FAILED, application);
|
assertAppState(RMAppState.FAILED, application);
|
||||||
@ -511,7 +501,7 @@ public void testAppNewKill() throws IOException {
|
|||||||
sendAppUpdateSavedEvent(application);
|
sendAppUpdateSavedEvent(application);
|
||||||
assertKilled(application);
|
assertKilled(application);
|
||||||
assertAppFinalStateNotSaved(application);
|
assertAppFinalStateNotSaved(application);
|
||||||
verify(writer).applicationFinished(any(RMApp.class));
|
verifyApplicationFinished(RMAppState.KILLED);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -528,7 +518,7 @@ public void testAppNewReject() throws IOException {
|
|||||||
sendAppUpdateSavedEvent(application);
|
sendAppUpdateSavedEvent(application);
|
||||||
assertFailed(application, rejectedText);
|
assertFailed(application, rejectedText);
|
||||||
assertAppFinalStateNotSaved(application);
|
assertAppFinalStateNotSaved(application);
|
||||||
verify(writer).applicationFinished(any(RMApp.class));
|
verifyApplicationFinished(RMAppState.FAILED);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test (timeout = 30000)
|
@Test (timeout = 30000)
|
||||||
@ -543,7 +533,7 @@ public void testAppNewSavingKill() throws IOException {
|
|||||||
rmDispatcher.await();
|
rmDispatcher.await();
|
||||||
sendAppUpdateSavedEvent(application);
|
sendAppUpdateSavedEvent(application);
|
||||||
assertKilled(application);
|
assertKilled(application);
|
||||||
verify(writer).applicationFinished(any(RMApp.class));
|
verifyApplicationFinished(RMAppState.KILLED);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test (timeout = 30000)
|
@Test (timeout = 30000)
|
||||||
@ -560,7 +550,7 @@ public void testAppNewSavingReject() throws IOException {
|
|||||||
sendAppUpdateSavedEvent(application);
|
sendAppUpdateSavedEvent(application);
|
||||||
assertFailed(application, rejectedText);
|
assertFailed(application, rejectedText);
|
||||||
assertAppFinalStateSaved(application);
|
assertAppFinalStateSaved(application);
|
||||||
verify(writer).applicationFinished(any(RMApp.class));
|
verifyApplicationFinished(RMAppState.FAILED);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test (timeout = 30000)
|
@Test (timeout = 30000)
|
||||||
@ -577,7 +567,7 @@ public void testAppSubmittedRejected() throws IOException {
|
|||||||
sendAppUpdateSavedEvent(application);
|
sendAppUpdateSavedEvent(application);
|
||||||
assertFailed(application, rejectedText);
|
assertFailed(application, rejectedText);
|
||||||
assertAppFinalStateSaved(application);
|
assertAppFinalStateSaved(application);
|
||||||
verify(writer).applicationFinished(any(RMApp.class));
|
verifyApplicationFinished(RMAppState.FAILED);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -592,7 +582,7 @@ public void testAppSubmittedKill() throws IOException, InterruptedException {
|
|||||||
sendAppUpdateSavedEvent(application);
|
sendAppUpdateSavedEvent(application);
|
||||||
assertKilled(application);
|
assertKilled(application);
|
||||||
assertAppFinalStateSaved(application);
|
assertAppFinalStateSaved(application);
|
||||||
verify(writer).applicationFinished(any(RMApp.class));
|
verifyApplicationFinished(RMAppState.KILLED);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -627,7 +617,7 @@ public void testAppAcceptedFailed() throws IOException {
|
|||||||
sendAppUpdateSavedEvent(application);
|
sendAppUpdateSavedEvent(application);
|
||||||
assertFailed(application, ".*" + message + ".*Failing the application.*");
|
assertFailed(application, ".*" + message + ".*Failing the application.*");
|
||||||
assertAppFinalStateSaved(application);
|
assertAppFinalStateSaved(application);
|
||||||
verify(writer).applicationFinished(any(RMApp.class));
|
verifyApplicationFinished(RMAppState.FAILED);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -649,7 +639,7 @@ public void testAppAcceptedKill() throws IOException, InterruptedException {
|
|||||||
sendAppUpdateSavedEvent(application);
|
sendAppUpdateSavedEvent(application);
|
||||||
assertKilled(application);
|
assertKilled(application);
|
||||||
assertAppFinalStateSaved(application);
|
assertAppFinalStateSaved(application);
|
||||||
verify(writer).applicationFinished(any(RMApp.class));
|
verifyApplicationFinished(RMAppState.KILLED);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -672,7 +662,7 @@ public void testAppRunningKill() throws IOException {
|
|||||||
sendAttemptUpdateSavedEvent(application);
|
sendAttemptUpdateSavedEvent(application);
|
||||||
sendAppUpdateSavedEvent(application);
|
sendAppUpdateSavedEvent(application);
|
||||||
assertKilled(application);
|
assertKilled(application);
|
||||||
verify(writer).applicationFinished(any(RMApp.class));
|
verifyApplicationFinished(RMAppState.KILLED);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -727,7 +717,7 @@ public void testAppRunningFailed() throws IOException {
|
|||||||
rmDispatcher.await();
|
rmDispatcher.await();
|
||||||
assertFailed(application, ".*Failing the application.*");
|
assertFailed(application, ".*Failing the application.*");
|
||||||
assertAppFinalStateSaved(application);
|
assertAppFinalStateSaved(application);
|
||||||
verify(writer).applicationFinished(any(RMApp.class));
|
verifyApplicationFinished(RMAppState.FAILED);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -785,7 +775,7 @@ public void testAppFinishedFinished() throws IOException {
|
|||||||
StringBuilder diag = application.getDiagnostics();
|
StringBuilder diag = application.getDiagnostics();
|
||||||
Assert.assertEquals("application diagnostics is not correct",
|
Assert.assertEquals("application diagnostics is not correct",
|
||||||
"", diag.toString());
|
"", diag.toString());
|
||||||
verify(writer).applicationFinished(any(RMApp.class));
|
verifyApplicationFinished(RMAppState.FINISHED);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test (timeout = 30000)
|
@Test (timeout = 30000)
|
||||||
@ -810,10 +800,10 @@ public void testAppFailedFailed() throws IOException {
|
|||||||
rmDispatcher.await();
|
rmDispatcher.await();
|
||||||
assertTimesAtFinish(application);
|
assertTimesAtFinish(application);
|
||||||
assertAppState(RMAppState.FAILED, application);
|
assertAppState(RMAppState.FAILED, application);
|
||||||
|
verifyApplicationFinished(RMAppState.FAILED);
|
||||||
|
|
||||||
assertTimesAtFinish(application);
|
assertTimesAtFinish(application);
|
||||||
assertAppState(RMAppState.FAILED, application);
|
assertAppState(RMAppState.FAILED, application);
|
||||||
verify(writer).applicationFinished(any(RMApp.class));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test (timeout = 30000)
|
@Test (timeout = 30000)
|
||||||
@ -856,10 +846,10 @@ public void testAppKilledKilled() throws IOException {
|
|||||||
rmDispatcher.await();
|
rmDispatcher.await();
|
||||||
assertTimesAtFinish(application);
|
assertTimesAtFinish(application);
|
||||||
assertAppState(RMAppState.KILLED, application);
|
assertAppState(RMAppState.KILLED, application);
|
||||||
|
verifyApplicationFinished(RMAppState.KILLED);
|
||||||
|
|
||||||
assertTimesAtFinish(application);
|
assertTimesAtFinish(application);
|
||||||
assertAppState(RMAppState.KILLED, application);
|
assertAppState(RMAppState.KILLED, application);
|
||||||
verify(writer).applicationFinished(any(RMApp.class));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -871,4 +861,11 @@ public void testGetAppReport() {
|
|||||||
report = app.createAndGetApplicationReport("clientuser", true);
|
report = app.createAndGetApplicationReport("clientuser", true);
|
||||||
Assert.assertNotNull(report.getApplicationResourceUsageReport());
|
Assert.assertNotNull(report.getApplicationResourceUsageReport());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void verifyApplicationFinished(RMAppState state) {
|
||||||
|
ArgumentCaptor<RMAppState> finalState =
|
||||||
|
ArgumentCaptor.forClass(RMAppState.class);
|
||||||
|
verify(writer).applicationFinished(any(RMApp.class), finalState.capture());
|
||||||
|
Assert.assertEquals(state, finalState.getValue());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -103,6 +103,7 @@
|
|||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.runner.RunWith;
|
import org.junit.runner.RunWith;
|
||||||
import org.junit.runners.Parameterized;
|
import org.junit.runners.Parameterized;
|
||||||
|
import org.mockito.ArgumentCaptor;
|
||||||
|
|
||||||
@RunWith(value = Parameterized.class)
|
@RunWith(value = Parameterized.class)
|
||||||
public class TestRMAppAttemptTransitions {
|
public class TestRMAppAttemptTransitions {
|
||||||
@ -367,6 +368,7 @@ private void testAppAttemptSubmittedToFailedState(String diagnostics) {
|
|||||||
// verify(application).handle(anyObject());
|
// verify(application).handle(anyObject());
|
||||||
verify(application).handle(any(RMAppRejectedEvent.class));
|
verify(application).handle(any(RMAppRejectedEvent.class));
|
||||||
verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
|
verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
|
||||||
|
verifyApplicationAttemptFinished(RMAppAttemptState.FAILED);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -384,9 +386,9 @@ private void testAppAttemptKilledState(Container amContainer,
|
|||||||
assertEquals(0, applicationAttempt.getRanNodes().size());
|
assertEquals(0, applicationAttempt.getRanNodes().size());
|
||||||
assertNull(applicationAttempt.getFinalApplicationStatus());
|
assertNull(applicationAttempt.getFinalApplicationStatus());
|
||||||
verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
|
verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
|
||||||
verify(writer).applicationAttemptFinished(any(RMAppAttempt.class));
|
|
||||||
verifyAttemptFinalStateSaved();
|
verifyAttemptFinalStateSaved();
|
||||||
assertFalse(transferStateFromPreviousAttempt);
|
assertFalse(transferStateFromPreviousAttempt);
|
||||||
|
verifyApplicationAttemptFinished(RMAppAttemptState.KILLED);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -460,8 +462,8 @@ private void testAppAttemptFailedState(Container container,
|
|||||||
// Check events
|
// Check events
|
||||||
verify(application, times(1)).handle(any(RMAppFailedAttemptEvent.class));
|
verify(application, times(1)).handle(any(RMAppFailedAttemptEvent.class));
|
||||||
verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
|
verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
|
||||||
verify(writer).applicationAttemptFinished(any(RMAppAttempt.class));
|
|
||||||
verifyAttemptFinalStateSaved();
|
verifyAttemptFinalStateSaved();
|
||||||
|
verifyApplicationAttemptFinished(RMAppAttemptState.FAILED);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -496,7 +498,6 @@ private void testAppAttemptRunningState(Container container,
|
|||||||
assertEquals(getProxyUrl(applicationAttempt),
|
assertEquals(getProxyUrl(applicationAttempt),
|
||||||
applicationAttempt.getTrackingUrl());
|
applicationAttempt.getTrackingUrl());
|
||||||
}
|
}
|
||||||
verify(writer).applicationAttemptStarted(any(RMAppAttempt.class));
|
|
||||||
// TODO - need to add more checks relevant to this state
|
// TODO - need to add more checks relevant to this state
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -544,6 +545,7 @@ private void testAppAttemptFinishedState(Container container,
|
|||||||
assertEquals(finalStatus, applicationAttempt.getFinalApplicationStatus());
|
assertEquals(finalStatus, applicationAttempt.getFinalApplicationStatus());
|
||||||
verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
|
verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
|
||||||
assertFalse(transferStateFromPreviousAttempt);
|
assertFalse(transferStateFromPreviousAttempt);
|
||||||
|
verifyApplicationAttemptFinished(RMAppAttemptState.FINISHED);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -806,7 +808,7 @@ public void testAMCrashAtAllocated() {
|
|||||||
assertEquals(RMAppAttemptState.FAILED,
|
assertEquals(RMAppAttemptState.FAILED,
|
||||||
applicationAttempt.getAppAttemptState());
|
applicationAttempt.getAppAttemptState());
|
||||||
verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
|
verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
|
||||||
verify(writer).applicationAttemptFinished(any(RMAppAttempt.class));
|
verifyApplicationAttemptFinished(RMAppAttemptState.FAILED);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -846,6 +848,7 @@ public void testRunningToFailed() {
|
|||||||
assertEquals(rmAppPageUrl, applicationAttempt.getOriginalTrackingUrl());
|
assertEquals(rmAppPageUrl, applicationAttempt.getOriginalTrackingUrl());
|
||||||
assertEquals(rmAppPageUrl, applicationAttempt.getTrackingUrl());
|
assertEquals(rmAppPageUrl, applicationAttempt.getTrackingUrl());
|
||||||
verifyAMHostAndPortInvalidated();
|
verifyAMHostAndPortInvalidated();
|
||||||
|
verifyApplicationAttemptFinished(RMAppAttemptState.FAILED);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -883,6 +886,7 @@ public void testRunningToKilled() {
|
|||||||
assertEquals(rmAppPageUrl, applicationAttempt.getTrackingUrl());
|
assertEquals(rmAppPageUrl, applicationAttempt.getTrackingUrl());
|
||||||
verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
|
verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
|
||||||
verifyAMHostAndPortInvalidated();
|
verifyAMHostAndPortInvalidated();
|
||||||
|
verifyApplicationAttemptFinished(RMAppAttemptState.KILLED);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout=10000)
|
@Test(timeout=10000)
|
||||||
@ -903,6 +907,7 @@ public void testLaunchedExpire() {
|
|||||||
assertEquals(rmAppPageUrl, applicationAttempt.getOriginalTrackingUrl());
|
assertEquals(rmAppPageUrl, applicationAttempt.getOriginalTrackingUrl());
|
||||||
assertEquals(rmAppPageUrl, applicationAttempt.getTrackingUrl());
|
assertEquals(rmAppPageUrl, applicationAttempt.getTrackingUrl());
|
||||||
verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
|
verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
|
||||||
|
verifyApplicationAttemptFinished(RMAppAttemptState.FAILED);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout=20000)
|
@Test(timeout=20000)
|
||||||
@ -925,6 +930,7 @@ public void testRunningExpire() {
|
|||||||
assertEquals(rmAppPageUrl, applicationAttempt.getTrackingUrl());
|
assertEquals(rmAppPageUrl, applicationAttempt.getTrackingUrl());
|
||||||
verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
|
verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
|
||||||
verifyAMHostAndPortInvalidated();
|
verifyAMHostAndPortInvalidated();
|
||||||
|
verifyApplicationAttemptFinished(RMAppAttemptState.FAILED);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -1177,6 +1183,7 @@ public void testFailedToFailed() {
|
|||||||
applicationAttempt.getAppAttemptState());
|
applicationAttempt.getAppAttemptState());
|
||||||
// should not kill containers when attempt fails.
|
// should not kill containers when attempt fails.
|
||||||
assertTrue(transferStateFromPreviousAttempt);
|
assertTrue(transferStateFromPreviousAttempt);
|
||||||
|
verifyApplicationAttemptFinished(RMAppAttemptState.FAILED);
|
||||||
|
|
||||||
// failed attempt captured the container finished event.
|
// failed attempt captured the container finished event.
|
||||||
assertEquals(0, applicationAttempt.getJustFinishedContainers().size());
|
assertEquals(0, applicationAttempt.getJustFinishedContainers().size());
|
||||||
@ -1216,6 +1223,7 @@ scheduler, masterService, submissionContext, new Configuration(),
|
|||||||
assertEquals(RMAppAttemptState.FAILED,
|
assertEquals(RMAppAttemptState.FAILED,
|
||||||
applicationAttempt.getAppAttemptState());
|
applicationAttempt.getAppAttemptState());
|
||||||
assertFalse(transferStateFromPreviousAttempt);
|
assertFalse(transferStateFromPreviousAttempt);
|
||||||
|
verifyApplicationAttemptFinished(RMAppAttemptState.FAILED);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void verifyTokenCount(ApplicationAttemptId appAttemptId, int count) {
|
private void verifyTokenCount(ApplicationAttemptId appAttemptId, int count) {
|
||||||
@ -1245,4 +1253,13 @@ private void verifyAMHostAndPortInvalidated() {
|
|||||||
assertEquals("N/A", applicationAttempt.getHost());
|
assertEquals("N/A", applicationAttempt.getHost());
|
||||||
assertEquals(-1, applicationAttempt.getRpcPort());
|
assertEquals(-1, applicationAttempt.getRpcPort());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void verifyApplicationAttemptFinished(RMAppAttemptState state) {
|
||||||
|
ArgumentCaptor<RMAppAttemptState> finalState =
|
||||||
|
ArgumentCaptor.forClass(RMAppAttemptState.class);
|
||||||
|
verify(writer).applicationAttemptFinished(
|
||||||
|
any(RMAppAttempt.class), finalState.capture());
|
||||||
|
Assert.assertEquals(state, finalState.getValue());
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user