YARN-212. NM state machine ignores an APPLICATION_CONTAINER_FINISHED event when it shouldn't. Contributed by Nathan Roberts
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1408812 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
4741064250
commit
6db6e00649
@ -216,6 +216,9 @@ Release 0.23.5 - UNRELEASED
|
|||||||
YARN-206. TestApplicationCleanup.testContainerCleanup occasionally fails.
|
YARN-206. TestApplicationCleanup.testContainerCleanup occasionally fails.
|
||||||
(jlowe via jeagles)
|
(jlowe via jeagles)
|
||||||
|
|
||||||
|
YARN-212. NM state machine ignores an APPLICATION_CONTAINER_FINISHED event
|
||||||
|
when it shouldn't (Nathan Roberts via jlowe)
|
||||||
|
|
||||||
Release 0.23.4 - UNRELEASED
|
Release 0.23.4 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -143,6 +143,9 @@ ApplicationEventType.INIT_APPLICATION, new AppInitTransition())
|
|||||||
ApplicationState.APPLICATION_RESOURCES_CLEANINGUP),
|
ApplicationState.APPLICATION_RESOURCES_CLEANINGUP),
|
||||||
ApplicationEventType.FINISH_APPLICATION,
|
ApplicationEventType.FINISH_APPLICATION,
|
||||||
new AppFinishTriggeredTransition())
|
new AppFinishTriggeredTransition())
|
||||||
|
.addTransition(ApplicationState.INITING, ApplicationState.INITING,
|
||||||
|
ApplicationEventType.APPLICATION_CONTAINER_FINISHED,
|
||||||
|
CONTAINER_DONE_TRANSITION)
|
||||||
.addTransition(ApplicationState.INITING, ApplicationState.INITING,
|
.addTransition(ApplicationState.INITING, ApplicationState.INITING,
|
||||||
ApplicationEventType.APPLICATION_LOG_HANDLING_INITED,
|
ApplicationEventType.APPLICATION_LOG_HANDLING_INITED,
|
||||||
new AppLogInitDoneTransition())
|
new AppLogInitDoneTransition())
|
||||||
|
@ -277,6 +277,8 @@ ContainerEventType.KILL_CONTAINER, new KillTransition())
|
|||||||
// From DONE
|
// From DONE
|
||||||
.addTransition(ContainerState.DONE, ContainerState.DONE,
|
.addTransition(ContainerState.DONE, ContainerState.DONE,
|
||||||
ContainerEventType.KILL_CONTAINER)
|
ContainerEventType.KILL_CONTAINER)
|
||||||
|
.addTransition(ContainerState.DONE, ContainerState.DONE,
|
||||||
|
ContainerEventType.INIT_CONTAINER)
|
||||||
.addTransition(ContainerState.DONE, ContainerState.DONE,
|
.addTransition(ContainerState.DONE, ContainerState.DONE,
|
||||||
ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
|
ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
|
||||||
UPDATE_DIAGNOSTICS_TRANSITION)
|
UPDATE_DIAGNOSTICS_TRANSITION)
|
||||||
|
@ -155,6 +155,60 @@ public void testAppRunningAfterContainersComplete() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Finished containers properly tracked when only container finishes in APP_INITING
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testContainersCompleteDuringAppInit1() {
|
||||||
|
WrappedApplication wa = null;
|
||||||
|
try {
|
||||||
|
wa = new WrappedApplication(3, 314159265358979L, "yak", 1);
|
||||||
|
wa.initApplication();
|
||||||
|
wa.initContainer(-1);
|
||||||
|
assertEquals(ApplicationState.INITING, wa.app.getApplicationState());
|
||||||
|
|
||||||
|
wa.containerFinished(0);
|
||||||
|
assertEquals(ApplicationState.INITING, wa.app.getApplicationState());
|
||||||
|
|
||||||
|
wa.applicationInited();
|
||||||
|
assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState());
|
||||||
|
assertEquals(0, wa.app.getContainers().size());
|
||||||
|
} finally {
|
||||||
|
if (wa != null)
|
||||||
|
wa.finished();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Finished containers properly tracked when 1 of several containers finishes in APP_INITING
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testContainersCompleteDuringAppInit2() {
|
||||||
|
WrappedApplication wa = null;
|
||||||
|
try {
|
||||||
|
wa = new WrappedApplication(3, 314159265358979L, "yak", 3);
|
||||||
|
wa.initApplication();
|
||||||
|
wa.initContainer(-1);
|
||||||
|
assertEquals(ApplicationState.INITING, wa.app.getApplicationState());
|
||||||
|
|
||||||
|
wa.containerFinished(0);
|
||||||
|
|
||||||
|
assertEquals(ApplicationState.INITING, wa.app.getApplicationState());
|
||||||
|
|
||||||
|
wa.applicationInited();
|
||||||
|
assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState());
|
||||||
|
assertEquals(2, wa.app.getContainers().size());
|
||||||
|
|
||||||
|
wa.containerFinished(1);
|
||||||
|
wa.containerFinished(2);
|
||||||
|
assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState());
|
||||||
|
assertEquals(0, wa.app.getContainers().size());
|
||||||
|
} finally {
|
||||||
|
if (wa != null)
|
||||||
|
wa.finished();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public void testAppFinishedOnRunningContainers() {
|
public void testAppFinishedOnRunningContainers() {
|
||||||
|
@ -56,6 +56,8 @@
|
|||||||
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
|
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent;
|
||||||
@ -65,6 +67,8 @@
|
|||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEventType;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||||
@ -209,6 +213,32 @@ public void testCleanupOnSuccess() throws Exception {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@SuppressWarnings("unchecked") // mocked generic
|
||||||
|
public void testInitWhileDone() throws Exception {
|
||||||
|
WrappedContainer wc = null;
|
||||||
|
try {
|
||||||
|
wc = new WrappedContainer(6, 314159265358979L, 4344, "yak");
|
||||||
|
wc.initContainer();
|
||||||
|
wc.localizeResources();
|
||||||
|
wc.launchContainer();
|
||||||
|
reset(wc.localizerBus);
|
||||||
|
wc.containerSuccessful();
|
||||||
|
wc.containerResourcesCleanup();
|
||||||
|
assertEquals(ContainerState.DONE, wc.c.getContainerState());
|
||||||
|
// Now in DONE, issue INIT
|
||||||
|
wc.initContainer();
|
||||||
|
// Verify still in DONE
|
||||||
|
assertEquals(ContainerState.DONE, wc.c.getContainerState());
|
||||||
|
verifyCleanupCall(wc);
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
if (wc != null) {
|
||||||
|
wc.finished();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@SuppressWarnings("unchecked") // mocked generic
|
@SuppressWarnings("unchecked") // mocked generic
|
||||||
public void testCleanupOnKillRequest() throws Exception {
|
public void testCleanupOnKillRequest() throws Exception {
|
||||||
@ -506,6 +536,8 @@ private class WrappedContainer {
|
|||||||
final EventHandler<ContainersLauncherEvent> launcherBus;
|
final EventHandler<ContainersLauncherEvent> launcherBus;
|
||||||
final EventHandler<ContainersMonitorEvent> monitorBus;
|
final EventHandler<ContainersMonitorEvent> monitorBus;
|
||||||
final EventHandler<AuxServicesEvent> auxBus;
|
final EventHandler<AuxServicesEvent> auxBus;
|
||||||
|
final EventHandler<ApplicationEvent> appBus;
|
||||||
|
final EventHandler<LogHandlerEvent> LogBus;
|
||||||
|
|
||||||
final ContainerLaunchContext ctxt;
|
final ContainerLaunchContext ctxt;
|
||||||
final ContainerId cId;
|
final ContainerId cId;
|
||||||
@ -527,10 +559,14 @@ private class WrappedContainer {
|
|||||||
launcherBus = mock(EventHandler.class);
|
launcherBus = mock(EventHandler.class);
|
||||||
monitorBus = mock(EventHandler.class);
|
monitorBus = mock(EventHandler.class);
|
||||||
auxBus = mock(EventHandler.class);
|
auxBus = mock(EventHandler.class);
|
||||||
|
appBus = mock(EventHandler.class);
|
||||||
|
LogBus = mock(EventHandler.class);
|
||||||
dispatcher.register(LocalizationEventType.class, localizerBus);
|
dispatcher.register(LocalizationEventType.class, localizerBus);
|
||||||
dispatcher.register(ContainersLauncherEventType.class, launcherBus);
|
dispatcher.register(ContainersLauncherEventType.class, launcherBus);
|
||||||
dispatcher.register(ContainersMonitorEventType.class, monitorBus);
|
dispatcher.register(ContainersMonitorEventType.class, monitorBus);
|
||||||
dispatcher.register(AuxServicesEventType.class, auxBus);
|
dispatcher.register(AuxServicesEventType.class, auxBus);
|
||||||
|
dispatcher.register(ApplicationEventType.class, appBus);
|
||||||
|
dispatcher.register(LogHandlerEventType.class, LogBus);
|
||||||
this.user = user;
|
this.user = user;
|
||||||
|
|
||||||
ctxt = mock(ContainerLaunchContext.class);
|
ctxt = mock(ContainerLaunchContext.class);
|
||||||
@ -654,6 +690,11 @@ public void containerSuccessful() {
|
|||||||
ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS));
|
ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS));
|
||||||
drainDispatcherEvents();
|
drainDispatcherEvents();
|
||||||
}
|
}
|
||||||
|
public void containerResourcesCleanup() {
|
||||||
|
c.handle(new ContainerEvent(cId,
|
||||||
|
ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP));
|
||||||
|
drainDispatcherEvents();
|
||||||
|
}
|
||||||
|
|
||||||
public void containerFailed(int exitCode) {
|
public void containerFailed(int exitCode) {
|
||||||
c.handle(new ContainerExitEvent(cId,
|
c.handle(new ContainerExitEvent(cId,
|
||||||
|
@ -319,6 +319,7 @@ public void testMultipleAppsLogAggregation() throws Exception {
|
|||||||
this.user, null,
|
this.user, null,
|
||||||
ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY, this.acls));
|
ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY, this.acls));
|
||||||
|
|
||||||
|
dispatcher.await();
|
||||||
ApplicationEvent expectedInitEvents[] = new ApplicationEvent[]{
|
ApplicationEvent expectedInitEvents[] = new ApplicationEvent[]{
|
||||||
new ApplicationEvent(
|
new ApplicationEvent(
|
||||||
application1,
|
application1,
|
||||||
|
Loading…
Reference in New Issue
Block a user