HADOOP-8285 Use ProtoBuf for RpcPayLoadHeader (sanjay radia)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1329319 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Sanjay Radia 2012-04-23 16:34:21 +00:00
parent 1eb1d0ae4f
commit 589c68ae09
32 changed files with 220 additions and 245 deletions

View File

@ -63,6 +63,8 @@ Trunk (unreleased changes)
HADOOP-8290. Remove remaining references to hadoop.native.lib (harsh) HADOOP-8290. Remove remaining references to hadoop.native.lib (harsh)
HADOOP-8285 Use ProtoBuf for RpcPayLoadHeader (sanjay radia)
BUG FIXES BUG FIXES
HADOOP-8177. MBeans shouldn't try to register when it fails to create MBeanName. HADOOP-8177. MBeans shouldn't try to register when it fails to create MBeanName.

View File

@ -281,9 +281,14 @@
<Match> <Match>
<!-- protobuf generated code --> <!-- protobuf generated code -->
<Class name="~org\.apache\.hadoop\.ipc\.protobuf\.IpcConnectionContextProtos.*"/> <Class name="~org\.apache\.hadoop\.ipc\.protobuf\.IpcConnectionContextProtos.*"/>
</Match>
<Match>
<!-- protobuf generated code -->
<Class name="~org\.apache\.hadoop\.ipc\.protobuf\.RpcPayloadHeaderProtos.*"/>
</Match> </Match>
<Match> <Match>
<!-- protobuf generated code --> <!-- protobuf generated code -->
<Class name="~org\.apache\.hadoop\.ha\.proto\.HAServiceProtocolProtos.*"/> <Class name="~org\.apache\.hadoop\.ha\.proto\.HAServiceProtocolProtos.*"/>
</Match> </Match>
</FindBugsFilter> </FindBugsFilter>

View File

