YARN-1070. Fixed race conditions in NodeManager during container-kill. Contributed by Zhijie Shen.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1527827 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-10-01 00:18:09 +00:00
parent 84e4e4b143
commit 74d20250ff
5 changed files with 67 additions and 50 deletions

View File

@ -101,7 +101,11 @@ Release 2.1.2 - UNRELEASED
YARN-1221. With Fair Scheduler, reserved MB reported in RM web UI increases YARN-1221. With Fair Scheduler, reserved MB reported in RM web UI increases
indefinitely (Siqi Li via Sandy Ryza) indefinitely (Siqi Li via Sandy Ryza)
YARN-1247. test-container-executor has gotten out of sync with the changes to container-executor. (rvs via tucu) YARN-1247. test-container-executor has gotten out of sync with the changes to
container-executor. (rvs via tucu)
YARN-1070. Fixed race conditions in NodeManager during container-kill.
(Zhijie Shen via vinodkv)
Release 2.1.1-beta - 2013-09-23 Release 2.1.1-beta - 2013-09-23

View File

@ -68,6 +68,7 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
import org.apache.hadoop.yarn.server.nodemanager.util.ProcessIdFileReader; import org.apache.hadoop.yarn.server.nodemanager.util.ProcessIdFileReader;
@ -133,10 +134,22 @@ public Integer call() {
final List<String> command = launchContext.getCommands(); final List<String> command = launchContext.getCommands();
int ret = -1; int ret = -1;
// CONTAINER_KILLED_ON_REQUEST should not be missed if the container
// is already at KILLING
if (container.getContainerState() == ContainerState.KILLING) {
dispatcher.getEventHandler().handle(
new ContainerExitEvent(containerID,
ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
Shell.WINDOWS ? ExitCode.FORCE_KILLED.getExitCode() :
ExitCode.TERMINATED.getExitCode(),
"Container terminated before launch."));
return 0;
}
try { try {
localResources = container.getLocalizedResources(); localResources = container.getLocalizedResources();
if (localResources == null) { if (localResources == null) {
RPCUtil.getRemoteException( throw RPCUtil.getRemoteException(
"Unable to get local resources when Container " + containerID + "Unable to get local resources when Container " + containerID +
" is at " + container.getContainerState()); " is at " + container.getContainerState());
} }

View File

@ -75,20 +75,9 @@ public class ContainersLauncher extends AbstractService
new ThreadFactoryBuilder() new ThreadFactoryBuilder()
.setNameFormat("ContainersLauncher #%d") .setNameFormat("ContainersLauncher #%d")
.build()); .build());
private final Map<ContainerId,RunningContainer> running = @VisibleForTesting
Collections.synchronizedMap(new HashMap<ContainerId,RunningContainer>()); public final Map<ContainerId, ContainerLaunch> running =
Collections.synchronizedMap(new HashMap<ContainerId, ContainerLaunch>());
private static final class RunningContainer {
public RunningContainer(Future<Integer> submit,
ContainerLaunch launcher) {
this.runningcontainer = submit;
this.launcher = launcher;
}
Future<Integer> runningcontainer;
ContainerLaunch launcher;
}
public ContainersLauncher(Context context, Dispatcher dispatcher, public ContainersLauncher(Context context, Dispatcher dispatcher,
ContainerExecutor exec, LocalDirsHandlerService dirsHandler, ContainerExecutor exec, LocalDirsHandlerService dirsHandler,
@ -133,38 +122,20 @@ public void handle(ContainersLauncherEvent event) {
ContainerLaunch launch = ContainerLaunch launch =
new ContainerLaunch(context, getConfig(), dispatcher, exec, app, new ContainerLaunch(context, getConfig(), dispatcher, exec, app,
event.getContainer(), dirsHandler, containerManager); event.getContainer(), dirsHandler, containerManager);
running.put(containerId, containerLauncher.submit(launch);
new RunningContainer(containerLauncher.submit(launch), running.put(containerId, launch);
launch));
break; break;
case CLEANUP_CONTAINER: case CLEANUP_CONTAINER:
RunningContainer rContainerDatum = running.remove(containerId); ContainerLaunch launcher = running.remove(containerId);
if (rContainerDatum == null) { if (launcher == null) {
// Container not launched. So nothing needs to be done. // Container not launched. So nothing needs to be done.
return; return;
} }
Future<Integer> rContainer = rContainerDatum.runningcontainer;
if (rContainer != null
&& !rContainer.isDone()) {
// Cancel the future so that it won't be launched if it isn't already.
// If it is going to be canceled, make sure CONTAINER_KILLED_ON_REQUEST
// will not be missed if the container is already at KILLING
if (rContainer.cancel(false)) {
if (container.getContainerState() == ContainerState.KILLING) {
dispatcher.getEventHandler().handle(
new ContainerExitEvent(containerId,
ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
Shell.WINDOWS ? ExitCode.FORCE_KILLED.getExitCode() :
ExitCode.TERMINATED.getExitCode(),
"Container terminated before launch."));
}
}
}
// Cleanup a container whether it is running/killed/completed, so that // Cleanup a container whether it is running/killed/completed, so that
// no sub-processes are alive. // no sub-processes are alive.
try { try {
rContainerDatum.launcher.cleanupContainer(); launcher.cleanupContainer();
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Got exception while cleaning container " + containerId LOG.warn("Got exception while cleaning container " + containerId
+ ". Ignoring."); + ". Ignoring.");

View File

@ -65,6 +65,7 @@
import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
@ -72,6 +73,7 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
@ -296,8 +298,7 @@ public void testCleanupOnKillRequest() throws Exception {
wc.launchContainer(); wc.launchContainer();
reset(wc.localizerBus); reset(wc.localizerBus);
wc.killContainer(); wc.killContainer();
assertEquals(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL, assertEquals(ContainerState.KILLING, wc.c.getContainerState());
wc.c.getContainerState());
assertNull(wc.c.getLocalizedResources()); assertNull(wc.c.getLocalizedResources());
wc.containerKilledOnRequest(); wc.containerKilledOnRequest();
@ -330,14 +331,18 @@ public void testKillOnLocalizationFailed() throws Exception {
} }
@Test @Test
public void testKillOnLocalized() throws Exception { public void testKillOnLocalizedWhenContainerNotLaunched() throws Exception {
WrappedContainer wc = null; WrappedContainer wc = null;
try { try {
wc = new WrappedContainer(17, 314159265358979L, 4344, "yak"); wc = new WrappedContainer(17, 314159265358979L, 4344, "yak");
wc.initContainer(); wc.initContainer();
wc.localizeResources(); wc.localizeResources();
assertEquals(ContainerState.LOCALIZED, wc.c.getContainerState()); assertEquals(ContainerState.LOCALIZED, wc.c.getContainerState());
ContainerLaunch launcher = wc.launcher.running.get(wc.c.getContainerId());
wc.killContainer(); wc.killContainer();
assertEquals(ContainerState.KILLING, wc.c.getContainerState());
launcher.call();
wc.drainDispatcherEvents();
assertEquals(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL, assertEquals(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
wc.c.getContainerState()); wc.c.getContainerState());
assertNull(wc.c.getLocalizedResources()); assertNull(wc.c.getLocalizedResources());
@ -348,6 +353,31 @@ public void testKillOnLocalized() throws Exception {
} }
} }
} }
@Test
public void testKillOnLocalizedWhenContainerLaunched() throws Exception {
WrappedContainer wc = null;
try {
wc = new WrappedContainer(17, 314159265358979L, 4344, "yak");
wc.initContainer();
wc.localizeResources();
assertEquals(ContainerState.LOCALIZED, wc.c.getContainerState());
ContainerLaunch launcher = wc.launcher.running.get(wc.c.getContainerId());
launcher.call();
wc.drainDispatcherEvents();
assertEquals(ContainerState.EXITED_WITH_FAILURE,
wc.c.getContainerState());
wc.killContainer();
assertEquals(ContainerState.EXITED_WITH_FAILURE,
wc.c.getContainerState());
assertNull(wc.c.getLocalizedResources());
verifyCleanupCall(wc);
} finally {
if (wc != null) {
wc.finished();
}
}
}
@Test @Test
public void testResourceLocalizedOnLocalizationFailed() throws Exception { public void testResourceLocalizedOnLocalizationFailed() throws Exception {
@ -472,12 +502,10 @@ public void testLaunchAfterKillRequest() throws Exception {
wc.initContainer(); wc.initContainer();
wc.localizeResources(); wc.localizeResources();
wc.killContainer(); wc.killContainer();
assertEquals(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL, assertEquals(ContainerState.KILLING, wc.c.getContainerState());
wc.c.getContainerState());
assertNull(wc.c.getLocalizedResources()); assertNull(wc.c.getLocalizedResources());
wc.launchContainer(); wc.launchContainer();
assertEquals(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL, assertEquals(ContainerState.KILLING, wc.c.getContainerState());
wc.c.getContainerState());
assertNull(wc.c.getLocalizedResources()); assertNull(wc.c.getLocalizedResources());
wc.containerKilledOnRequest(); wc.containerKilledOnRequest();
verifyCleanupCall(wc); verifyCleanupCall(wc);
@ -650,7 +678,9 @@ private class WrappedContainer {
Context context = mock(Context.class); Context context = mock(Context.class);
when(context.getApplications()).thenReturn( when(context.getApplications()).thenReturn(
new ConcurrentHashMap<ApplicationId, Application>()); new ConcurrentHashMap<ApplicationId, Application>());
launcher = new ContainersLauncher(context, dispatcher, null, null, null); ContainerExecutor executor = mock(ContainerExecutor.class);
launcher =
new ContainersLauncher(context, dispatcher, executor, null, null);
// create a mock ExecutorService, which will not really launch // create a mock ExecutorService, which will not really launch
// ContainerLaunch at all. // ContainerLaunch at all.
launcher.containerLauncher = mock(ExecutorService.class); launcher.containerLauncher = mock(ExecutorService.class);

View File

@ -682,9 +682,8 @@ public void testDelayedKill() throws Exception {
ContainerStatus containerStatus = ContainerStatus containerStatus =
containerManager.getContainerStatuses(gcsRequest) containerManager.getContainerStatuses(gcsRequest)
.getContainerStatuses().get(0); .getContainerStatuses().get(0);
int expectedExitCode = Shell.WINDOWS ? ExitCode.FORCE_KILLED.getExitCode() : Assert.assertEquals(ExitCode.FORCE_KILLED.getExitCode(),
ExitCode.TERMINATED.getExitCode(); containerStatus.getExitStatus());
Assert.assertEquals(expectedExitCode, containerStatus.getExitStatus());
// Now verify the contents of the file. Script generates a message when it // Now verify the contents of the file. Script generates a message when it
// receives a sigterm so we look for that. We cannot perform this check on // receives a sigterm so we look for that. We cannot perform this check on