YARN-8776. Implement Container Exec feature in LinuxContainerExecutor. Contributed by Eric Yang

This commit is contained in:
Billie Rinaldi 2018-11-12 10:41:45 -08:00
parent 18fe65d756
commit 1f9c4f32e8
17 changed files with 275 additions and 43 deletions

View File

@ -38,7 +38,6 @@
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerExecContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -59,6 +58,7 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerExecutionException;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerPrepareContext;
import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerExecContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerLivenessContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReacquisitionContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReapContext;

View File

@ -62,15 +62,10 @@
import org.apache.hadoop.yarn.server.nodemanager.util.CgroupsLCEResourcesHandler;
import org.apache.hadoop.yarn.server.nodemanager.util.DefaultLCEResourcesHandler;
import org.apache.hadoop.yarn.server.nodemanager.util.LCEResourcesHandler;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@ -801,20 +796,22 @@ public boolean reapContainer(ContainerReapContext ctx) throws IOException {
@Override
public IOStreamPair execContainer(ContainerExecContext ctx)
throws ContainerExecutionException {
// TODO: calls PrivilegedOperationExecutor and return IOStream pairs
InputStream in = null;
OutputStream out = null;
int byteSize = 4000;
IOStreamPair res;
try {
in = new ByteArrayInputStream(
"This is input command".getBytes(Charset.forName("UTF-8")));
out = new ByteArrayOutputStream(byteSize);
} catch (IllegalArgumentException e) {
LOG.error("Failed to execute command to container runtime", e);
res = linuxContainerRuntime.execContainer(ctx);
} catch (ContainerExecutionException e) {
int retCode = e.getExitCode();
if (retCode != 0) {
return new IOStreamPair(null, null);
}
LOG.warn("Error in executing container interactive shell"
+ ctx + " exit = " + retCode, e);
logOutput(e.getOutput());
throw new ContainerExecutionException(
"Error in executing container interactive shel" + ctx.getContainer()
.getContainerId().toString() + " exit = " + retCode);
}
IOStreamPair pair = new IOStreamPair(in, out);
System.out.println(pair);
return new IOStreamPair(in, out);
return res;
}
@Override

View File

@ -44,6 +44,7 @@ public enum OperationType {
INITIALIZE_CONTAINER(""), //no CLI switch supported yet
LAUNCH_CONTAINER(""), //no CLI switch supported yet
SIGNAL_CONTAINER(""), //no CLI switch supported yet
EXEC_CONTAINER("--run-docker"), //no CLI switch supported yet
DELETE_AS_USER(""), //no CLI switch supported yet
LAUNCH_DOCKER_CONTAINER(""), //no CLI switch supported yet
TC_MODIFY_STATE("--tc-modify-state"),

View File

@ -20,8 +20,8 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@ -34,6 +34,8 @@
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@ -207,6 +209,59 @@ public String executePrivilegedOperation(PrivilegedOperation operation,
false);
}
/**
*
* @param prefixCommands
* @param operation
* @return stdin and stdout of container exec
* @throws PrivilegedOperationException
*/
public IOStreamPair executePrivilegedInteractiveOperation(
List<String> prefixCommands, PrivilegedOperation operation)
throws PrivilegedOperationException, InterruptedException {
String[] fullCommandArray = getPrivilegedOperationExecutionCommand(
prefixCommands, operation);
ProcessBuilder pb = new ProcessBuilder(fullCommandArray);
OutputStream stdin;
InputStream stdout;
try {
pb.redirectErrorStream(true);
Process p = pb.start();
stdin = p.getOutputStream();
stdout = p.getInputStream();
if (LOG.isDebugEnabled()) {
LOG.debug("command array:");
LOG.debug(Arrays.toString(fullCommandArray));
}
} catch (ExitCodeException e) {
if (operation.isFailureLoggingEnabled()) {
StringBuilder logBuilder = new StringBuilder(
"Interactive Shell execution returned exit code: ")
.append(e.getExitCode())
.append(". Privileged Interactive Operation Stderr: ")
.append(System.lineSeparator())
.append(e.getMessage())
.append(System.lineSeparator());
logBuilder.append("Full command array for failed execution: ")
.append(System.lineSeparator());
logBuilder.append(Arrays.toString(fullCommandArray));
LOG.warn(logBuilder.toString());
}
//stderr from shell executor seems to be stuffed into the exception
//'message' - so, we have to extract it and set it as the error out
throw new PrivilegedOperationException(e, e.getExitCode(),
pb.redirectError().toString(), e.getMessage());
} catch (IOException e) {
LOG.warn("IOException executing command: ", e);
throw new PrivilegedOperationException(e);
}
return new IOStreamPair(stdout, stdin);
}
//Utility functions for squashing together operations in supported ways
//At some point, we need to create a generalized mechanism that uses a set
//of squashing 'rules' to squash an set of PrivilegedOperations of varying

View File

@ -24,6 +24,7 @@
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
@ -36,6 +37,7 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerRuntime;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerRuntimeConstants;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerRuntimeContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerExecContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -194,4 +196,10 @@ public void reapContainer(ContainerRuntimeContext ctx)
public String[] getIpAndHost(Container container) {
return ContainerExecutor.getLocalIpAndHost(container);
}
@Override
public IOStreamPair execContainer(ContainerExecContext containerExecContext)
throws ContainerExecutionException {
throw new ContainerExecutionException("Unsupported operation.");
}
}