@ -50,8 +50,9 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.ipc.RpcPayloadHeader.*;
import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto; import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto;
import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcPayloadHeaderProto;
import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcPayloadOperationProto;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.io.WritableUtils;
@ -163,10 +164,10 @@ private class Call {
final Writable rpcRequest; // the serialized rpc request - RpcPayload final Writable rpcRequest; // the serialized rpc request - RpcPayload
Writable rpcResponse; // null if rpc has error Writable rpcResponse; // null if rpc has error
IOException error; // exception, null if success IOException error; // exception, null if success
final RpcKind rpcKind; // Rpc EngineKind final RPC.RpcKind rpcKind; // Rpc EngineKind
boolean done; // true when call is done boolean done; // true when call is done
protected Call(RpcKind rpcKind, Writable param) { protected Call(RPC.RpcKind rpcKind, Writable param) {
this.rpcKind = rpcKind; this.rpcKind = rpcKind;
this.rpcRequest = param; this.rpcRequest = param;
synchronized (Client.this) { synchronized (Client.this) {
@ -613,7 +614,7 @@ public Boolean run() throws IOException {
this.in = new DataInputStream(new BufferedInputStream(inStream)); this.in = new DataInputStream(new BufferedInputStream(inStream));
} }
this.out = new DataOutputStream(new BufferedOutputStream(outStream)); this.out = new DataOutputStream(new BufferedOutputStream(outStream));
writeHeader(); writeConnectionContext();
// update last activity time // update last activity time
touch(); touch();
@ -704,16 +705,17 @@ private void writeConnectionHeader(OutputStream outStream)
out.flush(); out.flush();
} }
/* Write the protocol header for each connection /* Write the connection context header for each connection
* Out is not synchronized because only the first thread does this. * Out is not synchronized because only the first thread does this.
*/ */
private void writeHeader() throws IOException { private void writeConnectionContext() throws IOException {
// Write out the ConnectionHeader // Write out the ConnectionHeader
DataOutputBuffer buf = new DataOutputBuffer(); DataOutputBuffer buf = new DataOutputBuffer();
connectionContext.writeTo(buf); connectionContext.writeTo(buf);
// Write out the payload length // Write out the payload length
int bufLen = buf.getLength(); int bufLen = buf.getLength();
out.writeInt(bufLen); out.writeInt(bufLen);
out.write(buf.getData(), 0, bufLen); out.write(buf.getData(), 0, bufLen);
} }
@ -806,21 +808,22 @@ public void sendParam(Call call) {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug(getName() + " sending #" + call.id); LOG.debug(getName() + " sending #" + call.id);
//for serializing the // Serializing the data to be written.
//data to be written // Format:
// 0) Length of rest below (1 + 2)
// 1) PayloadHeader - is serialized Delimited hence contains length
// 2) the Payload - the RpcRequest
//
d = new DataOutputBuffer(); d = new DataOutputBuffer();
d.writeInt(0); // placeholder for data length RpcPayloadHeaderProto header = ProtoUtil.makeRpcPayloadHeader(
RpcPayloadHeader header = new RpcPayloadHeader( call.rpcKind, RpcPayloadOperationProto.RPC_FINAL_PAYLOAD, call.id);
call.rpcKind, RpcPayloadOperation.RPC_FINAL_PAYLOAD, call.id); header.writeDelimitedTo(d);
header.write(d);
call.rpcRequest.write(d); call.rpcRequest.write(d);
byte[] data = d.getData(); byte[] data = d.getData();
int dataLength = d.getLength() - 4;
data[0] = (byte)((dataLength >>> 24) & 0xff); int totalLength = d.getLength();
data[1] = (byte)((dataLength >>> 16) & 0xff); out.writeInt(totalLength); // Total Length
data[2] = (byte)((dataLength >>> 8) & 0xff); out.write(data, 0, totalLength);//PayloadHeader + RpcRequest
data[3] = (byte)(dataLength & 0xff);
out.write(data, 0, dataLength + 4);//write the data
out.flush(); out.flush();
} }
} catch(IOException e) { } catch(IOException e) {
@ -937,7 +940,7 @@ private class ParallelCall extends Call {
private int index; private int index;
public ParallelCall(Writable param, ParallelResults results, int index) { public ParallelCall(Writable param, ParallelResults results, int index) {
super(RpcKind.RPC_WRITABLE, param); super(RPC.RpcKind.RPC_WRITABLE, param);
this.results = results; this.results = results;
this.index = index; this.index = index;
} }
@ -1022,22 +1025,22 @@ public void stop() {
} }
/** /**
* Same as {@link #call(RpcPayloadHeader.RpcKind, Writable, ConnectionId)} * Same as {@link #call(RPC.RpcKind, Writable, ConnectionId)}
* for RPC_BUILTIN * for RPC_BUILTIN
*/ */
public Writable call(Writable param, InetSocketAddress address) public Writable call(Writable param, InetSocketAddress address)
throws InterruptedException, IOException { throws InterruptedException, IOException {
return call(RpcKind.RPC_BUILTIN, param, address); return call(RPC.RpcKind.RPC_BUILTIN, param, address);
} }
/** Make a call, passing <code>param</code>, to the IPC server running at /** Make a call, passing <code>param</code>, to the IPC server running at
* <code>address</code>, returning the value. Throws exceptions if there are * <code>address</code>, returning the value. Throws exceptions if there are
* network problems or if the remote code threw an exception. * network problems or if the remote code threw an exception.
* @deprecated Use {@link #call(RpcPayloadHeader.RpcKind, Writable, * @deprecated Use {@link #call(RPC.RpcKind, Writable,
* ConnectionId)} instead * ConnectionId)} instead
*/ */
@Deprecated @Deprecated
public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress address) public Writable call(RPC.RpcKind rpcKind, Writable param, InetSocketAddress address)
throws InterruptedException, IOException { throws InterruptedException, IOException {
return call(rpcKind, param, address, null); return call(rpcKind, param, address, null);
} }
@ -1047,11 +1050,11 @@ public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress address)
* the value. * the value.
* Throws exceptions if there are network problems or if the remote code * Throws exceptions if there are network problems or if the remote code
* threw an exception. * threw an exception.
* @deprecated Use {@link #call(RpcPayloadHeader.RpcKind, Writable, * @deprecated Use {@link #call(RPC.RpcKind, Writable,
* ConnectionId)} instead * ConnectionId)} instead
*/ */
@Deprecated @Deprecated
public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress addr, public Writable call(RPC.RpcKind rpcKind, Writable param, InetSocketAddress addr,
UserGroupInformation ticket) UserGroupInformation ticket)
throws InterruptedException, IOException { throws InterruptedException, IOException {
ConnectionId remoteId = ConnectionId.getConnectionId(addr, null, ticket, 0, ConnectionId remoteId = ConnectionId.getConnectionId(addr, null, ticket, 0,
@ -1065,11 +1068,11 @@ public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress addr,
* timeout, returning the value. * timeout, returning the value.
* Throws exceptions if there are network problems or if the remote code * Throws exceptions if there are network problems or if the remote code
* threw an exception. * threw an exception.
* @deprecated Use {@link #call(RpcPayloadHeader.RpcKind, Writable, * @deprecated Use {@link #call(RPC.RpcKind, Writable,
* ConnectionId)} instead * ConnectionId)} instead
*/ */
@Deprecated @Deprecated
public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress addr, public Writable call(RPC.RpcKind rpcKind, Writable param, InetSocketAddress addr,
Class<?> protocol, UserGroupInformation ticket, Class<?> protocol, UserGroupInformation ticket,
int rpcTimeout) int rpcTimeout)
throws InterruptedException, IOException { throws InterruptedException, IOException {
@ -1080,7 +1083,7 @@ public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress addr,
/** /**
* Same as {@link #call(RpcPayloadHeader.RpcKind, Writable, InetSocketAddress, * Same as {@link #call(RPC.RpcKind, Writable, InetSocketAddress,
* Class, UserGroupInformation, int, Configuration)} * Class, UserGroupInformation, int, Configuration)}
* except that rpcKind is writable. * except that rpcKind is writable.
*/ */
@ -1090,7 +1093,7 @@ public Writable call(Writable param, InetSocketAddress addr,
throws InterruptedException, IOException { throws InterruptedException, IOException {
ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol, ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol,
ticket, rpcTimeout, conf); ticket, rpcTimeout, conf);
return call(RpcKind.RPC_BUILTIN, param, remoteId); return call(RPC.RpcKind.RPC_BUILTIN, param, remoteId);
} }
/** /**
@ -1101,7 +1104,7 @@ public Writable call(Writable param, InetSocketAddress addr,
* value. Throws exceptions if there are network problems or if the remote * value. Throws exceptions if there are network problems or if the remote
* code threw an exception. * code threw an exception.
*/ */
public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress addr, public Writable call(RPC.RpcKind rpcKind, Writable param, InetSocketAddress addr,
Class<?> protocol, UserGroupInformation ticket, Class<?> protocol, UserGroupInformation ticket,
int rpcTimeout, Configuration conf) int rpcTimeout, Configuration conf)
throws InterruptedException, IOException { throws InterruptedException, IOException {
@ -1111,12 +1114,12 @@ public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress addr,
} }
/** /**
* Same as {link {@link #call(RpcPayloadHeader.RpcKind, Writable, ConnectionId)} * Same as {link {@link #call(RPC.RpcKind, Writable, ConnectionId)}
* except the rpcKind is RPC_BUILTIN * except the rpcKind is RPC_BUILTIN
*/ */
public Writable call(Writable param, ConnectionId remoteId) public Writable call(Writable param, ConnectionId remoteId)
throws InterruptedException, IOException { throws InterruptedException, IOException {
return call(RpcKind.RPC_BUILTIN, param, remoteId); return call(RPC.RpcKind.RPC_BUILTIN, param, remoteId);
} }
/** /**
@ -1130,7 +1133,7 @@ public Writable call(Writable param, ConnectionId remoteId)
* Throws exceptions if there are network problems or if the remote code * Throws exceptions if there are network problems or if the remote code
* threw an exception. * threw an exception.
*/ */
public Writable call(RpcKind rpcKind, Writable rpcRequest, public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
ConnectionId remoteId) throws InterruptedException, IOException { ConnectionId remoteId) throws InterruptedException, IOException {
Call call = new Call(rpcKind, rpcRequest); Call call = new Call(rpcKind, rpcRequest);
Connection connection = getConnection(remoteId, call); Connection connection = getConnection(remoteId, call);

View File

@ -38,7 +38,6 @@
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.Client.ConnectionId; import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.ipc.RPC.RpcInvoker; import org.apache.hadoop.ipc.RPC.RpcInvoker;
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcRequestProto; import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcRequestProto;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
@ -61,7 +60,7 @@ public class ProtobufRpcEngine implements RpcEngine {
static { // Register the rpcRequest deserializer for WritableRpcEngine static { // Register the rpcRequest deserializer for WritableRpcEngine
org.apache.hadoop.ipc.Server.registerProtocolEngine( org.apache.hadoop.ipc.Server.registerProtocolEngine(
RpcKind.RPC_PROTOCOL_BUFFER, RpcRequestWritable.class, RPC.RpcKind.RPC_PROTOCOL_BUFFER, RpcRequestWritable.class,
new Server.ProtoBufRpcInvoker()); new Server.ProtoBufRpcInvoker());
} }
@ -182,7 +181,7 @@ public Object invoke(Object proxy, Method method, Object[] args)
HadoopRpcRequestProto rpcRequest = constructRpcRequest(method, args); HadoopRpcRequestProto rpcRequest = constructRpcRequest(method, args);
RpcResponseWritable val = null; RpcResponseWritable val = null;
try { try {
val = (RpcResponseWritable) client.call(RpcKind.RPC_PROTOCOL_BUFFER, val = (RpcResponseWritable) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
new RpcRequestWritable(rpcRequest), remoteId); new RpcRequestWritable(rpcRequest), remoteId);
} catch (Throwable e) { } catch (Throwable e) {
throw new ServiceException(e); throw new ServiceException(e);
@ -351,7 +350,7 @@ public Server(Class<?> protocolClass, Object protocolImpl,
numReaders, queueSizePerHandler, conf, classNameBase(protocolImpl numReaders, queueSizePerHandler, conf, classNameBase(protocolImpl
.getClass().getName()), secretManager, portRangeConfig); .getClass().getName()), secretManager, portRangeConfig);
this.verbose = verbose; this.verbose = verbose;
registerProtocolAndImpl(RpcKind.RPC_PROTOCOL_BUFFER, protocolClass, registerProtocolAndImpl(RPC.RpcKind.RPC_PROTOCOL_BUFFER, protocolClass,
protocolImpl); protocolImpl);
} }
@ -363,10 +362,10 @@ private static ProtoClassProtoImpl getProtocolImpl(RPC.Server server,
String protoName, long version) throws IOException { String protoName, long version) throws IOException {
ProtoNameVer pv = new ProtoNameVer(protoName, version); ProtoNameVer pv = new ProtoNameVer(protoName, version);
ProtoClassProtoImpl impl = ProtoClassProtoImpl impl =
server.getProtocolImplMap(RpcKind.RPC_PROTOCOL_BUFFER).get(pv); server.getProtocolImplMap(RPC.RpcKind.RPC_PROTOCOL_BUFFER).get(pv);
if (impl == null) { // no match for Protocol AND Version if (impl == null) { // no match for Protocol AND Version
VerProtocolImpl highest = VerProtocolImpl highest =
server.getHighestSupportedProtocol(RpcKind.RPC_PROTOCOL_BUFFER, server.getHighestSupportedProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
protoName); protoName);
if (highest == null) { if (highest == null) {
throw new IOException("Unknown protocol: " + protoName); throw new IOException("Unknown protocol: " + protoName);

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.ipc; package org.apache.hadoop.ipc;
import org.apache.hadoop.ipc.RPC.Server.VerProtocolImpl; import org.apache.hadoop.ipc.RPC.Server.VerProtocolImpl;
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureRequestProto; import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureRequestProto;
import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureResponseProto; import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureResponseProto;
import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolVersionsRequestProto; import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolVersionsRequestProto;
@ -49,7 +48,7 @@ public GetProtocolVersionsResponseProto getProtocolVersions(
String protocol = request.getProtocol(); String protocol = request.getProtocol();
GetProtocolVersionsResponseProto.Builder builder = GetProtocolVersionsResponseProto.Builder builder =
GetProtocolVersionsResponseProto.newBuilder(); GetProtocolVersionsResponseProto.newBuilder();
for (RpcKind r : RpcKind.values()) { for (RPC.RpcKind r : RPC.RpcKind.values()) {
long[] versions; long[] versions;
try { try {
versions = getProtocolVersionForRpcKind(r, protocol); versions = getProtocolVersionForRpcKind(r, protocol);
@ -78,7 +77,7 @@ public GetProtocolSignatureResponseProto getProtocolSignature(
String rpcKind = request.getRpcKind(); String rpcKind = request.getRpcKind();
long[] versions; long[] versions;
try { try {
versions = getProtocolVersionForRpcKind(RpcKind.valueOf(rpcKind), versions = getProtocolVersionForRpcKind(RPC.RpcKind.valueOf(rpcKind),
protocol); protocol);
} catch (ClassNotFoundException e1) { } catch (ClassNotFoundException e1) {
throw new ServiceException(e1); throw new ServiceException(e1);
@ -104,7 +103,7 @@ public GetProtocolSignatureResponseProto getProtocolSignature(
return builder.build(); return builder.build();
} }
private long[] getProtocolVersionForRpcKind(RpcKind rpcKind, private long[] getProtocolVersionForRpcKind(RPC.RpcKind rpcKind,
String protocol) throws ClassNotFoundException { String protocol) throws ClassNotFoundException {
Class<?> protocolClass = Class.forName(protocol); Class<?> protocolClass = Class.forName(protocol);
String protocolName = RPC.getProtocolName(protocolClass); String protocolName = RPC.getProtocolName(protocolClass);

View File

@ -42,7 +42,6 @@
import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.io.*; import org.apache.hadoop.io.*;
import org.apache.hadoop.ipc.Client.ConnectionId; import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolInfoService; import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolInfoService;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SaslRpcServer; import org.apache.hadoop.security.SaslRpcServer;
@ -73,6 +72,18 @@
* the protocol instance is transmitted. * the protocol instance is transmitted.
*/ */
public class RPC { public class RPC {
public enum RpcKind {
RPC_BUILTIN ((short) 1), // Used for built in calls by tests
RPC_WRITABLE ((short) 2), // Use WritableRpcEngine
RPC_PROTOCOL_BUFFER ((short) 3); // Use ProtobufRpcEngine
final static short MAX_INDEX = RPC_PROTOCOL_BUFFER.value; // used for array size
private static final short FIRST_INDEX = RPC_BUILTIN.value;
public final short value; //TODO make it private
RpcKind(short val) {
this.value = val;
}
}
interface RpcInvoker { interface RpcInvoker {
/** /**
@ -777,7 +788,7 @@ static class ProtoClassProtoImpl {
ArrayList<Map<ProtoNameVer, ProtoClassProtoImpl>> protocolImplMapArray = ArrayList<Map<ProtoNameVer, ProtoClassProtoImpl>> protocolImplMapArray =
new ArrayList<Map<ProtoNameVer, ProtoClassProtoImpl>>(RpcKind.MAX_INDEX); new ArrayList<Map<ProtoNameVer, ProtoClassProtoImpl>>(RpcKind.MAX_INDEX);
Map<ProtoNameVer, ProtoClassProtoImpl> getProtocolImplMap(RpcKind rpcKind) { Map<ProtoNameVer, ProtoClassProtoImpl> getProtocolImplMap(RPC.RpcKind rpcKind) {
if (protocolImplMapArray.size() == 0) {// initialize for all rpc kinds if (protocolImplMapArray.size() == 0) {// initialize for all rpc kinds
for (int i=0; i <= RpcKind.MAX_INDEX; ++i) { for (int i=0; i <= RpcKind.MAX_INDEX; ++i) {
protocolImplMapArray.add( protocolImplMapArray.add(
@ -821,7 +832,7 @@ static class VerProtocolImpl {
@SuppressWarnings("unused") // will be useful later. @SuppressWarnings("unused") // will be useful later.
VerProtocolImpl[] getSupportedProtocolVersions(RpcKind rpcKind, VerProtocolImpl[] getSupportedProtocolVersions(RPC.RpcKind rpcKind,
String protocolName) { String protocolName) {
VerProtocolImpl[] resultk = VerProtocolImpl[] resultk =
new VerProtocolImpl[getProtocolImplMap(rpcKind).size()]; new VerProtocolImpl[getProtocolImplMap(rpcKind).size()];
@ -900,7 +911,7 @@ public Server addProtocol(RpcKind rpcKind, Class<?> protocolClass,
} }
@Override @Override
public Writable call(RpcKind rpcKind, String protocol, public Writable call(RPC.RpcKind rpcKind, String protocol,
Writable rpcRequest, long receiveTime) throws Exception { Writable rpcRequest, long receiveTime) throws Exception {
return getRpcInvoker(rpcKind).call(this, protocol, rpcRequest, return getRpcInvoker(rpcKind).call(this, protocol, rpcRequest,
receiveTime); receiveTime);

View File

@ -27,7 +27,6 @@
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureRequestProto; import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureRequestProto;
import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureResponseProto; import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureResponseProto;
import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolSignatureProto; import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolSignatureProto;
@ -107,7 +106,7 @@ private static Map<Long, ProtocolSignature> getVersionSignatureMap(
* @throws IOException * @throws IOException
*/ */
public static boolean isMethodSupported(Object rpcProxy, Class<?> protocol, public static boolean isMethodSupported(Object rpcProxy, Class<?> protocol,
RpcKind rpcKind, long version, String methodName) throws IOException { RPC.RpcKind rpcKind, long version, String methodName) throws IOException {
InetSocketAddress serverAddress = RPC.getServerAddress(rpcProxy); InetSocketAddress serverAddress = RPC.getServerAddress(rpcProxy);
Map<Long, ProtocolSignature> versionMap = getVersionSignatureMap( Map<Long, ProtocolSignature> versionMap = getVersionSignatureMap(
serverAddress, protocol.getName(), rpcKind.toString()); serverAddress, protocol.getName(), rpcKind.toString());

View File

@ -1,118 +0,0 @@
package org.apache.hadoop.ipc;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
/**
* This is the rpc payload header. It is sent with every rpc call
* <pre>
* The format of RPC call is as follows:
* +---------------------------------------------------+
* | Rpc length in bytes (header + payload length) |
* +---------------------------------------------------+
* | Rpc Header | Rpc Payload |
* +---------------------------------------------------+
*
* The format of Rpc Header is:
* +----------------------------------+
* | RpcKind (1 bytes) |
* +----------------------------------+
* | RpcPayloadOperation (1 bytes) |
* +----------------------------------+
* | Call ID (4 bytes) |
* +----------------------------------+
*
* {@link RpcKind} determines the type of serialization used for Rpc Payload.
* </pre>
* <p>
* <b>Note this header does NOT have its own version number,
* it used the version number from the connection header. </b>
*/
public class RpcPayloadHeader implements Writable {
public enum RpcPayloadOperation {
RPC_FINAL_PAYLOAD ((short)1),
RPC_CONTINUATION_PAYLOAD ((short)2), // not implemented yet
RPC_CLOSE_CONNECTION ((short)3); // close the rpc connection
private final short code;
private static final short FIRST_INDEX = RPC_FINAL_PAYLOAD.code;
RpcPayloadOperation(short val) {
this.code = val;
}
public void write(DataOutput out) throws IOException {
out.writeByte(code);
}
static RpcPayloadOperation readFields(DataInput in) throws IOException {
short inValue = in.readByte();
return RpcPayloadOperation.values()[inValue - FIRST_INDEX];
}
}
public enum RpcKind {
RPC_BUILTIN ((short) 1), // Used for built in calls by tests
RPC_WRITABLE ((short) 2), // Use WritableRpcEngine
RPC_PROTOCOL_BUFFER ((short) 3); // Use ProtobufRpcEngine
final static short MAX_INDEX = RPC_PROTOCOL_BUFFER.value; // used for array size
private static final short FIRST_INDEX = RPC_BUILTIN.value;
private final short value;
RpcKind(short val) {
this.value = val;
}
public void write(DataOutput out) throws IOException {
out.writeByte(value);
}
static RpcKind readFields(DataInput in) throws IOException {
short inValue = in.readByte();
return RpcKind.values()[inValue - FIRST_INDEX];
}
}
private RpcKind kind;
private RpcPayloadOperation operation;
private int callId;
public RpcPayloadHeader() {
kind = RpcKind.RPC_WRITABLE;
operation = RpcPayloadOperation.RPC_CLOSE_CONNECTION;
}
public RpcPayloadHeader(RpcKind kind, RpcPayloadOperation op, int callId) {
this.kind = kind;
this.operation = op;
this.callId = callId;
}
int getCallId() {
return callId;
}
RpcKind getkind() {
return kind;
}
RpcPayloadOperation getOperation() {
return operation;
}
@Override
public void write(DataOutput out) throws IOException {
kind.write(out);
operation.write(out);
out.writeInt(callId);
}
@Override
public void readFields(DataInput in) throws IOException {
kind = RpcKind.readFields(in);
operation = RpcPayloadOperation.readFields(in);
this.callId = in.readInt();
}
}

View File

@ -72,11 +72,10 @@
import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.ipc.RPC.RpcInvoker; import org.apache.hadoop.ipc.RPC.RpcInvoker;
import org.apache.hadoop.ipc.RPC.VersionMismatch; import org.apache.hadoop.ipc.RPC.VersionMismatch;
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcPayloadOperation;
import org.apache.hadoop.ipc.metrics.RpcDetailedMetrics; import org.apache.hadoop.ipc.metrics.RpcDetailedMetrics;
import org.apache.hadoop.ipc.metrics.RpcMetrics; import org.apache.hadoop.ipc.metrics.RpcMetrics;
import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto; import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto;
import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.*;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.SaslRpcServer; import org.apache.hadoop.security.SaslRpcServer;
@ -170,8 +169,8 @@ static class RpcKindMapValue {
this.rpcRequestWrapperClass = rpcRequestWrapperClass; this.rpcRequestWrapperClass = rpcRequestWrapperClass;
} }
} }
static Map<RpcKind, RpcKindMapValue> rpcKindMap = new static Map<RPC.RpcKind, RpcKindMapValue> rpcKindMap = new
HashMap<RpcKind, RpcKindMapValue>(4); HashMap<RPC.RpcKind, RpcKindMapValue>(4);
@ -185,7 +184,7 @@ static class RpcKindMapValue {
* @param rpcInvoker - use to process the calls on SS. * @param rpcInvoker - use to process the calls on SS.
*/ */
public static void registerProtocolEngine(RpcKind rpcKind, public static void registerProtocolEngine(RPC.RpcKind rpcKind,
Class<? extends Writable> rpcRequestWrapperClass, Class<? extends Writable> rpcRequestWrapperClass,
RpcInvoker rpcInvoker) { RpcInvoker rpcInvoker) {
RpcKindMapValue old = RpcKindMapValue old =
@ -201,14 +200,14 @@ public static void registerProtocolEngine(RpcKind rpcKind,
} }
public Class<? extends Writable> getRpcRequestWrapper( public Class<? extends Writable> getRpcRequestWrapper(
RpcKind rpcKind) { RpcKindProto rpcKind) {
if (rpcRequestClass != null) if (rpcRequestClass != null)
return rpcRequestClass; return rpcRequestClass;
RpcKindMapValue val = rpcKindMap.get(rpcKind); RpcKindMapValue val = rpcKindMap.get(ProtoUtil.convert(rpcKind));
return (val == null) ? null : val.rpcRequestWrapperClass; return (val == null) ? null : val.rpcRequestWrapperClass;
} }
public static RpcInvoker getRpcInvoker(RpcKind rpcKind) { public static RpcInvoker getRpcInvoker(RPC.RpcKind rpcKind) {
RpcKindMapValue val = rpcKindMap.get(rpcKind); RpcKindMapValue val = rpcKindMap.get(rpcKind);
return (val == null) ? null : val.rpcInvoker; return (val == null) ? null : val.rpcInvoker;
} }
@ -403,12 +402,12 @@ private static class Call {
private long timestamp; // time received when response is null private long timestamp; // time received when response is null
// time served when response is not null // time served when response is not null
private ByteBuffer rpcResponse; // the response for this call private ByteBuffer rpcResponse; // the response for this call
private final RpcKind rpcKind; private final RPC.RpcKind rpcKind;
public Call(int id, Writable param, Connection connection) { public Call(int id, Writable param, Connection connection) {
this( id, param, connection, RpcKind.RPC_BUILTIN ); this( id, param, connection, RPC.RpcKind.RPC_BUILTIN );
} }
public Call(int id, Writable param, Connection connection, RpcKind kind) { public Call(int id, Writable param, Connection connection, RPC.RpcKind kind) {
this.callId = id; this.callId = id;
this.rpcRequest = param; this.rpcRequest = param;
this.connection = connection; this.connection = connection;
@ -1366,7 +1365,6 @@ public int readAndProcess() throws IOException, InterruptedException {
if (data == null) { if (data == null) {
dataLengthBuffer.flip(); dataLengthBuffer.flip();
dataLength = dataLengthBuffer.getInt(); dataLength = dataLengthBuffer.getInt();
if ((dataLength == Client.PING_CALL_ID) && (!useWrap)) { if ((dataLength == Client.PING_CALL_ID) && (!useWrap)) {
// covers the !useSasl too // covers the !useSasl too
dataLengthBuffer.clear(); dataLengthBuffer.clear();
@ -1555,22 +1553,27 @@ private void processOneRpc(byte[] buf) throws IOException,
private void processData(byte[] buf) throws IOException, InterruptedException { private void processData(byte[] buf) throws IOException, InterruptedException {
DataInputStream dis = DataInputStream dis =
new DataInputStream(new ByteArrayInputStream(buf)); new DataInputStream(new ByteArrayInputStream(buf));
RpcPayloadHeader header = new RpcPayloadHeader(); RpcPayloadHeaderProto header = RpcPayloadHeaderProto.parseDelimitedFrom(dis);
header.readFields(dis); // Read the RpcPayload header
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug(" got #" + header.getCallId()); LOG.debug(" got #" + header.getCallId());
if (header.getOperation() != RpcPayloadOperation.RPC_FINAL_PAYLOAD) { if (!header.hasRpcOp()) {
throw new IOException(" IPC Server: No rpc op in rpcPayloadHeader");
}
if (header.getRpcOp() != RpcPayloadOperationProto.RPC_FINAL_PAYLOAD) {
throw new IOException("IPC Server does not implement operation" + throw new IOException("IPC Server does not implement operation" +
header.getOperation()); header.getRpcOp());
} }
// If we know the rpc kind, get its class so that we can deserialize // If we know the rpc kind, get its class so that we can deserialize
// (Note it would make more sense to have the handler deserialize but // (Note it would make more sense to have the handler deserialize but
// we continue with this original design. // we continue with this original design.
if (!header.hasRpcKind()) {
throw new IOException(" IPC Server: No rpc kind in rpcPayloadHeader");
}
Class<? extends Writable> rpcRequestClass = Class<? extends Writable> rpcRequestClass =
getRpcRequestWrapper(header.getkind()); getRpcRequestWrapper(header.getRpcKind());
if (rpcRequestClass == null) { if (rpcRequestClass == null) {
LOG.warn("Unknown rpc kind " + header.getkind() + LOG.warn("Unknown rpc kind " + header.getRpcKind() +
" from client " + getHostAddress()); " from client " + getHostAddress());
final Call readParamsFailedCall = final Call readParamsFailedCall =
new Call(header.getCallId(), null, this); new Call(header.getCallId(), null, this);
@ -1578,7 +1581,7 @@ private void processData(byte[] buf) throws IOException, InterruptedException {
setupResponse(responseBuffer, readParamsFailedCall, Status.FATAL, null, setupResponse(responseBuffer, readParamsFailedCall, Status.FATAL, null,
IOException.class.getName(), IOException.class.getName(),
"Unknown rpc kind " + header.getkind()); "Unknown rpc kind " + header.getRpcKind());
responder.doRespond(readParamsFailedCall); responder.doRespond(readParamsFailedCall);
return; return;
} }
@ -1589,7 +1592,7 @@ private void processData(byte[] buf) throws IOException, InterruptedException {
} catch (Throwable t) { } catch (Throwable t) {
LOG.warn("Unable to read call parameters for client " + LOG.warn("Unable to read call parameters for client " +
getHostAddress() + "on connection protocol " + getHostAddress() + "on connection protocol " +
this.protocolName + " for rpcKind " + header.getkind(), t); this.protocolName + " for rpcKind " + header.getRpcKind(), t);
final Call readParamsFailedCall = final Call readParamsFailedCall =
new Call(header.getCallId(), null, this); new Call(header.getCallId(), null, this);
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream(); ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
@ -1601,7 +1604,8 @@ private void processData(byte[] buf) throws IOException, InterruptedException {
return; return;
} }
Call call = new Call(header.getCallId(), rpcRequest, this, header.getkind()); Call call = new Call(header.getCallId(), rpcRequest, this,
ProtoUtil.convert(header.getRpcKind()));
callQueue.put(call); // queue the call; maybe blocked here callQueue.put(call); // queue the call; maybe blocked here
incRpcCount(); // Increment the rpc count incRpcCount(); // Increment the rpc count
} }
@ -1991,11 +1995,11 @@ public synchronized InetSocketAddress getListenerAddress() {
*/ */
@Deprecated @Deprecated
public Writable call(Writable param, long receiveTime) throws Exception { public Writable call(Writable param, long receiveTime) throws Exception {
return call(RpcKind.RPC_BUILTIN, null, param, receiveTime); return call(RPC.RpcKind.RPC_BUILTIN, null, param, receiveTime);
} }
/** Called for each call. */ /** Called for each call. */
public abstract Writable call(RpcKind rpcKind, String protocol, public abstract Writable call(RPC.RpcKind rpcKind, String protocol,
Writable param, long receiveTime) throws Exception; Writable param, long receiveTime) throws Exception;
/** /**

View File

@ -33,7 +33,6 @@
import org.apache.hadoop.io.*; import org.apache.hadoop.io.*;
import org.apache.hadoop.ipc.Client.ConnectionId; import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.ipc.RPC.RpcInvoker; import org.apache.hadoop.ipc.RPC.RpcInvoker;
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
import org.apache.hadoop.ipc.VersionedProtocol; import org.apache.hadoop.ipc.VersionedProtocol;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.SecretManager;
@ -75,7 +74,7 @@ public static synchronized void ensureInitialized() {
* Register the rpcRequest deserializer for WritableRpcEngine * Register the rpcRequest deserializer for WritableRpcEngine
*/ */
private static synchronized void initialize() { private static synchronized void initialize() {
org.apache.hadoop.ipc.Server.registerProtocolEngine(RpcKind.RPC_WRITABLE, org.apache.hadoop.ipc.Server.registerProtocolEngine(RPC.RpcKind.RPC_WRITABLE,
Invocation.class, new Server.WritableRpcInvoker()); Invocation.class, new Server.WritableRpcInvoker());
isInitialized = true; isInitialized = true;
} }
@ -223,7 +222,7 @@ public Object invoke(Object proxy, Method method, Object[] args)
} }
ObjectWritable value = (ObjectWritable) ObjectWritable value = (ObjectWritable)
client.call(RpcKind.RPC_WRITABLE, new Invocation(method, args), remoteId); client.call(RPC.RpcKind.RPC_WRITABLE, new Invocation(method, args), remoteId);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
long callTime = System.currentTimeMillis() - startTime; long callTime = System.currentTimeMillis() - startTime;
LOG.debug("Call: " + method.getName() + " " + callTime); LOG.debug("Call: " + method.getName() + " " + callTime);
@ -412,12 +411,12 @@ public Server(Class<?> protocolClass, Object protocolImpl,
protocolImpl.getClass()); protocolImpl.getClass());
} }
// register protocol class and its super interfaces // register protocol class and its super interfaces
registerProtocolAndImpl(RpcKind.RPC_WRITABLE, protocolClass, protocolImpl); registerProtocolAndImpl(RPC.RpcKind.RPC_WRITABLE, protocolClass, protocolImpl);
protocols = RPC.getProtocolInterfaces(protocolClass); protocols = RPC.getProtocolInterfaces(protocolClass);
} }
for (Class<?> p : protocols) { for (Class<?> p : protocols) {
if (!p.equals(VersionedProtocol.class)) { if (!p.equals(VersionedProtocol.class)) {
registerProtocolAndImpl(RpcKind.RPC_WRITABLE, p, protocolImpl); registerProtocolAndImpl(RPC.RpcKind.RPC_WRITABLE, p, protocolImpl);
} }
} }
@ -461,7 +460,7 @@ public Writable call(org.apache.hadoop.ipc.RPC.Server server,
// registered directly. // registered directly.
// Send the call to the highest protocol version // Send the call to the highest protocol version
VerProtocolImpl highest = server.getHighestSupportedProtocol( VerProtocolImpl highest = server.getHighestSupportedProtocol(
RpcKind.RPC_WRITABLE, protocolName); RPC.RpcKind.RPC_WRITABLE, protocolName);
if (highest == null) { if (highest == null) {
throw new IOException("Unknown protocol: " + protocolName); throw new IOException("Unknown protocol: " + protocolName);
} }
@ -473,10 +472,10 @@ public Writable call(org.apache.hadoop.ipc.RPC.Server server,
ProtoNameVer pv = ProtoNameVer pv =
new ProtoNameVer(call.declaringClassProtocolName, clientVersion); new ProtoNameVer(call.declaringClassProtocolName, clientVersion);
protocolImpl = protocolImpl =
server.getProtocolImplMap(RpcKind.RPC_WRITABLE).get(pv); server.getProtocolImplMap(RPC.RpcKind.RPC_WRITABLE).get(pv);
if (protocolImpl == null) { // no match for Protocol AND Version if (protocolImpl == null) { // no match for Protocol AND Version
VerProtocolImpl highest = VerProtocolImpl highest =
server.getHighestSupportedProtocol(RpcKind.RPC_WRITABLE, server.getHighestSupportedProtocol(RPC.RpcKind.RPC_WRITABLE,
protoName); protoName);
if (highest == null) { if (highest == null) {
throw new IOException("Unknown protocol: " + protoName); throw new IOException("Unknown protocol: " + protoName);

View File

@ -21,8 +21,10 @@
import java.io.DataInput; import java.io.DataInput;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto; import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto;
import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.UserInformationProto; import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.UserInformationProto;
import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.*;
import org.apache.hadoop.security.SaslRpcServer.AuthMethod; import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
@ -135,4 +137,30 @@ public static UserGroupInformation getUgi(UserInformationProto userInfo) {
} }
return ugi; return ugi;
} }
static RpcKindProto convert(RPC.RpcKind kind) {
switch (kind) {
case RPC_BUILTIN: return RpcKindProto.RPC_BUILTIN;
case RPC_WRITABLE: return RpcKindProto.RPC_WRITABLE;
case RPC_PROTOCOL_BUFFER: return RpcKindProto.RPC_PROTOCOL_BUFFER;
}
return null;
}
public static RPC.RpcKind convert( RpcKindProto kind) {
switch (kind) {
case RPC_BUILTIN: return RPC.RpcKind.RPC_BUILTIN;
case RPC_WRITABLE: return RPC.RpcKind.RPC_WRITABLE;
case RPC_PROTOCOL_BUFFER: return RPC.RpcKind.RPC_PROTOCOL_BUFFER;
}
return null;
}
public static RpcPayloadHeaderProto makeRpcPayloadHeader(RPC.RpcKind rpcKind,
RpcPayloadOperationProto operation, int callId) {
RpcPayloadHeaderProto.Builder result = RpcPayloadHeaderProto.newBuilder();
result.setRpcKind(convert(rpcKind)).setRpcOp(operation).setCallId(callId);
return result.build();
}
} }

View File

@ -0,0 +1,58 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
option java_package = "org.apache.hadoop.ipc.protobuf";
option java_outer_classname = "RpcPayloadHeaderProtos";
option java_generate_equals_and_hash = true;
/**
* This is the rpc payload header. It is sent with every rpc call.
*
* The format of RPC call is as follows:
* +-----------------------------------------------------+
* | Rpc length in bytes |
* +-----------------------------------------------------+
* | RpcPayloadHeader - serialized delimited ie has len |
* +-----------------------------------------------------+
* | RpcRequest Payload |
* +-----------------------------------------------------+
*
*/
/**
* RpcKind determine the rpcEngine and the serialization of the rpc payload
*/
enum RpcKindProto {
RPC_BUILTIN = 0; // Used for built in calls by tests
RPC_WRITABLE = 1; // Use WritableRpcEngine
RPC_PROTOCOL_BUFFER = 2; // Use ProtobufRpcEngine
}
enum RpcPayloadOperationProto {
RPC_FINAL_PAYLOAD = 0; // The final payload
RPC_CONTINUATION_PAYLOAD = 1; // not implemented yet
RPC_CLOSE_CONNECTION = 2; // close the rpc connection
}
message RpcPayloadHeaderProto { // the header for the RpcRequest
optional RpcKindProto rpcKind = 1;
optional RpcPayloadOperationProto rpcOp = 2;
optional uint32 callId = 3; // each rpc has a callId that is also used in response
}

View File

@ -25,7 +25,6 @@
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
@ -99,7 +98,7 @@ public TestServer(int handlerCount, boolean sleep,
} }
@Override @Override
public Writable call(RpcKind rpcKind, String protocol, Writable param, public Writable call(RPC.RpcKind rpcKind, String protocol, Writable param,
long receiveTime) throws IOException { long receiveTime) throws IOException {
if (sleep) { if (sleep) {
// sleep a bit // sleep a bit

View File

@ -30,7 +30,6 @@
import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
/** /**
@ -73,7 +72,7 @@ public TestServer(final int handlerCount, final boolean sleep)
} }
@Override @Override
public Writable call(RpcKind rpcKind, String protocol, Writable param, public Writable call(RPC.RpcKind rpcKind, String protocol, Writable param,
long receiveTime) throws IOException { long receiveTime) throws IOException {
if (sleep) { if (sleep) {
try { try {

View File

@ -23,7 +23,6 @@
import org.junit.Assert; import org.junit.Assert;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
import org.apache.hadoop.ipc.TestProtoBufRpc.PBServerImpl; import org.apache.hadoop.ipc.TestProtoBufRpc.PBServerImpl;
import org.apache.hadoop.ipc.TestProtoBufRpc.TestRpcService; import org.apache.hadoop.ipc.TestProtoBufRpc.TestRpcService;
import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcProto; import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcProto;
@ -178,9 +177,9 @@ public void setUp() throws Exception {
// create a server with two handlers // create a server with two handlers
server = RPC.getServer(Foo0.class, server = RPC.getServer(Foo0.class,
new Foo0Impl(), ADDRESS, 0, 2, false, conf, null); new Foo0Impl(), ADDRESS, 0, 2, false, conf, null);
server.addProtocol(RpcKind.RPC_WRITABLE, Foo1.class, new Foo1Impl()); server.addProtocol(RPC.RpcKind.RPC_WRITABLE, Foo1.class, new Foo1Impl());
server.addProtocol(RpcKind.RPC_WRITABLE, Bar.class, new BarImpl()); server.addProtocol(RPC.RpcKind.RPC_WRITABLE, Bar.class, new BarImpl());
server.addProtocol(RpcKind.RPC_WRITABLE, Mixin.class, new BarImpl()); server.addProtocol(RPC.RpcKind.RPC_WRITABLE, Mixin.class, new BarImpl());
// Add Protobuf server // Add Protobuf server
@ -189,7 +188,7 @@ public void setUp() throws Exception {
new PBServerImpl(); new PBServerImpl();
BlockingService service = TestProtobufRpcProto BlockingService service = TestProtobufRpcProto
.newReflectiveBlockingService(pbServerImpl); .newReflectiveBlockingService(pbServerImpl);
server.addProtocol(RpcKind.RPC_PROTOCOL_BUFFER, TestRpcService.class, server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, TestRpcService.class,
service); service);
server.start(); server.start();
addr = NetUtils.getConnectAddress(server); addr = NetUtils.getConnectAddress(server);

View File

@ -24,7 +24,6 @@
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
import org.apache.hadoop.ipc.protobuf.TestProtos.EchoRequestProto; import org.apache.hadoop.ipc.protobuf.TestProtos.EchoRequestProto;
import org.apache.hadoop.ipc.protobuf.TestProtos.EchoResponseProto; import org.apache.hadoop.ipc.protobuf.TestProtos.EchoResponseProto;
import org.apache.hadoop.ipc.protobuf.TestProtos.EmptyRequestProto; import org.apache.hadoop.ipc.protobuf.TestProtos.EmptyRequestProto;
@ -122,7 +121,7 @@ public void setUp() throws IOException { // Setup server for both protocols
BlockingService service2 = TestProtobufRpc2Proto BlockingService service2 = TestProtobufRpc2Proto
.newReflectiveBlockingService(server2Impl); .newReflectiveBlockingService(server2Impl);
server.addProtocol(RpcKind.RPC_PROTOCOL_BUFFER, TestRpcService2.class, server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, TestRpcService2.class,
service2); service2);
server.start(); server.start();
} }

View File

@ -31,7 +31,6 @@
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureRequestProto; import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureRequestProto;
import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureResponseProto; import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureResponseProto;
import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolSignatureProto; import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolSignatureProto;
@ -134,7 +133,7 @@ public void testVersion0ClientVersion1Server() throws Exception {
TestImpl1 impl = new TestImpl1(); TestImpl1 impl = new TestImpl1();
server = RPC.getServer(TestProtocol1.class, server = RPC.getServer(TestProtocol1.class,
impl, ADDRESS, 0, 2, false, conf, null); impl, ADDRESS, 0, 2, false, conf, null);
server.addProtocol(RpcKind.RPC_WRITABLE, TestProtocol0.class, impl); server.addProtocol(RPC.RpcKind.RPC_WRITABLE, TestProtocol0.class, impl);
server.start(); server.start();
addr = NetUtils.getConnectAddress(server); addr = NetUtils.getConnectAddress(server);
@ -201,7 +200,7 @@ public void testVersion2ClientVersion1Server() throws Exception {
TestImpl1 impl = new TestImpl1(); TestImpl1 impl = new TestImpl1();
server = RPC.getServer(TestProtocol1.class, server = RPC.getServer(TestProtocol1.class,
impl, ADDRESS, 0, 2, false, conf, null); impl, ADDRESS, 0, 2, false, conf, null);
server.addProtocol(RpcKind.RPC_WRITABLE, TestProtocol0.class, impl); server.addProtocol(RPC.RpcKind.RPC_WRITABLE, TestProtocol0.class, impl);
server.start(); server.start();
addr = NetUtils.getConnectAddress(server); addr = NetUtils.getConnectAddress(server);
@ -222,7 +221,7 @@ public void testVersion2ClientVersion2Server() throws Exception {
TestImpl2 impl = new TestImpl2(); TestImpl2 impl = new TestImpl2();
server = RPC.getServer(TestProtocol2.class, server = RPC.getServer(TestProtocol2.class,
impl, ADDRESS, 0, 2, false, conf, null); impl, ADDRESS, 0, 2, false, conf, null);
server.addProtocol(RpcKind.RPC_WRITABLE, TestProtocol0.class, impl); server.addProtocol(RPC.RpcKind.RPC_WRITABLE, TestProtocol0.class, impl);
server.start(); server.start();
addr = NetUtils.getConnectAddress(server); addr = NetUtils.getConnectAddress(server);
@ -316,11 +315,11 @@ public void testIsMethodSupported() throws IOException {
TestProtocol2 proxy = RPC.getProxy(TestProtocol2.class, TestProtocol2 proxy = RPC.getProxy(TestProtocol2.class,
TestProtocol2.versionID, addr, conf); TestProtocol2.versionID, addr, conf);
boolean supported = RpcClientUtil.isMethodSupported(proxy, boolean supported = RpcClientUtil.isMethodSupported(proxy,
TestProtocol2.class, RpcKind.RPC_WRITABLE, TestProtocol2.class, RPC.RpcKind.RPC_WRITABLE,
RPC.getProtocolVersion(TestProtocol2.class), "echo"); RPC.getProtocolVersion(TestProtocol2.class), "echo");
Assert.assertTrue(supported); Assert.assertTrue(supported);
supported = RpcClientUtil.isMethodSupported(proxy, supported = RpcClientUtil.isMethodSupported(proxy,
TestProtocol2.class, RpcKind.RPC_PROTOCOL_BUFFER, TestProtocol2.class, RPC.RpcKind.RPC_PROTOCOL_BUFFER,
RPC.getProtocolVersion(TestProtocol2.class), "echo"); RPC.getProtocolVersion(TestProtocol2.class), "echo");
Assert.assertFalse(supported); Assert.assertFalse(supported);
} }
@ -334,7 +333,7 @@ public void testProtocolMetaInfoSSTranslatorPB() throws Exception {
TestImpl1 impl = new TestImpl1(); TestImpl1 impl = new TestImpl1();
server = RPC.getServer(TestProtocol1.class, impl, ADDRESS, 0, 2, false, server = RPC.getServer(TestProtocol1.class, impl, ADDRESS, 0, 2, false,
conf, null); conf, null);
server.addProtocol(RpcKind.RPC_WRITABLE, TestProtocol0.class, impl); server.addProtocol(RPC.RpcKind.RPC_WRITABLE, TestProtocol0.class, impl);
server.start(); server.start();
ProtocolMetaInfoServerSideTranslatorPB xlator = ProtocolMetaInfoServerSideTranslatorPB xlator =
@ -343,13 +342,13 @@ public void testProtocolMetaInfoSSTranslatorPB() throws Exception {
GetProtocolSignatureResponseProto resp = xlator.getProtocolSignature( GetProtocolSignatureResponseProto resp = xlator.getProtocolSignature(
null, null,
createGetProtocolSigRequestProto(TestProtocol1.class, createGetProtocolSigRequestProto(TestProtocol1.class,
RpcKind.RPC_PROTOCOL_BUFFER)); RPC.RpcKind.RPC_PROTOCOL_BUFFER));
//No signatures should be found //No signatures should be found
Assert.assertEquals(0, resp.getProtocolSignatureCount()); Assert.assertEquals(0, resp.getProtocolSignatureCount());
resp = xlator.getProtocolSignature( resp = xlator.getProtocolSignature(
null, null,
createGetProtocolSigRequestProto(TestProtocol1.class, createGetProtocolSigRequestProto(TestProtocol1.class,
RpcKind.RPC_WRITABLE)); RPC.RpcKind.RPC_WRITABLE));
Assert.assertEquals(1, resp.getProtocolSignatureCount()); Assert.assertEquals(1, resp.getProtocolSignatureCount());
ProtocolSignatureProto sig = resp.getProtocolSignatureList().get(0); ProtocolSignatureProto sig = resp.getProtocolSignatureList().get(0);
Assert.assertEquals(TestProtocol1.versionID, sig.getVersion()); Assert.assertEquals(TestProtocol1.versionID, sig.getVersion());
@ -366,7 +365,7 @@ public void testProtocolMetaInfoSSTranslatorPB() throws Exception {
} }
private GetProtocolSignatureRequestProto createGetProtocolSigRequestProto( private GetProtocolSignatureRequestProto createGetProtocolSigRequestProto(
Class<?> protocol, RpcKind rpcKind) { Class<?> protocol, RPC.RpcKind rpcKind) {
GetProtocolSignatureRequestProto.Builder builder = GetProtocolSignatureRequestProto.Builder builder =
GetProtocolSignatureRequestProto.newBuilder(); GetProtocolSignatureRequestProto.newBuilder();
builder.setProtocol(protocol.getName()); builder.setProtocol(protocol.getName());

View File

@ -70,6 +70,8 @@ Trunk (unreleased changes)
HDFS-3282. Expose getFileLength API. (umamahesh) HDFS-3282. Expose getFileLength API. (umamahesh)
HADOOP-8285 HDFS changes for Use ProtoBuf for RpcPayLoadHeader (sanjay radia)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-2834. Add a ByteBuffer-based read API to DFSInputStream. HDFS-2834. Add a ByteBuffer-based read API to DFSInputStream.

View File

@ -52,7 +52,6 @@
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
@ -1010,7 +1009,7 @@ public static URI createUri(String scheme, InetSocketAddress address) {
public static void addPBProtocol(Configuration conf, Class<?> protocol, public static void addPBProtocol(Configuration conf, Class<?> protocol,
BlockingService service, RPC.Server server) throws IOException { BlockingService service, RPC.Server server) throws IOException {
RPC.setProtocolEngine(conf, protocol, ProtobufRpcEngine.class); RPC.setProtocolEngine(conf, protocol, ProtobufRpcEngine.class);
server.addProtocol(RpcKind.RPC_PROTOCOL_BUFFER, protocol, service); server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, protocol, service);
} }
/** /**

View File

@ -46,7 +46,6 @@
import org.apache.hadoop.ipc.ProtocolTranslator; import org.apache.hadoop.ipc.ProtocolTranslator;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RpcClientUtil; import org.apache.hadoop.ipc.RpcClientUtil;
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
@ -193,7 +192,7 @@ public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block,
@Override @Override
public boolean isMethodSupported(String methodName) throws IOException { public boolean isMethodSupported(String methodName) throws IOException {
return RpcClientUtil.isMethodSupported(rpcProxy, return RpcClientUtil.isMethodSupported(rpcProxy,
ClientDatanodeProtocolPB.class, RpcKind.RPC_PROTOCOL_BUFFER, ClientDatanodeProtocolPB.class, RPC.RpcKind.RPC_PROTOCOL_BUFFER,
RPC.getProtocolVersion(ClientDatanodeProtocolPB.class), methodName); RPC.getProtocolVersion(ClientDatanodeProtocolPB.class), methodName);
} }

View File

@ -109,7 +109,6 @@
import org.apache.hadoop.ipc.ProtocolMetaInterface; import org.apache.hadoop.ipc.ProtocolMetaInterface;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RpcClientUtil; import org.apache.hadoop.ipc.RpcClientUtil;
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
@ -812,7 +811,7 @@ public void setBalancerBandwidth(long bandwidth) throws IOException {
@Override @Override
public boolean isMethodSupported(String methodName) throws IOException { public boolean isMethodSupported(String methodName) throws IOException {
return RpcClientUtil.isMethodSupported(rpcProxy, return RpcClientUtil.isMethodSupported(rpcProxy,
ClientNamenodeProtocolPB.class, RpcKind.RPC_PROTOCOL_BUFFER, ClientNamenodeProtocolPB.class, RPC.RpcKind.RPC_PROTOCOL_BUFFER,
RPC.getProtocolVersion(ClientNamenodeProtocolPB.class), methodName); RPC.getProtocolVersion(ClientNamenodeProtocolPB.class), methodName);
} }

View File

@ -69,7 +69,6 @@
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.RpcClientUtil; import org.apache.hadoop.ipc.RpcClientUtil;
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
@ -308,7 +307,7 @@ public void commitBlockSynchronization(ExtendedBlock block,
public boolean isMethodSupported(String methodName) public boolean isMethodSupported(String methodName)
throws IOException { throws IOException {
return RpcClientUtil.isMethodSupported(rpcProxy, DatanodeProtocolPB.class, return RpcClientUtil.isMethodSupported(rpcProxy, DatanodeProtocolPB.class,
RpcKind.RPC_PROTOCOL_BUFFER, RPC.RpcKind.RPC_PROTOCOL_BUFFER,
RPC.getProtocolVersion(DatanodeProtocolPB.class), methodName); RPC.getProtocolVersion(DatanodeProtocolPB.class), methodName);
} }
} }

View File

@ -26,7 +26,6 @@
import org.apache.hadoop.ipc.ProtocolMetaInterface; import org.apache.hadoop.ipc.ProtocolMetaInterface;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RpcClientUtil; import org.apache.hadoop.ipc.RpcClientUtil;
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
import org.apache.hadoop.tools.GetUserMappingsProtocol; import org.apache.hadoop.tools.GetUserMappingsProtocol;
import com.google.protobuf.RpcController; import com.google.protobuf.RpcController;
@ -65,7 +64,7 @@ public String[] getGroupsForUser(String user) throws IOException {
@Override @Override
public boolean isMethodSupported(String methodName) throws IOException { public boolean isMethodSupported(String methodName) throws IOException {
return RpcClientUtil.isMethodSupported(rpcProxy, return RpcClientUtil.isMethodSupported(rpcProxy,
GetUserMappingsProtocolPB.class, RpcKind.RPC_PROTOCOL_BUFFER, GetUserMappingsProtocolPB.class, RPC.RpcKind.RPC_PROTOCOL_BUFFER,
RPC.getProtocolVersion(GetUserMappingsProtocolPB.class), methodName); RPC.getProtocolVersion(GetUserMappingsProtocolPB.class), methodName);
} }
} }

View File

@ -39,7 +39,6 @@
import org.apache.hadoop.ipc.ProtocolMetaInterface; import org.apache.hadoop.ipc.ProtocolMetaInterface;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RpcClientUtil; import org.apache.hadoop.ipc.RpcClientUtil;
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import com.google.protobuf.RpcController; import com.google.protobuf.RpcController;
@ -119,7 +118,7 @@ public String updateReplicaUnderRecovery(ExtendedBlock oldBlock,
@Override @Override
public boolean isMethodSupported(String methodName) throws IOException { public boolean isMethodSupported(String methodName) throws IOException {
return RpcClientUtil.isMethodSupported(rpcProxy, return RpcClientUtil.isMethodSupported(rpcProxy,
InterDatanodeProtocolPB.class, RpcKind.RPC_PROTOCOL_BUFFER, InterDatanodeProtocolPB.class, RPC.RpcKind.RPC_PROTOCOL_BUFFER,
RPC.getProtocolVersion(InterDatanodeProtocolPB.class), methodName); RPC.getProtocolVersion(InterDatanodeProtocolPB.class), methodName);
} }
} }

View File

@ -33,7 +33,6 @@
import org.apache.hadoop.ipc.ProtocolMetaInterface; import org.apache.hadoop.ipc.ProtocolMetaInterface;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RpcClientUtil; import org.apache.hadoop.ipc.RpcClientUtil;
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
import com.google.protobuf.RpcController; import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException; import com.google.protobuf.ServiceException;
@ -109,7 +108,7 @@ public FenceResponse fence(JournalInfo journalInfo, long epoch,
@Override @Override
public boolean isMethodSupported(String methodName) throws IOException { public boolean isMethodSupported(String methodName) throws IOException {
return RpcClientUtil.isMethodSupported(rpcProxy, JournalProtocolPB.class, return RpcClientUtil.isMethodSupported(rpcProxy, JournalProtocolPB.class,
RpcKind.RPC_PROTOCOL_BUFFER, RPC.RpcKind.RPC_PROTOCOL_BUFFER,
RPC.getProtocolVersion(JournalProtocolPB.class), methodName); RPC.getProtocolVersion(JournalProtocolPB.class), methodName);
} }
} }

View File

@ -47,7 +47,6 @@
import org.apache.hadoop.ipc.ProtocolMetaInterface; import org.apache.hadoop.ipc.ProtocolMetaInterface;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RpcClientUtil; import org.apache.hadoop.ipc.RpcClientUtil;
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
import com.google.protobuf.RpcController; import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException; import com.google.protobuf.ServiceException;
@ -209,7 +208,7 @@ public RemoteEditLogManifest getEditLogManifest(long sinceTxId)
@Override @Override
public boolean isMethodSupported(String methodName) throws IOException { public boolean isMethodSupported(String methodName) throws IOException {
return RpcClientUtil.isMethodSupported(rpcProxy, NamenodeProtocolPB.class, return RpcClientUtil.isMethodSupported(rpcProxy, NamenodeProtocolPB.class,
RpcKind.RPC_PROTOCOL_BUFFER, RPC.RpcKind.RPC_PROTOCOL_BUFFER,
RPC.getProtocolVersion(NamenodeProtocolPB.class), methodName); RPC.getProtocolVersion(NamenodeProtocolPB.class), methodName);
} }
} }

View File

@ -26,7 +26,6 @@
import org.apache.hadoop.ipc.ProtocolMetaInterface; import org.apache.hadoop.ipc.ProtocolMetaInterface;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RpcClientUtil; import org.apache.hadoop.ipc.RpcClientUtil;
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol; import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
import com.google.protobuf.RpcController; import com.google.protobuf.RpcController;
@ -64,7 +63,7 @@ public void refreshServiceAcl() throws IOException {
public boolean isMethodSupported(String methodName) throws IOException { public boolean isMethodSupported(String methodName) throws IOException {
return RpcClientUtil.isMethodSupported(rpcProxy, return RpcClientUtil.isMethodSupported(rpcProxy,
RefreshAuthorizationPolicyProtocolPB.class, RefreshAuthorizationPolicyProtocolPB.class,
RpcKind.RPC_PROTOCOL_BUFFER, RPC.RpcKind.RPC_PROTOCOL_BUFFER,
RPC.getProtocolVersion(RefreshAuthorizationPolicyProtocolPB.class), RPC.getProtocolVersion(RefreshAuthorizationPolicyProtocolPB.class),
methodName); methodName);
} }

View File

@ -27,7 +27,6 @@
import org.apache.hadoop.ipc.ProtocolMetaInterface; import org.apache.hadoop.ipc.ProtocolMetaInterface;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RpcClientUtil; import org.apache.hadoop.ipc.RpcClientUtil;
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
import org.apache.hadoop.security.RefreshUserMappingsProtocol; import org.apache.hadoop.security.RefreshUserMappingsProtocol;
import com.google.protobuf.RpcController; import com.google.protobuf.RpcController;
@ -76,7 +75,7 @@ public void refreshSuperUserGroupsConfiguration() throws IOException {
public boolean isMethodSupported(String methodName) throws IOException { public boolean isMethodSupported(String methodName) throws IOException {
return RpcClientUtil return RpcClientUtil
.isMethodSupported(rpcProxy, RefreshUserMappingsProtocolPB.class, .isMethodSupported(rpcProxy, RefreshUserMappingsProtocolPB.class,
RpcKind.RPC_PROTOCOL_BUFFER, RPC.RpcKind.RPC_PROTOCOL_BUFFER,
RPC.getProtocolVersion(RefreshUserMappingsProtocolPB.class), RPC.getProtocolVersion(RefreshUserMappingsProtocolPB.class),
methodName); methodName);
} }

View File

@ -63,7 +63,6 @@
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.mockito.internal.stubbing.answers.ThrowsException; import org.mockito.internal.stubbing.answers.ThrowsException;
@ -100,7 +99,7 @@ public TestServer(int handlerCount, boolean sleep,
} }
@Override @Override
public Writable call(RpcKind rpcKind, String protocol, Writable param, long receiveTime) public Writable call(RPC.RpcKind rpcKind, String protocol, Writable param, long receiveTime)
throws IOException { throws IOException {
if (sleep) { if (sleep) {
// sleep a bit // sleep a bit

View File

@ -52,7 +52,6 @@
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.junit.Assert; import org.junit.Assert;
@ -86,7 +85,7 @@ public TestServer(int handlerCount, boolean sleep,
} }
@Override @Override
public Writable call(RpcKind rpcKind, String protocol, Writable param, long receiveTime) public Writable call(RPC.RpcKind rpcKind, String protocol, Writable param, long receiveTime)
throws IOException { throws IOException {
if (sleep) { if (sleep) {
// sleep a bit // sleep a bit

View File

@ -50,6 +50,8 @@ Trunk (unreleased changes)
MAPREDUCE-3935. Annotate Counters.Counter and Counters.Group as @Public. MAPREDUCE-3935. Annotate Counters.Counter and Counters.Group as @Public.
(tomwhite) (tomwhite)
HADOOP-8285 MR changes for Use ProtoBuf for RpcPayLoadHeader (sanjay radia)
BUG FIXES BUG FIXES
MAPREDUCE-4100. [Gridmix] Bug fixed in compression emulation feature for MAPREDUCE-4100. [Gridmix] Bug fixed in compression emulation feature for

View File

@ -30,7 +30,6 @@
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.SecretManager;
@ -162,7 +161,7 @@ private Server createServer(Class<?> pbProtocol, InetSocketAddress addr, Configu
addr.getHostName(), addr.getPort(), numHandlers, false, conf, addr.getHostName(), addr.getPort(), numHandlers, false, conf,
secretManager); secretManager);
LOG.info("Adding protocol "+pbProtocol.getCanonicalName()+" to the server"); LOG.info("Adding protocol "+pbProtocol.getCanonicalName()+" to the server");
server.addProtocol(RpcKind.RPC_PROTOCOL_BUFFER, pbProtocol, blockingService); server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, pbProtocol, blockingService);
return server; return server;
} }
} }