YARN-7240. Add more states and transitions to stabilize the NM Container state machine. (Kartheek Muthyala via asuresh)

This commit is contained in:
Arun Suresh 2017-09-25 14:11:55 -07:00
parent 47011d7dd3
commit df800f6cf3
10 changed files with 660 additions and 142 deletions

View File

@ -20,6 +20,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.UpdateContainerTokenEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@ -144,7 +145,6 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerScheduler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.UpdateContainerSchedulerEvent;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredApplicationsState;
@ -1251,29 +1251,6 @@ private void updateContainerInternal(ContainerId containerId,
+ " [" + containerTokenIdentifier.getVersion() + "]");
}
// Check container state
org.apache.hadoop.yarn.server.nodemanager.
containermanager.container.ContainerState currentState =
container.getContainerState();
EnumSet<org.apache.hadoop.yarn.server.nodemanager.containermanager
.container.ContainerState> allowedStates = EnumSet.of(
org.apache.hadoop.yarn.server.nodemanager.containermanager.container
.ContainerState.RUNNING,
org.apache.hadoop.yarn.server.nodemanager.containermanager.container
.ContainerState.SCHEDULED,
org.apache.hadoop.yarn.server.nodemanager.containermanager.container
.ContainerState.LOCALIZING,
org.apache.hadoop.yarn.server.nodemanager.containermanager.container
.ContainerState.REINITIALIZING,
org.apache.hadoop.yarn.server.nodemanager.containermanager.container
.ContainerState.RELAUNCHING);
if (!allowedStates.contains(currentState)) {
throw RPCUtil.getRemoteException("Container " + containerId.toString()
+ " is in " + currentState.name() + " state."
+ " Resource can only be changed when a container is in"
+ " RUNNING or SCHEDULED state");
}
// Check validity of the target resource.
Resource currentResource = container.getResource();
ExecutionType currentExecType =
@ -1313,11 +1290,11 @@ private void updateContainerInternal(ContainerId containerId,
this.readLock.lock();
try {
if (!serviceStopped) {
// Dispatch message to ContainerScheduler to actually
// Dispatch message to Container to actually
// make the change.
dispatcher.getEventHandler().handle(new UpdateContainerSchedulerEvent(
container, containerTokenIdentifier, isResourceChange,
isExecTypeUpdate, isIncrease));
dispatcher.getEventHandler().handle(new UpdateContainerTokenEvent(
container.getContainerId(), containerTokenIdentifier,
isResourceChange, isExecTypeUpdate, isIncrease));
} else {
throw new YarnException(
"Unable to change container resource as the NodeManager is "
@ -1816,10 +1793,14 @@ private Container preReInitializeOrLocalizeCheck(ContainerId containerId,
if (container == null) {
throw new YarnException("Specified " + containerId + " does not exist!");
}
if (!container.isRunning() || container.isReInitializing()) {
if (!container.isRunning() || container.isReInitializing()
|| container.getContainerTokenIdentifier().getExecutionType()
== ExecutionType.OPPORTUNISTIC) {
throw new YarnException("Cannot perform " + op + " on [" + containerId
+ "]. Current state is [" + container.getContainerState() + ", " +
"isReInitializing=" + container.isReInitializing() + "].");
"isReInitializing=" + container.isReInitializing() + "]. Container"
+ " Execution Type is [" + container.getContainerTokenIdentifier()
.getExecutionType() + "].");
}
return container;
}

View File

@ -29,6 +29,7 @@ public enum ContainerEventType {
ROLLBACK_REINIT,
PAUSE_CONTAINER,
RESUME_CONTAINER,
UPDATE_CONTAINER_TOKEN,
// DownloadManager
CONTAINER_INITED,
@ -42,5 +43,8 @@ public enum ContainerEventType {
CONTAINER_EXITED_WITH_FAILURE,
CONTAINER_KILLED_ON_REQUEST,
CONTAINER_PAUSED,
CONTAINER_RESUMED
CONTAINER_RESUMED,
// Producer: ContainerScheduler
CONTAINER_TOKEN_UPDATED
}

View File

@ -33,6 +33,8 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.UpdateContainerSchedulerEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -308,8 +310,8 @@ ContainerEventType.INIT_CONTAINER, new RequestResourcesTransition())
UPDATE_DIAGNOSTICS_TRANSITION)
.addTransition(ContainerState.NEW, ContainerState.DONE,
ContainerEventType.KILL_CONTAINER, new KillOnNewTransition())
.addTransition(ContainerState.NEW, ContainerState.DONE,
ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition())
.addTransition(ContainerState.NEW, ContainerState.NEW,
ContainerEventType.UPDATE_CONTAINER_TOKEN, new UpdateTransition())
// From LOCALIZING State
.addTransition(ContainerState.LOCALIZING,
@ -325,8 +327,9 @@ ContainerEventType.RESOURCE_LOCALIZED, new LocalizedTransition())
.addTransition(ContainerState.LOCALIZING, ContainerState.KILLING,
ContainerEventType.KILL_CONTAINER,
new KillBeforeRunningTransition())
.addTransition(ContainerState.LOCALIZING, ContainerState.KILLING,
ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition())
.addTransition(ContainerState.LOCALIZING, ContainerState.LOCALIZING,
ContainerEventType.UPDATE_CONTAINER_TOKEN, new UpdateTransition())
// From LOCALIZATION_FAILED State
.addTransition(ContainerState.LOCALIZATION_FAILED,
@ -351,6 +354,9 @@ ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition())
.addTransition(ContainerState.LOCALIZATION_FAILED,
ContainerState.LOCALIZATION_FAILED,
ContainerEventType.RESOURCE_FAILED)
.addTransition(ContainerState.LOCALIZATION_FAILED,
ContainerState.LOCALIZATION_FAILED,
ContainerEventType.UPDATE_CONTAINER_TOKEN, new UpdateTransition())
// From SCHEDULED State
.addTransition(ContainerState.SCHEDULED, ContainerState.RUNNING,
@ -364,6 +370,9 @@ ContainerEventType.CONTAINER_LAUNCHED, new LaunchTransition())
.addTransition(ContainerState.SCHEDULED, ContainerState.KILLING,
ContainerEventType.KILL_CONTAINER,
new KillBeforeRunningTransition())
.addTransition(ContainerState.SCHEDULED, ContainerState.SCHEDULED,
ContainerEventType.UPDATE_CONTAINER_TOKEN,
new NotifyContainerSchedulerOfUpdateTransition())
// From RUNNING State
.addTransition(ContainerState.RUNNING,
@ -376,10 +385,16 @@ ContainerEventType.CONTAINER_LAUNCHED, new LaunchTransition())
ContainerState.EXITED_WITH_FAILURE),
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
new RetryFailureTransition())
.addTransition(ContainerState.RUNNING, ContainerState.REINITIALIZING,
.addTransition(ContainerState.RUNNING,
EnumSet.of(ContainerState.RUNNING,
ContainerState.REINITIALIZING,
ContainerState.REINITIALIZING_AWAITING_KILL),
ContainerEventType.REINITIALIZE_CONTAINER,
new ReInitializeContainerTransition())
.addTransition(ContainerState.RUNNING, ContainerState.REINITIALIZING,
.addTransition(ContainerState.RUNNING,
EnumSet.of(ContainerState.RUNNING,
ContainerState.REINITIALIZING,
ContainerState.REINITIALIZING_AWAITING_KILL),
ContainerEventType.ROLLBACK_REINIT,
new RollbackContainerTransition())
.addTransition(ContainerState.RUNNING, ContainerState.RUNNING,
@ -398,9 +413,16 @@ ContainerEventType.KILL_CONTAINER, new KillTransition())
ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
new KilledExternallyTransition())
.addTransition(ContainerState.RUNNING, ContainerState.PAUSING,
ContainerEventType.PAUSE_CONTAINER, new PauseContainerTransition())
ContainerEventType.PAUSE_CONTAINER, new PauseContainerTransition())
.addTransition(ContainerState.RUNNING, ContainerState.RUNNING,
ContainerEventType.UPDATE_CONTAINER_TOKEN,
new NotifyContainerSchedulerOfUpdateTransition())
// From PAUSING State
.addTransition(ContainerState.PAUSING, ContainerState.PAUSING,
ContainerEventType.RESOURCE_LOCALIZED,
new ResourceLocalizedWhileRunningTransition())
.addTransition(ContainerState.PAUSING, ContainerState.KILLING,
ContainerEventType.KILL_CONTAINER, new KillTransition())
.addTransition(ContainerState.PAUSING, ContainerState.PAUSING,
@ -420,6 +442,12 @@ ContainerEventType.CONTAINER_PAUSED, new PausedContainerTransition())
.addTransition(ContainerState.PAUSING, ContainerState.EXITED_WITH_FAILURE,
ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
new KilledExternallyTransition())
.addTransition(ContainerState.PAUSING, ContainerState.PAUSING,
ContainerEventType.RESOURCE_LOCALIZED,
new ResourceLocalizedWhileRunningTransition())
.addTransition(ContainerState.PAUSING, ContainerState.PAUSING,
ContainerEventType.UPDATE_CONTAINER_TOKEN,
new NotifyContainerSchedulerOfUpdateTransition())
// From PAUSED State
.addTransition(ContainerState.PAUSED, ContainerState.KILLING,
@ -429,6 +457,10 @@ ContainerEventType.KILL_CONTAINER, new KillTransition())
UPDATE_DIAGNOSTICS_TRANSITION)
.addTransition(ContainerState.PAUSED, ContainerState.PAUSED,
ContainerEventType.PAUSE_CONTAINER)
// This can happen during re-initialization.
.addTransition(ContainerState.PAUSED, ContainerState.PAUSED,
ContainerEventType.RESOURCE_LOCALIZED,
new ResourceLocalizedWhileRunningTransition())
.addTransition(ContainerState.PAUSED, ContainerState.RESUMING,
ContainerEventType.RESUME_CONTAINER, new ResumeContainerTransition())
// In case something goes wrong then container will exit from the
@ -444,6 +476,9 @@ ContainerEventType.RESUME_CONTAINER, new ResumeContainerTransition())
ContainerState.EXITED_WITH_SUCCESS,
ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
new ExitedWithSuccessTransition(true))
.addTransition(ContainerState.PAUSED, ContainerState.PAUSED,
ContainerEventType.UPDATE_CONTAINER_TOKEN,
new NotifyContainerSchedulerOfUpdateTransition())
// From RESUMING State
.addTransition(ContainerState.RESUMING, ContainerState.KILLING,
@ -453,6 +488,10 @@ ContainerEventType.KILL_CONTAINER, new KillTransition())
.addTransition(ContainerState.RESUMING, ContainerState.RESUMING,
ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
UPDATE_DIAGNOSTICS_TRANSITION)
// This can happen during re-initialization
.addTransition(ContainerState.RESUMING, ContainerState.RESUMING,
ContainerEventType.RESOURCE_LOCALIZED,
new ResourceLocalizedWhileRunningTransition())
// In case something goes wrong then container will exit from the
// RESUMING state
.addTransition(ContainerState.RESUMING,
@ -467,6 +506,10 @@ ContainerEventType.KILL_CONTAINER, new KillTransition())
ContainerState.EXITED_WITH_SUCCESS,
ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
new ExitedWithSuccessTransition(true))
.addTransition(ContainerState.RESUMING, ContainerState.RESUMING,
ContainerEventType.UPDATE_CONTAINER_TOKEN,
new NotifyContainerSchedulerOfUpdateTransition())
// NOTE - We cannot get a PAUSE_CONTAINER while in RESUMING state.
// From REINITIALIZING State
.addTransition(ContainerState.REINITIALIZING,
@ -478,7 +521,8 @@ ContainerEventType.KILL_CONTAINER, new KillTransition())
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
new ExitedWithFailureTransition(true))
.addTransition(ContainerState.REINITIALIZING,
ContainerState.REINITIALIZING,
EnumSet.of(ContainerState.REINITIALIZING,
ContainerState.REINITIALIZING_AWAITING_KILL),
ContainerEventType.RESOURCE_LOCALIZED,
new ResourceLocalizedWhileReInitTransition())
.addTransition(ContainerState.REINITIALIZING, ContainerState.RUNNING,
@ -490,12 +534,39 @@ ContainerEventType.KILL_CONTAINER, new KillTransition())
UPDATE_DIAGNOSTICS_TRANSITION)
.addTransition(ContainerState.REINITIALIZING, ContainerState.KILLING,
ContainerEventType.KILL_CONTAINER, new KillTransition())
.addTransition(ContainerState.REINITIALIZING, ContainerState.KILLING,
ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition())
.addTransition(ContainerState.REINITIALIZING, ContainerState.PAUSING,
ContainerEventType.PAUSE_CONTAINER, new PauseContainerTransition())
.addTransition(ContainerState.REINITIALIZING,
ContainerState.REINITIALIZING,
ContainerEventType.UPDATE_CONTAINER_TOKEN,
new NotifyContainerSchedulerOfUpdateTransition())
// from REINITIALIZING_AWAITING_KILL
.addTransition(ContainerState.REINITIALIZING_AWAITING_KILL,
ContainerState.EXITED_WITH_SUCCESS,
ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
new ExitedWithSuccessTransition(true))
.addTransition(ContainerState.REINITIALIZING_AWAITING_KILL,
ContainerState.EXITED_WITH_FAILURE,
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
new ExitedWithFailureTransition(true))
.addTransition(ContainerState.REINITIALIZING_AWAITING_KILL,
ContainerState.REINITIALIZING_AWAITING_KILL,
ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
UPDATE_DIAGNOSTICS_TRANSITION)
.addTransition(ContainerState.REINITIALIZING_AWAITING_KILL,
ContainerState.KILLING,
ContainerEventType.KILL_CONTAINER, new KillTransition())
.addTransition(ContainerState.REINITIALIZING_AWAITING_KILL,
ContainerState.SCHEDULED, ContainerEventType.PAUSE_CONTAINER)
.addTransition(ContainerState.REINITIALIZING_AWAITING_KILL,
ContainerState.SCHEDULED,
ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
new KilledForReInitializationTransition())
.addTransition(ContainerState.REINITIALIZING_AWAITING_KILL,
ContainerState.REINITIALIZING_AWAITING_KILL,
ContainerEventType.UPDATE_CONTAINER_TOKEN,
new NotifyContainerSchedulerOfUpdateTransition())
// From RELAUNCHING State
.addTransition(ContainerState.RELAUNCHING, ContainerState.RUNNING,
@ -511,6 +582,10 @@ ContainerEventType.CONTAINER_LAUNCHED, new LaunchTransition())
ContainerEventType.KILL_CONTAINER, new KillTransition())
.addTransition(ContainerState.RELAUNCHING, ContainerState.KILLING,
ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition())
.addTransition(ContainerState.RELAUNCHING, ContainerState.RELAUNCHING,
ContainerEventType.UPDATE_CONTAINER_TOKEN,
new NotifyContainerSchedulerOfUpdateTransition())
// From CONTAINER_EXITED_WITH_SUCCESS State
.addTransition(ContainerState.EXITED_WITH_SUCCESS, ContainerState.DONE,
@ -524,6 +599,10 @@ ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition())
ContainerState.EXITED_WITH_SUCCESS,
EnumSet.of(ContainerEventType.KILL_CONTAINER,
ContainerEventType.PAUSE_CONTAINER))
// No transition - assuming container is on its way to completion
.addTransition(ContainerState.EXITED_WITH_SUCCESS,
ContainerState.EXITED_WITH_SUCCESS,
ContainerEventType.UPDATE_CONTAINER_TOKEN)
// From EXITED_WITH_FAILURE State
.addTransition(ContainerState.EXITED_WITH_FAILURE, ContainerState.DONE,
@ -537,6 +616,10 @@ ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition())
ContainerState.EXITED_WITH_FAILURE,
EnumSet.of(ContainerEventType.KILL_CONTAINER,
ContainerEventType.PAUSE_CONTAINER))
// No transition - assuming container is on its way to completion
.addTransition(ContainerState.EXITED_WITH_FAILURE,
ContainerState.EXITED_WITH_FAILURE,
ContainerEventType.UPDATE_CONTAINER_TOKEN)
// From KILLING State.
.addTransition(ContainerState.KILLING,
@ -572,6 +655,9 @@ ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition())
ContainerState.KILLING,
EnumSet.of(ContainerEventType.CONTAINER_LAUNCHED,
ContainerEventType.PAUSE_CONTAINER))
// No transition - assuming container is on its way to completion
.addTransition(ContainerState.KILLING, ContainerState.KILLING,
ContainerEventType.UPDATE_CONTAINER_TOKEN)
// From CONTAINER_CLEANEDUP_AFTER_KILL State.
.addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
@ -589,6 +675,10 @@ ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition())
ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
ContainerEventType.PAUSE_CONTAINER))
// No transition - assuming container is on its way to completion
.addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
ContainerEventType.UPDATE_CONTAINER_TOKEN)
// From DONE
.addTransition(ContainerState.DONE, ContainerState.DONE,
@ -606,6 +696,9 @@ ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition())
EnumSet.of(ContainerEventType.RESOURCE_FAILED,
ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE))
// No transition - assuming container is on its way to completion
.addTransition(ContainerState.DONE, ContainerState.DONE,
ContainerEventType.UPDATE_CONTAINER_TOKEN)
// create the topology tables
.installTopology();
@ -626,6 +719,7 @@ public org.apache.hadoop.yarn.api.records.ContainerState getCurrentState() {
case RUNNING:
case RELAUNCHING:
case REINITIALIZING:
case REINITIALIZING_AWAITING_KILL:
case EXITED_WITH_SUCCESS:
case EXITED_WITH_FAILURE:
case KILLING:
@ -929,6 +1023,45 @@ public void transition(ContainerImpl container, ContainerEvent event) {
}
static class UpdateTransition extends ContainerTransition {
@Override
public void transition(
ContainerImpl container, ContainerEvent event) {
UpdateContainerTokenEvent updateEvent = (UpdateContainerTokenEvent)event;
// Update the container token
container.setContainerTokenIdentifier(updateEvent.getUpdatedToken());
if (updateEvent.isResourceChange()) {
try {
// Persist change in the state store.
container.context.getNMStateStore().storeContainerResourceChanged(
container.containerId,
container.getContainerTokenIdentifier().getVersion(),
container.getResource());
} catch (IOException e) {
LOG.warn("Could not store container [" + container.containerId
+ "] resource change..", e);
}
}
}
}
static class NotifyContainerSchedulerOfUpdateTransition extends
UpdateTransition {
@Override
public void transition(
ContainerImpl container, ContainerEvent event) {
UpdateContainerTokenEvent updateEvent = (UpdateContainerTokenEvent)event;
// Save original token
ContainerTokenIdentifier originalToken =
container.containerTokenIdentifier;
super.transition(container, updateEvent);
container.dispatcher.getEventHandler().handle(
new UpdateContainerSchedulerEvent(container,
originalToken, updateEvent));
}
}
/**
* State transition when a NEW container receives the INIT_CONTAINER
* message.
@ -1074,12 +1207,15 @@ public ContainerState transition(ContainerImpl container,
/**
* Transition to start the Re-Initialization process.
*/
static class ReInitializeContainerTransition extends ContainerTransition {
static class ReInitializeContainerTransition implements
MultipleArcTransition<ContainerImpl, ContainerEvent, ContainerState> {
@SuppressWarnings("unchecked")
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
public ContainerState transition(
ContainerImpl container, ContainerEvent event) {
container.reInitContext = createReInitContext(container, event);
boolean resourcesPresent = false;
try {
// 'reInitContext.newResourceSet' can be
// a) current container resourceSet (In case of Restart)
@ -1101,6 +1237,7 @@ public void transition(ContainerImpl container, ContainerEvent event) {
container.dispatcher.getEventHandler().handle(
new ContainersLauncherEvent(container,
ContainersLauncherEventType.CLEANUP_CONTAINER_FOR_REINIT));
resourcesPresent = true;
}
container.metrics.reInitingContainer();
NMAuditLogger.logSuccess(container.user,
@ -1112,7 +1249,11 @@ public void transition(ContainerImpl container, ContainerEvent event) {
" re-initialization failure..", e);
container.addDiagnostics("Error re-initializing due to" +
"[" + e.getMessage() + "]");
return ContainerState.RUNNING;
}
return resourcesPresent ?
ContainerState.REINITIALIZING_AWAITING_KILL :
ContainerState.REINITIALIZING;
}
protected ReInitializationContext createReInitContext(
@ -1164,11 +1305,14 @@ protected ReInitializationContext createReInitContext(ContainerImpl
* If all dependencies are met, then restart Container with new bits.
*/
static class ResourceLocalizedWhileReInitTransition
extends ContainerTransition {
implements MultipleArcTransition
<ContainerImpl, ContainerEvent, ContainerState> {
@SuppressWarnings("unchecked")
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
public ContainerState transition(
ContainerImpl container, ContainerEvent event) {
ContainerResourceLocalizedEvent rsrcEvent =
(ContainerResourceLocalizedEvent) event;
container.reInitContext.newResourceSet.resourceLocalized(
@ -1180,7 +1324,9 @@ public void transition(ContainerImpl container, ContainerEvent event) {
container.dispatcher.getEventHandler().handle(
new ContainersLauncherEvent(container,
ContainersLauncherEventType.CLEANUP_CONTAINER_FOR_REINIT));
return ContainerState.REINITIALIZING_AWAITING_KILL;
}
return ContainerState.REINITIALIZING;
}
}

View File

@ -20,7 +20,8 @@
public enum ContainerState {
NEW, LOCALIZING, LOCALIZATION_FAILED, SCHEDULED, RUNNING, RELAUNCHING,
REINITIALIZING, EXITED_WITH_SUCCESS, EXITED_WITH_FAILURE, KILLING,
REINITIALIZING, REINITIALIZING_AWAITING_KILL,
EXITED_WITH_SUCCESS, EXITED_WITH_FAILURE, KILLING,
CONTAINER_CLEANEDUP_AFTER_KILL, CONTAINER_RESOURCES_CLEANINGUP, DONE,
PAUSING, PAUSED, RESUMING
}

View File

@ -0,0 +1,86 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
/**
* Update Event consumed by the Container.
*/
public class UpdateContainerTokenEvent extends ContainerEvent {
private final ContainerTokenIdentifier updatedToken;
private final boolean isResourceChange;
private final boolean isExecTypeUpdate;
private final boolean isIncrease;
/**
* Create Update event.
*
* @param cID Container Id.
* @param updatedToken Updated Container Token.
* @param isResourceChange Is Resource change.
* @param isExecTypeUpdate Is ExecutionType Update.
* @param isIncrease Is container increase.
*/
public UpdateContainerTokenEvent(ContainerId cID,
ContainerTokenIdentifier updatedToken, boolean isResourceChange,
boolean isExecTypeUpdate, boolean isIncrease) {
super(cID, ContainerEventType.UPDATE_CONTAINER_TOKEN);
this.updatedToken = updatedToken;
this.isResourceChange = isResourceChange;
this.isExecTypeUpdate = isExecTypeUpdate;
this.isIncrease = isIncrease;
}
/**
* Update Container Token.
*
* @return Container Token.
*/
public ContainerTokenIdentifier getUpdatedToken() {
return updatedToken;
}
/**
* Is this update a ResourceChange.
*
* @return isResourceChange.
*/
public boolean isResourceChange() {
return isResourceChange;
}
/**
* Is this update an ExecType Update.
*
* @return isExecTypeUpdate.
*/
public boolean isExecTypeUpdate() {
return isExecTypeUpdate;
}
/**
* Is this a container Increase.
*
* @return isIncrease.
*/
public boolean isIncrease() {
return isIncrease;
}
}

View File

@ -33,6 +33,7 @@
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor
.ChangeMonitoringContainerResourceEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
@ -151,7 +152,9 @@ public void handle(ContainerSchedulerEvent event) {
case SCHEDULE_CONTAINER:
scheduleContainer(event.getContainer());
break;
// NOTE: Is sent only after container state has changed to PAUSED...
case CONTAINER_PAUSED:
// NOTE: Is sent only after container state has changed to DONE...
case CONTAINER_COMPLETED:
onResourcesReclaimed(event.getContainer());
break;
@ -180,58 +183,38 @@ private void onUpdateContainer(UpdateContainerSchedulerEvent updateEvent) {
if (updateEvent.isResourceChange()) {
if (runningContainers.containsKey(containerId)) {
this.utilizationTracker.subtractContainerResource(
updateEvent.getContainer());
updateEvent.getContainer().setContainerTokenIdentifier(
updateEvent.getUpdatedToken());
new ContainerImpl(getConfig(), null, null, null, null,
updateEvent.getOriginalToken(), context));
this.utilizationTracker.addContainerResources(
updateEvent.getContainer());
getContainersMonitor().handle(
new ChangeMonitoringContainerResourceEvent(containerId,
updateEvent.getUpdatedToken().getResource()));
} else {
// Is Queued or localizing..
updateEvent.getContainer().setContainerTokenIdentifier(
updateEvent.getUpdatedToken());
}
try {
// Persist change in the state store.
this.context.getNMStateStore().storeContainerResourceChanged(
containerId,
updateEvent.getUpdatedToken().getVersion(),
updateEvent.getUpdatedToken().getResource());
} catch (IOException e) {
LOG.warn("Could not store container [" + containerId + "] resource " +
"change..", e);
}
}
if (updateEvent.isExecTypeUpdate()) {
updateEvent.getContainer().setContainerTokenIdentifier(
updateEvent.getUpdatedToken());
// If this is a running container.. just change the execution type
// and be done with it.
if (!runningContainers.containsKey(containerId)) {
// Promotion or not (Increase signifies either a promotion
// or container size increase)
if (updateEvent.isIncrease()) {
// Promotion of queued container..
if (queuedOpportunisticContainers.remove(containerId) != null) {
queuedGuaranteedContainers.put(containerId,
updateEvent.getContainer());
}
// Promotion or not (Increase signifies either a promotion
// or container size increase)
if (updateEvent.isIncrease()) {
// Promotion of queued container..
if (queuedOpportunisticContainers.remove(containerId) != null) {
queuedGuaranteedContainers.put(containerId,
updateEvent.getContainer());
//Kill/pause opportunistic containers if any to make room for
// promotion request
reclaimOpportunisticContainerResources(updateEvent.getContainer());
} else {
// Demotion of queued container.. Should not happen too often
// since you should not find too many queued guaranteed
// containers
if (queuedGuaranteedContainers.remove(containerId) != null) {
queuedOpportunisticContainers.put(containerId,
updateEvent.getContainer());
}
}
} else {
// Demotion of queued container.. Should not happen too often
// since you should not find too many queued guaranteed
// containers
if (queuedGuaranteedContainers.remove(containerId) != null) {
queuedOpportunisticContainers.put(containerId,
updateEvent.getContainer());
}
}
startPendingContainers(maxOppQueueLength <= 0);
}
}
@ -290,6 +273,16 @@ private void onResourcesReclaimed(Container container) {
queuedGuaranteedContainers.remove(container.getContainerId());
}
// Requeue PAUSED containers
if (container.getContainerState() == ContainerState.PAUSED) {
if (container.getContainerTokenIdentifier().getExecutionType() ==
ExecutionType.GUARANTEED) {
queuedGuaranteedContainers.put(container.getContainerId(), container);
} else {
queuedOpportunisticContainers.put(
container.getContainerId(), container);
}
}
// decrement only if it was a running container
Container completedContainer = runningContainers.remove(container
.getContainerId());
@ -301,7 +294,8 @@ private void onResourcesReclaimed(Container container) {
ExecutionType.OPPORTUNISTIC) {
this.metrics.completeOpportunisticContainer(container.getResource());
}
startPendingContainers(false);
boolean forceStartGuaranteedContainers = (maxOppQueueLength <= 0);
startPendingContainers(forceStartGuaranteedContainers);
}
}
@ -311,26 +305,9 @@ private void onResourcesReclaimed(Container container) {
* container without looking at available resource
*/
private void startPendingContainers(boolean forceStartGuaranteedContaieners) {
// Start pending guaranteed containers, if resources available.
// Start guaranteed containers that are paused, if resources available.
boolean resourcesAvailable = startContainers(
queuedGuaranteedContainers.values(), forceStartGuaranteedContaieners);
// Resume opportunistic containers, if resource available.
if (resourcesAvailable) {
List<Container> pausedContainers = new ArrayList<Container>();
Map<ContainerId, Container> containers =
context.getContainers();
for (Map.Entry<ContainerId, Container>entry : containers.entrySet()) {
ContainerId contId = entry.getKey();
// Find containers that were not already started and are in paused state
if(false == runningContainers.containsKey(contId)) {
if(containers.get(contId).getContainerState()
== ContainerState.PAUSED) {
pausedContainers.add(containers.get(contId));
}
}
}
resourcesAvailable = startContainers(pausedContainers, false);
}
queuedGuaranteedContainers.values(), forceStartGuaranteedContaieners);
// Start opportunistic containers, if resources available.
if (resourcesAvailable) {
startContainers(queuedOpportunisticContainers.values(), false);
@ -590,16 +567,19 @@ private void shedQueuedOpportunisticContainers() {
queuedOpportunisticContainers.values().iterator();
while (containerIter.hasNext()) {
Container container = containerIter.next();
if (numAllowed <= 0) {
container.sendKillEvent(
ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER,
"Container De-queued to meet NM queuing limits.");
containerIter.remove();
LOG.info(
"Opportunistic container {} will be killed to meet NM queuing" +
" limits.", container.getContainerId());
// Do not shed PAUSED containers
if (container.getContainerState() != ContainerState.PAUSED) {
if (numAllowed <= 0) {
container.sendKillEvent(
ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER,
"Container De-queued to meet NM queuing limits.");
containerIter.remove();
LOG.info(
"Opportunistic container {} will be killed to meet NM queuing" +
" limits.", container.getContainerId());
}
numAllowed--;
}
numAllowed--;
}
}

View File

@ -21,33 +21,37 @@
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container
.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.UpdateContainerTokenEvent;
/**
* Update Event consumed by the {@link ContainerScheduler}.
*/
public class UpdateContainerSchedulerEvent extends ContainerSchedulerEvent {
private ContainerTokenIdentifier updatedToken;
private boolean isResourceChange;
private boolean isExecTypeUpdate;
private boolean isIncrease;
private final UpdateContainerTokenEvent containerEvent;
private final ContainerTokenIdentifier originalToken;
/**
* Create instance of Event.
*
* @param originalContainer Original Container.
* @param updatedToken Updated Container Token.
* @param isResourceChange is this a Resource Change.
* @param isExecTypeUpdate is this an ExecTypeUpdate.
* @param isIncrease is this a Container Increase.
* @param container Container.
* @param origToken The Original Container Token.
* @param event The Container Event.
*/
public UpdateContainerSchedulerEvent(Container originalContainer,
ContainerTokenIdentifier updatedToken, boolean isResourceChange,
boolean isExecTypeUpdate, boolean isIncrease) {
super(originalContainer, ContainerSchedulerEventType.UPDATE_CONTAINER);
this.updatedToken = updatedToken;
this.isResourceChange = isResourceChange;
this.isExecTypeUpdate = isExecTypeUpdate;
this.isIncrease = isIncrease;
public UpdateContainerSchedulerEvent(Container container,
ContainerTokenIdentifier origToken, UpdateContainerTokenEvent event) {
super(container, ContainerSchedulerEventType.UPDATE_CONTAINER);
this.containerEvent = event;
this.originalToken = origToken;
}
/**
* Original Token before update.
*
* @return Container Token.
*/
public ContainerTokenIdentifier getOriginalToken() {
return this.originalToken;
}
/**
@ -56,7 +60,7 @@ public UpdateContainerSchedulerEvent(Container originalContainer,
* @return Container Token.
*/
public ContainerTokenIdentifier getUpdatedToken() {
return updatedToken;
return containerEvent.getUpdatedToken();
}
/**
@ -64,7 +68,7 @@ public ContainerTokenIdentifier getUpdatedToken() {
* @return isResourceChange.
*/
public boolean isResourceChange() {
return isResourceChange;
return containerEvent.isResourceChange();
}
/**
@ -72,7 +76,7 @@ public boolean isResourceChange() {
* @return isExecTypeUpdate.
*/
public boolean isExecTypeUpdate() {
return isExecTypeUpdate;
return containerEvent.isExecTypeUpdate();
}
/**
@ -80,6 +84,6 @@ public boolean isExecTypeUpdate() {
* @return isIncrease.
*/
public boolean isIncrease() {
return isIncrease;
return containerEvent.isIncrease();
}
}

View File

@ -208,6 +208,8 @@ public void setup() throws IOException {
containerManager.init(conf);
nodeStatusUpdater.start();
((NMContext)context).setNodeStatusUpdater(nodeStatusUpdater);
((NMContext)context).setContainerStateTransitionListener(
new NodeManager.DefaultContainerStateListener());
}
protected ContainerManagerImpl

View File

@ -90,12 +90,16 @@
import org.apache.hadoop.yarn.server.nodemanager.CMgrSignalContainersEvent;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
import org.apache.hadoop.yarn.server.nodemanager.ContainerStateTransitionListener;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestAuxServices.ServiceA;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
@ -119,6 +123,41 @@ public TestContainerManager() throws UnsupportedFileSystemException {
LOG = LoggerFactory.getLogger(TestContainerManager.class);
}
private static class Listener implements ContainerStateTransitionListener {
private final Map<ContainerId,
List<org.apache.hadoop.yarn.server.nodemanager.containermanager.
container.ContainerState>> states = new HashMap<>();
private final Map<ContainerId, List<ContainerEventType>> events =
new HashMap<>();
@Override
public void init(Context context) {}
@Override
public void preTransition(ContainerImpl op,
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState beforeState,
ContainerEvent eventToBeProcessed) {
if (!states.containsKey(op.getContainerId())) {
states.put(op.getContainerId(), new ArrayList<>());
states.get(op.getContainerId()).add(beforeState);
events.put(op.getContainerId(), new ArrayList<>());
}
}
@Override
public void postTransition(ContainerImpl op,
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState beforeState,
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState afterState,
ContainerEvent processedEvent) {
states.get(op.getContainerId()).add(afterState);
events.get(op.getContainerId()).add(processedEvent.getType());
}
}
private boolean delayContainers = false;
@Override
@ -144,7 +183,7 @@ public int launchContainer(ContainerStartContext ctx)
@Override
protected ContainerManagerImpl
createContainerManager(DeletionService delSrvc) {
return new ContainerManagerImpl(context, exec, delSrvc, nodeStatusUpdater,
return new ContainerManagerImpl(context, exec, delSrvc, nodeStatusUpdater,
metrics, dirsHandler) {
@Override
@ -496,6 +535,9 @@ private String[] testContainerReInitSuccess(boolean autoCommit)
@Test
public void testContainerUpgradeSuccessAutoCommit() throws IOException,
InterruptedException, YarnException {
Listener listener = new Listener();
((NodeManager.DefaultContainerStateListener)containerManager.context.
getContainerStateTransitionListener()).addListener(listener);
testContainerReInitSuccess(true);
// Should not be able to Commit (since already auto committed)
try {
@ -504,6 +546,41 @@ public void testContainerUpgradeSuccessAutoCommit() throws IOException,
} catch (Exception e) {
Assert.assertTrue(e.getMessage().contains("Nothing to Commit"));
}
List<org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState> containerStates =
listener.states.get(createContainerId(0));
Assert.assertEquals(Arrays.asList(
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState.NEW,
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState.LOCALIZING,
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState.SCHEDULED,
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState.RUNNING,
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState.REINITIALIZING,
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState.REINITIALIZING_AWAITING_KILL,
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState.REINITIALIZING_AWAITING_KILL,
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState.SCHEDULED,
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState.RUNNING), containerStates);
List<ContainerEventType> containerEventTypes =
listener.events.get(createContainerId(0));
Assert.assertEquals(Arrays.asList(
ContainerEventType.INIT_CONTAINER,
ContainerEventType.RESOURCE_LOCALIZED,
ContainerEventType.CONTAINER_LAUNCHED,
ContainerEventType.REINITIALIZE_CONTAINER,
ContainerEventType.RESOURCE_LOCALIZED,
ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
ContainerEventType.CONTAINER_LAUNCHED), containerEventTypes);
}
@Test
@ -524,6 +601,9 @@ public void testContainerUpgradeSuccessExplicitCommit() throws IOException,
@Test
public void testContainerUpgradeSuccessExplicitRollback() throws IOException,
InterruptedException, YarnException {
Listener listener = new Listener();
((NodeManager.DefaultContainerStateListener)containerManager.context.
getContainerStateTransitionListener()).addListener(listener);
String[] pids = testContainerReInitSuccess(false);
// Test that the container can be Restarted after the successful upgrrade.
@ -575,6 +655,67 @@ public void testContainerUpgradeSuccessExplicitRollback() throws IOException,
Assert.assertNotEquals("The Rolled-back process should be a different pid",
pids[0], rolledBackPid);
List<org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState> containerStates =
listener.states.get(createContainerId(0));
Assert.assertEquals(Arrays.asList(
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState.NEW,
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState.LOCALIZING,
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState.SCHEDULED,
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState.RUNNING,
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState.REINITIALIZING,
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState.REINITIALIZING_AWAITING_KILL,
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState.REINITIALIZING_AWAITING_KILL,
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState.SCHEDULED,
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState.RUNNING,
// This is the successful restart
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState.REINITIALIZING_AWAITING_KILL,
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState.REINITIALIZING_AWAITING_KILL,
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState.SCHEDULED,
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState.RUNNING,
// This is the rollback
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState.REINITIALIZING_AWAITING_KILL,
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState.REINITIALIZING_AWAITING_KILL,
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState.SCHEDULED,
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState.RUNNING), containerStates);
List<ContainerEventType> containerEventTypes =
listener.events.get(createContainerId(0));
Assert.assertEquals(Arrays.asList(
ContainerEventType.INIT_CONTAINER,
ContainerEventType.RESOURCE_LOCALIZED,
ContainerEventType.CONTAINER_LAUNCHED,
ContainerEventType.REINITIALIZE_CONTAINER,
ContainerEventType.RESOURCE_LOCALIZED,
ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
ContainerEventType.CONTAINER_LAUNCHED,
ContainerEventType.REINITIALIZE_CONTAINER,
ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
ContainerEventType.CONTAINER_LAUNCHED,
ContainerEventType.ROLLBACK_REINIT,
ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
ContainerEventType.CONTAINER_LAUNCHED), containerEventTypes);
}
@Test
@ -584,6 +725,9 @@ public void testContainerUpgradeLocalizationFailure() throws IOException,
return;
}
containerManager.start();
Listener listener = new Listener();
((NodeManager.DefaultContainerStateListener)containerManager.context.
getContainerStateTransitionListener()).addListener(listener);
// ////// Construct the Container-id
ContainerId cId = createContainerId(0);
File oldStartFile = new File(tmpDir, "start_file_o.txt").getAbsoluteFile();
@ -598,6 +742,32 @@ public void testContainerUpgradeLocalizationFailure() throws IOException,
// since upgrade was terminated..
Assert.assertTrue("Process is NOT alive!",
DefaultContainerExecutor.containerIsAlive(pid));
List<org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState> containerStates =
listener.states.get(createContainerId(0));
Assert.assertEquals(Arrays.asList(
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState.NEW,
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState.LOCALIZING,
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState.SCHEDULED,
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState.RUNNING,
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState.REINITIALIZING,
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState.RUNNING), containerStates);
List<ContainerEventType> containerEventTypes =
listener.events.get(createContainerId(0));
Assert.assertEquals(Arrays.asList(
ContainerEventType.INIT_CONTAINER,
ContainerEventType.RESOURCE_LOCALIZED,
ContainerEventType.CONTAINER_LAUNCHED,
ContainerEventType.REINITIALIZE_CONTAINER,
ContainerEventType.RESOURCE_FAILED), containerEventTypes);
}
@Test
@ -632,6 +802,9 @@ public void testContainerUpgradeRollbackDueToFailure() throws IOException,
return;
}
containerManager.start();
Listener listener = new Listener();
((NodeManager.DefaultContainerStateListener)containerManager.context.
getContainerStateTransitionListener()).addListener(listener);
// ////// Construct the Container-id
ContainerId cId = createContainerId(0);
File oldStartFile = new File(tmpDir, "start_file_o.txt").getAbsoluteFile();
@ -666,6 +839,50 @@ public void testContainerUpgradeRollbackDueToFailure() throws IOException,
Assert.assertNotEquals("The Rolled-back process should be a different pid",
pid, rolledBackPid);
List<org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState> containerStates =
listener.states.get(createContainerId(0));
Assert.assertEquals(Arrays.asList(
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState.NEW,
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState.LOCALIZING,
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState.SCHEDULED,
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState.RUNNING,
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState.REINITIALIZING,
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState.REINITIALIZING_AWAITING_KILL,
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState.REINITIALIZING_AWAITING_KILL,
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState.SCHEDULED,
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState.RUNNING,
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState.RUNNING,
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState.SCHEDULED,
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState.RUNNING), containerStates);
List<ContainerEventType> containerEventTypes =
listener.events.get(createContainerId(0));
Assert.assertEquals(Arrays.asList(
ContainerEventType.INIT_CONTAINER,
ContainerEventType.RESOURCE_LOCALIZED,
ContainerEventType.CONTAINER_LAUNCHED,
ContainerEventType.REINITIALIZE_CONTAINER,
ContainerEventType.RESOURCE_LOCALIZED,
ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
ContainerEventType.CONTAINER_LAUNCHED,
ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
ContainerEventType.CONTAINER_LAUNCHED), containerEventTypes);
}
/**
@ -1582,16 +1799,12 @@ public void testIncreaseContainerResourceWithInvalidRequests() throws Exception
containerManager.updateContainer(updateRequest);
// Check response
Assert.assertEquals(
0, updateResponse.getSuccessfullyUpdatedContainers().size());
Assert.assertEquals(2, updateResponse.getFailedRequests().size());
1, updateResponse.getSuccessfullyUpdatedContainers().size());
Assert.assertEquals(1, updateResponse.getFailedRequests().size());
for (Map.Entry<ContainerId, SerializedException> entry : updateResponse
.getFailedRequests().entrySet()) {
Assert.assertNotNull("Failed message", entry.getValue().getMessage());
if (cId0.equals(entry.getKey())) {
Assert.assertTrue(entry.getValue().getMessage()
.contains("Resource can only be changed when a "
+ "container is in RUNNING or SCHEDULED state"));
} else if (cId7.equals(entry.getKey())) {
if (cId7.equals(entry.getKey())) {
Assert.assertTrue(entry.getValue().getMessage()
.contains("Container " + cId7.toString()
+ " is not handled by this NodeManager"));

View File

@ -23,6 +23,7 @@
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -47,11 +48,17 @@
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.ContainerStateTransitionListener;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
@ -76,6 +83,40 @@ public TestContainerSchedulerQueuing() throws UnsupportedFileSystemException {
LOG = LoggerFactory.getLogger(TestContainerSchedulerQueuing.class);
}
private static class Listener implements ContainerStateTransitionListener {
private final Map<ContainerId,
List<ContainerState>> states = new HashMap<>();
private final Map<ContainerId, List<ContainerEventType>> events =
new HashMap<>();
@Override
public void init(Context context) {}
@Override
public void preTransition(ContainerImpl op,
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState beforeState,
ContainerEvent eventToBeProcessed) {
if (!states.containsKey(op.getContainerId())) {
states.put(op.getContainerId(), new ArrayList<>());
states.get(op.getContainerId()).add(beforeState);
events.put(op.getContainerId(), new ArrayList<>());
}
}
@Override
public void postTransition(ContainerImpl op,
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState beforeState,
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState afterState,
ContainerEvent processedEvent) {
states.get(op.getContainerId()).add(afterState);
events.get(op.getContainerId()).add(processedEvent.getType());
}
}
private boolean delayContainers = true;
@Override
@ -542,6 +583,10 @@ public void testPauseOpportunisticForGuaranteedContainer() throws Exception {
containerManager.start();
containerManager.getContainerScheduler().
setUsePauseEventForPreemption(true);
Listener listener = new Listener();
((NodeManager.DefaultContainerStateListener)containerManager.getContext().
getContainerStateTransitionListener()).addListener(listener);
ContainerLaunchContext containerLaunchContext =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
@ -606,6 +651,39 @@ public void testPauseOpportunisticForGuaranteedContainer() throws Exception {
// starts running
BaseContainerManagerTest.waitForNMContainerState(containerManager,
createContainerId(0), ContainerState.DONE, 40);
List<org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState> containerStates =
listener.states.get(createContainerId(0));
Assert.assertEquals(Arrays.asList(
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState.NEW,
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState.SCHEDULED,
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState.RUNNING,
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState.PAUSING,
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState.PAUSED,
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState.RESUMING,
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState.RUNNING,
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState.EXITED_WITH_SUCCESS,
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState.DONE), containerStates);
List<ContainerEventType> containerEventTypes =
listener.events.get(createContainerId(0));
Assert.assertEquals(Arrays.asList(ContainerEventType.INIT_CONTAINER,
ContainerEventType.CONTAINER_LAUNCHED,
ContainerEventType.PAUSE_CONTAINER,
ContainerEventType.CONTAINER_PAUSED,
ContainerEventType.RESUME_CONTAINER,
ContainerEventType.CONTAINER_RESUMED,
ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP), containerEventTypes);
}
/**
@ -1068,6 +1146,9 @@ public void testStopQueuedContainer() throws Exception {
@Test
public void testPromotionOfOpportunisticContainers() throws Exception {
containerManager.start();
Listener listener = new Listener();
((NodeManager.DefaultContainerStateListener)containerManager.getContext().
getContainerStateTransitionListener()).addListener(listener);
ContainerLaunchContext containerLaunchContext =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
@ -1150,6 +1231,7 @@ public void testPromotionOfOpportunisticContainers() throws Exception {
containerStatuses = containerManager
.getContainerStatuses(statRequest).getContainerStatuses();
Assert.assertEquals(1, containerStatuses.size());
for (ContainerStatus status : containerStatuses) {
if (org.apache.hadoop.yarn.api.records.ContainerState.RUNNING ==
status.getState()) {
@ -1160,6 +1242,25 @@ public void testPromotionOfOpportunisticContainers() throws Exception {
// Ensure no containers are queued.
Assert.assertEquals(0, containerScheduler.getNumQueuedContainers());
List<org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState> containerStates =
listener.states.get(createContainerId(1));
Assert.assertEquals(Arrays.asList(
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState.NEW,
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState.SCHEDULED,
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState.SCHEDULED,
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
ContainerState.RUNNING), containerStates);
List<ContainerEventType> containerEventTypes =
listener.events.get(createContainerId(1));
Assert.assertEquals(Arrays.asList(
ContainerEventType.INIT_CONTAINER,
ContainerEventType.UPDATE_CONTAINER_TOKEN,
ContainerEventType.CONTAINER_LAUNCHED), containerEventTypes);
}
@Test