View File

@ -24,6 +24,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.Context;
@ -32,6 +33,7 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerExecutionException;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerRuntime;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerRuntimeContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerExecContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -229,4 +231,12 @@ boolean isRuntimeAllowed(String runtimeType) {
return runtimeType != null && allowedRuntimes.contains(
runtimeType.toUpperCase());
}
@Override
public IOStreamPair execContainer(ContainerExecContext ctx)
throws ContainerExecutionException {
Container container = ctx.getContainer();
LinuxContainerRuntime runtime = pickContainerRuntime(container);
return runtime.execContainer(ctx);
}
}

View File

@ -21,12 +21,14 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerCommand;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerCommandExecutor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerExecCommand;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerKillCommand;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerRmCommand;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerStartCommand;
@ -62,6 +64,7 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerRuntime;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerRuntimeConstants;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerRuntimeContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerExecContext;
import java.io.File;
import java.io.IOException;
@ -1144,6 +1147,48 @@ public void reapContainer(ContainerRuntimeContext ctx)
}
}
/**
* Perform docker exec command into running container
*
* @param ctx container exec context
* @return IOStreams of docker exec
* @throws ContainerExecutionException
*/
@Override
public IOStreamPair execContainer(ContainerExecContext ctx)
throws ContainerExecutionException {
String containerId = ctx.getContainer().getContainerId().toString();
DockerExecCommand dockerExecCommand = new DockerExecCommand(containerId);
dockerExecCommand.setInteractive();
dockerExecCommand.setTTY();
List<String> command = new ArrayList<String>();
command.add("bash");
dockerExecCommand.setOverrideCommandWithArgs(command);
String commandFile = dockerClient.writeCommandToTempFile(dockerExecCommand,
ContainerId.fromString(containerId), nmContext);
PrivilegedOperation privOp = new PrivilegedOperation(
PrivilegedOperation.OperationType.EXEC_CONTAINER);
privOp.appendArgs(commandFile);
privOp.disableFailureLogging();
IOStreamPair output;
try {
output =
privilegedOperationExecutor.executePrivilegedInteractiveOperation(
null, privOp);
LOG.info("ContainerId=" + containerId + ", docker exec output for "
+ dockerExecCommand + ": " + output);
} catch (PrivilegedOperationException e) {
throw new ContainerExecutionException(
"Execute container interactive shell failed", e.getExitCode(),
e.getOutput(), e.getErrorOutput());
} catch (InterruptedException ie) {
LOG.warn("InterruptedException executing command: ", ie);
throw new ContainerExecutionException(ie.getMessage());
}
return output;
}
// ipAndHost[0] contains comma separated list of IPs
// ipAndHost[1] contains the hostname.

