HADOOP-7557 Make IPC header be extensible (sanjay radia)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1295261 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Sanjay Radia 2012-02-29 20:43:21 +00:00
parent 46d0d47f07
commit 7ae04652a6
8 changed files with 267 additions and 177 deletions

View File

@ -55,6 +55,8 @@ Trunk (unreleased changes)
HADOOP-7994. Remove getProtocolVersion and getProtocolSignature from the
client side translator and server side implementation. (jitendra)
HADOOP-7557 Make IPC header be extensible (sanjay radia)
BUG FIXES
HADOOP-8018. Hudson auto test for HDFS has started throwing javadoc

View File

@ -277,5 +277,9 @@
<Match>
<!-- protobuf generated code -->
<Class name="~org\.apache\.hadoop\.ipc\.protobuf\.ProtocolInfoProtos.*"/>
</Match>
<Match>
<!-- protobuf generated code -->
<Class name="~org\.apache\.hadoop\.ipc\.protobuf\.IpcConnectionContextProtos.*"/>
</Match>
</FindBugsFilter>

View File

@ -51,6 +51,7 @@
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.ipc.RpcPayloadHeader.*;
import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
@ -66,6 +67,7 @@
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.TokenSelector;
import org.apache.hadoop.security.token.TokenInfo;
import org.apache.hadoop.util.ProtoUtil;
import org.apache.hadoop.util.ReflectionUtils;
/** A client for an IPC service. IPC calls take a single {@link Writable} as a
@ -211,7 +213,7 @@ public synchronized Writable getRpcResult() {
private class Connection extends Thread {
private InetSocketAddress server; // server ip:port
private String serverPrincipal; // server's krb5 principal name
private ConnectionHeader header; // connection header
private IpcConnectionContextProto connectionContext; // connection context
private final ConnectionId remoteId; // connection id
private AuthMethod authMethod; // authentication method
private boolean useSasl;
@ -292,8 +294,8 @@ public Connection(ConnectionId remoteId) throws IOException {
authMethod = AuthMethod.KERBEROS;
}
header =
new ConnectionHeader(RPC.getProtocolName(protocol), ticket, authMethod);
connectionContext = ProtoUtil.makeIpcConnectionContext(
RPC.getProtocolName(protocol), ticket, authMethod);
if (LOG.isDebugEnabled())
LOG.debug("Use " + authMethod + " authentication for protocol "
@ -563,7 +565,7 @@ private synchronized void setupIOstreams() throws InterruptedException {
setupConnection();
InputStream inStream = NetUtils.getInputStream(socket);
OutputStream outStream = NetUtils.getOutputStream(socket);
writeRpcHeader(outStream);
writeConnectionHeader(outStream);
if (useSasl) {
final InputStream in2 = inStream;
final OutputStream out2 = outStream;
@ -597,8 +599,11 @@ public Boolean run() throws IOException {
} else {
// fall back to simple auth because server told us so.
authMethod = AuthMethod.SIMPLE;
header = new ConnectionHeader(header.getProtocol(), header
.getUgi(), authMethod);
// remake the connectionContext
connectionContext = ProtoUtil.makeIpcConnectionContext(
connectionContext.getProtocol(),
ProtoUtil.getUgi(connectionContext.getUserInfo()),
authMethod);
useSasl = false;
}
}
@ -678,13 +683,26 @@ private void handleConnectionFailure(
". Already tried " + curRetries + " time(s).");
}
/* Write the RPC header */
private void writeRpcHeader(OutputStream outStream) throws IOException {
/**
* Write the connection header - this is sent when connection is established
* +----------------------------------+
* | "hrpc" 4 bytes |
* +----------------------------------+
* | Version (1 bytes) |
* +----------------------------------+
* | Authmethod (1 byte) |
* +----------------------------------+
* | IpcSerializationType (1 byte) |
* +----------------------------------+
*/
private void writeConnectionHeader(OutputStream outStream)
throws IOException {
DataOutputStream out = new DataOutputStream(new BufferedOutputStream(outStream));
// Write out the header, version and authentication method
out.write(Server.HEADER.array());
out.write(Server.CURRENT_VERSION);
authMethod.write(out);
Server.IpcSerializationType.PROTOBUF.write(out);
out.flush();
}
@ -694,7 +712,7 @@ private void writeRpcHeader(OutputStream outStream) throws IOException {
private void writeHeader() throws IOException {
// Write out the ConnectionHeader
DataOutputBuffer buf = new DataOutputBuffer();
header.write(buf);
connectionContext.writeTo(buf);
// Write out the payload length
int bufLen = buf.getLength();
@ -1261,16 +1279,16 @@ private Connection getConnection(ConnectionId remoteId,
public static class ConnectionId {
InetSocketAddress address;
UserGroupInformation ticket;
Class<?> protocol;
final Class<?> protocol;
private static final int PRIME = 16777619;
private int rpcTimeout;
private String serverPrincipal;
private int maxIdleTime; //connections will be culled if it was idle for
private final int rpcTimeout;
private final String serverPrincipal;
private final int maxIdleTime; //connections will be culled if it was idle for
//maxIdleTime msecs
private int maxRetries; //the max. no. of retries for socket connections
private boolean tcpNoDelay; // if T then disable Nagle's Algorithm
private boolean doPing; //do we need to send ping message
private int pingInterval; // how often sends ping to the server in msecs
private final int maxRetries; //the max. no. of retries for socket connections
private final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
private final boolean doPing; //do we need to send ping message
private final int pingInterval; // how often sends ping to the server in msecs
ConnectionId(InetSocketAddress address, Class<?> protocol,
UserGroupInformation ticket, int rpcTimeout,

View File

@ -1,121 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ipc;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
/**
* The IPC connection header sent by the client to the server
* on connection establishment.
*/
class ConnectionHeader implements Writable {
public static final Log LOG = LogFactory.getLog(ConnectionHeader.class);
private String protocol;
private UserGroupInformation ugi = null;
private AuthMethod authMethod;
public ConnectionHeader() {}
/**
* Create a new {@link ConnectionHeader} with the given <code>protocol</code>
* and {@link UserGroupInformation}.
* @param protocol protocol used for communication between the IPC client
* and the server
* @param ugi {@link UserGroupInformation} of the client communicating with
* the server
*/
public ConnectionHeader(String protocol, UserGroupInformation ugi, AuthMethod authMethod) {
this.protocol = protocol;
this.ugi = ugi;
this.authMethod = authMethod;
}
@Override
public void readFields(DataInput in) throws IOException {
protocol = Text.readString(in);
if (protocol.isEmpty()) {
protocol = null;
}
boolean ugiUsernamePresent = in.readBoolean();
if (ugiUsernamePresent) {
String username = in.readUTF();
boolean realUserNamePresent = in.readBoolean();
if (realUserNamePresent) {
String realUserName = in.readUTF();
UserGroupInformation realUserUgi = UserGroupInformation
.createRemoteUser(realUserName);
ugi = UserGroupInformation.createProxyUser(username, realUserUgi);
} else {
ugi = UserGroupInformation.createRemoteUser(username);
}
} else {
ugi = null;
}
}
@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, (protocol == null) ? "" : protocol);
if (ugi != null) {
if (authMethod == AuthMethod.KERBEROS) {
// Send effective user for Kerberos auth
out.writeBoolean(true);
out.writeUTF(ugi.getUserName());
out.writeBoolean(false);
} else if (authMethod == AuthMethod.DIGEST) {
// Don't send user for token auth
out.writeBoolean(false);
} else {
//Send both effective user and real user for simple auth
out.writeBoolean(true);
out.writeUTF(ugi.getUserName());
if (ugi.getRealUser() != null) {
out.writeBoolean(true);
out.writeUTF(ugi.getRealUser().getUserName());
} else {
out.writeBoolean(false);
}
}
} else {
out.writeBoolean(false);
}
}
public String getProtocol() {
return protocol;
}
public UserGroupInformation getUgi() {
return ugi;
}
public String toString() {
return protocol + "-" + ugi;
}
}

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ipc;
import java.io.IOException;
/**
* IPC exception is thrown by IPC layer when the IPC
* connection cannot be established.
*/
public class IpcException extends IOException {
private static final long serialVersionUID = 1L;
final String errMsg;
public IpcException(final String err) {
errMsg = err;
}
}

View File

@ -21,6 +21,7 @@
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.BindException;
@ -74,6 +75,7 @@
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcPayloadOperation;
import org.apache.hadoop.ipc.metrics.RpcDetailedMetrics;
import org.apache.hadoop.ipc.metrics.RpcMetrics;
import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.SaslRpcServer;
@ -90,6 +92,7 @@
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.ProtoUtil;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
@ -110,6 +113,22 @@ public abstract class Server {
*/
public static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes());
/**
* Serialization type for ConnectionContext and RpcPayloadHeader
*/
public enum IpcSerializationType {
// Add new serialization type to the end without affecting the enum order
PROTOBUF;
void write(DataOutput out) throws IOException {
out.writeByte(this.ordinal());
}
static IpcSerializationType fromByte(byte b) {
return IpcSerializationType.values()[b];
}
}
/**
* If the user accidentally sends an HTTP GET to an IPC port, we detect this
* and send back a nicer response.
@ -133,7 +152,8 @@ public abstract class Server {
// 5 : Introduced use of {@link ArrayPrimitiveWritable$Internal}
// in ObjectWritable to efficiently transmit arrays of primitives
// 6 : Made RPC payload header explicit
public static final byte CURRENT_VERSION = 6;
// 7 : Changed Ipc Connection Header to use Protocol buffers
public static final byte CURRENT_VERSION = 7;
/**
* Initial and max size of response buffer
@ -968,9 +988,9 @@ private synchronized void waitPending() throws InterruptedException {
/** Reads calls from a connection and queues them for handling. */
public class Connection {
private boolean rpcHeaderRead = false; // if initial rpc header is read
private boolean headerRead = false; //if the connection header that
//follows version is read.
private boolean connectionHeaderRead = false; // connection header is read?
private boolean connectionContextRead = false; //if connection context that
//follows connection header is read
private SocketChannel channel;
private ByteBuffer data;
@ -986,14 +1006,14 @@ public class Connection {
private int remotePort;
private InetAddress addr;
ConnectionHeader header = new ConnectionHeader();
IpcConnectionContextProto connectionContext;
String protocolName;
boolean useSasl;
SaslServer saslServer;
private AuthMethod authMethod;
private boolean saslContextEstablished;
private boolean skipInitialSaslHandshake;
private ByteBuffer rpcHeaderBuffer;
private ByteBuffer connectionHeaderBuf = null;
private ByteBuffer unwrappedData;
private ByteBuffer unwrappedDataLengthBuffer;
@ -1241,17 +1261,17 @@ public int readAndProcess() throws IOException, InterruptedException {
return count;
}
if (!rpcHeaderRead) {
if (!connectionHeaderRead) {
//Every connection is expected to send the header.
if (rpcHeaderBuffer == null) {
rpcHeaderBuffer = ByteBuffer.allocate(2);
if (connectionHeaderBuf == null) {
connectionHeaderBuf = ByteBuffer.allocate(3);
}
count = channelRead(channel, rpcHeaderBuffer);
if (count < 0 || rpcHeaderBuffer.remaining() > 0) {
count = channelRead(channel, connectionHeaderBuf);
if (count < 0 || connectionHeaderBuf.remaining() > 0) {
return count;
}
int version = rpcHeaderBuffer.get(0);
byte[] method = new byte[] {rpcHeaderBuffer.get(1)};
int version = connectionHeaderBuf.get(0);
byte[] method = new byte[] {connectionHeaderBuf.get(1)};
authMethod = AuthMethod.read(new DataInputStream(
new ByteArrayInputStream(method)));
dataLengthBuffer.flip();
@ -1273,6 +1293,14 @@ public int readAndProcess() throws IOException, InterruptedException {
setupBadVersionResponse(version);
return -1;
}
IpcSerializationType serializationType = IpcSerializationType
.fromByte(connectionHeaderBuf.get(2));
if (serializationType != IpcSerializationType.PROTOBUF) {
respondUnsupportedSerialization(serializationType);
return -1;
}
dataLengthBuffer.clear();
if (authMethod == null) {
throw new IOException("Unable to read authentication method");
@ -1302,8 +1330,8 @@ public int readAndProcess() throws IOException, InterruptedException {
useSasl = true;
}
rpcHeaderBuffer = null;
rpcHeaderRead = true;
connectionHeaderBuf = null;
connectionHeaderRead = true;
continue;
}
@ -1334,7 +1362,7 @@ public int readAndProcess() throws IOException, InterruptedException {
skipInitialSaslHandshake = false;
continue;
}
boolean isHeaderRead = headerRead;
boolean isHeaderRead = connectionContextRead;
if (useSasl) {
saslReadAndProcess(data.array());
} else {
@ -1383,6 +1411,17 @@ private void setupBadVersionResponse(int clientVersion) throws IOException {
}
}
private void respondUnsupportedSerialization(IpcSerializationType st) throws IOException {
String errMsg = "Server IPC version " + CURRENT_VERSION
+ " do not support serilization " + st.toString();
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
Call fakeCall = new Call(-1, null, this);
setupResponse(buffer, fakeCall, Status.FATAL, null,
IpcException.class.getName(), errMsg);
responder.doRespond(fakeCall);
}
private void setupHttpRequestOnIpcPortResponse() throws IOException {
Call fakeCall = new Call(0, null, this);
fakeCall.setResponse(ByteBuffer.wrap(
@ -1390,15 +1429,15 @@ private void setupHttpRequestOnIpcPortResponse() throws IOException {
responder.doRespond(fakeCall);
}
/// Reads the connection header following version
private void processHeader(byte[] buf) throws IOException {
/** Reads the connection context following the connection header */
private void processConnectionContext(byte[] buf) throws IOException {
DataInputStream in =
new DataInputStream(new ByteArrayInputStream(buf));
header.readFields(in);
protocolName = header.getProtocol();
connectionContext = IpcConnectionContextProto.parseFrom(in);
protocolName = connectionContext.hasProtocol() ? connectionContext
.getProtocol() : null;
UserGroupInformation protocolUser = header.getUgi();
UserGroupInformation protocolUser = ProtoUtil.getUgi(connectionContext);
if (!useSasl) {
user = protocolUser;
if (user != null) {
@ -1472,14 +1511,14 @@ private void processUnwrappedData(byte[] inBuf) throws IOException,
private void processOneRpc(byte[] buf) throws IOException,
InterruptedException {
if (headerRead) {
if (connectionContextRead) {
processData(buf);
} else {
processHeader(buf);
headerRead = true;
processConnectionContext(buf);
connectionContextRead = true;
if (!authorizeConnection()) {
throw new AccessControlException("Connection from " + this
+ " for protocol " + header.getProtocol()
+ " for protocol " + connectionContext.getProtocol()
+ " is unauthorized for user " + user);
}
}
@ -1549,9 +1588,9 @@ private boolean authorizeConnection() throws IOException {
&& (authMethod != AuthMethod.DIGEST)) {
ProxyUsers.authorize(user, this.getHostAddress(), conf);
}
authorize(user, header, getHostInetAddress());
authorize(user, protocolName, getHostInetAddress());
if (LOG.isDebugEnabled()) {
LOG.debug("Successfully authorized " + header);
LOG.debug("Successfully authorized " + connectionContext);
}
rpcMetrics.incrAuthorizationSuccesses();
} catch (AuthorizationException ae) {
@ -1596,11 +1635,10 @@ public void run() {
while (running) {
try {
final Call call = callQueue.take(); // pop the queue; maybe blocked here
if (LOG.isDebugEnabled())
if (LOG.isDebugEnabled()) {
LOG.debug(getName() + ": has Call#" + call.callId +
"for RpcKind " + call.rpcKind + " from " + call.connection);
}
String errorClass = null;
String error = null;
Writable value = null;
@ -1921,21 +1959,22 @@ public abstract Writable call(RpcKind rpcKind, String protocol,
* Authorize the incoming client connection.
*
* @param user client user
* @param connection incoming connection
* @param protocolName - the protocol
* @param addr InetAddress of incoming connection
* @throws AuthorizationException when the client isn't authorized to talk the protocol
*/
public void authorize(UserGroupInformation user,
ConnectionHeader connection,
InetAddress addr
) throws AuthorizationException {
private void authorize(UserGroupInformation user, String protocolName,
InetAddress addr) throws AuthorizationException {
if (authorize) {
if (protocolName == null) {
throw new AuthorizationException("Null protocol not authorized");
}
Class<?> protocol = null;
try {
protocol = getProtocolClass(connection.getProtocol(), getConf());
protocol = getProtocolClass(protocolName, getConf());
} catch (ClassNotFoundException cfne) {
throw new AuthorizationException("Unknown protocol: " +
connection.getProtocol());
protocolName);
}
serviceAuthorizationManager.authorize(user, protocol, getConf(), addr);
}

View File

@ -21,6 +21,11 @@
import java.io.DataInput;
import java.io.IOException;
import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto;
import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.UserInformationProto;
import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
import org.apache.hadoop.security.UserGroupInformation;
public abstract class ProtoUtil {
/**
@ -63,4 +68,71 @@ public static int readRawVarint32(DataInput in) throws IOException {
return result;
}
/**
* This method creates the connection context using exactly the same logic
* as the old connection context as was done for writable where
* the effective and real users are set based on the auth method.
*
*/
public static IpcConnectionContextProto makeIpcConnectionContext(
final String protocol,
final UserGroupInformation ugi, final AuthMethod authMethod) {
IpcConnectionContextProto.Builder result = IpcConnectionContextProto.newBuilder();
if (protocol != null) {
result.setProtocol(protocol);
}
UserInformationProto.Builder ugiProto = UserInformationProto.newBuilder();
if (ugi != null) {
/*
* In the connection context we send only additional user info that
* is not derived from the authentication done during connection setup.
*/
if (authMethod == AuthMethod.KERBEROS) {
// Real user was established as part of the connection.
// Send effective user only.
ugiProto.setEffectiveUser(ugi.getUserName());
} else if (authMethod == AuthMethod.DIGEST) {
// With token, the connection itself establishes
// both real and effective user. Hence send none in header.
} else { // Simple authentication
// No user info is established as part of the connection.
// Send both effective user and real user
ugiProto.setEffectiveUser(ugi.getUserName());
if (ugi.getRealUser() != null) {
ugiProto.setRealUser(ugi.getRealUser().getUserName());
}
}
}
result.setUserInfo(ugiProto);
return result.build();
}
public static UserGroupInformation getUgi(IpcConnectionContextProto context) {
if (context.hasUserInfo()) {
UserInformationProto userInfo = context.getUserInfo();
return getUgi(userInfo);
} else {
return null;
}
}
public static UserGroupInformation getUgi(UserInformationProto userInfo) {
UserGroupInformation ugi = null;
String effectiveUser = userInfo.hasEffectiveUser() ? userInfo
.getEffectiveUser() : null;
String realUser = userInfo.hasRealUser() ? userInfo.getRealUser() : null;
if (effectiveUser != null) {
if (realUser != null) {
UserGroupInformation realUserUgi = UserGroupInformation
.createRemoteUser(realUser);
ugi = UserGroupInformation
.createProxyUser(effectiveUser, realUserUgi);
} else {
ugi = org.apache.hadoop.security.UserGroupInformation
.createRemoteUser(effectiveUser);
}
}
return ugi;
}
}

View File

@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
option java_package = "org.apache.hadoop.ipc.protobuf";
option java_outer_classname = "IpcConnectionContextProtos";
option java_generate_equals_and_hash = true;
/**
* Spec for UserInformationProto is specified in ProtoUtil#makeIpcConnectionContext
*/
message UserInformationProto {
optional string effectiveUser = 1;
optional string realUser = 2;
}
/**
* The connection context is sent as part of the connection establishment.
* It establishes the context for ALL Rpc calls within the connection.
*/
message IpcConnectionContextProto {
// UserInfo beyond what is determined as part of security handshake
// at connection time (kerberos, tokens etc).
optional UserInformationProto userInfo = 2;
// Protocol name for next rpc layer.
// The client created a proxy with this protocol name
optional string protocol = 3;
}