MAPREDUCE-3162. Separated application-init and container-init event types in NodeManager's Application state machine. Contributed by Todd Lipcon.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1185988 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
13e4562924
commit
e3bb120e9f
@ -1680,6 +1680,9 @@ Release 0.23.0 - Unreleased
|
||||
MAPREDUCE-3199. Fixed pom files to include correct log4j configuration for
|
||||
tests. (vinodkv)
|
||||
|
||||
MAPREDUCE-3162. Separated application-init and container-init event types
|
||||
in NodeManager's Application state machine. (Todd Lipcon via vinodkv)
|
||||
|
||||
Release 0.22.0 - Unreleased
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -69,6 +69,7 @@
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerInitEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl;
|
||||
@ -225,6 +226,7 @@ public void stop() {
|
||||
/**
|
||||
* Start a container on this NodeManager.
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public StartContainerResponse startContainer(StartContainerRequest request)
|
||||
throws YarnRemoteException {
|
||||
@ -274,10 +276,13 @@ public StartContainerResponse startContainer(StartContainerRequest request)
|
||||
context.getApplications().putIfAbsent(applicationID, application)) {
|
||||
LOG.info("Creating a new application reference for app "
|
||||
+ applicationID);
|
||||
dispatcher.getEventHandler().handle(
|
||||
new ApplicationInitEvent(applicationID));
|
||||
}
|
||||
|
||||
// TODO: Validate the request
|
||||
dispatcher.getEventHandler().handle(new ApplicationInitEvent(container));
|
||||
dispatcher.getEventHandler().handle(
|
||||
new ApplicationContainerInitEvent(container));
|
||||
|
||||
NMAuditLogger.logSuccess(launchContext.getUser(),
|
||||
AuditConstants.START_CONTAINER, "ContainerManageImpl",
|
||||
|
@ -0,0 +1,47 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.application;
|
||||
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerInitEvent;
|
||||
|
||||
/**
|
||||
* Event sent from {@link ContainerManagerImpl} to {@link ApplicationImpl} to
|
||||
* request the initialization of a container. This is funneled through
|
||||
* the Application so that the application life-cycle can be checked, and container
|
||||
* launches can be delayed until the application is fully initialized.
|
||||
*
|
||||
* Once the application is initialized,
|
||||
* {@link ApplicationImpl.InitContainerTransition} simply passes this event on as a
|
||||
* {@link ContainerInitEvent}.
|
||||
*
|
||||
*/
|
||||
public class ApplicationContainerInitEvent extends ApplicationEvent {
|
||||
final Container container;
|
||||
|
||||
public ApplicationContainerInitEvent(Container container) {
|
||||
super(container.getContainerID().getApplicationAttemptId().getApplicationId(),
|
||||
ApplicationEventType.INIT_CONTAINER);
|
||||
this.container = container;
|
||||
}
|
||||
|
||||
Container getContainer() {
|
||||
return container;
|
||||
}
|
||||
}
|
@ -22,6 +22,7 @@ public enum ApplicationEventType {
|
||||
|
||||
// Source: ContainerManager
|
||||
INIT_APPLICATION,
|
||||
INIT_CONTAINER,
|
||||
FINISH_APPLICATION,
|
||||
|
||||
// Source: ResourceLocalizationService
|
||||
|
@ -104,11 +104,14 @@ public Map<ContainerId, Container> getContainers() {
|
||||
// Transitions from NEW state
|
||||
.addTransition(ApplicationState.NEW, ApplicationState.INITING,
|
||||
ApplicationEventType.INIT_APPLICATION, new AppInitTransition())
|
||||
.addTransition(ApplicationState.NEW, ApplicationState.NEW,
|
||||
ApplicationEventType.INIT_CONTAINER,
|
||||
new InitContainerTransition())
|
||||
|
||||
// Transitions from INITING state
|
||||
.addTransition(ApplicationState.INITING, ApplicationState.INITING,
|
||||
ApplicationEventType.INIT_APPLICATION,
|
||||
new AppIsInitingTransition())
|
||||
ApplicationEventType.INIT_CONTAINER,
|
||||
new InitContainerTransition())
|
||||
.addTransition(ApplicationState.INITING,
|
||||
EnumSet.of(ApplicationState.FINISHING_CONTAINERS_WAIT,
|
||||
ApplicationState.APPLICATION_RESOURCES_CLEANINGUP),
|
||||
@ -121,8 +124,8 @@ ApplicationEventType.INIT_APPLICATION, new AppInitTransition())
|
||||
// Transitions from RUNNING state
|
||||
.addTransition(ApplicationState.RUNNING,
|
||||
ApplicationState.RUNNING,
|
||||
ApplicationEventType.INIT_APPLICATION,
|
||||
new DuplicateAppInitTransition())
|
||||
ApplicationEventType.INIT_CONTAINER,
|
||||
new InitContainerTransition())
|
||||
.addTransition(ApplicationState.RUNNING,
|
||||
ApplicationState.RUNNING,
|
||||
ApplicationEventType.APPLICATION_CONTAINER_FINISHED,
|
||||
@ -167,9 +170,6 @@ static class AppInitTransition implements
|
||||
SingleArcTransition<ApplicationImpl, ApplicationEvent> {
|
||||
@Override
|
||||
public void transition(ApplicationImpl app, ApplicationEvent event) {
|
||||
ApplicationInitEvent initEvent = (ApplicationInitEvent) event;
|
||||
Container container = initEvent.getContainer();
|
||||
app.containers.put(container.getContainerID(), container);
|
||||
app.dispatcher.getEventHandler().handle(
|
||||
new ApplicationLocalizationEvent(
|
||||
LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
|
||||
@ -177,17 +177,36 @@ public void transition(ApplicationImpl app, ApplicationEvent event) {
|
||||
}
|
||||
|
||||
/**
|
||||
* Absorb initialization events while the application initializes.
|
||||
* Handles INIT_CONTAINER events which request that we launch a new
|
||||
* container. When we're still in the INITTING state, we simply
|
||||
* queue these up. When we're in the RUNNING state, we pass along
|
||||
* an ContainerInitEvent to the appropriate ContainerImpl.
|
||||
*/
|
||||
static class AppIsInitingTransition implements
|
||||
@SuppressWarnings("unchecked")
|
||||
static class InitContainerTransition implements
|
||||
SingleArcTransition<ApplicationImpl, ApplicationEvent> {
|
||||
@Override
|
||||
public void transition(ApplicationImpl app, ApplicationEvent event) {
|
||||
ApplicationInitEvent initEvent = (ApplicationInitEvent) event;
|
||||
ApplicationContainerInitEvent initEvent =
|
||||
(ApplicationContainerInitEvent) event;
|
||||
Container container = initEvent.getContainer();
|
||||
app.containers.put(container.getContainerID(), container);
|
||||
LOG.info("Adding " + container.getContainerID()
|
||||
+ " to application " + app.toString());
|
||||
|
||||
switch (app.getApplicationState()) {
|
||||
case RUNNING:
|
||||
app.dispatcher.getEventHandler().handle(new ContainerInitEvent(
|
||||
container.getContainerID()));
|
||||
break;
|
||||
case INITING:
|
||||
case NEW:
|
||||
// these get queued up and sent out in AppInitDoneTransition
|
||||
break;
|
||||
default:
|
||||
assert false : "Invalid state for InitContainerTransition: " +
|
||||
app.getApplicationState();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -211,20 +230,6 @@ public void transition(ApplicationImpl app, ApplicationEvent event) {
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
static class DuplicateAppInitTransition implements
|
||||
SingleArcTransition<ApplicationImpl, ApplicationEvent> {
|
||||
@Override
|
||||
public void transition(ApplicationImpl app, ApplicationEvent event) {
|
||||
ApplicationInitEvent initEvent = (ApplicationInitEvent) event;
|
||||
Container container = initEvent.getContainer();
|
||||
app.containers.put(container.getContainerID(), container);
|
||||
LOG.info("Adding " + container.getContainerID()
|
||||
+ " to application " + app.toString());
|
||||
app.dispatcher.getEventHandler().handle(new ContainerInitEvent(
|
||||
container.getContainerID()));
|
||||
}
|
||||
}
|
||||
|
||||
static final class ContainerDoneTransition implements
|
||||
SingleArcTransition<ApplicationImpl, ApplicationEvent> {
|
||||
|
@ -18,20 +18,11 @@
|
||||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.application;
|
||||
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
|
||||
public class ApplicationInitEvent extends ApplicationEvent {
|
||||
|
||||
private final Container container;
|
||||
|
||||
public ApplicationInitEvent(Container container) {
|
||||
super(container.getContainerID().getApplicationAttemptId().getApplicationId(),
|
||||
ApplicationEventType.INIT_APPLICATION);
|
||||
this.container = container;
|
||||
public ApplicationInitEvent(ApplicationId appId) {
|
||||
super(appId, ApplicationEventType.INIT_APPLICATION);
|
||||
}
|
||||
|
||||
public Container getContainer() {
|
||||
return this.container;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -41,11 +41,12 @@ public void testApplicationInit1() {
|
||||
WrappedApplication wa = null;
|
||||
try {
|
||||
wa = new WrappedApplication(1, 314159265358979L, "yak", 3);
|
||||
wa.initApplication(1);
|
||||
wa.initApplication();
|
||||
wa.initContainer(1);
|
||||
assertEquals(ApplicationState.INITING, wa.app.getApplicationState());
|
||||
assertEquals(1, wa.app.getContainers().size());
|
||||
wa.initApplication(0);
|
||||
wa.initApplication(2);
|
||||
wa.initContainer(0);
|
||||
wa.initContainer(2);
|
||||
assertEquals(ApplicationState.INITING, wa.app.getApplicationState());
|
||||
assertEquals(3, wa.app.getContainers().size());
|
||||
wa.applicationInited();
|
||||
@ -70,7 +71,8 @@ public void testApplicationInit2() {
|
||||
WrappedApplication wa = null;
|
||||
try {
|
||||
wa = new WrappedApplication(2, 314159265358979L, "yak", 3);
|
||||
wa.initApplication(0);
|
||||
wa.initApplication();
|
||||
wa.initContainer(0);
|
||||
assertEquals(ApplicationState.INITING, wa.app.getApplicationState());
|
||||
assertEquals(1, wa.app.getContainers().size());
|
||||
|
||||
@ -80,8 +82,8 @@ public void testApplicationInit2() {
|
||||
argThat(new ContainerInitMatcher(wa.containers.get(0)
|
||||
.getContainerID())));
|
||||
|
||||
wa.initApplication(1);
|
||||
wa.initApplication(2);
|
||||
wa.initContainer(1);
|
||||
wa.initContainer(2);
|
||||
assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState());
|
||||
assertEquals(3, wa.app.getContainers().size());
|
||||
|
||||
@ -105,7 +107,8 @@ public void testAppRunningAfterContainersComplete() {
|
||||
WrappedApplication wa = null;
|
||||
try {
|
||||
wa = new WrappedApplication(3, 314159265358979L, "yak", 3);
|
||||
wa.initApplication(-1);
|
||||
wa.initApplication();
|
||||
wa.initContainer(-1);
|
||||
assertEquals(ApplicationState.INITING, wa.app.getApplicationState());
|
||||
wa.applicationInited();
|
||||
assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState());
|
||||
@ -130,7 +133,8 @@ public void testAppFinishedOnRunningContainers() {
|
||||
WrappedApplication wa = null;
|
||||
try {
|
||||
wa = new WrappedApplication(4, 314159265358979L, "yak", 3);
|
||||
wa.initApplication(-1);
|
||||
wa.initApplication();
|
||||
wa.initContainer(-1);
|
||||
assertEquals(ApplicationState.INITING, wa.app.getApplicationState());
|
||||
wa.applicationInited();
|
||||
assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState());
|
||||
@ -185,7 +189,8 @@ public void testAppFinishedOnCompletedContainers() {
|
||||
WrappedApplication wa = null;
|
||||
try {
|
||||
wa = new WrappedApplication(5, 314159265358979L, "yak", 3);
|
||||
wa.initApplication(-1);
|
||||
wa.initApplication();
|
||||
wa.initContainer(-1);
|
||||
assertEquals(ApplicationState.INITING, wa.app.getApplicationState());
|
||||
wa.applicationInited();
|
||||
assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState());
|
||||
@ -220,7 +225,8 @@ public void testStartContainerAfterAppFinished() {
|
||||
WrappedApplication wa = null;
|
||||
try {
|
||||
wa = new WrappedApplication(5, 314159265358979L, "yak", 3);
|
||||
wa.initApplication(-1);
|
||||
wa.initApplication();
|
||||
wa.initContainer(-1);
|
||||
assertEquals(ApplicationState.INITING, wa.app.getApplicationState());
|
||||
wa.applicationInited();
|
||||
assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState());
|
||||
@ -256,7 +262,8 @@ public void testAppFinishedOnIniting() {
|
||||
WrappedApplication wa = null;
|
||||
try {
|
||||
wa = new WrappedApplication(1, 314159265358979L, "yak", 3);
|
||||
wa.initApplication(0);
|
||||
wa.initApplication();
|
||||
wa.initContainer(0);
|
||||
assertEquals(ApplicationState.INITING, wa.app.getApplicationState());
|
||||
assertEquals(1, wa.app.getContainers().size());
|
||||
|
||||
@ -276,7 +283,7 @@ public void testAppFinishedOnIniting() {
|
||||
refEq(new ApplicationLocalizationEvent(
|
||||
LocalizationEventType.DESTROY_APPLICATION_RESOURCES, wa.app)));
|
||||
|
||||
wa.initApplication(1);
|
||||
wa.initContainer(1);
|
||||
assertEquals(ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
|
||||
wa.app.getApplicationState());
|
||||
assertEquals(0, wa.app.getContainers().size());
|
||||
@ -376,13 +383,18 @@ public void finished() {
|
||||
dispatcher.stop();
|
||||
}
|
||||
|
||||
public void initApplication(int containerNum) {
|
||||
public void initApplication() {
|
||||
app.handle(new ApplicationInitEvent(appId));
|
||||
}
|
||||
|
||||
|
||||
public void initContainer(int containerNum) {
|
||||
if (containerNum == -1) {
|
||||
for (int i = 0; i < containers.size(); i++) {
|
||||
app.handle(new ApplicationInitEvent(containers.get(i)));
|
||||
app.handle(new ApplicationContainerInitEvent(containers.get(i)));
|
||||
}
|
||||
} else {
|
||||
app.handle(new ApplicationInitEvent(containers.get(containerNum)));
|
||||
app.handle(new ApplicationContainerInitEvent(containers.get(containerNum)));
|
||||
}
|
||||
drainDispatcherEvents();
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user