YARN-9126. Fix container clean up for reinitialization.
Contributed by Chandni Singh
This commit is contained in:
parent
cf571133b8
commit
e815fd9c49
@ -113,6 +113,7 @@ public class ContainerLaunch implements Callable<Integer> {
|
||||
Shell.appendScriptExtension("launch_container");
|
||||
|
||||
public static final String FINAL_CONTAINER_TOKENS_FILE = "container_tokens";
|
||||
public static final String SYSFS_DIR = "sysfs";
|
||||
|
||||
public static final String KEYSTORE_FILE = "yarn_provided.keystore";
|
||||
public static final String TRUSTSTORE_FILE = "yarn_provided.truststore";
|
||||
@ -1772,6 +1773,8 @@ protected void cleanupContainerFiles(Path containerWorkDir) {
|
||||
deleteAsUser(new Path(containerWorkDir, CONTAINER_SCRIPT));
|
||||
// delete TokensPath
|
||||
deleteAsUser(new Path(containerWorkDir, FINAL_CONTAINER_TOKENS_FILE));
|
||||
// delete sysfs dir
|
||||
deleteAsUser(new Path(containerWorkDir, SYSFS_DIR));
|
||||
|
||||
// delete symlinks because launch script will create symlinks again
|
||||
try {
|
||||
|
@ -46,7 +46,6 @@
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
@ -152,25 +151,10 @@ public void handle(ContainersLauncherEvent event) {
|
||||
containerLauncher.submit(launch);
|
||||
break;
|
||||
case CLEANUP_CONTAINER:
|
||||
cleanup(event, containerId, true);
|
||||
break;
|
||||
case CLEANUP_CONTAINER_FOR_REINIT:
|
||||
ContainerLaunch existingLaunch = running.remove(containerId);
|
||||
if (existingLaunch == null) {
|
||||
// Container not launched.
|
||||
// triggering KILLING to CONTAINER_CLEANEDUP_AFTER_KILL transition.
|
||||
dispatcher.getEventHandler().handle(
|
||||
new ContainerExitEvent(containerId,
|
||||
ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
|
||||
Shell.WINDOWS ? ContainerExecutor.ExitCode.FORCE_KILLED.getExitCode() :
|
||||
ContainerExecutor.ExitCode.TERMINATED.getExitCode(),
|
||||
"Container terminated before launch."));
|
||||
return;
|
||||
}
|
||||
|
||||
// Cleanup a container whether it is running/killed/completed, so that
|
||||
// no sub-processes are alive.
|
||||
ContainerCleanup cleanup = new ContainerCleanup(context, getConfig(),
|
||||
dispatcher, exec, event.getContainer(), existingLaunch);
|
||||
containerLauncher.submit(cleanup);
|
||||
cleanup(event, containerId, false);
|
||||
break;
|
||||
case SIGNAL_CONTAINER:
|
||||
SignalContainersLauncherEvent signalEvent =
|
||||
@ -221,4 +205,32 @@ public void handle(ContainersLauncherEvent event) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
void cleanup(ContainersLauncherEvent event, ContainerId containerId,
|
||||
boolean async) {
|
||||
ContainerLaunch existingLaunch = running.remove(containerId);
|
||||
if (existingLaunch == null) {
|
||||
// Container not launched.
|
||||
// triggering KILLING to CONTAINER_CLEANEDUP_AFTER_KILL transition.
|
||||
dispatcher.getEventHandler().handle(
|
||||
new ContainerExitEvent(containerId,
|
||||
ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
|
||||
Shell.WINDOWS ?
|
||||
ContainerExecutor.ExitCode.FORCE_KILLED.getExitCode() :
|
||||
ContainerExecutor.ExitCode.TERMINATED.getExitCode(),
|
||||
"Container terminated before launch."));
|
||||
return;
|
||||
}
|
||||
|
||||
// Cleanup a container whether it is running/killed/completed, so that
|
||||
// no sub-processes are alive.
|
||||
ContainerCleanup cleanup = new ContainerCleanup(context, getConfig(),
|
||||
dispatcher, exec, event.getContainer(), existingLaunch);
|
||||
if (async) {
|
||||
containerLauncher.submit(cleanup);
|
||||
} else {
|
||||
cleanup.run();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -31,6 +31,7 @@
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.InjectMocks;
|
||||
@ -39,8 +40,10 @@
|
||||
import org.mockito.MockitoAnnotations;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
@ -212,11 +215,14 @@ public void testCleanupContainerForReINITEvent()
|
||||
|
||||
when(event.getType())
|
||||
.thenReturn(ContainersLauncherEventType.CLEANUP_CONTAINER_FOR_REINIT);
|
||||
assertEquals(1, dummyMap.size());
|
||||
final List<ContainerId> cleanedContainers = new ArrayList<>();
|
||||
doAnswer(invocation -> {
|
||||
cleanedContainers.add((ContainerId)invocation.getArguments()[1]);
|
||||
return null;
|
||||
}).when(spy).cleanup(any(), any(), anyBoolean());
|
||||
spy.handle(event);
|
||||
assertEquals(0, dummyMap.size());
|
||||
Mockito.verify(containerLauncher, Mockito.times(1))
|
||||
.submit(Mockito.any(ContainerCleanup.class));
|
||||
Assert.assertEquals("container not cleaned", containerId,
|
||||
cleanedContainers.get(0));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
Loading…
Reference in New Issue
Block a user