YARN-8331. Race condition in NM container launched after done. Contributed by Pradeep Ambati

This commit is contained in:
Jason Lowe 2018-08-09 10:17:34 -05:00
parent 778369ea02
commit cd04e954d2
4 changed files with 71 additions and 14 deletions

View File

@ -384,7 +384,7 @@ ContainerEventType.CONTAINER_LAUNCHED, new LaunchTransition())
UPDATE_DIAGNOSTICS_TRANSITION)
.addTransition(ContainerState.SCHEDULED, ContainerState.KILLING,
ContainerEventType.KILL_CONTAINER,
new KillBeforeRunningTransition())
new KillTransition())
.addTransition(ContainerState.SCHEDULED, ContainerState.SCHEDULED,
ContainerEventType.UPDATE_CONTAINER_TOKEN,
new NotifyContainerSchedulerOfUpdateTransition())
@ -618,6 +618,9 @@ ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition())
.addTransition(ContainerState.EXITED_WITH_SUCCESS,
ContainerState.EXITED_WITH_SUCCESS,
ContainerEventType.UPDATE_CONTAINER_TOKEN)
.addTransition(ContainerState.EXITED_WITH_SUCCESS,
ContainerState.EXITED_WITH_SUCCESS,
ContainerEventType.CONTAINER_KILLED_ON_REQUEST)
// From EXITED_WITH_FAILURE State
.addTransition(ContainerState.EXITED_WITH_FAILURE, ContainerState.DONE,
@ -635,6 +638,9 @@ ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition())
.addTransition(ContainerState.EXITED_WITH_FAILURE,
ContainerState.EXITED_WITH_FAILURE,
ContainerEventType.UPDATE_CONTAINER_TOKEN)
.addTransition(ContainerState.EXITED_WITH_FAILURE,
ContainerState.EXITED_WITH_FAILURE,
ContainerEventType.CONTAINER_KILLED_ON_REQUEST)
// From KILLING State.
.addTransition(ContainerState.KILLING,
@ -694,6 +700,9 @@ ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition())
.addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
ContainerEventType.UPDATE_CONTAINER_TOKEN)
.addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
ContainerEventType.CONTAINER_KILLED_ON_REQUEST)
// From DONE
.addTransition(ContainerState.DONE, ContainerState.DONE,
@ -714,6 +723,8 @@ ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition())
// No transition - assuming container is on its way to completion
.addTransition(ContainerState.DONE, ContainerState.DONE,
ContainerEventType.UPDATE_CONTAINER_TOKEN)
.addTransition(ContainerState.DONE, ContainerState.DONE,
ContainerEventType.CONTAINER_KILLED_ON_REQUEST)
// create the topology tables
.installTopology();

View File

