YARN-7973. Added ContainerRelaunch feature for Docker containers.
Contributed by Shane Kumpf
This commit is contained in:
parent
583fa6ed48
commit
c467f311d0
@ -98,6 +98,16 @@ public interface ApplicationConstants {
|
||||
|
||||
public static final String STDOUT = "stdout";
|
||||
|
||||
/**
|
||||
* The type of launch for the container.
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
enum ContainerLaunchType {
|
||||
LAUNCH,
|
||||
RELAUNCH
|
||||
}
|
||||
|
||||
/**
|
||||
* Environment for Applications.
|
||||
*
|
||||
|
@ -181,6 +181,17 @@ public void prepareContainer(ContainerPrepareContext ctx) throws
|
||||
public abstract int launchContainer(ContainerStartContext ctx) throws
|
||||
IOException, ConfigurationException;
|
||||
|
||||
/**
|
||||
* Relaunch the container on the node. This is a blocking call and returns
|
||||
* only when the container exits.
|
||||
* @param ctx Encapsulates information necessary for relaunching containers.
|
||||
* @return the return status of the relaunch
|
||||
* @throws IOException if the container relaunch fails
|
||||
* @throws ConfigurationException if config error was found
|
||||
*/
|
||||
public abstract int relaunchContainer(ContainerStartContext ctx) throws
|
||||
IOException, ConfigurationException;
|
||||
|
||||
/**
|
||||
* Signal container with the specified signal.
|
||||
*
|
||||
|
@ -339,6 +339,12 @@ public int launchContainer(ContainerStartContext ctx)
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int relaunchContainer(ContainerStartContext ctx)
|
||||
throws IOException, ConfigurationException {
|
||||
return launchContainer(ctx);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new {@link ShellCommandExecutor} using the parameters.
|
||||
*
|
||||
|
@ -474,6 +474,20 @@ public void prepareContainer(ContainerPrepareContext ctx) throws IOException {
|
||||
@Override
|
||||
public int launchContainer(ContainerStartContext ctx)
|
||||
throws IOException, ConfigurationException {
|
||||
return handleLaunchForLaunchType(ctx,
|
||||
ApplicationConstants.ContainerLaunchType.LAUNCH);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int relaunchContainer(ContainerStartContext ctx)
|
||||
throws IOException, ConfigurationException {
|
||||
return handleLaunchForLaunchType(ctx,
|
||||
ApplicationConstants.ContainerLaunchType.RELAUNCH);
|
||||
}
|
||||
|
||||
private int handleLaunchForLaunchType(ContainerStartContext ctx,
|
||||
ApplicationConstants.ContainerLaunchType type) throws IOException,
|
||||
ConfigurationException {
|
||||
Container container = ctx.getContainer();
|
||||
String user = ctx.getUser();
|
||||
|
||||
@ -542,13 +556,37 @@ public int launchContainer(ContainerStartContext ctx)
|
||||
ContainerRuntimeContext runtimeContext = buildContainerRuntimeContext(
|
||||
ctx, pidFilePath, resourcesOptions, tcCommandFile, numaArgs);
|
||||
|
||||
if (type.equals(ApplicationConstants.ContainerLaunchType.RELAUNCH)) {
|
||||
linuxContainerRuntime.relaunchContainer(runtimeContext);
|
||||
} else {
|
||||
linuxContainerRuntime.launchContainer(runtimeContext);
|
||||
}
|
||||
|
||||
} else {
|
||||
LOG.info(
|
||||
"Container was marked as inactive. Returning terminated error");
|
||||
return ContainerExecutor.ExitCode.TERMINATED.getExitCode();
|
||||
}
|
||||
} catch (ContainerExecutionException e) {
|
||||
return handleExitCode(e, container, containerId);
|
||||
} finally {
|
||||
resourcesHandler.postExecute(containerId);
|
||||
|
||||
try {
|
||||
if (resourceHandlerChain != null) {
|
||||
resourceHandlerChain.postComplete(containerId);
|
||||
}
|
||||
} catch (ResourceHandlerException e) {
|
||||
LOG.warn("ResourceHandlerChain.postComplete failed for " +
|
||||
"containerId: " + containerId + ". Exception: " + e);
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
private int handleExitCode(ContainerExecutionException e, Container container,
|
||||
ContainerId containerId) throws ConfigurationException {
|
||||
int exitCode = e.getExitCode();
|
||||
LOG.warn("Exit code from container " + containerId + " is : " + exitCode);
|
||||
// 143 (SIGTERM) and 137 (SIGKILL) exit codes means the container was
|
||||
@ -598,20 +636,6 @@ public int launchContainer(ContainerStartContext ctx)
|
||||
"Container killed on request. Exit code is " + exitCode));
|
||||
}
|
||||
return exitCode;
|
||||
} finally {
|
||||
resourcesHandler.postExecute(containerId);
|
||||
|
||||
try {
|
||||
if (resourceHandlerChain != null) {
|
||||
resourceHandlerChain.postComplete(containerId);
|
||||
}
|
||||
} catch (ResourceHandlerException e) {
|
||||
LOG.warn("ResourceHandlerChain.postComplete failed for " +
|
||||
"containerId: " + containerId + ". Exception: " + e);
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
private ContainerRuntimeContext buildContainerRuntimeContext(
|
||||
|
@ -487,6 +487,24 @@ protected Map<Path, List<String>> getLocalizedResources()
|
||||
@SuppressWarnings("unchecked")
|
||||
protected int launchContainer(ContainerStartContext ctx)
|
||||
throws IOException, ConfigurationException {
|
||||
int launchPrep = prepareForLaunch(ctx);
|
||||
if (launchPrep == 0) {
|
||||
return exec.launchContainer(ctx);
|
||||
}
|
||||
return launchPrep;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
protected int relaunchContainer(ContainerStartContext ctx)
|
||||
throws IOException, ConfigurationException {
|
||||
int launchPrep = prepareForLaunch(ctx);
|
||||
if (launchPrep == 0) {
|
||||
return exec.relaunchContainer(ctx);
|
||||
}
|
||||
return launchPrep;
|
||||
}
|
||||
|
||||
protected int prepareForLaunch(ContainerStartContext ctx) throws IOException {
|
||||
ContainerId containerId = container.getContainerId();
|
||||
if (container.isMarkedForKilling()) {
|
||||
LOG.info("Container " + containerId + " not launched as it has already "
|
||||
@ -508,8 +526,8 @@ protected int launchContainer(ContainerStartContext ctx)
|
||||
return ExitCode.TERMINATED.getExitCode();
|
||||
} else {
|
||||
exec.activateContainer(containerId, pidFilePath);
|
||||
return exec.launchContainer(ctx);
|
||||
}
|
||||
return ExitCode.SUCCESS.getExitCode();
|
||||
}
|
||||
|
||||
protected void setContainerCompletedStatus(int exitCode) {
|
||||
|
@ -108,7 +108,7 @@ public Integer call() {
|
||||
+ dirsHandler.getDisksHealthReport(false));
|
||||
}
|
||||
|
||||
ret = launchContainer(new ContainerStartContext.Builder()
|
||||
ret = relaunchContainer(new ContainerStartContext.Builder()
|
||||
.setContainer(container)
|
||||
.setLocalizedResources(localResources)
|
||||
.setNmPrivateContainerScriptPath(nmPrivateContainerScriptPath)
|
||||
|
@ -125,6 +125,12 @@ public void launchContainer(ContainerRuntimeContext ctx)
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void relaunchContainer(ContainerRuntimeContext ctx)
|
||||
throws ContainerExecutionException {
|
||||
launchContainer(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void signalContainer(ContainerRuntimeContext ctx)
|
||||
throws ContainerExecutionException {
|
||||
|
@ -141,6 +141,15 @@ public void launchContainer(ContainerRuntimeContext ctx)
|
||||
runtime.launchContainer(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void relaunchContainer(ContainerRuntimeContext ctx)
|
||||
throws ContainerExecutionException {
|
||||
Container container = ctx.getContainer();
|
||||
LinuxContainerRuntime runtime = pickContainerRuntime(container);
|
||||
|
||||
runtime.relaunchContainer(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void signalContainer(ContainerRuntimeContext ctx)
|
||||
throws ContainerExecutionException {
|
||||
|
@ -23,9 +23,11 @@
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerCommand;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerCommandExecutor;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerKillCommand;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerRmCommand;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerStartCommand;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerVolumeCommand;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.DockerCommandPlugin;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePlugin;
|
||||
@ -910,6 +912,40 @@ public void launchContainer(ContainerRuntimeContext ctx)
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void relaunchContainer(ContainerRuntimeContext ctx)
|
||||
throws ContainerExecutionException {
|
||||
Container container = ctx.getContainer();
|
||||
String containerIdStr = container.getContainerId().toString();
|
||||
// Check to see if the container already exists for relaunch
|
||||
DockerCommandExecutor.DockerContainerStatus containerStatus =
|
||||
DockerCommandExecutor.getContainerStatus(containerIdStr, conf,
|
||||
privilegedOperationExecutor);
|
||||
if (containerStatus != null &&
|
||||
DockerCommandExecutor.isStartable(containerStatus)) {
|
||||
DockerStartCommand startCommand = new DockerStartCommand(containerIdStr);
|
||||
String commandFile = dockerClient.writeCommandToTempFile(startCommand,
|
||||
containerIdStr);
|
||||
PrivilegedOperation launchOp = buildLaunchOp(ctx, commandFile,
|
||||
startCommand);
|
||||
|
||||
try {
|
||||
privilegedOperationExecutor.executePrivilegedOperation(null,
|
||||
launchOp, null, null, false, false);
|
||||
} catch (PrivilegedOperationException e) {
|
||||
LOG.warn("Relaunch container failed. Exception: ", e);
|
||||
LOG.info("Docker command used: " + startCommand);
|
||||
|
||||
throw new ContainerExecutionException("Launch container failed", e
|
||||
.getExitCode(), e.getOutput(), e.getErrorOutput());
|
||||
}
|
||||
} else {
|
||||
throw new ContainerExecutionException("Container is not in a startable "
|
||||
+ "state, unable to relaunch: " + containerIdStr);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Signal the docker container.
|
||||
*
|
||||
@ -1057,7 +1093,7 @@ public String[] getIpAndHost(Container container) {
|
||||
|
||||
|
||||
private PrivilegedOperation buildLaunchOp(ContainerRuntimeContext ctx,
|
||||
String commandFile, DockerRunCommand runCommand) {
|
||||
String commandFile, DockerCommand command) {
|
||||
|
||||
String runAsUser = ctx.getExecutionAttribute(RUN_AS_USER);
|
||||
String containerIdStr = ctx.getContainer().getContainerId().toString();
|
||||
@ -1096,7 +1132,7 @@ private PrivilegedOperation buildLaunchOp(ContainerRuntimeContext ctx,
|
||||
launchOp.appendArgs(tcCommandFile);
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Launching container with cmd: " + runCommand);
|
||||
LOG.debug("Launching container with cmd: " + command);
|
||||
}
|
||||
|
||||
return launchOp;
|
||||
|
@ -268,6 +268,16 @@ public void launchContainer(ContainerRuntimeContext ctx)
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void relaunchContainer(ContainerRuntimeContext ctx)
|
||||
throws ContainerExecutionException {
|
||||
try {
|
||||
super.relaunchContainer(ctx);
|
||||
} finally {
|
||||
deletePolicyFiles(ctx);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Determine if JVMSandboxLinuxContainerRuntime should be used. This is
|
||||
* decided based on the value of
|
||||
|
@ -225,4 +225,18 @@ public static boolean isRemovable(DockerContainerStatus containerStatus) {
|
||||
&& !containerStatus.equals(DockerContainerStatus.REMOVING)
|
||||
&& !containerStatus.equals(DockerContainerStatus.RUNNING);
|
||||
}
|
||||
|
||||
/**
|
||||
* Is the container in a startable state?
|
||||
*
|
||||
* @param containerStatus the container's {@link DockerContainerStatus}.
|
||||
* @return is the container in a startable state.
|
||||
*/
|
||||
public static boolean isStartable(DockerContainerStatus containerStatus) {
|
||||
if (containerStatus.equals(DockerContainerStatus.EXITED)
|
||||
|| containerStatus.equals(DockerContainerStatus.STOPPED)) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
@ -0,0 +1,29 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker;
|
||||
|
||||
/**
|
||||
* Encapsulates the docker start command and its command line arguments.
|
||||
*/
|
||||
public class DockerStartCommand extends DockerCommand {
|
||||
private static final String START_COMMAND = "start";
|
||||
|
||||
public DockerStartCommand(String containerName) {
|
||||
super(START_COMMAND);
|
||||
super.addCommandArguments("name", containerName);
|
||||
}
|
||||
}
|
@ -54,6 +54,16 @@ void prepareContainer(ContainerRuntimeContext ctx)
|
||||
void launchContainer(ContainerRuntimeContext ctx)
|
||||
throws ContainerExecutionException;
|
||||
|
||||
/**
|
||||
* Relaunch a container.
|
||||
*
|
||||
* @param ctx the {@link ContainerRuntimeContext}
|
||||
* @throws ContainerExecutionException if an error occurs while relaunching
|
||||
* the container
|
||||
*/
|
||||
void relaunchContainer(ContainerRuntimeContext ctx)
|
||||
throws ContainerExecutionException;
|
||||
|
||||
/**
|
||||
* Signal a container. Signals may be a request to terminate, a status check,
|
||||
* etc.
|
||||
|
@ -1542,7 +1542,7 @@ int launch_docker_container_as_user(const char * user, const char *app_id,
|
||||
fprintf(LOGFILE, "Launching docker container...\n");
|
||||
fprintf(LOGFILE, "Docker run command: %s\n", docker_command_with_binary);
|
||||
FILE* start_docker = popen(docker_command_with_binary, "r");
|
||||
if (pclose (start_docker) != 0)
|
||||
if (WEXITSTATUS(pclose (start_docker)) != 0)
|
||||
{
|
||||
fprintf (ERRORFILE,
|
||||
"Could not invoke docker %s.\n", docker_command_with_binary);
|
||||
|
@ -357,6 +357,8 @@ int get_docker_command(const char *command_file, const struct configuration *con
|
||||
return get_docker_stop_command(command_file, conf, out, outlen);
|
||||
} else if (strcmp(DOCKER_VOLUME_COMMAND, command) == 0) {
|
||||
return get_docker_volume_command(command_file, conf, out, outlen);
|
||||
} else if (strcmp(DOCKER_START_COMMAND, command) == 0) {
|
||||
return get_docker_start_command(command_file, conf, out, outlen);
|
||||
} else {
|
||||
return UNKNOWN_DOCKER_COMMAND;
|
||||
}
|
||||
@ -813,6 +815,44 @@ int get_docker_kill_command(const char *command_file, const struct configuration
|
||||
return BUFFER_TOO_SMALL;
|
||||
}
|
||||
|
||||
int get_docker_start_command(const char *command_file, const struct configuration *conf, char *out, const size_t outlen) {
|
||||
int ret = 0;
|
||||
char *container_name = NULL;
|
||||
struct configuration command_config = {0, NULL};
|
||||
ret = read_and_verify_command_file(command_file, DOCKER_START_COMMAND, &command_config);
|
||||
if (ret != 0) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
container_name = get_configuration_value("name", DOCKER_COMMAND_FILE_SECTION, &command_config);
|
||||
if (container_name == NULL || validate_container_name(container_name) != 0) {
|
||||
return INVALID_DOCKER_CONTAINER_NAME;
|
||||
}
|
||||
|
||||
memset(out, 0, outlen);
|
||||
|
||||
ret = add_docker_config_param(&command_config, out, outlen);
|
||||
if (ret != 0) {
|
||||
return BUFFER_TOO_SMALL;
|
||||
}
|
||||
|
||||
ret = add_to_buffer(out, outlen, DOCKER_START_COMMAND);
|
||||
if (ret != 0) {
|
||||
goto free_and_exit;
|
||||
}
|
||||
ret = add_to_buffer(out, outlen, " ");
|
||||
if (ret != 0) {
|
||||
goto free_and_exit;
|
||||
}
|
||||
ret = add_to_buffer(out, outlen, container_name);
|
||||
if (ret != 0) {
|
||||
goto free_and_exit;
|
||||
}
|
||||
free_and_exit:
|
||||
free(container_name);
|
||||
return ret;
|
||||
}
|
||||
|
||||
static int detach_container(const struct configuration *command_config, char *out, const size_t outlen) {
|
||||
return add_param_to_command(command_config, "detach", "-d ", 0, out, outlen);
|
||||
}
|
||||
|
@ -32,6 +32,7 @@
|
||||
#define DOCKER_STOP_COMMAND "stop"
|
||||
#define DOCKER_KILL_COMMAND "kill"
|
||||
#define DOCKER_VOLUME_COMMAND "volume"
|
||||
#define DOCKER_START_COMMAND "start"
|
||||
|
||||
|
||||
enum docker_error_codes {
|
||||
@ -161,6 +162,16 @@ int get_docker_kill_command(const char* command_file, const struct configuration
|
||||
int get_docker_volume_command(const char *command_file, const struct configuration *conf, char *out,
|
||||
const size_t outlen);
|
||||
|
||||
/**
|
||||
* Get the Docker start command line string. The function will verify that the params file is meant for the start command.
|
||||
* @param command_file File containing the params for the Docker start command
|
||||
* @param conf Configuration struct containing the container-executor.cfg details
|
||||
* @param out Buffer to fill with the start command
|
||||
* @param outlen Size of the output buffer
|
||||
* @return Return code with 0 indicating success and non-zero codes indicating error
|
||||
*/
|
||||
int get_docker_start_command(const char* command_file, const struct configuration* conf, char *out, const size_t outlen);
|
||||
|
||||
/**
|
||||
* Give an error message for the supplied error code
|
||||
* @param error_code the error code
|
||||
|
@ -338,6 +338,26 @@ namespace ContainerExecutor {
|
||||
run_docker_command_test(file_cmd_vec, bad_file_cmd_vec, get_docker_kill_command);
|
||||
}
|
||||
|
||||
TEST_F(TestDockerUtil, test_docker_start) {
|
||||
std::vector<std::pair<std::string, std::string> > file_cmd_vec;
|
||||
file_cmd_vec.push_back(std::make_pair<std::string, std::string>(
|
||||
"[docker-command-execution]\n docker-command=start\n name=container_e1_12312_11111_02_000001",
|
||||
"start container_e1_12312_11111_02_000001"));
|
||||
|
||||
std::vector<std::pair<std::string, int> > bad_file_cmd_vec;
|
||||
bad_file_cmd_vec.push_back(std::make_pair<std::string, int>(
|
||||
"[docker-command-execution]\n docker-command=run\n name=container_e1_12312_11111_02_000001",
|
||||
static_cast<int>(INCORRECT_COMMAND)));
|
||||
bad_file_cmd_vec.push_back(std::make_pair<std::string, int>(
|
||||
"docker-command=start\n name=ctr-id", static_cast<int>(INCORRECT_COMMAND)));
|
||||
bad_file_cmd_vec.push_back(std::make_pair<std::string, int>(
|
||||
"[docker-command-execution]\n docker-command=start\n name=", static_cast<int>(INVALID_DOCKER_CONTAINER_NAME)));
|
||||
bad_file_cmd_vec.push_back(std::make_pair<std::string, int>(
|
||||
"[docker-command-execution]\n docker-command=start", static_cast<int>(INVALID_DOCKER_CONTAINER_NAME)));
|
||||
|
||||
run_docker_command_test(file_cmd_vec, bad_file_cmd_vec, get_docker_start_command);
|
||||
}
|
||||
|
||||
TEST_F(TestDockerUtil, test_detach_container) {
|
||||
std::vector<std::pair<std::string, std::string> > file_cmd_vec;
|
||||
file_cmd_vec.push_back(std::make_pair<std::string, std::string>(
|
||||
|
@ -675,6 +675,18 @@ public void testReapContainer() throws Exception {
|
||||
verify(lce, times(1)).reapContainer(ctx);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRelaunchContainer() throws Exception {
|
||||
Container container = mock(Container.class);
|
||||
LinuxContainerExecutor lce = mock(LinuxContainerExecutor.class);
|
||||
ContainerStartContext.Builder builder =
|
||||
new ContainerStartContext.Builder();
|
||||
builder.setContainer(container).setUser("foo");
|
||||
ContainerStartContext ctx = builder.build();
|
||||
lce.relaunchContainer(ctx);
|
||||
verify(lce, times(1)).relaunchContainer(ctx);
|
||||
}
|
||||
|
||||
private static class TestResourceHandler implements LCEResourcesHandler {
|
||||
static Set<ContainerId> postExecContainers = new HashSet<ContainerId>();
|
||||
|
||||
|
@ -78,7 +78,7 @@ public void testRelaunchContext() throws Exception {
|
||||
assertEquals("relaunch failed", 0, result);
|
||||
ArgumentCaptor<ContainerStartContext> captor =
|
||||
ArgumentCaptor.forClass(ContainerStartContext.class);
|
||||
verify(mockExecutor).launchContainer(captor.capture());
|
||||
verify(mockExecutor).relaunchContainer(captor.capture());
|
||||
ContainerStartContext csc = captor.getValue();
|
||||
assertNotNull("app ID null", csc.getAppId());
|
||||
assertNotNull("container null", csc.getContainer());
|
||||
|
@ -42,10 +42,12 @@
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationException;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerClient;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerCommandExecutor;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerKillCommand;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerRmCommand;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerRunCommand;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerStartCommand;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerStopCommand;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerVolumeCommand;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.DockerCommandPlugin;
|
||||
@ -99,6 +101,7 @@
|
||||
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.RESOURCES_OPTIONS;
|
||||
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.RUN_AS_USER;
|
||||
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.SIGNAL;
|
||||
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.TC_COMMAND_FILE;
|
||||
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.USER;
|
||||
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.USER_FILECACHE_DIRS;
|
||||
import static org.mockito.Mockito.any;
|
||||
@ -1864,6 +1867,32 @@ public void testLaunchContainerWithDockerTokens()
|
||||
dockerCommands.get(counter++));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDockerContainerRelaunch()
|
||||
throws ContainerExecutionException, PrivilegedOperationException,
|
||||
IOException {
|
||||
DockerLinuxContainerRuntime runtime = new MockRuntime(mockExecutor,
|
||||
DockerCommandExecutor.DockerContainerStatus.EXITED, false);
|
||||
runtime.initialize(conf, null);
|
||||
runtime.relaunchContainer(builder.build());
|
||||
|
||||
PrivilegedOperation op = capturePrivilegedOperation();
|
||||
List<String> args = op.getArguments();
|
||||
String dockerCommandFile = args.get(11);
|
||||
|
||||
List<String> dockerCommands = Files.readAllLines(
|
||||
Paths.get(dockerCommandFile), Charset.forName("UTF-8"));
|
||||
|
||||
int expected = 3;
|
||||
int counter = 0;
|
||||
Assert.assertEquals(expected, dockerCommands.size());
|
||||
Assert.assertEquals("[docker-command-execution]",
|
||||
dockerCommands.get(counter++));
|
||||
Assert.assertEquals(" docker-command=start",
|
||||
dockerCommands.get(counter++));
|
||||
Assert.assertEquals(" name=container_id", dockerCommands.get(counter));
|
||||
}
|
||||
|
||||
class MockRuntime extends DockerLinuxContainerRuntime {
|
||||
|
||||
private PrivilegedOperationExecutor privilegedOperationExecutor;
|
||||
@ -1929,5 +1958,66 @@ public void reapContainer(ContainerRuntimeContext ctx)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void relaunchContainer(ContainerRuntimeContext ctx)
|
||||
throws ContainerExecutionException {
|
||||
if (DockerCommandExecutor.isRemovable(containerStatus)) {
|
||||
String relaunchContainerIdStr =
|
||||
ctx.getContainer().getContainerId().toString();
|
||||
DockerStartCommand startCommand =
|
||||
new DockerStartCommand(containerIdStr);
|
||||
DockerClient dockerClient = new DockerClient(conf);
|
||||
String commandFile = dockerClient.writeCommandToTempFile(startCommand,
|
||||
relaunchContainerIdStr);
|
||||
String relaunchRunAsUser = ctx.getExecutionAttribute(RUN_AS_USER);
|
||||
Path relaunchNmPrivateContainerScriptPath = ctx.getExecutionAttribute(
|
||||
NM_PRIVATE_CONTAINER_SCRIPT_PATH);
|
||||
Path relaunchContainerWorkDir =
|
||||
ctx.getExecutionAttribute(CONTAINER_WORK_DIR);
|
||||
//we can't do better here thanks to type-erasure
|
||||
@SuppressWarnings("unchecked")
|
||||
List<String> relaunchLocalDirs = ctx.getExecutionAttribute(LOCAL_DIRS);
|
||||
@SuppressWarnings("unchecked")
|
||||
List<String> relaunchLogDirs = ctx.getExecutionAttribute(LOG_DIRS);
|
||||
String resourcesOpts = ctx.getExecutionAttribute(RESOURCES_OPTIONS);
|
||||
|
||||
PrivilegedOperation launchOp = new PrivilegedOperation(
|
||||
PrivilegedOperation.OperationType.LAUNCH_DOCKER_CONTAINER);
|
||||
|
||||
launchOp.appendArgs(relaunchRunAsUser, ctx.getExecutionAttribute(USER),
|
||||
Integer.toString(PrivilegedOperation
|
||||
.RunAsUserCommand.LAUNCH_DOCKER_CONTAINER.getValue()),
|
||||
ctx.getExecutionAttribute(APPID),
|
||||
relaunchContainerIdStr,
|
||||
relaunchContainerWorkDir.toString(),
|
||||
relaunchNmPrivateContainerScriptPath.toUri().getPath(),
|
||||
ctx.getExecutionAttribute(NM_PRIVATE_TOKENS_PATH).toUri().getPath(),
|
||||
ctx.getExecutionAttribute(PID_FILE_PATH).toString(),
|
||||
StringUtils.join(PrivilegedOperation.LINUX_FILE_PATH_SEPARATOR,
|
||||
relaunchLocalDirs),
|
||||
StringUtils.join(PrivilegedOperation.LINUX_FILE_PATH_SEPARATOR,
|
||||
relaunchLogDirs),
|
||||
commandFile,
|
||||
resourcesOpts);
|
||||
|
||||
String tcCommandFile = ctx.getExecutionAttribute(TC_COMMAND_FILE);
|
||||
|
||||
if (tcCommandFile != null) {
|
||||
launchOp.appendArgs(tcCommandFile);
|
||||
}
|
||||
|
||||
try {
|
||||
privilegedOperationExecutor.executePrivilegedOperation(null,
|
||||
launchOp, null, null, false, false);
|
||||
} catch (PrivilegedOperationException e) {
|
||||
LOG.warn("Relaunch container failed. Exception: ", e);
|
||||
LOG.info("Docker command used: " + startCommand);
|
||||
|
||||
throw new ContainerExecutionException("Launch container failed", e
|
||||
.getExitCode(), e.getOutput(), e.getErrorOutput());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,53 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker;
|
||||
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
/**
|
||||
* Tests the docker start command and any command line arguments.
|
||||
*/
|
||||
public class TestDockerStartCommand {
|
||||
|
||||
private DockerStartCommand dockerStartCommand;
|
||||
|
||||
private static final String CONTAINER_NAME = "foo";
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
dockerStartCommand = new DockerStartCommand(CONTAINER_NAME);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetCommandOption() {
|
||||
assertEquals("start", dockerStartCommand.getCommandOption());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetCommandWithArguments() {
|
||||
assertEquals("start", StringUtils.join(",",
|
||||
dockerStartCommand.getDockerCommandWithArguments()
|
||||
.get("docker-command")));
|
||||
assertEquals("foo", StringUtils.join(",",
|
||||
dockerStartCommand.getDockerCommandWithArguments().get("name")));
|
||||
assertEquals(2, dockerStartCommand.getDockerCommandWithArguments().size());
|
||||
}
|
||||
}
|
@ -86,6 +86,11 @@ public int launchContainer(ContainerStartContext ctx) throws
|
||||
return 0;
|
||||
}
|
||||
@Override
|
||||
public int relaunchContainer(ContainerStartContext ctx) throws
|
||||
IOException, ConfigurationException {
|
||||
return 0;
|
||||
}
|
||||
@Override
|
||||
public boolean signalContainer(ContainerSignalContext ctx)
|
||||
throws IOException {
|
||||
return true;
|
||||
|
Loading…
Reference in New Issue
Block a user