YARN-8778. Add command line interface to invoke interactive docker shell. Contributed by Eric Yang

This commit is contained in:
Billie Rinaldi 2018-11-19 17:59:03 -08:00
parent 397f523e22
commit 27ffec7ba7
15 changed files with 455 additions and 65 deletions

View File

@ -2813,3 +2813,43 @@ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
License for the specific language governing permissions and limitations under License for the specific language governing permissions and limitations under
the License. the License.
--------------------------------------------------------------------------------
Jline 3.9.0
The binary distribution of this product bundles these dependencies under the
following license:
Copyright (c) 2002-2018, the original author or authors.
All rights reserved.
http://www.opensource.org/licenses/bsd-license.php
Redistribution and use in source and binary forms, with or
without modification, are permitted provided that the following
conditions are met:
Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer
in the documentation and/or other materials provided with
the distribution.
Neither the name of JLine nor the names of its contributors
may be used to endorse or promote products derived from this
software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING,
BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,
OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
OF THE POSSIBILITY OF SUCH DAMAGE.

View File

@ -614,10 +614,17 @@ which has the following notices:
Expert Group and released to the public domain, as explained at Expert Group and released to the public domain, as explained at
http://creativecommons.org/publicdomain/zero/1.0/ http://creativecommons.org/publicdomain/zero/1.0/
The source and binary distribution of this product bundles modified version of The source and binary distribution of this product bundles modified version of
github.com/awslabs/aws-js-s3-explorer licensed under Apache 2.0 license github.com/awslabs/aws-js-s3-explorer licensed under Apache 2.0 license
with the following notice: with the following notice:
AWS JavaScript S3 Explorer AWS JavaScript S3 Explorer
Copyright 2014-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. Copyright 2014-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
The binary distribution of this product bundles binaries of
jline 3.9.0 (https://github.com/jline/jline3)
* LICENSE:
* license/LICENSE.jline3.txt (BSD License)
* HOMEPAGE:
* https://github.com/jline/jline3

View File

@ -788,7 +788,19 @@
<filter> <filter>
<artifact>org.eclipse.jetty.websocket:javax-websocket-server-impl</artifact> <artifact>org.eclipse.jetty.websocket:javax-websocket-server-impl</artifact>
<excludes> <excludes>
<exclude>*</exclude> <exclude>*/**</exclude>
</excludes>
</filter>
<filter>
<artifact>org.eclipse.jetty.websocket:websocket-client</artifact>
<excludes>
<exclude>*/**</exclude>
</excludes>
</filter>
<filter>
<artifact>org.eclipse.jetty:jetty-io</artifact>
<excludes>
<exclude>*/**</exclude>
</excludes> </excludes>
</filter> </filter>
</filters> </filters>

View File

@ -70,6 +70,7 @@
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceTypeInfo; import org.apache.hadoop.yarn.api.records.ResourceTypeInfo;
import org.apache.hadoop.yarn.api.records.ShellContainerCommand;
import org.apache.hadoop.yarn.api.records.SignalContainerCommand; import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
@ -560,4 +561,10 @@ public Map<String, Set<NodeAttribute>> getNodeToAttributes(
Set<String> hostNames) throws YarnException, IOException { Set<String> hostNames) throws YarnException, IOException {
return client.getNodeToAttributes(hostNames); return client.getNodeToAttributes(hostNames);
} }
@Override
public void shellToContainer(ContainerId containerId,
ShellContainerCommand command) throws IOException {
throw new IOException("Operation is not supported.");
}
} }

View File

@ -160,6 +160,7 @@
<junit.jupiter.version>5.3.1</junit.jupiter.version> <junit.jupiter.version>5.3.1</junit.jupiter.version>
<junit.vintage.version>5.3.1</junit.vintage.version> <junit.vintage.version>5.3.1</junit.vintage.version>
<junit.platform.version>1.3.1</junit.platform.version> <junit.platform.version>1.3.1</junit.platform.version>
<jline.version>3.9.0</jline.version>
</properties> </properties>
<dependencyManagement> <dependencyManagement>
@ -696,6 +697,11 @@
</exclusion> </exclusion>
</exclusions> </exclusions>
</dependency> </dependency>
<dependency>
<groupId>org.eclipse.jetty.websocket</groupId>
<artifactId>websocket-client</artifactId>
<version>${jetty.version}</version>
</dependency>
<dependency> <dependency>
<groupId>javax.servlet.jsp</groupId> <groupId>javax.servlet.jsp</groupId>
<artifactId>jsp-api</artifactId> <artifactId>jsp-api</artifactId>
@ -1177,6 +1183,11 @@
</exclusion> </exclusion>
</exclusions> </exclusions>
</dependency> </dependency>
<dependency>
<groupId>org.jline</groupId>
<artifactId>jline</artifactId>
<version>${jline.version}</version>
</dependency>
<dependency> <dependency>
<groupId>org.hsqldb</groupId> <groupId>org.hsqldb</groupId>
<artifactId>hsqldb</artifactId> <artifactId>hsqldb</artifactId>

View File

@ -0,0 +1,32 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
/**
* Enumeration of various signal container commands.
*/
@Public
@Evolving
public enum ShellContainerCommand {
BASH,
SH
}

View File

@ -21,8 +21,6 @@
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.security.PrivilegedExceptionAction;
import java.text.MessageFormat; import java.text.MessageFormat;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -40,13 +38,12 @@
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.security.authentication.util.KerberosUtil;
import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.client.api.AppAdminClient; import org.apache.hadoop.yarn.client.api.AppAdminClient;
import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.util.YarnClientUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.service.api.records.Component; import org.apache.hadoop.yarn.service.api.records.Component;
@ -60,11 +57,6 @@
import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
import org.apache.hadoop.yarn.util.RMHAUtils; import org.apache.hadoop.yarn.util.RMHAUtils;
import org.eclipse.jetty.util.UrlEncoded; import org.eclipse.jetty.util.UrlEncoded;
import org.ietf.jgss.GSSContext;
import org.ietf.jgss.GSSException;
import org.ietf.jgss.GSSManager;
import org.ietf.jgss.GSSName;
import org.ietf.jgss.Oid;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -92,54 +84,6 @@ public class ApiServiceClient extends AppAdminClient {
super.serviceInit(configuration); super.serviceInit(configuration);
} }
/**
* Generate SPNEGO challenge request token.
*
* @param server - hostname to contact
* @throws IOException
* @throws InterruptedException
*/
String generateToken(String server) throws IOException, InterruptedException {
UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
LOG.debug("The user credential is {}", currentUser);
String challenge = currentUser
.doAs(new PrivilegedExceptionAction<String>() {
@Override
public String run() throws Exception {
try {
// This Oid for Kerberos GSS-API mechanism.
Oid mechOid = KerberosUtil.getOidInstance("GSS_KRB5_MECH_OID");
GSSManager manager = GSSManager.getInstance();
// GSS name for server
GSSName serverName = manager.createName("HTTP@" + server,
GSSName.NT_HOSTBASED_SERVICE);
// Create a GSSContext for authentication with the service.
// We're passing client credentials as null since we want them to
// be read from the Subject.
GSSContext gssContext = manager.createContext(
serverName.canonicalize(mechOid), mechOid, null,
GSSContext.DEFAULT_LIFETIME);
gssContext.requestMutualAuth(true);
gssContext.requestCredDeleg(true);
// Establish context
byte[] inToken = new byte[0];
byte[] outToken = gssContext.initSecContext(inToken, 0,
inToken.length);
gssContext.dispose();
// Base64 encoded and stringified token for server
LOG.debug("Got valid challenge for host {}", serverName);
return new String(BASE_64_CODEC.encode(outToken),
StandardCharsets.US_ASCII);
} catch (GSSException | IllegalAccessException
| NoSuchFieldException | ClassNotFoundException e) {
LOG.error("Error: {}", e);
throw new AuthenticationException(e);
}
}
});
return challenge;
}
/** /**
* Calculate Resource Manager address base on working REST API. * Calculate Resource Manager address base on working REST API.
*/ */
@ -177,7 +121,7 @@ String getRMWebAddress() {
.resource(sb.toString()).type(MediaType.APPLICATION_JSON); .resource(sb.toString()).type(MediaType.APPLICATION_JSON);
if (useKerberos) { if (useKerberos) {
String[] server = host.split(":"); String[] server = host.split(":");
String challenge = generateToken(server[0]); String challenge = YarnClientUtils.generateToken(server[0]);
builder.header(HttpHeaders.AUTHORIZATION, "Negotiate " + builder.header(HttpHeaders.AUTHORIZATION, "Negotiate " +
challenge); challenge);
LOG.debug("Authorization: Negotiate {}", challenge); LOG.debug("Authorization: Negotiate {}", challenge);
@ -289,7 +233,7 @@ private Builder getApiClient(String requestPath)
if (conf.get("hadoop.http.authentication.type").equals("kerberos")) { if (conf.get("hadoop.http.authentication.type").equals("kerberos")) {
try { try {
URI url = new URI(requestPath); URI url = new URI(requestPath);
String challenge = generateToken(url.getHost()); String challenge = YarnClientUtils.generateToken(url.getHost());
builder.header(HttpHeaders.AUTHORIZATION, "Negotiate " + challenge); builder.header(HttpHeaders.AUTHORIZATION, "Negotiate " + challenge);
} catch (Exception e) { } catch (Exception e) {
throw new IOException(e); throw new IOException(e);

View File

@ -41,6 +41,7 @@
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.SaslRpcServer.QualityOfProtection; import org.apache.hadoop.security.SaslRpcServer.QualityOfProtection;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.yarn.client.util.YarnClientUtils;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.server.ServerConnector;
@ -169,8 +170,7 @@ public void tearDown() throws Exception {
public void testHttpSpnegoChallenge() throws Exception { public void testHttpSpnegoChallenge() throws Exception {
UserGroupInformation.loginUserFromKeytab(clientPrincipal, keytabFile UserGroupInformation.loginUserFromKeytab(clientPrincipal, keytabFile
.getCanonicalPath()); .getCanonicalPath());
asc = new ApiServiceClient(); String challenge = YarnClientUtils.generateToken("localhost");
String challenge = asc.generateToken("localhost");
assertNotNull(challenge); assertNotNull(challenge);
} }

View File

@ -54,6 +54,10 @@
<groupId>log4j</groupId> <groupId>log4j</groupId>
<artifactId>log4j</artifactId> <artifactId>log4j</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.eclipse.jetty.websocket</groupId>
<artifactId>websocket-client</artifactId>
</dependency>
<!-- 'mvn dependency:analyze' fails to detect use of this dependency --> <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
<dependency> <dependency>
@ -127,6 +131,10 @@
<type>test-jar</type> <type>test-jar</type>
</dependency> </dependency>
<dependency>
<groupId>org.jline</groupId>
<artifactId>jline</artifactId>
</dependency>
</dependencies> </dependencies>
<build> <build>

View File

@ -0,0 +1,156 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client.api;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.Charset;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.jline.terminal.Terminal;
import org.jline.terminal.TerminalBuilder;
import org.jline.reader.LineReader;
import org.jline.reader.LineReaderBuilder;
import org.jline.reader.impl.LineReaderImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Web socket for establishing interactive command shell connection through
* Node Manage to container executor.
*/
@InterfaceAudience.LimitedPrivate({ "HDFS", "MapReduce", "YARN" })
@InterfaceStability.Unstable
@WebSocket
public class ContainerShellWebSocket {
private static final Logger LOG =
LoggerFactory.getLogger(ContainerShellWebSocket.class);
private Session mySession;
private Terminal terminal;
private LineReader reader;
@OnWebSocketMessage
public void onText(Session session, String message) throws IOException {
terminal.output().write(message.getBytes(Charset.forName("UTF-8")));
terminal.output().flush();
}
@OnWebSocketConnect
public void onConnect(Session s) {
initTerminal(s);
LOG.info(s.getRemoteAddress().getHostString() + " connected!");
}
@OnWebSocketClose
public void onClose(Session session, int status, String reason) {
if (status==1000) {
LOG.info(session.getRemoteAddress().getHostString() +
" closed, status: " + status);
} else {
LOG.warn(session.getRemoteAddress().getHostString() +
" closed, status: " + status + " Reason: " + reason);
}
}
public void run() {
try {
Reader consoleReader = new Reader();
Thread inputThread = new Thread(consoleReader, "consoleReader");
inputThread.start();
while (mySession.isOpen()) {
mySession.getRemote().flush();
if (consoleReader.hasData()) {
String message = consoleReader.read();
mySession.getRemote().sendString(message);
mySession.getRemote().sendString("\r");
}
String message = "1{}";
mySession.getRemote().sendString(message);
Thread.sleep(100);
mySession.getRemote().flush();
}
inputThread.join();
} catch (IOException | InterruptedException e) {
try {
mySession.disconnect();
} catch (IOException e1) {
LOG.error("Error closing connection: ", e1);
}
}
}
protected void initTerminal(final Session session) {
try {
this.mySession = session;
try {
terminal = TerminalBuilder.builder()
.system(true)
.build();
} catch (IOException t) {
terminal = TerminalBuilder.builder()
.system(false)
.streams(System.in, (OutputStream) System.out)
.build();
}
reader = LineReaderBuilder.builder()
.terminal(terminal)
.build();
} catch (IOException e) {
session.close(1002, e.getMessage());
}
}
class Reader implements Runnable {
private StringBuilder sb = new StringBuilder();
private boolean hasData = false;
public String read() {
try {
return sb.toString();
} finally {
hasData = false;
sb.setLength(0);
}
}
public boolean hasData() {
return hasData;
}
@Override
public void run() {
while (true) {
int c = ((LineReaderImpl) reader).readCharacter();
if (c == 10 || c == 13) {
hasData = true;
continue;
}
sb.append(new String(Character.toChars(c)));
}
}
}
}

View File

@ -66,6 +66,7 @@
import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceTypeInfo; import org.apache.hadoop.yarn.api.records.ResourceTypeInfo;
import org.apache.hadoop.yarn.api.records.ShellContainerCommand;
import org.apache.hadoop.yarn.api.records.SignalContainerCommand; import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@ -958,4 +959,18 @@ List<NodeToAttributeValue>> getAttributesToNodes(
public abstract Map<String, Set<NodeAttribute>> getNodeToAttributes( public abstract Map<String, Set<NodeAttribute>> getNodeToAttributes(
Set<String> hostNames) throws YarnException, IOException; Set<String> hostNames) throws YarnException, IOException;
/**
* <p>
* The interface used by client to get a shell to a container.
* </p>
*
* @param containerId Container ID
* @param command Shell type
* @throws IOException if connection fails.
*/
@Public
@Unstable
public abstract void shellToContainer(ContainerId containerId,
ShellContainerCommand command) throws IOException;
} }

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.client.api.impl; package org.apache.hadoop.yarn.client.api.impl;
import java.io.IOException; import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.EnumSet; import java.util.EnumSet;
@ -27,6 +28,7 @@
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Future;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
@ -111,15 +113,18 @@
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceTypeInfo; import org.apache.hadoop.yarn.api.records.ResourceTypeInfo;
import org.apache.hadoop.yarn.api.records.ShellContainerCommand;
import org.apache.hadoop.yarn.api.records.SignalContainerCommand; import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.client.api.AHSClient; import org.apache.hadoop.yarn.client.api.AHSClient;
import org.apache.hadoop.yarn.client.api.ContainerShellWebSocket;
import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClientApplication; import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.client.util.YarnClientUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationIdNotProvidedException; import org.apache.hadoop.yarn.exceptions.ApplicationIdNotProvidedException;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
@ -132,6 +137,10 @@
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -1074,4 +1083,53 @@ public Map<String, Set<NodeAttribute>> getNodeToAttributes(
GetNodesToAttributesRequest.newInstance(hostNames); GetNodesToAttributesRequest.newInstance(hostNames);
return rmClient.getNodesToAttributes(request).getNodeToAttributes(); return rmClient.getNodesToAttributes(request).getNodeToAttributes();
} }
@Override
public void shellToContainer(ContainerId containerId,
ShellContainerCommand command) throws IOException {
try {
GetContainerReportRequest request = Records
.newRecord(GetContainerReportRequest.class);
request.setContainerId(containerId);
GetContainerReportResponse response = rmClient
.getContainerReport(request);
URI nodeHttpAddress = new URI(response.getContainerReport()
.getNodeHttpAddress());
String host = nodeHttpAddress.getHost();
int port = nodeHttpAddress.getPort();
String scheme = nodeHttpAddress.getScheme();
String protocol = "ws://";
if (scheme.equals("https")) {
protocol = "wss://";
}
WebSocketClient client = new WebSocketClient();
URI uri = URI.create(protocol + host + ":" + port + "/container/" +
containerId);
try {
client.start();
// The socket that receives events
ContainerShellWebSocket socket = new ContainerShellWebSocket();
ClientUpgradeRequest upgradeRequest = new ClientUpgradeRequest();
if (UserGroupInformation.isSecurityEnabled()) {
String challenge = YarnClientUtils.generateToken(host);
upgradeRequest.setHeader("Authorization", "Negotiate " + challenge);
}
// Attempt Connect
Future<Session> fut = client.connect(socket, uri, upgradeRequest);
// Wait for Connect
Session session = fut.get();
// Send a message
session.getRemote().sendString("stty -echo");
session.getRemote().sendString("\r");
session.getRemote().flush();
socket.run();
} finally {
client.stop();
}
} catch (WebSocketException e) {
LOG.debug("Websocket exception: " + e.getMessage());
} catch (Throwable t) {
LOG.error("Fail to shell to container: " + t.getMessage());
}
}
} }

View File

@ -48,6 +48,7 @@
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ShellContainerCommand;
import org.apache.hadoop.yarn.api.records.SignalContainerCommand; import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.AppAdminClient; import org.apache.hadoop.yarn.client.api.AppAdminClient;
@ -111,6 +112,7 @@ public class ApplicationCLI extends YarnCLI {
public static final String COMPONENTS = "components"; public static final String COMPONENTS = "components";
public static final String VERSION = "version"; public static final String VERSION = "version";
public static final String STATES = "states"; public static final String STATES = "states";
public static final String SHELL_CMD = "shell";
private static String firstArg = null; private static String firstArg = null;
@ -311,6 +313,8 @@ public int run(String[] args) throws Exception {
opts.getOption(LIST_CMD).setArgName("Application ID"); opts.getOption(LIST_CMD).setArgName("Application ID");
opts.getOption(FAIL_CMD).setArgName("Application Attempt ID"); opts.getOption(FAIL_CMD).setArgName("Application Attempt ID");
} else if (title != null && title.equalsIgnoreCase(CONTAINER)) { } else if (title != null && title.equalsIgnoreCase(CONTAINER)) {
opts.addOption(SHELL_CMD, true,
"Run a shell in the container.");
opts.addOption(STATUS_CMD, true, opts.addOption(STATUS_CMD, true,
"Prints the status of the container."); "Prints the status of the container.");
opts.addOption(LIST_CMD, true, opts.addOption(LIST_CMD, true,
@ -323,6 +327,7 @@ public int run(String[] args) throws Exception {
"app version, -components to filter instances based on component " + "app version, -components to filter instances based on component " +
"names, -states to filter instances based on instance state."); "names, -states to filter instances based on instance state.");
opts.addOption(HELP_CMD, false, "Displays help for all commands."); opts.addOption(HELP_CMD, false, "Displays help for all commands.");
opts.getOption(SHELL_CMD).setArgName("Container ID");
opts.getOption(STATUS_CMD).setArgName("Container ID"); opts.getOption(STATUS_CMD).setArgName("Container ID");
opts.getOption(LIST_CMD).setArgName("Application Name or Attempt ID"); opts.getOption(LIST_CMD).setArgName("Application Name or Attempt ID");
opts.addOption(APP_TYPE_CMD, true, "Works with -list to " + opts.addOption(APP_TYPE_CMD, true, "Works with -list to " +
@ -552,6 +557,19 @@ public int run(String[] args) throws Exception {
command = SignalContainerCommand.valueOf(signalArgs[1]); command = SignalContainerCommand.valueOf(signalArgs[1]);
} }
signalToContainer(containerId, command); signalToContainer(containerId, command);
} else if (cliParser.hasOption(SHELL_CMD)) {
if (hasAnyOtherCLIOptions(cliParser, opts, SHELL_CMD)) {
printUsage(title, opts);
return exitCode;
}
final String[] shellArgs = cliParser.getOptionValues(SHELL_CMD);
final String containerId = shellArgs[0];
ShellContainerCommand command =
ShellContainerCommand.BASH;
if (shellArgs.length == 2) {
command = ShellContainerCommand.valueOf(shellArgs[1]);
}
shellToContainer(containerId, command);
} else if (cliParser.hasOption(LAUNCH_CMD)) { } else if (cliParser.hasOption(LAUNCH_CMD)) {
if (hasAnyOtherCLIOptions(cliParser, opts, LAUNCH_CMD, APP_TYPE_CMD, if (hasAnyOtherCLIOptions(cliParser, opts, LAUNCH_CMD, APP_TYPE_CMD,
UPDATE_LIFETIME, CHANGE_APPLICATION_QUEUE)) { UPDATE_LIFETIME, CHANGE_APPLICATION_QUEUE)) {
@ -806,7 +824,7 @@ private void updateApplicationTimeout(String applicationId,
} }
/** /**
* Signals the containerId * Signals the containerId.
* *
* @param containerIdStr the container id * @param containerIdStr the container id
* @param command the signal command * @param command the signal command
@ -819,6 +837,20 @@ private void signalToContainer(String containerIdStr,
client.signalToContainer(containerId, command); client.signalToContainer(containerId, command);
} }
/**
* Shell to the containerId.
*
* @param containerIdStr the container id
* @param command the shell command
* @throws YarnException
*/
private void shellToContainer(String containerIdStr,
ShellContainerCommand command) throws YarnException, IOException {
ContainerId containerId = ContainerId.fromString(containerIdStr);
sysout.println("Shelling to container " + containerIdStr);
client.shellToContainer(containerId, command);
}
/** /**
* It prints the usage of the command * It prints the usage of the command
* *

View File

@ -19,15 +19,29 @@
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.security.authentication.util.KerberosUtil;
import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.ietf.jgss.GSSContext;
import org.ietf.jgss.GSSException;
import org.ietf.jgss.GSSManager;
import org.ietf.jgss.GSSName;
import org.ietf.jgss.Oid;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* This class is a container for utility methods that are useful when creating * This class is a container for utility methods that are useful when creating
@ -35,6 +49,9 @@
*/ */
public abstract class YarnClientUtils { public abstract class YarnClientUtils {
private static final Logger LOG =
LoggerFactory.getLogger(YarnClientUtils.class);
private static final Base64 BASE_64_CODEC = new Base64(0);
private static final String ADD_LABEL_FORMAT_ERR_MSG = private static final String ADD_LABEL_FORMAT_ERR_MSG =
"Input format for adding node-labels is not correct, it should be " "Input format for adding node-labels is not correct, it should be "
+ "labelName1[(exclusive=true/false)],LabelName2[] .."; + "labelName1[(exclusive=true/false)],LabelName2[] ..";
@ -187,4 +204,54 @@ static YarnConfiguration getYarnConfWithRmHaId(Configuration conf)
return yarnConf; return yarnConf;
} }
/**
* Generate SPNEGO challenge request token.
*
* @param server - hostname to contact
* @throws IOException thrown if doAs failed
* @throws InterruptedException thrown if doAs is interrupted
* @return SPNEGO token challenge
*/
public static String generateToken(String server) throws IOException,
InterruptedException {
UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
LOG.debug("The user credential is {}", currentUser);
String challenge = currentUser
.doAs(new PrivilegedExceptionAction<String>() {
@Override
public String run() throws Exception {
try {
// This Oid for Kerberos GSS-API mechanism.
Oid mechOid = KerberosUtil.getOidInstance("GSS_KRB5_MECH_OID");
GSSManager manager = GSSManager.getInstance();
// GSS name for server
GSSName serverName = manager.createName("HTTP@" + server,
GSSName.NT_HOSTBASED_SERVICE);
// Create a GSSContext for authentication with the service.
// We're passing client credentials as null since we want them to
// be read from the Subject.
GSSContext gssContext = manager.createContext(
serverName.canonicalize(mechOid), mechOid, null,
GSSContext.DEFAULT_LIFETIME);
gssContext.requestMutualAuth(true);
gssContext.requestCredDeleg(true);
// Establish context
byte[] inToken = new byte[0];
byte[] outToken = gssContext.initSecContext(inToken, 0,
inToken.length);
gssContext.dispose();
// Base64 encoded and stringified token for server
LOG.debug("Got valid challenge for host {}", serverName);
return new String(BASE_64_CODEC.encode(outToken),
StandardCharsets.US_ASCII);
} catch (GSSException | IllegalAccessException
| NoSuchFieldException | ClassNotFoundException e) {
LOG.error("Error: {}", e);
throw new AuthenticationException(e);
}
}
});
return challenge;
}
} }

View File

@ -2317,6 +2317,7 @@ private String createContainerCLIHelpMessage() throws IOException {
pw.println(" -components <arg> Works with -list to filter instances based on input comma-separated list of component names."); pw.println(" -components <arg> Works with -list to filter instances based on input comma-separated list of component names.");
pw.println(" -help Displays help for all commands."); pw.println(" -help Displays help for all commands.");
pw.println(" -list <Application Name or Attempt ID> List containers for application attempt when application attempt ID is provided. When application name is provided, then it finds the instances of the application based on app's own implementation, and -appTypes option must be specified unless it is the default yarn-service type. With app name, it supports optional use of -version to filter instances based on app version, -components to filter instances based on component names, -states to filter instances based on instance state."); pw.println(" -list <Application Name or Attempt ID> List containers for application attempt when application attempt ID is provided. When application name is provided, then it finds the instances of the application based on app's own implementation, and -appTypes option must be specified unless it is the default yarn-service type. With app name, it supports optional use of -version to filter instances based on app version, -components to filter instances based on component names, -states to filter instances based on instance state.");
pw.println(" -shell <Container ID> Run a shell in the container.");
pw.println(" -signal <container ID [signal command]> Signal the container."); pw.println(" -signal <container ID [signal command]> Signal the container.");
pw.println("The available signal commands are "); pw.println("The available signal commands are ");
pw.println(java.util.Arrays.asList(SignalContainerCommand.values())); pw.println(java.util.Arrays.asList(SignalContainerCommand.values()));