YARN-8763. Added node manager websocket API for accessing containers.
Contributed by Zian Chen
This commit is contained in:
parent
745f64012a
commit
347ea38581
@ -666,6 +666,10 @@
|
||||
<exclude>junit:junit</exclude>
|
||||
<exclude>com.google.code.findbugs:jsr305</exclude>
|
||||
<exclude>log4j:log4j</exclude>
|
||||
<exclude>org.eclipse.jetty.websocket:*</exclude>
|
||||
<exclude>javax.websocket:javax.websocket-api</exclude>
|
||||
<exclude>javax.annotation:javax.annotation-api</exclude>
|
||||
<exclude>org.eclipse.jetty:jetty-jndi</exclude>
|
||||
<!-- We need a filter that matches just those things that are included in the above artiacts -->
|
||||
</excludes>
|
||||
</artifactSet>
|
||||
@ -777,6 +781,12 @@
|
||||
<exclude>ehcache-core.xsd</exclude>
|
||||
</excludes>
|
||||
</filter>
|
||||
<filter>
|
||||
<artifact>org.eclipse.jetty.websocket:javax-websocket-server-impl</artifact>
|
||||
<excludes>
|
||||
<exclude>*</exclude>
|
||||
</excludes>
|
||||
</filter>
|
||||
</filters>
|
||||
|
||||
<!-- relocate classes from mssql-jdbc -->
|
||||
|
@ -158,6 +158,10 @@
|
||||
<!-- the jdk ships part of the javax.annotation namespace, so if we want to relocate this we'll have to care it out by class :( -->
|
||||
<exclude>com.google.code.findbugs:jsr305</exclude>
|
||||
<exclude>io.dropwizard.metrics:metrics-core</exclude>
|
||||
<exclude>org.eclipse.jetty.websocket:*</exclude>
|
||||
<exclude>org.eclipse.jetty:jetty-servlet</exclude>
|
||||
<exclude>org.eclipse.jetty:jetty-security</exclude>
|
||||
<exclude>org.ow2.asm:*</exclude>
|
||||
</excludes>
|
||||
</artifactSet>
|
||||
<filters>
|
||||
|
@ -803,6 +803,21 @@
|
||||
<artifactId>jetty-util-ajax</artifactId>
|
||||
<version>${jetty.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.eclipse.jetty.websocket</groupId>
|
||||
<artifactId>javax-websocket-server-impl</artifactId>
|
||||
<version>${jetty.version}</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.ow2.asm</groupId>
|
||||
<artifactId>asm</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.eclipse.jetty</groupId>
|
||||
<artifactId>jetty-webapp</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>javax.servlet.jsp</groupId>
|
||||
<artifactId>jsp-api</artifactId>
|
||||
|
@ -83,6 +83,10 @@
|
||||
<groupId>org.eclipse.jetty</groupId>
|
||||
<artifactId>jetty-util</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.eclipse.jetty.websocket</groupId>
|
||||
<artifactId>javax-websocket-server-impl</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
|
@ -37,6 +37,8 @@
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
|
||||
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerExecContext;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@ -216,6 +218,16 @@ public abstract boolean signalContainer(ContainerSignalContext ctx)
|
||||
public abstract boolean reapContainer(ContainerReapContext ctx)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Perform interactive docker command into running container.
|
||||
*
|
||||
* @param ctx Encapsulates information necessary for exec containers.
|
||||
* @return return input/output stream if the operation succeeded.
|
||||
* @throws ContainerExecutionException if container exec fails.
|
||||
*/
|
||||
public abstract IOStreamPair execContainer(ContainerExecContext ctx)
|
||||
throws ContainerExecutionException;
|
||||
|
||||
/**
|
||||
* Delete specified directories as a given user.
|
||||
*
|
||||
|
@ -20,6 +20,10 @@
|
||||
|
||||
import static org.apache.hadoop.fs.CreateFlag.CREATE;
|
||||
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
|
||||
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerExecutionException;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerExecContext;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@ -996,6 +1000,18 @@ public void clearLogDirPermissions() {
|
||||
this.logDirPermissions = null;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param ctx Encapsulates information necessary for exec containers.
|
||||
* @return the input/output stream of interactive docker shell.
|
||||
* @throws ContainerExecutionException
|
||||
*/
|
||||
@Override
|
||||
public IOStreamPair execContainer(ContainerExecContext ctx)
|
||||
throws ContainerExecutionException {
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the list of paths of given local directories.
|
||||
*
|
||||
|
@ -20,6 +20,8 @@
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Optional;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerExecContext;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
@ -60,9 +62,14 @@
|
||||
import org.apache.hadoop.yarn.server.nodemanager.util.CgroupsLCEResourcesHandler;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.util.DefaultLCEResourcesHandler;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.util.LCEResourcesHandler;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
@ -779,6 +786,32 @@ public boolean reapContainer(ContainerReapContext ctx) throws IOException {
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Performs container exec.
|
||||
*
|
||||
* @param ctx Encapsulates information necessary for exec container.
|
||||
* @return stdin and stdout of container exec.
|
||||
* @throws ContainerExecutionException if container exec fails.
|
||||
*/
|
||||
@Override
|
||||
public IOStreamPair execContainer(ContainerExecContext ctx)
|
||||
throws ContainerExecutionException {
|
||||
// TODO: calls PrivilegedOperationExecutor and return IOStream pairs
|
||||
InputStream in = null;
|
||||
OutputStream out = null;
|
||||
int byteSize = 4000;
|
||||
try {
|
||||
in = new ByteArrayInputStream(
|
||||
"This is input command".getBytes(Charset.forName("UTF-8")));
|
||||
out = new ByteArrayOutputStream(byteSize);
|
||||
} catch (IllegalArgumentException e) {
|
||||
LOG.error("Failed to execute command to container runtime", e);
|
||||
}
|
||||
IOStreamPair pair = new IOStreamPair(in, out);
|
||||
System.out.println(pair);
|
||||
return new IOStreamPair(in, out);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteAsUser(DeletionAsUserContext ctx) {
|
||||
String user = ctx.getUser();
|
||||
|
@ -0,0 +1,85 @@
|
||||
/*
|
||||
* *
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
* /
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.executor;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
||||
/**
|
||||
* Encapsulates information required for starting/launching containers.
|
||||
*/
|
||||
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
public final class ContainerExecContext {
|
||||
private final String user;
|
||||
private final String appId;
|
||||
private final String container;
|
||||
|
||||
/**
|
||||
* Builder for ContainerExecContext.
|
||||
*/
|
||||
public static final class Builder {
|
||||
private String user;
|
||||
private String appId;
|
||||
private String container;
|
||||
|
||||
public Builder() {
|
||||
}
|
||||
|
||||
public Builder setContainer(String container) {
|
||||
this.container = container;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder setUser(String user) {
|
||||
this.user = user;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder setAppId(String appId) {
|
||||
this.appId = appId;
|
||||
return this;
|
||||
}
|
||||
|
||||
public ContainerExecContext build() {
|
||||
return new ContainerExecContext(this);
|
||||
}
|
||||
}
|
||||
|
||||
private ContainerExecContext(Builder builder) {
|
||||
this.container = builder.container;
|
||||
this.user = builder.user;
|
||||
this.appId = builder.appId;
|
||||
}
|
||||
|
||||
public String getUser() {
|
||||
return this.user;
|
||||
}
|
||||
|
||||
public String getAppId() {
|
||||
return this.appId;
|
||||
}
|
||||
|
||||
public String getContainerId() {
|
||||
return this.container;
|
||||
}
|
||||
}
|
@ -0,0 +1,106 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.webapp;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.nio.charset.Charset;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerExecContext;
|
||||
import org.eclipse.jetty.websocket.api.Session;
|
||||
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
|
||||
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
|
||||
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
|
||||
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Web socket for establishing interactive command shell connection through
|
||||
* Node Manage to container executor.
|
||||
*/
|
||||
@InterfaceAudience.LimitedPrivate({ "HDFS", "MapReduce", "YARN" })
|
||||
@InterfaceStability.Unstable
|
||||
|
||||
@WebSocket
|
||||
public class ContainerShellWebSocket {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(ContainerShellWebSocket.class);
|
||||
|
||||
private final ContainerExecutor exec = new LinuxContainerExecutor();
|
||||
|
||||
private IOStreamPair pair;
|
||||
|
||||
@OnWebSocketMessage
|
||||
public void onText(Session session, String message) throws IOException {
|
||||
LOG.info("Message received: " + message);
|
||||
|
||||
try {
|
||||
byte[] buffer = new byte[4000];
|
||||
if (session.isOpen()) {
|
||||
int ni = message.length();
|
||||
if (ni > 0) {
|
||||
pair.out.write(message.getBytes(Charset.forName("UTF-8")));
|
||||
pair.out.flush();
|
||||
}
|
||||
int no = pair.in.available();
|
||||
pair.in.read(buffer, 0, Math.min(no, buffer.length));
|
||||
String formatted = new String(buffer, Charset.forName("UTF-8"))
|
||||
.replaceAll("\n", "\r\n");
|
||||
session.getRemote().sendString(formatted);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.error("Failed to parse WebSocket message from Client", e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@OnWebSocketConnect
|
||||
public void onConnect(Session session) {
|
||||
LOG.info(session.getRemoteAddress().getHostString() + " connected!");
|
||||
|
||||
try {
|
||||
URI containerURI = session.getUpgradeRequest().getRequestURI();
|
||||
String[] containerPath = containerURI.getPath().split("/");
|
||||
String cId = containerPath[2];
|
||||
LOG.info(
|
||||
"Making interactive connection to running docker container with ID: "
|
||||
+ cId);
|
||||
ContainerExecContext execContext = new ContainerExecContext
|
||||
.Builder()
|
||||
.setContainer(cId)
|
||||
.build();
|
||||
pair = exec.execContainer(execContext);
|
||||
} catch (Exception e) {
|
||||
LOG.error("Failed to establish WebSocket connection with Client", e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@OnWebSocketClose
|
||||
public void onClose(Session session, int status, String reason) {
|
||||
LOG.info(session.getRemoteAddress().getHostString() + " closed!");
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,36 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.webapp;
|
||||
|
||||
import javax.servlet.annotation.WebServlet;
|
||||
|
||||
import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
|
||||
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
|
||||
|
||||
/**
|
||||
* Container shell web socket interface.
|
||||
*/
|
||||
@WebServlet(urlPatterns="/container/container/*")
|
||||
public class ContainerShellWebSocketServlet extends WebSocketServlet{
|
||||
|
||||
@Override
|
||||
public void configure(WebSocketServletFactory factory) {
|
||||
factory.register(ContainerShellWebSocket.class);
|
||||
}
|
||||
}
|
@ -41,7 +41,9 @@
|
||||
|
||||
import com.sun.jersey.guice.spi.container.servlet.GuiceContainer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class WebServer extends AbstractService {
|
||||
|
||||
@ -64,6 +66,7 @@ public WebServer(Context nmContext, ResourceView resourceView,
|
||||
@Override
|
||||
protected void serviceStart() throws Exception {
|
||||
Configuration conf = getConfig();
|
||||
Map<String, String> params = new HashMap<String, String>();
|
||||
String bindAddress = WebAppUtils.getWebAppBindURL(conf,
|
||||
YarnConfiguration.NM_BIND_HOST,
|
||||
WebAppUtils.getNMWebAppURLWithoutScheme(conf));
|
||||
@ -102,6 +105,8 @@ protected void serviceStart() throws Exception {
|
||||
WebApps
|
||||
.$for("node", Context.class, this.nmContext, "ws")
|
||||
.at(bindAddress)
|
||||
.withServlet("ContainerShellWebSocket", "/container/*",
|
||||
ContainerShellWebSocketServlet.class, params, false)
|
||||
.with(conf)
|
||||
.withHttpSpnegoPrincipalKey(
|
||||
YarnConfiguration.NM_WEBAPP_SPNEGO_USER_NAME_KEY)
|
||||
|
@ -34,6 +34,8 @@
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerExecutionException;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerExecContext;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReapContext;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils;
|
||||
import org.junit.Assert;
|
||||
@ -177,6 +179,19 @@ public void testReapContainer() throws Exception {
|
||||
assertTrue(containerExecutor.reapContainer(builder.build()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExecContainer() throws Exception {
|
||||
try {
|
||||
ContainerExecContext.Builder builder = new ContainerExecContext.Builder();
|
||||
builder.setUser("foo").setAppId("app1").setContainer("container1");
|
||||
ContainerExecContext ctx = builder.build();
|
||||
containerExecutor.execContainer(ctx);
|
||||
} catch (Exception e) {
|
||||
// socket exception should be thrown immediately, without RPC retries.
|
||||
Assert.assertTrue(e instanceof ContainerExecutionException);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCleanupBeforeLaunch() throws Exception {
|
||||
Container container = mock(Container.class);
|
||||
|
@ -33,6 +33,7 @@
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntime;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerExecContext;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReapContext;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@ -694,6 +695,17 @@ public void testRelaunchContainer() throws Exception {
|
||||
verify(lce, times(1)).relaunchContainer(ctx);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExecContainer() throws Exception {
|
||||
LinuxContainerExecutor lce = mock(LinuxContainerExecutor.class);
|
||||
ContainerExecContext.Builder builder =
|
||||
new ContainerExecContext.Builder();
|
||||
builder.setUser("foo").setAppId("app1").setContainer("container1");
|
||||
ContainerExecContext ctx = builder.build();
|
||||
lce.execContainer(ctx);
|
||||
verify(lce, times(1)).execContainer(ctx);
|
||||
}
|
||||
|
||||
private static class TestResourceHandler implements LCEResourcesHandler {
|
||||
static Set<ContainerId> postExecContainers = new HashSet<ContainerId>();
|
||||
|
||||
|
@ -25,6 +25,7 @@
|
||||
import java.util.concurrent.ConcurrentSkipListMap;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
@ -41,6 +42,8 @@
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl.ProcessTreeInfo;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerExecutionException;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerExecContext;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerLivenessContext;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReapContext;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
|
||||
@ -101,6 +104,11 @@ public boolean reapContainer(ContainerReapContext ctx)
|
||||
return true;
|
||||
}
|
||||
@Override
|
||||
public IOStreamPair execContainer(ContainerExecContext ctx)
|
||||
throws ContainerExecutionException {
|
||||
return new IOStreamPair(null, null);
|
||||
}
|
||||
@Override
|
||||
public void deleteAsUser(DeletionAsUserContext ctx)
|
||||
throws IOException, InterruptedException {
|
||||
}
|
||||
|
@ -0,0 +1,80 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.webapp;
|
||||
|
||||
import org.eclipse.jetty.websocket.api.Session;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketAdapter;
|
||||
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
/**
|
||||
* Container shell client socket interface.
|
||||
*/
|
||||
@WebSocket
|
||||
public class ContainerShellClientSocketTest extends WebSocketAdapter {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(ContainerShellClientSocketTest.class);
|
||||
private Session session;
|
||||
private CountDownLatch latch = new CountDownLatch(1);
|
||||
|
||||
@Override
|
||||
public void onWebSocketText(String message) {
|
||||
LOG.info("Message received from server:" + message);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onWebSocketConnect(Session session) {
|
||||
LOG.info("Connected to server");
|
||||
this.session = session;
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onWebSocketClose(int statusCode, String reason) {
|
||||
session.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onWebSocketError(Throwable cause) {
|
||||
super.onWebSocketError(cause);
|
||||
cause.printStackTrace(System.err);
|
||||
}
|
||||
|
||||
public void sendMessage(String str) {
|
||||
try {
|
||||
session.getRemote().sendString(str);
|
||||
} catch (IOException e) {
|
||||
// TODO Auto-generated catch block
|
||||
LOG.error("Failed to sent message to server", e);
|
||||
}
|
||||
}
|
||||
|
||||
public CountDownLatch getLatch() {
|
||||
return latch;
|
||||
}
|
||||
|
||||
public void setLatch(CountDownLatch latch) {
|
||||
this.latch = latch;
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,149 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.webapp;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.util.NodeHealthScriptRunner;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeHealthCheckerService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.ResourceView;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
import org.eclipse.jetty.websocket.api.Session;
|
||||
import org.eclipse.jetty.websocket.client.WebSocketClient;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.File;
|
||||
import java.net.URI;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
/**
|
||||
* Test class for Node Manager Container Web Socket.
|
||||
*/
|
||||
public class TestNMContainerWebSocket {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(
|
||||
TestNMContainerWebSocket.class);
|
||||
|
||||
private static final File TESTROOTDIR = new File("target",
|
||||
TestNMWebServer.class.getSimpleName());
|
||||
private static File testLogDir = new File("target",
|
||||
TestNMWebServer.class.getSimpleName() + "LogDir");
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
TESTROOTDIR.mkdirs();
|
||||
testLogDir.mkdir();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() {
|
||||
FileUtil.fullyDelete(TESTROOTDIR);
|
||||
FileUtil.fullyDelete(testLogDir);
|
||||
}
|
||||
|
||||
private int startNMWebAppServer(String webAddr) {
|
||||
Configuration conf = new Configuration();
|
||||
Context nmContext = new NodeManager.NMContext(null, null, null, null, null,
|
||||
false, conf);
|
||||
ResourceView resourceView = new ResourceView() {
|
||||
@Override
|
||||
public long getVmemAllocatedForContainers() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getPmemAllocatedForContainers() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getVCoresAllocatedForContainers() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isVmemCheckEnabled() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isPmemCheckEnabled() {
|
||||
return true;
|
||||
}
|
||||
};
|
||||
conf.set(YarnConfiguration.NM_LOCAL_DIRS, TESTROOTDIR.getAbsolutePath());
|
||||
conf.set(YarnConfiguration.NM_LOG_DIRS, testLogDir.getAbsolutePath());
|
||||
NodeHealthCheckerService healthChecker = createNodeHealthCheckerService(
|
||||
conf);
|
||||
healthChecker.init(conf);
|
||||
LocalDirsHandlerService dirsHandler = healthChecker.getDiskHandler();
|
||||
conf.set(YarnConfiguration.NM_WEBAPP_ADDRESS, webAddr);
|
||||
WebServer server = new WebServer(nmContext, resourceView,
|
||||
new ApplicationACLsManager(conf), dirsHandler);
|
||||
try {
|
||||
server.init(conf);
|
||||
server.start();
|
||||
return server.getPort();
|
||||
} finally {
|
||||
}
|
||||
}
|
||||
|
||||
private NodeHealthCheckerService createNodeHealthCheckerService(
|
||||
Configuration conf) {
|
||||
NodeHealthScriptRunner scriptRunner = NodeManager.getNodeHealthScriptRunner(
|
||||
conf);
|
||||
LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
|
||||
return new NodeHealthCheckerService(scriptRunner, dirsHandler);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWebServerWithServlet() {
|
||||
int port = startNMWebAppServer("0.0.0.0");
|
||||
LOG.info("bind to port: " + port);
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("ws://localhost:").append(port).append("/container/abc/");
|
||||
String dest = sb.toString();
|
||||
WebSocketClient client = new WebSocketClient();
|
||||
try {
|
||||
ContainerShellClientSocketTest socket = new ContainerShellClientSocketTest();
|
||||
client.start();
|
||||
URI echoUri = new URI(dest);
|
||||
Future<Session> future = client.connect(socket, echoUri);
|
||||
Session session = future.get();
|
||||
session.getRemote().sendString("hello world");
|
||||
session.close();
|
||||
client.stop();
|
||||
} catch (Throwable t) {
|
||||
LOG.error("Failed to connect WebSocket and send message to server", t);
|
||||
} finally {
|
||||
try {
|
||||
client.stop();
|
||||
} catch (Exception e) {
|
||||
LOG.error("Failed to close client", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user