@ -556,14 +556,10 @@ protected void handleContainerExitCode(int exitCode, Path containerLogDir) {
|| exitCode == ExitCode.TERMINATED.getExitCode()) {
// If the process was killed, Send container_cleanedup_after_kill and
// just break out of this method.
// If Container was killed before starting... NO need to do this.
if (!killedBeforeStart) {
dispatcher.getEventHandler().handle(
new ContainerExitEvent(containerId,
ContainerEventType.CONTAINER_KILLED_ON_REQUEST, exitCode,
diagnosticInfo.toString()));
}
dispatcher.getEventHandler().handle(
new ContainerExitEvent(containerId,
ContainerEventType.CONTAINER_KILLED_ON_REQUEST, exitCode,
diagnosticInfo.toString()));
} else if (exitCode != 0) {
handleContainerExitWithFailure(containerId, exitCode, containerLogDir,
diagnosticInfo);

View File

@ -23,6 +23,11 @@
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -151,7 +156,14 @@ public void handle(ContainersLauncherEvent event) {
case CLEANUP_CONTAINER_FOR_REINIT:
ContainerLaunch launcher = running.remove(containerId);
if (launcher == null) {
// Container not launched. So nothing needs to be done.
// Container not launched.
// triggering KILLING to CONTAINER_CLEANEDUP_AFTER_KILL transition.
dispatcher.getEventHandler().handle(
new ContainerExitEvent(containerId,
ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
Shell.WINDOWS ? ContainerExecutor.ExitCode.FORCE_KILLED.getExitCode() :
ContainerExecutor.ExitCode.TERMINATED.getExitCode(),
"Container terminated before launch."));
return;
}

View File

@ -23,6 +23,7 @@
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.refEq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
@ -664,6 +665,17 @@ public void testKillOnLocalizedWhenContainerNotLaunchedContainerKilled()
ContainerLaunch launcher = wc.launcher.running.get(wc.c.getContainerId());
wc.killContainer();
assertEquals(ContainerState.KILLING, wc.c.getContainerState());
// check that container cleanup hasn't started at this point.
LocalizationCleanupMatcher cleanupResources =
new LocalizationCleanupMatcher(wc.c);
verify(wc.localizerBus, times(0)).handle(argThat(cleanupResources));
// check if containerlauncher cleans up the container launch.
verify(wc.launcherBus)
.handle(refEq(new ContainersLauncherEvent(wc.c,
ContainersLauncherEventType.CLEANUP_CONTAINER), "timestamp"));
launcher.call();
wc.drainDispatcherEvents();
assertEquals(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
@ -676,6 +688,7 @@ public void testKillOnLocalizedWhenContainerNotLaunchedContainerKilled()
assertEquals(ContainerState.DONE, wc.c.getContainerState());
assertEquals(killed + 1, metrics.getKilledContainers());
assertEquals(0, metrics.getRunningContainers());
assertEquals(0, wc.launcher.running.size());
} finally {
if (wc != null) {
wc.finished();
@ -1145,7 +1158,7 @@ private void verifyCleanupCall(WrappedContainer wc) throws Exception {
ResourcesReleasedMatcher matchesReq =
new ResourcesReleasedMatcher(wc.localResources, EnumSet.of(
LocalResourceVisibility.PUBLIC, LocalResourceVisibility.PRIVATE,
LocalResourceVisibility.APPLICATION));
LocalResourceVisibility.APPLICATION), wc.c);
verify(wc.localizerBus, atLeastOnce()).handle(argThat(matchesReq));
}
@ -1161,13 +1174,35 @@ private void verifyDockerContainerCleanupCall(WrappedContainer wc)
wc.c.getContainerId().toString())));
}
private static class ResourcesReleasedMatcher extends
// Argument matcher for matching container localization cleanup event.
private static class LocalizationCleanupMatcher extends
ArgumentMatcher<LocalizationEvent> {
Container c;
LocalizationCleanupMatcher(Container c){
this.c = c;
}
@Override
public boolean matches(Object o) {
if (!(o instanceof ContainerLocalizationCleanupEvent)) {
return false;
}
ContainerLocalizationCleanupEvent evt =
(ContainerLocalizationCleanupEvent) o;
return (evt.getContainer() == c);
}
}
private static class ResourcesReleasedMatcher extends
LocalizationCleanupMatcher {
final HashSet<LocalResourceRequest> resources =
new HashSet<LocalResourceRequest>();
ResourcesReleasedMatcher(Map<String, LocalResource> allResources,
EnumSet<LocalResourceVisibility> vis) throws URISyntaxException {
EnumSet<LocalResourceVisibility> vis, Container c) throws URISyntaxException {
super(c);
for (Entry<String, LocalResource> e : allResources.entrySet()) {
if (vis.contains(e.getValue().getVisibility())) {
resources.add(new LocalResourceRequest(e.getValue()));
@ -1177,9 +1212,12 @@ private static class ResourcesReleasedMatcher extends
@Override
public boolean matches(Object o) {
if (!(o instanceof ContainerLocalizationCleanupEvent)) {
// match event type and container.
if(!super.matches(o)){
return false;
}
// match resources.
ContainerLocalizationCleanupEvent evt =
(ContainerLocalizationCleanupEvent) o;
final HashSet<LocalResourceRequest> expected =