View File

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
/**
* Encapsulates the docker exec command and its command
* line arguments.
*/
public class DockerExecCommand extends DockerCommand {
private static final String EXEC_COMMAND = "exec";
private final Map<String, String> userEnv;
public DockerExecCommand(String containerId) {
super(EXEC_COMMAND);
super.addCommandArguments("name", containerId);
this.userEnv = new LinkedHashMap<String, String>();
}
public DockerExecCommand setInteractive() {
super.addCommandArguments("interactive", "true");
return this;
}
public DockerExecCommand setTTY() {
super.addCommandArguments("tty", "true");
return this;
}
public DockerExecCommand setOverrideCommandWithArgs(
List<String> overrideCommandWithArgs) {
for(String override: overrideCommandWithArgs) {
super.addCommandArguments("launch-command", override);
}
return this;
}
@Override
public Map<String, List<String>> getDockerCommandWithArguments() {
return super.getDockerCommandWithArguments();
}
}

View File

@ -22,7 +22,9 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerExecContext;
/**
* An abstraction for various container runtime implementations. Examples
@ -86,7 +88,17 @@ void reapContainer(ContainerRuntimeContext ctx)
throws ContainerExecutionException;
/**
* Return the host and ip of the container
* Run a program in container.
*
* @param ctx the {@link ContainerExecContext}
* @return stdin and stdout of container exec
* @throws ContainerExecutionException
*/
IOStreamPair execContainer(ContainerExecContext ctx)
throws ContainerExecutionException;
/**
* Return the host and ip of the container.
*
* @param container the {@link Container}
* @throws ContainerExecutionException if an error occurs while getting the ip

View File

@ -22,6 +22,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
/**
* Encapsulates information required for starting/launching containers.
@ -32,7 +33,7 @@
public final class ContainerExecContext {
private final String user;
private final String appId;
private final String container;
private final Container container;
/**
* Builder for ContainerExecContext.
@ -40,13 +41,13 @@ public final class ContainerExecContext {
public static final class Builder {
private String user;
private String appId;
private String container;
private Container container;
public Builder() {
}
public Builder setContainer(String container) {
this.container = container;
public Builder setContainer(Container c) {
this.container = c;
return this;
}
@ -79,7 +80,7 @@ public String getAppId() {
return this.appId;
}
public String getContainerId() {
public Container getContainer() {
return this.container;
}
}

View File

@ -24,8 +24,10 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerExecContext;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
@ -47,31 +49,43 @@
public class ContainerShellWebSocket {
private static final Logger LOG =
LoggerFactory.getLogger(ContainerShellWebSocket.class);
private static Context nmContext;
private final ContainerExecutor exec = new LinuxContainerExecutor();
private final ContainerExecutor exec;
private IOStreamPair pair;
public ContainerShellWebSocket() {
exec = nmContext.getContainerExecutor();
}
public static void init(Context nm) {
ContainerShellWebSocket.nmContext = nm;
}
@OnWebSocketMessage
public void onText(Session session, String message) throws IOException {
LOG.info("Message received: " + message);
try {
byte[] buffer = new byte[4000];
if (session.isOpen()) {
int ni = message.length();
if (ni > 0) {
pair.out.write(message.getBytes(Charset.forName("UTF-8")));
pair.out.flush();
if (!message.equals("1{}")) {
// Send keystroke to process input
byte[] payload;
payload = message.getBytes(Charset.forName("UTF-8"));
if (payload != null) {
pair.out.write(payload);
pair.out.flush();
}
}
// Render process output
int no = pair.in.available();
pair.in.read(buffer, 0, Math.min(no, buffer.length));
String formatted = new String(buffer, Charset.forName("UTF-8"))
.replaceAll("\n", "\r\n");
session.getRemote().sendString(formatted);
}
} catch (Exception e) {
LOG.error("Failed to parse WebSocket message from Client", e);
} catch (IOException e) {
onClose(session, 1001, "Shutdown");
}
}
@ -84,12 +98,14 @@ public void onConnect(Session session) {
URI containerURI = session.getUpgradeRequest().getRequestURI();
String[] containerPath = containerURI.getPath().split("/");
String cId = containerPath[2];
Container container = nmContext.getContainers().get(ContainerId
.fromString(cId));
LOG.info(
"Making interactive connection to running docker container with ID: "
+ cId);
ContainerExecContext execContext = new ContainerExecContext
.Builder()
.setContainer(cId)
.setContainer(container)
.build();
pair = exec.execContainer(execContext);
} catch (Exception e) {
@ -100,7 +116,14 @@ public void onConnect(Session session) {
@OnWebSocketClose
public void onClose(Session session, int status, String reason) {
LOG.info(session.getRemoteAddress().getHostString() + " closed!");
try {
LOG.info(session.getRemoteAddress().getHostString() + " closed!");
pair.in.close();
pair.out.close();
} catch (IOException e) {
} finally {
session.close();
}
}
}
}

View File

@ -99,6 +99,7 @@ protected void serviceStart() throws Exception {
targets.add(AuthenticationFilterInitializer.class.getName());
conf.set(filterInitializerConfKey, StringUtils.join(",", targets));
}
ContainerShellWebSocket.init(nmContext);
LOG.info("Instantiating NMWebApp at " + bindAddress);
try {
this.webApp =

View File

@ -1485,8 +1485,13 @@ int run_docker_with_pty(const char *command_file) {
}
} else {
if (rc < 0) {
fprintf(stderr, "Error %d on read master PTY\n", errno);
exit(DOCKER_EXEC_FAILED);
if (errno==5) {
fprintf(stderr, "Remote Connection Closed.\n");
exit(0);
} else {
fprintf(stderr, "Error %d on read master PTY\n", errno);
exit(DOCKER_EXEC_FAILED);
}
}
}
}

View File

@ -181,9 +181,10 @@ public void testReapContainer() throws Exception {
@Test
public void testExecContainer() throws Exception {
Container container = mock(Container.class);
try {
ContainerExecContext.Builder builder = new ContainerExecContext.Builder();
builder.setUser("foo").setAppId("app1").setContainer("container1");
builder.setUser("foo").setAppId("app1").setContainer(container);
ContainerExecContext ctx = builder.build();
containerExecutor.execContainer(ctx);
} catch (Exception e) {

View File

@ -44,7 +44,6 @@
import java.io.PrintWriter;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
@ -697,10 +696,11 @@ public void testRelaunchContainer() throws Exception {
@Test
public void testExecContainer() throws Exception {
Container container = mock(Container.class);
LinuxContainerExecutor lce = mock(LinuxContainerExecutor.class);
ContainerExecContext.Builder builder =
new ContainerExecContext.Builder();
builder.setUser("foo").setAppId("app1").setContainer("container1");
builder.setUser("foo").setAppId("app1").setContainer(container);
ContainerExecContext ctx = builder.build();
lce.execContainer(ctx);
verify(lce, times(1)).execContainer(ctx);

View File

@ -17,10 +17,13 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerExecutionException;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerRuntimeConstants;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerRuntimeContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerExecContext;
import java.util.Map;
@ -58,4 +61,10 @@ public void reapContainer(ContainerRuntimeContext ctx) {}
public String[] getIpAndHost(Container container) {
return new String[0];
}
@Override
public IOStreamPair execContainer(ContainerExecContext ctx)
throws ContainerExecutionException {
return null;
}
}

View File

@ -103,11 +103,13 @@ public boolean reapContainer(ContainerReapContext ctx)
throws IOException {
return true;
}
@Override
public IOStreamPair execContainer(ContainerExecContext ctx)
throws ContainerExecutionException {
return new IOStreamPair(null, null);
}
@Override
public void deleteAsUser(DeletionAsUserContext ctx)
throws IOException, InterruptedException {