YARN-8206. Sending a kill does not immediately kill docker containers. Contributed by Eric Badger
This commit is contained in:
parent
57c2feb0d3
commit
5f11288e41
@ -617,19 +617,8 @@ protected void addCGroupParentIfRequired(String resourcesOptions,
|
|||||||
*/
|
*/
|
||||||
private boolean allowPrivilegedContainerExecution(Container container)
|
private boolean allowPrivilegedContainerExecution(Container container)
|
||||||
throws ContainerExecutionException {
|
throws ContainerExecutionException {
|
||||||
Map<String, String> environment = container.getLaunchContext()
|
|
||||||
.getEnvironment();
|
|
||||||
String runPrivilegedContainerEnvVar = environment
|
|
||||||
.get(ENV_DOCKER_CONTAINER_RUN_PRIVILEGED_CONTAINER);
|
|
||||||
|
|
||||||
if (runPrivilegedContainerEnvVar == null) {
|
if(!isContainerRequestedAsPrivileged(container)) {
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!runPrivilegedContainerEnvVar.equalsIgnoreCase("true")) {
|
|
||||||
LOG.warn("NOT running a privileged container. Value of " +
|
|
||||||
ENV_DOCKER_CONTAINER_RUN_PRIVILEGED_CONTAINER
|
|
||||||
+ "is invalid: " + runPrivilegedContainerEnvVar);
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -669,6 +658,20 @@ private boolean allowPrivilegedContainerExecution(Container container)
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This function only returns whether a privileged container was requested,
|
||||||
|
* not whether the container was or will be launched as privileged.
|
||||||
|
* @param container
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
private boolean isContainerRequestedAsPrivileged(
|
||||||
|
Container container) {
|
||||||
|
String runPrivilegedContainerEnvVar = container.getLaunchContext()
|
||||||
|
.getEnvironment().get(ENV_DOCKER_CONTAINER_RUN_PRIVILEGED_CONTAINER);
|
||||||
|
return Boolean.parseBoolean(runPrivilegedContainerEnvVar);
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
private String mountReadOnlyPath(String mount,
|
private String mountReadOnlyPath(String mount,
|
||||||
Map<Path, List<String>> localizedResources)
|
Map<Path, List<String>> localizedResources)
|
||||||
throws ContainerExecutionException {
|
throws ContainerExecutionException {
|
||||||
@ -963,19 +966,16 @@ public void relaunchContainer(ContainerRuntimeContext ctx)
|
|||||||
public void signalContainer(ContainerRuntimeContext ctx)
|
public void signalContainer(ContainerRuntimeContext ctx)
|
||||||
throws ContainerExecutionException {
|
throws ContainerExecutionException {
|
||||||
ContainerExecutor.Signal signal = ctx.getExecutionAttribute(SIGNAL);
|
ContainerExecutor.Signal signal = ctx.getExecutionAttribute(SIGNAL);
|
||||||
String containerId = ctx.getContainer().getContainerId().toString();
|
|
||||||
Map<String, String> env =
|
Map<String, String> env =
|
||||||
ctx.getContainer().getLaunchContext().getEnvironment();
|
ctx.getContainer().getLaunchContext().getEnvironment();
|
||||||
try {
|
try {
|
||||||
if (ContainerExecutor.Signal.NULL.equals(signal)) {
|
if (ContainerExecutor.Signal.NULL.equals(signal)) {
|
||||||
executeLivelinessCheck(ctx);
|
executeLivelinessCheck(ctx);
|
||||||
} else {
|
} else if (ContainerExecutor.Signal.TERM.equals(signal)) {
|
||||||
if (ContainerExecutor.Signal.KILL.equals(signal)
|
String containerId = ctx.getContainer().getContainerId().toString();
|
||||||
|| ContainerExecutor.Signal.TERM.equals(signal)) {
|
|
||||||
handleContainerStop(containerId, env);
|
handleContainerStop(containerId, env);
|
||||||
} else {
|
} else {
|
||||||
handleContainerKill(containerId, env, signal);
|
handleContainerKill(ctx, env, signal);
|
||||||
}
|
|
||||||
}
|
}
|
||||||
} catch (ContainerExecutionException e) {
|
} catch (ContainerExecutionException e) {
|
||||||
LOG.warn("Signal docker container failed. Exception: ", e);
|
LOG.warn("Signal docker container failed. Exception: ", e);
|
||||||
@ -1184,21 +1184,50 @@ private void handleContainerStop(String containerId, Map<String, String> env)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void handleContainerKill(String containerId, Map<String, String> env,
|
private void handleContainerKill(ContainerRuntimeContext ctx,
|
||||||
|
Map<String, String> env,
|
||||||
ContainerExecutor.Signal signal) throws ContainerExecutionException {
|
ContainerExecutor.Signal signal) throws ContainerExecutionException {
|
||||||
|
Container container = ctx.getContainer();
|
||||||
|
|
||||||
|
// Only need to check whether the container was asked to be privileged.
|
||||||
|
// If the container had failed the permissions checks upon launch, it
|
||||||
|
// would have never been launched and thus we wouldn't be here
|
||||||
|
// attempting to signal it.
|
||||||
|
if (isContainerRequestedAsPrivileged(container)) {
|
||||||
|
String containerId = container.getContainerId().toString();
|
||||||
DockerCommandExecutor.DockerContainerStatus containerStatus =
|
DockerCommandExecutor.DockerContainerStatus containerStatus =
|
||||||
DockerCommandExecutor.getContainerStatus(containerId, conf,
|
DockerCommandExecutor.getContainerStatus(containerId, conf,
|
||||||
privilegedOperationExecutor, nmContext);
|
privilegedOperationExecutor, nmContext);
|
||||||
if (DockerCommandExecutor.isKillable(containerStatus)) {
|
if (DockerCommandExecutor.isKillable(containerStatus)) {
|
||||||
DockerKillCommand dockerKillCommand =
|
DockerKillCommand dockerKillCommand =
|
||||||
new DockerKillCommand(containerId).setSignal(signal.name());
|
new DockerKillCommand(containerId).setSignal(signal.name());
|
||||||
DockerCommandExecutor.executeDockerCommand(dockerKillCommand, containerId,
|
DockerCommandExecutor.executeDockerCommand(dockerKillCommand,
|
||||||
env, conf, privilegedOperationExecutor, false, nmContext);
|
containerId, env, conf, privilegedOperationExecutor, false,
|
||||||
|
nmContext);
|
||||||
} else {
|
} else {
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug(
|
LOG.debug(
|
||||||
"Container status is " + containerStatus.getName()
|
"Container status is {}, skipping kill - {}",
|
||||||
+ ", skipping kill - " + containerId);
|
containerStatus.getName(), containerId);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
PrivilegedOperation privOp = new PrivilegedOperation(
|
||||||
|
PrivilegedOperation.OperationType.SIGNAL_CONTAINER);
|
||||||
|
privOp.appendArgs(ctx.getExecutionAttribute(RUN_AS_USER),
|
||||||
|
ctx.getExecutionAttribute(USER),
|
||||||
|
Integer.toString(PrivilegedOperation.RunAsUserCommand
|
||||||
|
.SIGNAL_CONTAINER.getValue()),
|
||||||
|
ctx.getExecutionAttribute(PID),
|
||||||
|
Integer.toString(ctx.getExecutionAttribute(SIGNAL).getValue()));
|
||||||
|
privOp.disableFailureLogging();
|
||||||
|
try {
|
||||||
|
privilegedOperationExecutor.executePrivilegedOperation(null,
|
||||||
|
privOp, null, null, false, false);
|
||||||
|
} catch (PrivilegedOperationException e) {
|
||||||
|
//Don't log the failure here. Some kinds of signaling failures are
|
||||||
|
// acceptable. Let the calling executor decide what to do.
|
||||||
|
throw new ContainerExecutionException("Signal container failed using "
|
||||||
|
+ "signal: " + signal.name(), e
|
||||||
|
.getExitCode(), e.getOutput(), e.getErrorOutput());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -43,13 +43,8 @@
|
|||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationException;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationException;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerClient;
|
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerCommandExecutor;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerCommandExecutor;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerKillCommand;
|
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerRmCommand;
|
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerRunCommand;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerRunCommand;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerStartCommand;
|
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerStopCommand;
|
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerVolumeCommand;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerVolumeCommand;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.DockerCommandPlugin;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.DockerCommandPlugin;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePlugin;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePlugin;
|
||||||
@ -87,6 +82,7 @@
|
|||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.DockerLinuxContainerRuntime.ENV_DOCKER_CONTAINER_RUN_PRIVILEGED_CONTAINER;
|
||||||
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.APPID;
|
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.APPID;
|
||||||
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.APPLICATION_LOCAL_DIRS;
|
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.APPLICATION_LOCAL_DIRS;
|
||||||
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.CONTAINER_ID_STR;
|
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.CONTAINER_ID_STR;
|
||||||
@ -103,7 +99,6 @@
|
|||||||
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.RESOURCES_OPTIONS;
|
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.RESOURCES_OPTIONS;
|
||||||
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.RUN_AS_USER;
|
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.RUN_AS_USER;
|
||||||
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.SIGNAL;
|
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.SIGNAL;
|
||||||
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.TC_COMMAND_FILE;
|
|
||||||
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.USER;
|
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.USER;
|
||||||
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.USER_FILECACHE_DIRS;
|
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.USER_FILECACHE_DIRS;
|
||||||
import static org.mockito.Matchers.anyString;
|
import static org.mockito.Matchers.anyString;
|
||||||
@ -317,20 +312,24 @@ public void testSelectDockerContainerType() {
|
|||||||
.isDockerContainerRequested(envOtherType));
|
.isDockerContainerRequested(envOtherType));
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
private PrivilegedOperation capturePrivilegedOperation()
|
private PrivilegedOperation capturePrivilegedOperation()
|
||||||
throws PrivilegedOperationException {
|
throws PrivilegedOperationException {
|
||||||
|
return capturePrivilegedOperation(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
private PrivilegedOperation capturePrivilegedOperation(int invocations)
|
||||||
|
throws PrivilegedOperationException {
|
||||||
ArgumentCaptor<PrivilegedOperation> opCaptor = ArgumentCaptor.forClass(
|
ArgumentCaptor<PrivilegedOperation> opCaptor = ArgumentCaptor.forClass(
|
||||||
PrivilegedOperation.class);
|
PrivilegedOperation.class);
|
||||||
|
|
||||||
//single invocation expected
|
|
||||||
//due to type erasure + mocking, this verification requires a suppress
|
//due to type erasure + mocking, this verification requires a suppress
|
||||||
// warning annotation on the entire method
|
// warning annotation on the entire method
|
||||||
verify(mockExecutor, times(1))
|
verify(mockExecutor, times(invocations))
|
||||||
.executePrivilegedOperation(anyList(), opCaptor.capture(), any(
|
.executePrivilegedOperation(anyList(), opCaptor.capture(), any(
|
||||||
File.class), anyMap(), anyBoolean(), anyBoolean());
|
File.class), anyMap(), anyBoolean(), anyBoolean());
|
||||||
|
|
||||||
//verification completed. we need to isolate specific invications.
|
//verification completed. we need to isolate specific invocations.
|
||||||
// hence, reset mock here
|
// hence, reset mock here
|
||||||
Mockito.reset(mockExecutor);
|
Mockito.reset(mockExecutor);
|
||||||
|
|
||||||
@ -918,6 +917,8 @@ public void testLaunchPrivilegedContainersInvalidEnvVar()
|
|||||||
@Test
|
@Test
|
||||||
public void testLaunchPrivilegedContainersWithDisabledSetting()
|
public void testLaunchPrivilegedContainersWithDisabledSetting()
|
||||||
throws ContainerExecutionException {
|
throws ContainerExecutionException {
|
||||||
|
conf.setBoolean(YarnConfiguration.NM_DOCKER_ALLOW_PRIVILEGED_CONTAINERS,
|
||||||
|
false);
|
||||||
DockerLinuxContainerRuntime runtime = new DockerLinuxContainerRuntime(
|
DockerLinuxContainerRuntime runtime = new DockerLinuxContainerRuntime(
|
||||||
mockExecutor, mockCGroupsHandler);
|
mockExecutor, mockCGroupsHandler);
|
||||||
runtime.initialize(conf, nmContext);
|
runtime.initialize(conf, nmContext);
|
||||||
@ -939,6 +940,7 @@ public void testLaunchPrivilegedContainersWithEnabledSettingAndDefaultACL()
|
|||||||
//Enable privileged containers.
|
//Enable privileged containers.
|
||||||
conf.setBoolean(YarnConfiguration.NM_DOCKER_ALLOW_PRIVILEGED_CONTAINERS,
|
conf.setBoolean(YarnConfiguration.NM_DOCKER_ALLOW_PRIVILEGED_CONTAINERS,
|
||||||
true);
|
true);
|
||||||
|
conf.set(YarnConfiguration.NM_DOCKER_PRIVILEGED_CONTAINERS_ACL, "");
|
||||||
|
|
||||||
DockerLinuxContainerRuntime runtime = new DockerLinuxContainerRuntime(
|
DockerLinuxContainerRuntime runtime = new DockerLinuxContainerRuntime(
|
||||||
mockExecutor, mockCGroupsHandler);
|
mockExecutor, mockCGroupsHandler);
|
||||||
@ -1356,9 +1358,12 @@ public void testContainerLivelinessCheck()
|
|||||||
public void testDockerStopOnTermSignalWhenRunning()
|
public void testDockerStopOnTermSignalWhenRunning()
|
||||||
throws ContainerExecutionException, PrivilegedOperationException,
|
throws ContainerExecutionException, PrivilegedOperationException,
|
||||||
IOException {
|
IOException {
|
||||||
List<String> dockerCommands = getDockerCommandsForSignal(
|
when(mockExecutor
|
||||||
ContainerExecutor.Signal.TERM,
|
.executePrivilegedOperation(anyList(), any(PrivilegedOperation.class),
|
||||||
DockerCommandExecutor.DockerContainerStatus.RUNNING);
|
any(File.class), anyMap(), anyBoolean(), anyBoolean())).thenReturn(
|
||||||
|
DockerCommandExecutor.DockerContainerStatus.RUNNING.getName());
|
||||||
|
List<String> dockerCommands = getDockerCommandsForDockerStop(
|
||||||
|
ContainerExecutor.Signal.TERM);
|
||||||
Assert.assertEquals(4, dockerCommands.size());
|
Assert.assertEquals(4, dockerCommands.size());
|
||||||
Assert.assertEquals("[docker-command-execution]", dockerCommands.get(0));
|
Assert.assertEquals("[docker-command-execution]", dockerCommands.get(0));
|
||||||
Assert.assertEquals(" docker-command=stop", dockerCommands.get(1));
|
Assert.assertEquals(" docker-command=stop", dockerCommands.get(1));
|
||||||
@ -1370,11 +1375,54 @@ public void testDockerStopOnTermSignalWhenRunning()
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testDockerStopOnKillSignalWhenRunning()
|
public void testDockerStopOnKillSignalWhenRunning()
|
||||||
|
throws ContainerExecutionException, PrivilegedOperationException {
|
||||||
|
List<String> dockerCommands = getDockerCommandsForSignal(
|
||||||
|
ContainerExecutor.Signal.KILL);
|
||||||
|
Assert.assertEquals(5, dockerCommands.size());
|
||||||
|
Assert.assertEquals(runAsUser, dockerCommands.get(0));
|
||||||
|
Assert.assertEquals(user, dockerCommands.get(1));
|
||||||
|
Assert.assertEquals(
|
||||||
|
Integer.toString(PrivilegedOperation.RunAsUserCommand
|
||||||
|
.SIGNAL_CONTAINER.getValue()),
|
||||||
|
dockerCommands.get(2));
|
||||||
|
Assert.assertEquals(signalPid, dockerCommands.get(3));
|
||||||
|
Assert.assertEquals(
|
||||||
|
Integer.toString(ContainerExecutor.Signal.KILL.getValue()),
|
||||||
|
dockerCommands.get(4));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDockerKillOnQuitSignalWhenRunning() throws Exception {
|
||||||
|
List<String> dockerCommands = getDockerCommandsForSignal(
|
||||||
|
ContainerExecutor.Signal.QUIT);
|
||||||
|
|
||||||
|
Assert.assertEquals(5, dockerCommands.size());
|
||||||
|
Assert.assertEquals(runAsUser, dockerCommands.get(0));
|
||||||
|
Assert.assertEquals(user, dockerCommands.get(1));
|
||||||
|
Assert.assertEquals(
|
||||||
|
Integer.toString(PrivilegedOperation.RunAsUserCommand
|
||||||
|
.SIGNAL_CONTAINER.getValue()),
|
||||||
|
dockerCommands.get(2));
|
||||||
|
Assert.assertEquals(signalPid, dockerCommands.get(3));
|
||||||
|
Assert.assertEquals(
|
||||||
|
Integer.toString(ContainerExecutor.Signal.QUIT.getValue()),
|
||||||
|
dockerCommands.get(4));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDockerStopOnTermSignalWhenRunningPrivileged()
|
||||||
throws ContainerExecutionException, PrivilegedOperationException,
|
throws ContainerExecutionException, PrivilegedOperationException,
|
||||||
IOException {
|
IOException {
|
||||||
List<String> dockerCommands = getDockerCommandsForSignal(
|
conf.set(YarnConfiguration.NM_DOCKER_ALLOW_PRIVILEGED_CONTAINERS, "true");
|
||||||
ContainerExecutor.Signal.KILL,
|
conf.set(YarnConfiguration.NM_DOCKER_PRIVILEGED_CONTAINERS_ACL,
|
||||||
DockerCommandExecutor.DockerContainerStatus.RUNNING);
|
submittingUser);
|
||||||
|
env.put(ENV_DOCKER_CONTAINER_RUN_PRIVILEGED_CONTAINER, "true");
|
||||||
|
when(mockExecutor
|
||||||
|
.executePrivilegedOperation(anyList(), any(PrivilegedOperation.class),
|
||||||
|
any(File.class), anyMap(), anyBoolean(), anyBoolean())).thenReturn(
|
||||||
|
DockerCommandExecutor.DockerContainerStatus.RUNNING.getName());
|
||||||
|
List<String> dockerCommands = getDockerCommandsForDockerStop(
|
||||||
|
ContainerExecutor.Signal.TERM);
|
||||||
Assert.assertEquals(4, dockerCommands.size());
|
Assert.assertEquals(4, dockerCommands.size());
|
||||||
Assert.assertEquals("[docker-command-execution]", dockerCommands.get(0));
|
Assert.assertEquals("[docker-command-execution]", dockerCommands.get(0));
|
||||||
Assert.assertEquals(" docker-command=stop", dockerCommands.get(1));
|
Assert.assertEquals(" docker-command=stop", dockerCommands.get(1));
|
||||||
@ -1385,10 +1433,42 @@ public void testDockerStopOnKillSignalWhenRunning()
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testDockerKillOnQuitSignalWhenRunning() throws Exception {
|
public void testDockerStopOnKillSignalWhenRunningPrivileged()
|
||||||
List<String> dockerCommands = getDockerCommandsForSignal(
|
throws ContainerExecutionException, PrivilegedOperationException,
|
||||||
ContainerExecutor.Signal.QUIT,
|
IOException {
|
||||||
DockerCommandExecutor.DockerContainerStatus.RUNNING);
|
conf.set(YarnConfiguration.NM_DOCKER_ALLOW_PRIVILEGED_CONTAINERS, "true");
|
||||||
|
conf.set(YarnConfiguration.NM_DOCKER_PRIVILEGED_CONTAINERS_ACL,
|
||||||
|
submittingUser);
|
||||||
|
env.put(ENV_DOCKER_CONTAINER_RUN_PRIVILEGED_CONTAINER, "true");
|
||||||
|
when(mockExecutor
|
||||||
|
.executePrivilegedOperation(anyList(), any(PrivilegedOperation.class),
|
||||||
|
any(File.class), anyMap(), anyBoolean(), anyBoolean())).thenReturn(
|
||||||
|
DockerCommandExecutor.DockerContainerStatus.RUNNING.getName());
|
||||||
|
List<String> dockerCommands = getDockerCommandsForDockerStop(
|
||||||
|
ContainerExecutor.Signal.KILL);
|
||||||
|
Assert.assertEquals(4, dockerCommands.size());
|
||||||
|
Assert.assertEquals("[docker-command-execution]", dockerCommands.get(0));
|
||||||
|
Assert.assertEquals(" docker-command=kill", dockerCommands.get(1));
|
||||||
|
Assert.assertEquals(
|
||||||
|
" name=container_e11_1518975676334_14532816_01_000001",
|
||||||
|
dockerCommands.get(2));
|
||||||
|
Assert.assertEquals(" signal=KILL", dockerCommands.get(3));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDockerKillOnQuitSignalWhenRunningPrivileged()
|
||||||
|
throws Exception {
|
||||||
|
conf.set(YarnConfiguration.NM_DOCKER_ALLOW_PRIVILEGED_CONTAINERS, "true");
|
||||||
|
conf.set(YarnConfiguration.NM_DOCKER_PRIVILEGED_CONTAINERS_ACL,
|
||||||
|
submittingUser);
|
||||||
|
env.put(ENV_DOCKER_CONTAINER_RUN_PRIVILEGED_CONTAINER, "true");
|
||||||
|
when(mockExecutor
|
||||||
|
.executePrivilegedOperation(anyList(), any(PrivilegedOperation.class),
|
||||||
|
any(File.class), anyMap(), anyBoolean(), anyBoolean())).thenReturn(
|
||||||
|
DockerCommandExecutor.DockerContainerStatus.RUNNING.getName());
|
||||||
|
List<String> dockerCommands = getDockerCommandsForDockerStop(
|
||||||
|
ContainerExecutor.Signal.QUIT);
|
||||||
|
|
||||||
Assert.assertEquals(4, dockerCommands.size());
|
Assert.assertEquals(4, dockerCommands.size());
|
||||||
Assert.assertEquals("[docker-command-execution]", dockerCommands.get(0));
|
Assert.assertEquals("[docker-command-execution]", dockerCommands.get(0));
|
||||||
Assert.assertEquals(" docker-command=kill", dockerCommands.get(1));
|
Assert.assertEquals(" docker-command=kill", dockerCommands.get(1));
|
||||||
@ -1403,8 +1483,8 @@ public void testDockerRmOnWhenExited() throws Exception {
|
|||||||
env.put(DockerLinuxContainerRuntime.ENV_DOCKER_CONTAINER_DELAYED_REMOVAL,
|
env.put(DockerLinuxContainerRuntime.ENV_DOCKER_CONTAINER_DELAYED_REMOVAL,
|
||||||
"false");
|
"false");
|
||||||
conf.set(YarnConfiguration.NM_DOCKER_ALLOW_DELAYED_REMOVAL, "true");
|
conf.set(YarnConfiguration.NM_DOCKER_ALLOW_DELAYED_REMOVAL, "true");
|
||||||
MockRuntime runtime = new MockRuntime(mockExecutor,
|
DockerLinuxContainerRuntime runtime =
|
||||||
DockerCommandExecutor.DockerContainerStatus.EXITED, true);
|
new DockerLinuxContainerRuntime(mockExecutor, mockCGroupsHandler);
|
||||||
builder.setExecutionAttribute(RUN_AS_USER, runAsUser)
|
builder.setExecutionAttribute(RUN_AS_USER, runAsUser)
|
||||||
.setExecutionAttribute(USER, user);
|
.setExecutionAttribute(USER, user);
|
||||||
runtime.initialize(enableMockContainerExecutor(conf), null);
|
runtime.initialize(enableMockContainerExecutor(conf), null);
|
||||||
@ -1420,8 +1500,8 @@ public void testNoDockerRmWhenDelayedDeletionEnabled()
|
|||||||
env.put(DockerLinuxContainerRuntime.ENV_DOCKER_CONTAINER_DELAYED_REMOVAL,
|
env.put(DockerLinuxContainerRuntime.ENV_DOCKER_CONTAINER_DELAYED_REMOVAL,
|
||||||
"true");
|
"true");
|
||||||
conf.set(YarnConfiguration.NM_DOCKER_ALLOW_DELAYED_REMOVAL, "true");
|
conf.set(YarnConfiguration.NM_DOCKER_ALLOW_DELAYED_REMOVAL, "true");
|
||||||
MockRuntime runtime = new MockRuntime(mockExecutor,
|
DockerLinuxContainerRuntime runtime =
|
||||||
DockerCommandExecutor.DockerContainerStatus.EXITED, true);
|
new DockerLinuxContainerRuntime(mockExecutor, mockCGroupsHandler);
|
||||||
builder.setExecutionAttribute(RUN_AS_USER, runAsUser)
|
builder.setExecutionAttribute(RUN_AS_USER, runAsUser)
|
||||||
.setExecutionAttribute(USER, user);
|
.setExecutionAttribute(USER, user);
|
||||||
runtime.initialize(enableMockContainerExecutor(conf), null);
|
runtime.initialize(enableMockContainerExecutor(conf), null);
|
||||||
@ -1431,13 +1511,34 @@ public void testNoDockerRmWhenDelayedDeletionEnabled()
|
|||||||
File.class), anyMap(), anyBoolean(), anyBoolean());
|
File.class), anyMap(), anyBoolean(), anyBoolean());
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<String> getDockerCommandsForSignal(
|
private List<String> getDockerCommandsForDockerStop(
|
||||||
ContainerExecutor.Signal signal,
|
ContainerExecutor.Signal signal)
|
||||||
DockerCommandExecutor.DockerContainerStatus status)
|
|
||||||
throws ContainerExecutionException, PrivilegedOperationException,
|
throws ContainerExecutionException, PrivilegedOperationException,
|
||||||
IOException {
|
IOException {
|
||||||
|
|
||||||
MockRuntime runtime = new MockRuntime(mockExecutor, status, false);
|
DockerLinuxContainerRuntime runtime =
|
||||||
|
new DockerLinuxContainerRuntime(mockExecutor, mockCGroupsHandler);
|
||||||
|
builder.setExecutionAttribute(RUN_AS_USER, runAsUser)
|
||||||
|
.setExecutionAttribute(USER, user)
|
||||||
|
.setExecutionAttribute(PID, signalPid)
|
||||||
|
.setExecutionAttribute(SIGNAL, signal);
|
||||||
|
runtime.initialize(enableMockContainerExecutor(conf), nmContext);
|
||||||
|
runtime.signalContainer(builder.build());
|
||||||
|
|
||||||
|
PrivilegedOperation op = capturePrivilegedOperation(2);
|
||||||
|
Assert.assertEquals(op.getOperationType(),
|
||||||
|
PrivilegedOperation.OperationType.RUN_DOCKER_CMD);
|
||||||
|
String dockerCommandFile = op.getArguments().get(0);
|
||||||
|
return Files.readAllLines(Paths.get(dockerCommandFile),
|
||||||
|
Charset.forName("UTF-8"));
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<String> getDockerCommandsForSignal(
|
||||||
|
ContainerExecutor.Signal signal)
|
||||||
|
throws ContainerExecutionException, PrivilegedOperationException {
|
||||||
|
|
||||||
|
DockerLinuxContainerRuntime runtime =
|
||||||
|
new DockerLinuxContainerRuntime(mockExecutor, mockCGroupsHandler);
|
||||||
builder.setExecutionAttribute(RUN_AS_USER, runAsUser)
|
builder.setExecutionAttribute(RUN_AS_USER, runAsUser)
|
||||||
.setExecutionAttribute(USER, user)
|
.setExecutionAttribute(USER, user)
|
||||||
.setExecutionAttribute(PID, signalPid)
|
.setExecutionAttribute(PID, signalPid)
|
||||||
@ -1447,10 +1548,8 @@ private List<String> getDockerCommandsForSignal(
|
|||||||
|
|
||||||
PrivilegedOperation op = capturePrivilegedOperation();
|
PrivilegedOperation op = capturePrivilegedOperation();
|
||||||
Assert.assertEquals(op.getOperationType(),
|
Assert.assertEquals(op.getOperationType(),
|
||||||
PrivilegedOperation.OperationType.RUN_DOCKER_CMD);
|
PrivilegedOperation.OperationType.SIGNAL_CONTAINER);
|
||||||
String dockerCommandFile = op.getArguments().get(0);
|
return op.getArguments();
|
||||||
return Files.readAllLines(Paths.get(dockerCommandFile),
|
|
||||||
Charset.forName("UTF-8"));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -1937,12 +2036,16 @@ public void testLaunchContainerWithDockerTokens()
|
|||||||
public void testDockerContainerRelaunch()
|
public void testDockerContainerRelaunch()
|
||||||
throws ContainerExecutionException, PrivilegedOperationException,
|
throws ContainerExecutionException, PrivilegedOperationException,
|
||||||
IOException {
|
IOException {
|
||||||
DockerLinuxContainerRuntime runtime = new MockRuntime(mockExecutor,
|
DockerLinuxContainerRuntime runtime =
|
||||||
DockerCommandExecutor.DockerContainerStatus.EXITED, false);
|
new DockerLinuxContainerRuntime(mockExecutor, mockCGroupsHandler);
|
||||||
runtime.initialize(conf, null);
|
when(mockExecutor
|
||||||
|
.executePrivilegedOperation(anyList(), any(PrivilegedOperation.class),
|
||||||
|
any(File.class), anyMap(), anyBoolean(), anyBoolean())).thenReturn(
|
||||||
|
DockerCommandExecutor.DockerContainerStatus.STOPPED.getName());
|
||||||
|
runtime.initialize(conf, nmContext);
|
||||||
runtime.relaunchContainer(builder.build());
|
runtime.relaunchContainer(builder.build());
|
||||||
|
|
||||||
PrivilegedOperation op = capturePrivilegedOperation();
|
PrivilegedOperation op = capturePrivilegedOperation(2);
|
||||||
List<String> args = op.getArguments();
|
List<String> args = op.getArguments();
|
||||||
String dockerCommandFile = args.get(11);
|
String dockerCommandFile = args.get(11);
|
||||||
|
|
||||||
@ -1960,134 +2063,4 @@ public void testDockerContainerRelaunch()
|
|||||||
" name=container_e11_1518975676334_14532816_01_000001",
|
" name=container_e11_1518975676334_14532816_01_000001",
|
||||||
dockerCommands.get(counter));
|
dockerCommands.get(counter));
|
||||||
}
|
}
|
||||||
|
|
||||||
class MockRuntime extends DockerLinuxContainerRuntime {
|
|
||||||
|
|
||||||
private PrivilegedOperationExecutor privilegedOperationExecutor;
|
|
||||||
private DockerCommandExecutor.DockerContainerStatus containerStatus;
|
|
||||||
private boolean delayedRemovalAllowed;
|
|
||||||
|
|
||||||
MockRuntime(PrivilegedOperationExecutor privilegedOperationExecutor,
|
|
||||||
DockerCommandExecutor.DockerContainerStatus containerStatus,
|
|
||||||
boolean delayedRemovalAllowed) {
|
|
||||||
super(privilegedOperationExecutor);
|
|
||||||
this.privilegedOperationExecutor = privilegedOperationExecutor;
|
|
||||||
this.containerStatus = containerStatus;
|
|
||||||
this.delayedRemovalAllowed = delayedRemovalAllowed;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void signalContainer(ContainerRuntimeContext ctx)
|
|
||||||
throws ContainerExecutionException {
|
|
||||||
ContainerExecutor.Signal signal = ctx.getExecutionAttribute(SIGNAL);
|
|
||||||
String containerName = ctx.getContainer().getContainerId().toString();
|
|
||||||
Map<String, String> environment =
|
|
||||||
ctx.getContainer().getLaunchContext().getEnvironment();
|
|
||||||
try {
|
|
||||||
if (ContainerExecutor.Signal.KILL.equals(signal)
|
|
||||||
|| ContainerExecutor.Signal.TERM.equals(signal)) {
|
|
||||||
if (DockerCommandExecutor.isStoppable(containerStatus)) {
|
|
||||||
DockerStopCommand dockerStopCommand =
|
|
||||||
new DockerStopCommand(containerName)
|
|
||||||
.setGracePeriod(dockerStopGracePeriod);
|
|
||||||
DockerCommandExecutor.executeDockerCommand(dockerStopCommand,
|
|
||||||
containerName, environment, conf, mockExecutor, false,
|
|
||||||
nmContext);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if (DockerCommandExecutor.isKillable(containerStatus)) {
|
|
||||||
DockerKillCommand dockerKillCommand =
|
|
||||||
new DockerKillCommand(containerName);
|
|
||||||
dockerKillCommand.setSignal(signal.name());
|
|
||||||
DockerCommandExecutor.executeDockerCommand(dockerKillCommand,
|
|
||||||
containerName, environment, conf, mockExecutor, false,
|
|
||||||
nmContext);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (ContainerExecutionException e) {
|
|
||||||
LOG.warn("Signal docker container failed. Exception: ", e);
|
|
||||||
throw new ContainerExecutionException("Signal docker container failed",
|
|
||||||
e.getExitCode(), e.getOutput(), e.getErrorOutput());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void reapContainer(ContainerRuntimeContext ctx)
|
|
||||||
throws ContainerExecutionException {
|
|
||||||
String delayedRemoval = env.get(ENV_DOCKER_CONTAINER_DELAYED_REMOVAL);
|
|
||||||
if (delayedRemovalAllowed && delayedRemoval != null
|
|
||||||
&& delayedRemoval.equalsIgnoreCase("true")) {
|
|
||||||
LOG.info("Delayed removal requested and allowed, skipping removal - "
|
|
||||||
+ containerId);
|
|
||||||
} else {
|
|
||||||
if (DockerCommandExecutor.isRemovable(containerStatus)) {
|
|
||||||
DockerRmCommand dockerRmCommand = new DockerRmCommand(containerId);
|
|
||||||
DockerCommandExecutor
|
|
||||||
.executeDockerCommand(dockerRmCommand, containerId, env, conf,
|
|
||||||
privilegedOperationExecutor, false, nmContext);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void relaunchContainer(ContainerRuntimeContext ctx)
|
|
||||||
throws ContainerExecutionException {
|
|
||||||
if (DockerCommandExecutor.isRemovable(containerStatus)) {
|
|
||||||
String relaunchContainerIdStr =
|
|
||||||
ctx.getContainer().getContainerId().toString();
|
|
||||||
DockerStartCommand startCommand =
|
|
||||||
new DockerStartCommand(containerIdStr);
|
|
||||||
DockerClient dockerClient = new DockerClient(conf);
|
|
||||||
String commandFile = dockerClient.writeCommandToTempFile(startCommand,
|
|
||||||
relaunchContainerIdStr);
|
|
||||||
String relaunchRunAsUser = ctx.getExecutionAttribute(RUN_AS_USER);
|
|
||||||
Path relaunchNmPrivateContainerScriptPath = ctx.getExecutionAttribute(
|
|
||||||
NM_PRIVATE_CONTAINER_SCRIPT_PATH);
|
|
||||||
Path relaunchContainerWorkDir =
|
|
||||||
ctx.getExecutionAttribute(CONTAINER_WORK_DIR);
|
|
||||||
//we can't do better here thanks to type-erasure
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
List<String> relaunchLocalDirs = ctx.getExecutionAttribute(LOCAL_DIRS);
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
List<String> relaunchLogDirs = ctx.getExecutionAttribute(LOG_DIRS);
|
|
||||||
String resourcesOpts = ctx.getExecutionAttribute(RESOURCES_OPTIONS);
|
|
||||||
|
|
||||||
PrivilegedOperation launchOp = new PrivilegedOperation(
|
|
||||||
PrivilegedOperation.OperationType.LAUNCH_DOCKER_CONTAINER);
|
|
||||||
|
|
||||||
launchOp.appendArgs(relaunchRunAsUser, ctx.getExecutionAttribute(USER),
|
|
||||||
Integer.toString(PrivilegedOperation
|
|
||||||
.RunAsUserCommand.LAUNCH_DOCKER_CONTAINER.getValue()),
|
|
||||||
ctx.getExecutionAttribute(APPID),
|
|
||||||
relaunchContainerIdStr,
|
|
||||||
relaunchContainerWorkDir.toString(),
|
|
||||||
relaunchNmPrivateContainerScriptPath.toUri().getPath(),
|
|
||||||
ctx.getExecutionAttribute(NM_PRIVATE_TOKENS_PATH).toUri().getPath(),
|
|
||||||
ctx.getExecutionAttribute(PID_FILE_PATH).toString(),
|
|
||||||
StringUtils.join(PrivilegedOperation.LINUX_FILE_PATH_SEPARATOR,
|
|
||||||
relaunchLocalDirs),
|
|
||||||
StringUtils.join(PrivilegedOperation.LINUX_FILE_PATH_SEPARATOR,
|
|
||||||
relaunchLogDirs),
|
|
||||||
commandFile,
|
|
||||||
resourcesOpts);
|
|
||||||
|
|
||||||
String tcCommandFile = ctx.getExecutionAttribute(TC_COMMAND_FILE);
|
|
||||||
|
|
||||||
if (tcCommandFile != null) {
|
|
||||||
launchOp.appendArgs(tcCommandFile);
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
privilegedOperationExecutor.executePrivilegedOperation(null,
|
|
||||||
launchOp, null, null, false, false);
|
|
||||||
} catch (PrivilegedOperationException e) {
|
|
||||||
LOG.warn("Relaunch container failed. Exception: ", e);
|
|
||||||
LOG.info("Docker command used: " + startCommand);
|
|
||||||
|
|
||||||
throw new ContainerExecutionException("Launch container failed", e
|
|
||||||
.getExitCode(), e.getOutput(), e.getErrorOutput());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user