diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 5caa5dcdff..09a48e1986 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -63,6 +63,8 @@ Trunk (unreleased changes) HADOOP-8290. Remove remaining references to hadoop.native.lib (harsh) + HADOOP-8285 Use ProtoBuf for RpcPayLoadHeader (sanjay radia) + BUG FIXES HADOOP-8177. MBeans shouldn't try to register when it fails to create MBeanName. diff --git a/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml b/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml index 855b028453..44092c0014 100644 --- a/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml +++ b/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml @@ -281,9 +281,14 @@ + + + + + diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index cb999f3c41..083141311b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -50,8 +50,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; -import org.apache.hadoop.ipc.RpcPayloadHeader.*; import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto; +import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcPayloadHeaderProto; +import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.RpcPayloadOperationProto; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; @@ -163,10 +164,10 @@ private class Call { final Writable rpcRequest; // the serialized rpc request - RpcPayload Writable rpcResponse; // null if rpc has error IOException error; // exception, null if success - final RpcKind rpcKind; // Rpc EngineKind + final RPC.RpcKind rpcKind; // Rpc EngineKind boolean done; // true when call is done - protected Call(RpcKind rpcKind, Writable param) { + protected Call(RPC.RpcKind rpcKind, Writable param) { this.rpcKind = rpcKind; this.rpcRequest = param; synchronized (Client.this) { @@ -613,7 +614,7 @@ public Boolean run() throws IOException { this.in = new DataInputStream(new BufferedInputStream(inStream)); } this.out = new DataOutputStream(new BufferedOutputStream(outStream)); - writeHeader(); + writeConnectionContext(); // update last activity time touch(); @@ -704,16 +705,17 @@ private void writeConnectionHeader(OutputStream outStream) out.flush(); } - /* Write the protocol header for each connection + /* Write the connection context header for each connection * Out is not synchronized because only the first thread does this. */ - private void writeHeader() throws IOException { + private void writeConnectionContext() throws IOException { // Write out the ConnectionHeader DataOutputBuffer buf = new DataOutputBuffer(); connectionContext.writeTo(buf); // Write out the payload length int bufLen = buf.getLength(); + out.writeInt(bufLen); out.write(buf.getData(), 0, bufLen); } @@ -806,21 +808,22 @@ public void sendParam(Call call) { if (LOG.isDebugEnabled()) LOG.debug(getName() + " sending #" + call.id); - //for serializing the - //data to be written + // Serializing the data to be written. + // Format: + // 0) Length of rest below (1 + 2) + // 1) PayloadHeader - is serialized Delimited hence contains length + // 2) the Payload - the RpcRequest + // d = new DataOutputBuffer(); - d.writeInt(0); // placeholder for data length - RpcPayloadHeader header = new RpcPayloadHeader( - call.rpcKind, RpcPayloadOperation.RPC_FINAL_PAYLOAD, call.id); - header.write(d); + RpcPayloadHeaderProto header = ProtoUtil.makeRpcPayloadHeader( + call.rpcKind, RpcPayloadOperationProto.RPC_FINAL_PAYLOAD, call.id); + header.writeDelimitedTo(d); call.rpcRequest.write(d); byte[] data = d.getData(); - int dataLength = d.getLength() - 4; - data[0] = (byte)((dataLength >>> 24) & 0xff); - data[1] = (byte)((dataLength >>> 16) & 0xff); - data[2] = (byte)((dataLength >>> 8) & 0xff); - data[3] = (byte)(dataLength & 0xff); - out.write(data, 0, dataLength + 4);//write the data + + int totalLength = d.getLength(); + out.writeInt(totalLength); // Total Length + out.write(data, 0, totalLength);//PayloadHeader + RpcRequest out.flush(); } } catch(IOException e) { @@ -937,7 +940,7 @@ private class ParallelCall extends Call { private int index; public ParallelCall(Writable param, ParallelResults results, int index) { - super(RpcKind.RPC_WRITABLE, param); + super(RPC.RpcKind.RPC_WRITABLE, param); this.results = results; this.index = index; } @@ -1022,22 +1025,22 @@ public void stop() { } /** - * Same as {@link #call(RpcPayloadHeader.RpcKind, Writable, ConnectionId)} + * Same as {@link #call(RPC.RpcKind, Writable, ConnectionId)} * for RPC_BUILTIN */ public Writable call(Writable param, InetSocketAddress address) throws InterruptedException, IOException { - return call(RpcKind.RPC_BUILTIN, param, address); + return call(RPC.RpcKind.RPC_BUILTIN, param, address); } /** Make a call, passing param, to the IPC server running at * address, returning the value. Throws exceptions if there are * network problems or if the remote code threw an exception. - * @deprecated Use {@link #call(RpcPayloadHeader.RpcKind, Writable, + * @deprecated Use {@link #call(RPC.RpcKind, Writable, * ConnectionId)} instead */ @Deprecated - public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress address) + public Writable call(RPC.RpcKind rpcKind, Writable param, InetSocketAddress address) throws InterruptedException, IOException { return call(rpcKind, param, address, null); } @@ -1047,11 +1050,11 @@ public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress address) * the value. * Throws exceptions if there are network problems or if the remote code * threw an exception. - * @deprecated Use {@link #call(RpcPayloadHeader.RpcKind, Writable, + * @deprecated Use {@link #call(RPC.RpcKind, Writable, * ConnectionId)} instead */ @Deprecated - public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress addr, + public Writable call(RPC.RpcKind rpcKind, Writable param, InetSocketAddress addr, UserGroupInformation ticket) throws InterruptedException, IOException { ConnectionId remoteId = ConnectionId.getConnectionId(addr, null, ticket, 0, @@ -1065,11 +1068,11 @@ public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress addr, * timeout, returning the value. * Throws exceptions if there are network problems or if the remote code * threw an exception. - * @deprecated Use {@link #call(RpcPayloadHeader.RpcKind, Writable, + * @deprecated Use {@link #call(RPC.RpcKind, Writable, * ConnectionId)} instead */ @Deprecated - public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress addr, + public Writable call(RPC.RpcKind rpcKind, Writable param, InetSocketAddress addr, Class protocol, UserGroupInformation ticket, int rpcTimeout) throws InterruptedException, IOException { @@ -1080,7 +1083,7 @@ public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress addr, /** - * Same as {@link #call(RpcPayloadHeader.RpcKind, Writable, InetSocketAddress, + * Same as {@link #call(RPC.RpcKind, Writable, InetSocketAddress, * Class, UserGroupInformation, int, Configuration)} * except that rpcKind is writable. */ @@ -1090,7 +1093,7 @@ public Writable call(Writable param, InetSocketAddress addr, throws InterruptedException, IOException { ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol, ticket, rpcTimeout, conf); - return call(RpcKind.RPC_BUILTIN, param, remoteId); + return call(RPC.RpcKind.RPC_BUILTIN, param, remoteId); } /** @@ -1101,7 +1104,7 @@ public Writable call(Writable param, InetSocketAddress addr, * value. Throws exceptions if there are network problems or if the remote * code threw an exception. */ - public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress addr, + public Writable call(RPC.RpcKind rpcKind, Writable param, InetSocketAddress addr, Class protocol, UserGroupInformation ticket, int rpcTimeout, Configuration conf) throws InterruptedException, IOException { @@ -1111,12 +1114,12 @@ public Writable call(RpcKind rpcKind, Writable param, InetSocketAddress addr, } /** - * Same as {link {@link #call(RpcPayloadHeader.RpcKind, Writable, ConnectionId)} + * Same as {link {@link #call(RPC.RpcKind, Writable, ConnectionId)} * except the rpcKind is RPC_BUILTIN */ public Writable call(Writable param, ConnectionId remoteId) throws InterruptedException, IOException { - return call(RpcKind.RPC_BUILTIN, param, remoteId); + return call(RPC.RpcKind.RPC_BUILTIN, param, remoteId); } /** @@ -1130,7 +1133,7 @@ public Writable call(Writable param, ConnectionId remoteId) * Throws exceptions if there are network problems or if the remote code * threw an exception. */ - public Writable call(RpcKind rpcKind, Writable rpcRequest, + public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest, ConnectionId remoteId) throws InterruptedException, IOException { Call call = new Call(rpcKind, rpcRequest); Connection connection = getConnection(remoteId, call); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java index befc8f70e0..2d3f91e5e4 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java @@ -38,7 +38,6 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.ipc.Client.ConnectionId; import org.apache.hadoop.ipc.RPC.RpcInvoker; -import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcRequestProto; import org.apache.hadoop.security.UserGroupInformation; @@ -61,7 +60,7 @@ public class ProtobufRpcEngine implements RpcEngine { static { // Register the rpcRequest deserializer for WritableRpcEngine org.apache.hadoop.ipc.Server.registerProtocolEngine( - RpcKind.RPC_PROTOCOL_BUFFER, RpcRequestWritable.class, + RPC.RpcKind.RPC_PROTOCOL_BUFFER, RpcRequestWritable.class, new Server.ProtoBufRpcInvoker()); } @@ -182,7 +181,7 @@ public Object invoke(Object proxy, Method method, Object[] args) HadoopRpcRequestProto rpcRequest = constructRpcRequest(method, args); RpcResponseWritable val = null; try { - val = (RpcResponseWritable) client.call(RpcKind.RPC_PROTOCOL_BUFFER, + val = (RpcResponseWritable) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER, new RpcRequestWritable(rpcRequest), remoteId); } catch (Throwable e) { throw new ServiceException(e); @@ -351,7 +350,7 @@ public Server(Class protocolClass, Object protocolImpl, numReaders, queueSizePerHandler, conf, classNameBase(protocolImpl .getClass().getName()), secretManager, portRangeConfig); this.verbose = verbose; - registerProtocolAndImpl(RpcKind.RPC_PROTOCOL_BUFFER, protocolClass, + registerProtocolAndImpl(RPC.RpcKind.RPC_PROTOCOL_BUFFER, protocolClass, protocolImpl); } @@ -363,10 +362,10 @@ private static ProtoClassProtoImpl getProtocolImpl(RPC.Server server, String protoName, long version) throws IOException { ProtoNameVer pv = new ProtoNameVer(protoName, version); ProtoClassProtoImpl impl = - server.getProtocolImplMap(RpcKind.RPC_PROTOCOL_BUFFER).get(pv); + server.getProtocolImplMap(RPC.RpcKind.RPC_PROTOCOL_BUFFER).get(pv); if (impl == null) { // no match for Protocol AND Version VerProtocolImpl highest = - server.getHighestSupportedProtocol(RpcKind.RPC_PROTOCOL_BUFFER, + server.getHighestSupportedProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, protoName); if (highest == null) { throw new IOException("Unknown protocol: " + protoName); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolMetaInfoServerSideTranslatorPB.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolMetaInfoServerSideTranslatorPB.java index aaf71f8a4e..d9d80a84d8 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolMetaInfoServerSideTranslatorPB.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolMetaInfoServerSideTranslatorPB.java @@ -18,7 +18,6 @@ package org.apache.hadoop.ipc; import org.apache.hadoop.ipc.RPC.Server.VerProtocolImpl; -import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureRequestProto; import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureResponseProto; import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolVersionsRequestProto; @@ -49,7 +48,7 @@ public GetProtocolVersionsResponseProto getProtocolVersions( String protocol = request.getProtocol(); GetProtocolVersionsResponseProto.Builder builder = GetProtocolVersionsResponseProto.newBuilder(); - for (RpcKind r : RpcKind.values()) { + for (RPC.RpcKind r : RPC.RpcKind.values()) { long[] versions; try { versions = getProtocolVersionForRpcKind(r, protocol); @@ -78,7 +77,7 @@ public GetProtocolSignatureResponseProto getProtocolSignature( String rpcKind = request.getRpcKind(); long[] versions; try { - versions = getProtocolVersionForRpcKind(RpcKind.valueOf(rpcKind), + versions = getProtocolVersionForRpcKind(RPC.RpcKind.valueOf(rpcKind), protocol); } catch (ClassNotFoundException e1) { throw new ServiceException(e1); @@ -104,7 +103,7 @@ public GetProtocolSignatureResponseProto getProtocolSignature( return builder.build(); } - private long[] getProtocolVersionForRpcKind(RpcKind rpcKind, + private long[] getProtocolVersionForRpcKind(RPC.RpcKind rpcKind, String protocol) throws ClassNotFoundException { Class protocolClass = Class.forName(protocol); String protocolName = RPC.getProtocolName(protocolClass); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java index d0f268ec5d..56fbd7d5a1 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java @@ -42,7 +42,6 @@ import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.io.*; import org.apache.hadoop.ipc.Client.ConnectionId; -import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolInfoService; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.SaslRpcServer; @@ -73,6 +72,18 @@ * the protocol instance is transmitted. */ public class RPC { + public enum RpcKind { + RPC_BUILTIN ((short) 1), // Used for built in calls by tests + RPC_WRITABLE ((short) 2), // Use WritableRpcEngine + RPC_PROTOCOL_BUFFER ((short) 3); // Use ProtobufRpcEngine + final static short MAX_INDEX = RPC_PROTOCOL_BUFFER.value; // used for array size + private static final short FIRST_INDEX = RPC_BUILTIN.value; + public final short value; //TODO make it private + + RpcKind(short val) { + this.value = val; + } + } interface RpcInvoker { /** @@ -777,7 +788,7 @@ static class ProtoClassProtoImpl { ArrayList> protocolImplMapArray = new ArrayList>(RpcKind.MAX_INDEX); - Map getProtocolImplMap(RpcKind rpcKind) { + Map getProtocolImplMap(RPC.RpcKind rpcKind) { if (protocolImplMapArray.size() == 0) {// initialize for all rpc kinds for (int i=0; i <= RpcKind.MAX_INDEX; ++i) { protocolImplMapArray.add( @@ -821,7 +832,7 @@ static class VerProtocolImpl { @SuppressWarnings("unused") // will be useful later. - VerProtocolImpl[] getSupportedProtocolVersions(RpcKind rpcKind, + VerProtocolImpl[] getSupportedProtocolVersions(RPC.RpcKind rpcKind, String protocolName) { VerProtocolImpl[] resultk = new VerProtocolImpl[getProtocolImplMap(rpcKind).size()]; @@ -900,7 +911,7 @@ public Server addProtocol(RpcKind rpcKind, Class protocolClass, } @Override - public Writable call(RpcKind rpcKind, String protocol, + public Writable call(RPC.RpcKind rpcKind, String protocol, Writable rpcRequest, long receiveTime) throws Exception { return getRpcInvoker(rpcKind).call(this, protocol, rpcRequest, receiveTime); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcClientUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcClientUtil.java index cdbc034ea2..2623f9ede5 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcClientUtil.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcClientUtil.java @@ -27,7 +27,6 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureRequestProto; import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureResponseProto; import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolSignatureProto; @@ -107,7 +106,7 @@ private static Map getVersionSignatureMap( * @throws IOException */ public static boolean isMethodSupported(Object rpcProxy, Class protocol, - RpcKind rpcKind, long version, String methodName) throws IOException { + RPC.RpcKind rpcKind, long version, String methodName) throws IOException { InetSocketAddress serverAddress = RPC.getServerAddress(rpcProxy); Map versionMap = getVersionSignatureMap( serverAddress, protocol.getName(), rpcKind.toString()); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcPayloadHeader.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcPayloadHeader.java deleted file mode 100644 index 6e97159fb4..0000000000 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcPayloadHeader.java +++ /dev/null @@ -1,118 +0,0 @@ -package org.apache.hadoop.ipc; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -import org.apache.hadoop.io.Writable; - -/** - * This is the rpc payload header. It is sent with every rpc call - *
- * The format of RPC call is as follows:
- * +---------------------------------------------------+
- * |  Rpc length in bytes (header + payload length)    |
- * +---------------------------------------------------+
- * |      Rpc Header       |       Rpc Payload         |
- * +---------------------------------------------------+
- * 
- * The format of Rpc Header is:
- * +----------------------------------+
- * |  RpcKind (1 bytes)               |      
- * +----------------------------------+
- * |  RpcPayloadOperation (1 bytes)   |      
- * +----------------------------------+
- * |  Call ID (4 bytes)               |      
- * +----------------------------------+
- * 
- * {@link RpcKind} determines the type of serialization used for Rpc Payload.
- * 
- *

- * Note this header does NOT have its own version number, - * it used the version number from the connection header. - */ -public class RpcPayloadHeader implements Writable { - public enum RpcPayloadOperation { - RPC_FINAL_PAYLOAD ((short)1), - RPC_CONTINUATION_PAYLOAD ((short)2), // not implemented yet - RPC_CLOSE_CONNECTION ((short)3); // close the rpc connection - - private final short code; - private static final short FIRST_INDEX = RPC_FINAL_PAYLOAD.code; - RpcPayloadOperation(short val) { - this.code = val; - } - - public void write(DataOutput out) throws IOException { - out.writeByte(code); - } - - static RpcPayloadOperation readFields(DataInput in) throws IOException { - short inValue = in.readByte(); - return RpcPayloadOperation.values()[inValue - FIRST_INDEX]; - } - } - - public enum RpcKind { - RPC_BUILTIN ((short) 1), // Used for built in calls by tests - RPC_WRITABLE ((short) 2), // Use WritableRpcEngine - RPC_PROTOCOL_BUFFER ((short) 3); // Use ProtobufRpcEngine - final static short MAX_INDEX = RPC_PROTOCOL_BUFFER.value; // used for array size - private static final short FIRST_INDEX = RPC_BUILTIN.value; - private final short value; - - RpcKind(short val) { - this.value = val; - } - - public void write(DataOutput out) throws IOException { - out.writeByte(value); - } - - static RpcKind readFields(DataInput in) throws IOException { - short inValue = in.readByte(); - return RpcKind.values()[inValue - FIRST_INDEX]; - } - } - - private RpcKind kind; - private RpcPayloadOperation operation; - private int callId; - - public RpcPayloadHeader() { - kind = RpcKind.RPC_WRITABLE; - operation = RpcPayloadOperation.RPC_CLOSE_CONNECTION; - } - - public RpcPayloadHeader(RpcKind kind, RpcPayloadOperation op, int callId) { - this.kind = kind; - this.operation = op; - this.callId = callId; - } - - int getCallId() { - return callId; - } - - RpcKind getkind() { - return kind; - } - - RpcPayloadOperation getOperation() { - return operation; - } - - @Override - public void write(DataOutput out) throws IOException { - kind.write(out); - operation.write(out); - out.writeInt(callId); - } - - @Override - public void readFields(DataInput in) throws IOException { - kind = RpcKind.readFields(in); - operation = RpcPayloadOperation.readFields(in); - this.callId = in.readInt(); - } -} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index d9ac47eb66..80ec243a2c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -72,11 +72,10 @@ import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.ipc.RPC.RpcInvoker; import org.apache.hadoop.ipc.RPC.VersionMismatch; -import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; -import org.apache.hadoop.ipc.RpcPayloadHeader.RpcPayloadOperation; import org.apache.hadoop.ipc.metrics.RpcDetailedMetrics; import org.apache.hadoop.ipc.metrics.RpcMetrics; import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto; +import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.*; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.SaslRpcServer; @@ -170,8 +169,8 @@ static class RpcKindMapValue { this.rpcRequestWrapperClass = rpcRequestWrapperClass; } } - static Map rpcKindMap = new - HashMap(4); + static Map rpcKindMap = new + HashMap(4); @@ -185,7 +184,7 @@ static class RpcKindMapValue { * @param rpcInvoker - use to process the calls on SS. */ - public static void registerProtocolEngine(RpcKind rpcKind, + public static void registerProtocolEngine(RPC.RpcKind rpcKind, Class rpcRequestWrapperClass, RpcInvoker rpcInvoker) { RpcKindMapValue old = @@ -201,14 +200,14 @@ public static void registerProtocolEngine(RpcKind rpcKind, } public Class getRpcRequestWrapper( - RpcKind rpcKind) { + RpcKindProto rpcKind) { if (rpcRequestClass != null) return rpcRequestClass; - RpcKindMapValue val = rpcKindMap.get(rpcKind); + RpcKindMapValue val = rpcKindMap.get(ProtoUtil.convert(rpcKind)); return (val == null) ? null : val.rpcRequestWrapperClass; } - public static RpcInvoker getRpcInvoker(RpcKind rpcKind) { + public static RpcInvoker getRpcInvoker(RPC.RpcKind rpcKind) { RpcKindMapValue val = rpcKindMap.get(rpcKind); return (val == null) ? null : val.rpcInvoker; } @@ -403,12 +402,12 @@ private static class Call { private long timestamp; // time received when response is null // time served when response is not null private ByteBuffer rpcResponse; // the response for this call - private final RpcKind rpcKind; + private final RPC.RpcKind rpcKind; public Call(int id, Writable param, Connection connection) { - this( id, param, connection, RpcKind.RPC_BUILTIN ); + this( id, param, connection, RPC.RpcKind.RPC_BUILTIN ); } - public Call(int id, Writable param, Connection connection, RpcKind kind) { + public Call(int id, Writable param, Connection connection, RPC.RpcKind kind) { this.callId = id; this.rpcRequest = param; this.connection = connection; @@ -1366,7 +1365,6 @@ public int readAndProcess() throws IOException, InterruptedException { if (data == null) { dataLengthBuffer.flip(); dataLength = dataLengthBuffer.getInt(); - if ((dataLength == Client.PING_CALL_ID) && (!useWrap)) { // covers the !useSasl too dataLengthBuffer.clear(); @@ -1555,22 +1553,27 @@ private void processOneRpc(byte[] buf) throws IOException, private void processData(byte[] buf) throws IOException, InterruptedException { DataInputStream dis = new DataInputStream(new ByteArrayInputStream(buf)); - RpcPayloadHeader header = new RpcPayloadHeader(); - header.readFields(dis); // Read the RpcPayload header + RpcPayloadHeaderProto header = RpcPayloadHeaderProto.parseDelimitedFrom(dis); if (LOG.isDebugEnabled()) LOG.debug(" got #" + header.getCallId()); - if (header.getOperation() != RpcPayloadOperation.RPC_FINAL_PAYLOAD) { + if (!header.hasRpcOp()) { + throw new IOException(" IPC Server: No rpc op in rpcPayloadHeader"); + } + if (header.getRpcOp() != RpcPayloadOperationProto.RPC_FINAL_PAYLOAD) { throw new IOException("IPC Server does not implement operation" + - header.getOperation()); + header.getRpcOp()); } // If we know the rpc kind, get its class so that we can deserialize // (Note it would make more sense to have the handler deserialize but // we continue with this original design. + if (!header.hasRpcKind()) { + throw new IOException(" IPC Server: No rpc kind in rpcPayloadHeader"); + } Class rpcRequestClass = - getRpcRequestWrapper(header.getkind()); + getRpcRequestWrapper(header.getRpcKind()); if (rpcRequestClass == null) { - LOG.warn("Unknown rpc kind " + header.getkind() + + LOG.warn("Unknown rpc kind " + header.getRpcKind() + " from client " + getHostAddress()); final Call readParamsFailedCall = new Call(header.getCallId(), null, this); @@ -1578,7 +1581,7 @@ private void processData(byte[] buf) throws IOException, InterruptedException { setupResponse(responseBuffer, readParamsFailedCall, Status.FATAL, null, IOException.class.getName(), - "Unknown rpc kind " + header.getkind()); + "Unknown rpc kind " + header.getRpcKind()); responder.doRespond(readParamsFailedCall); return; } @@ -1589,7 +1592,7 @@ private void processData(byte[] buf) throws IOException, InterruptedException { } catch (Throwable t) { LOG.warn("Unable to read call parameters for client " + getHostAddress() + "on connection protocol " + - this.protocolName + " for rpcKind " + header.getkind(), t); + this.protocolName + " for rpcKind " + header.getRpcKind(), t); final Call readParamsFailedCall = new Call(header.getCallId(), null, this); ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream(); @@ -1601,7 +1604,8 @@ private void processData(byte[] buf) throws IOException, InterruptedException { return; } - Call call = new Call(header.getCallId(), rpcRequest, this, header.getkind()); + Call call = new Call(header.getCallId(), rpcRequest, this, + ProtoUtil.convert(header.getRpcKind())); callQueue.put(call); // queue the call; maybe blocked here incRpcCount(); // Increment the rpc count } @@ -1991,11 +1995,11 @@ public synchronized InetSocketAddress getListenerAddress() { */ @Deprecated public Writable call(Writable param, long receiveTime) throws Exception { - return call(RpcKind.RPC_BUILTIN, null, param, receiveTime); + return call(RPC.RpcKind.RPC_BUILTIN, null, param, receiveTime); } /** Called for each call. */ - public abstract Writable call(RpcKind rpcKind, String protocol, + public abstract Writable call(RPC.RpcKind rpcKind, String protocol, Writable param, long receiveTime) throws Exception; /** diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java index e4cd9b9e08..2ebf42a9aa 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java @@ -33,7 +33,6 @@ import org.apache.hadoop.io.*; import org.apache.hadoop.ipc.Client.ConnectionId; import org.apache.hadoop.ipc.RPC.RpcInvoker; -import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; import org.apache.hadoop.ipc.VersionedProtocol; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.SecretManager; @@ -75,7 +74,7 @@ public static synchronized void ensureInitialized() { * Register the rpcRequest deserializer for WritableRpcEngine */ private static synchronized void initialize() { - org.apache.hadoop.ipc.Server.registerProtocolEngine(RpcKind.RPC_WRITABLE, + org.apache.hadoop.ipc.Server.registerProtocolEngine(RPC.RpcKind.RPC_WRITABLE, Invocation.class, new Server.WritableRpcInvoker()); isInitialized = true; } @@ -223,7 +222,7 @@ public Object invoke(Object proxy, Method method, Object[] args) } ObjectWritable value = (ObjectWritable) - client.call(RpcKind.RPC_WRITABLE, new Invocation(method, args), remoteId); + client.call(RPC.RpcKind.RPC_WRITABLE, new Invocation(method, args), remoteId); if (LOG.isDebugEnabled()) { long callTime = System.currentTimeMillis() - startTime; LOG.debug("Call: " + method.getName() + " " + callTime); @@ -412,12 +411,12 @@ public Server(Class protocolClass, Object protocolImpl, protocolImpl.getClass()); } // register protocol class and its super interfaces - registerProtocolAndImpl(RpcKind.RPC_WRITABLE, protocolClass, protocolImpl); + registerProtocolAndImpl(RPC.RpcKind.RPC_WRITABLE, protocolClass, protocolImpl); protocols = RPC.getProtocolInterfaces(protocolClass); } for (Class p : protocols) { if (!p.equals(VersionedProtocol.class)) { - registerProtocolAndImpl(RpcKind.RPC_WRITABLE, p, protocolImpl); + registerProtocolAndImpl(RPC.RpcKind.RPC_WRITABLE, p, protocolImpl); } } @@ -461,7 +460,7 @@ public Writable call(org.apache.hadoop.ipc.RPC.Server server, // registered directly. // Send the call to the highest protocol version VerProtocolImpl highest = server.getHighestSupportedProtocol( - RpcKind.RPC_WRITABLE, protocolName); + RPC.RpcKind.RPC_WRITABLE, protocolName); if (highest == null) { throw new IOException("Unknown protocol: " + protocolName); } @@ -473,10 +472,10 @@ public Writable call(org.apache.hadoop.ipc.RPC.Server server, ProtoNameVer pv = new ProtoNameVer(call.declaringClassProtocolName, clientVersion); protocolImpl = - server.getProtocolImplMap(RpcKind.RPC_WRITABLE).get(pv); + server.getProtocolImplMap(RPC.RpcKind.RPC_WRITABLE).get(pv); if (protocolImpl == null) { // no match for Protocol AND Version VerProtocolImpl highest = - server.getHighestSupportedProtocol(RpcKind.RPC_WRITABLE, + server.getHighestSupportedProtocol(RPC.RpcKind.RPC_WRITABLE, protoName); if (highest == null) { throw new IOException("Unknown protocol: " + protoName); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java index 3ee306b629..0618f0631c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java @@ -21,8 +21,10 @@ import java.io.DataInput; import java.io.IOException; +import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto; import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.UserInformationProto; +import org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos.*; import org.apache.hadoop.security.SaslRpcServer.AuthMethod; import org.apache.hadoop.security.UserGroupInformation; @@ -135,4 +137,30 @@ public static UserGroupInformation getUgi(UserInformationProto userInfo) { } return ugi; } + + static RpcKindProto convert(RPC.RpcKind kind) { + switch (kind) { + case RPC_BUILTIN: return RpcKindProto.RPC_BUILTIN; + case RPC_WRITABLE: return RpcKindProto.RPC_WRITABLE; + case RPC_PROTOCOL_BUFFER: return RpcKindProto.RPC_PROTOCOL_BUFFER; + } + return null; + } + + + public static RPC.RpcKind convert( RpcKindProto kind) { + switch (kind) { + case RPC_BUILTIN: return RPC.RpcKind.RPC_BUILTIN; + case RPC_WRITABLE: return RPC.RpcKind.RPC_WRITABLE; + case RPC_PROTOCOL_BUFFER: return RPC.RpcKind.RPC_PROTOCOL_BUFFER; + } + return null; + } + + public static RpcPayloadHeaderProto makeRpcPayloadHeader(RPC.RpcKind rpcKind, + RpcPayloadOperationProto operation, int callId) { + RpcPayloadHeaderProto.Builder result = RpcPayloadHeaderProto.newBuilder(); + result.setRpcKind(convert(rpcKind)).setRpcOp(operation).setCallId(callId); + return result.build(); + } } diff --git a/hadoop-common-project/hadoop-common/src/main/proto/RpcPayloadHeader.proto b/hadoop-common-project/hadoop-common/src/main/proto/RpcPayloadHeader.proto new file mode 100644 index 0000000000..42dea3bde3 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/proto/RpcPayloadHeader.proto @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +option java_package = "org.apache.hadoop.ipc.protobuf"; +option java_outer_classname = "RpcPayloadHeaderProtos"; +option java_generate_equals_and_hash = true; + + +/** + * This is the rpc payload header. It is sent with every rpc call. + * + * The format of RPC call is as follows: + * +-----------------------------------------------------+ + * | Rpc length in bytes | + * +-----------------------------------------------------+ + * | RpcPayloadHeader - serialized delimited ie has len | + * +-----------------------------------------------------+ + * | RpcRequest Payload | + * +-----------------------------------------------------+ + * + */ + + + +/** + * RpcKind determine the rpcEngine and the serialization of the rpc payload + */ +enum RpcKindProto { + RPC_BUILTIN = 0; // Used for built in calls by tests + RPC_WRITABLE = 1; // Use WritableRpcEngine + RPC_PROTOCOL_BUFFER = 2; // Use ProtobufRpcEngine +} + +enum RpcPayloadOperationProto { + RPC_FINAL_PAYLOAD = 0; // The final payload + RPC_CONTINUATION_PAYLOAD = 1; // not implemented yet + RPC_CLOSE_CONNECTION = 2; // close the rpc connection +} + +message RpcPayloadHeaderProto { // the header for the RpcRequest + optional RpcKindProto rpcKind = 1; + optional RpcPayloadOperationProto rpcOp = 2; + optional uint32 callId = 3; // each rpc has a callId that is also used in response +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java index efb2dc1126..5797bb524b 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java @@ -25,7 +25,6 @@ import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.net.NetUtils; @@ -99,7 +98,7 @@ public TestServer(int handlerCount, boolean sleep, } @Override - public Writable call(RpcKind rpcKind, String protocol, Writable param, + public Writable call(RPC.RpcKind rpcKind, String protocol, Writable param, long receiveTime) throws IOException { if (sleep) { // sleep a bit diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java index 5675cbfddf..5f5cc1bcd3 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPCServerResponder.java @@ -30,7 +30,6 @@ import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Writable; -import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; import org.apache.hadoop.net.NetUtils; /** @@ -73,7 +72,7 @@ public TestServer(final int handlerCount, final boolean sleep) } @Override - public Writable call(RpcKind rpcKind, String protocol, Writable param, + public Writable call(RPC.RpcKind rpcKind, String protocol, Writable param, long receiveTime) throws IOException { if (sleep) { try { diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java index f5acd93eb2..0446b42508 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestMultipleProtocolServer.java @@ -23,7 +23,6 @@ import org.junit.Assert; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; import org.apache.hadoop.ipc.TestProtoBufRpc.PBServerImpl; import org.apache.hadoop.ipc.TestProtoBufRpc.TestRpcService; import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcProto; @@ -178,9 +177,9 @@ public void setUp() throws Exception { // create a server with two handlers server = RPC.getServer(Foo0.class, new Foo0Impl(), ADDRESS, 0, 2, false, conf, null); - server.addProtocol(RpcKind.RPC_WRITABLE, Foo1.class, new Foo1Impl()); - server.addProtocol(RpcKind.RPC_WRITABLE, Bar.class, new BarImpl()); - server.addProtocol(RpcKind.RPC_WRITABLE, Mixin.class, new BarImpl()); + server.addProtocol(RPC.RpcKind.RPC_WRITABLE, Foo1.class, new Foo1Impl()); + server.addProtocol(RPC.RpcKind.RPC_WRITABLE, Bar.class, new BarImpl()); + server.addProtocol(RPC.RpcKind.RPC_WRITABLE, Mixin.class, new BarImpl()); // Add Protobuf server @@ -189,7 +188,7 @@ public void setUp() throws Exception { new PBServerImpl(); BlockingService service = TestProtobufRpcProto .newReflectiveBlockingService(pbServerImpl); - server.addProtocol(RpcKind.RPC_PROTOCOL_BUFFER, TestRpcService.class, + server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, TestRpcService.class, service); server.start(); addr = NetUtils.getConnectAddress(server); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java index 3b9140afc4..9e7b269441 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java @@ -24,7 +24,6 @@ import java.net.InetSocketAddress; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; import org.apache.hadoop.ipc.protobuf.TestProtos.EchoRequestProto; import org.apache.hadoop.ipc.protobuf.TestProtos.EchoResponseProto; import org.apache.hadoop.ipc.protobuf.TestProtos.EmptyRequestProto; @@ -122,7 +121,7 @@ public void setUp() throws IOException { // Setup server for both protocols BlockingService service2 = TestProtobufRpc2Proto .newReflectiveBlockingService(server2Impl); - server.addProtocol(RpcKind.RPC_PROTOCOL_BUFFER, TestRpcService2.class, + server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, TestRpcService2.class, service2); server.start(); } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java index aca33ef25b..50ae210ea9 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java @@ -31,7 +31,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureRequestProto; import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureResponseProto; import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolSignatureProto; @@ -134,7 +133,7 @@ public void testVersion0ClientVersion1Server() throws Exception { TestImpl1 impl = new TestImpl1(); server = RPC.getServer(TestProtocol1.class, impl, ADDRESS, 0, 2, false, conf, null); - server.addProtocol(RpcKind.RPC_WRITABLE, TestProtocol0.class, impl); + server.addProtocol(RPC.RpcKind.RPC_WRITABLE, TestProtocol0.class, impl); server.start(); addr = NetUtils.getConnectAddress(server); @@ -201,7 +200,7 @@ public void testVersion2ClientVersion1Server() throws Exception { TestImpl1 impl = new TestImpl1(); server = RPC.getServer(TestProtocol1.class, impl, ADDRESS, 0, 2, false, conf, null); - server.addProtocol(RpcKind.RPC_WRITABLE, TestProtocol0.class, impl); + server.addProtocol(RPC.RpcKind.RPC_WRITABLE, TestProtocol0.class, impl); server.start(); addr = NetUtils.getConnectAddress(server); @@ -222,7 +221,7 @@ public void testVersion2ClientVersion2Server() throws Exception { TestImpl2 impl = new TestImpl2(); server = RPC.getServer(TestProtocol2.class, impl, ADDRESS, 0, 2, false, conf, null); - server.addProtocol(RpcKind.RPC_WRITABLE, TestProtocol0.class, impl); + server.addProtocol(RPC.RpcKind.RPC_WRITABLE, TestProtocol0.class, impl); server.start(); addr = NetUtils.getConnectAddress(server); @@ -316,11 +315,11 @@ public void testIsMethodSupported() throws IOException { TestProtocol2 proxy = RPC.getProxy(TestProtocol2.class, TestProtocol2.versionID, addr, conf); boolean supported = RpcClientUtil.isMethodSupported(proxy, - TestProtocol2.class, RpcKind.RPC_WRITABLE, + TestProtocol2.class, RPC.RpcKind.RPC_WRITABLE, RPC.getProtocolVersion(TestProtocol2.class), "echo"); Assert.assertTrue(supported); supported = RpcClientUtil.isMethodSupported(proxy, - TestProtocol2.class, RpcKind.RPC_PROTOCOL_BUFFER, + TestProtocol2.class, RPC.RpcKind.RPC_PROTOCOL_BUFFER, RPC.getProtocolVersion(TestProtocol2.class), "echo"); Assert.assertFalse(supported); } @@ -334,7 +333,7 @@ public void testProtocolMetaInfoSSTranslatorPB() throws Exception { TestImpl1 impl = new TestImpl1(); server = RPC.getServer(TestProtocol1.class, impl, ADDRESS, 0, 2, false, conf, null); - server.addProtocol(RpcKind.RPC_WRITABLE, TestProtocol0.class, impl); + server.addProtocol(RPC.RpcKind.RPC_WRITABLE, TestProtocol0.class, impl); server.start(); ProtocolMetaInfoServerSideTranslatorPB xlator = @@ -343,13 +342,13 @@ public void testProtocolMetaInfoSSTranslatorPB() throws Exception { GetProtocolSignatureResponseProto resp = xlator.getProtocolSignature( null, createGetProtocolSigRequestProto(TestProtocol1.class, - RpcKind.RPC_PROTOCOL_BUFFER)); + RPC.RpcKind.RPC_PROTOCOL_BUFFER)); //No signatures should be found Assert.assertEquals(0, resp.getProtocolSignatureCount()); resp = xlator.getProtocolSignature( null, createGetProtocolSigRequestProto(TestProtocol1.class, - RpcKind.RPC_WRITABLE)); + RPC.RpcKind.RPC_WRITABLE)); Assert.assertEquals(1, resp.getProtocolSignatureCount()); ProtocolSignatureProto sig = resp.getProtocolSignatureList().get(0); Assert.assertEquals(TestProtocol1.versionID, sig.getVersion()); @@ -366,7 +365,7 @@ public void testProtocolMetaInfoSSTranslatorPB() throws Exception { } private GetProtocolSignatureRequestProto createGetProtocolSigRequestProto( - Class protocol, RpcKind rpcKind) { + Class protocol, RPC.RpcKind rpcKind) { GetProtocolSignatureRequestProto.Builder builder = GetProtocolSignatureRequestProto.newBuilder(); builder.setProtocol(protocol.getName()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 8c29b120d7..98488c3c7d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -70,6 +70,8 @@ Trunk (unreleased changes) HDFS-3282. Expose getFileLength API. (umamahesh) + HADOOP-8285 HDFS changes for Use ProtoBuf for RpcPayLoadHeader (sanjay radia) + OPTIMIZATIONS HDFS-2834. Add a ByteBuffer-based read API to DFSInputStream. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java index 1c1a2d5edd..9bf74821b3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java @@ -52,7 +52,6 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.security.UserGroupInformation; @@ -1010,7 +1009,7 @@ public static URI createUri(String scheme, InetSocketAddress address) { public static void addPBProtocol(Configuration conf, Class protocol, BlockingService service, RPC.Server server) throws IOException { RPC.setProtocolEngine(conf, protocol, ProtobufRpcEngine.class); - server.addProtocol(RpcKind.RPC_PROTOCOL_BUFFER, protocol, service); + server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, protocol, service); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java index 80dbeb79cd..d28dbff10b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java @@ -46,7 +46,6 @@ import org.apache.hadoop.ipc.ProtocolTranslator; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RpcClientUtil; -import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; @@ -193,7 +192,7 @@ public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block, @Override public boolean isMethodSupported(String methodName) throws IOException { return RpcClientUtil.isMethodSupported(rpcProxy, - ClientDatanodeProtocolPB.class, RpcKind.RPC_PROTOCOL_BUFFER, + ClientDatanodeProtocolPB.class, RPC.RpcKind.RPC_PROTOCOL_BUFFER, RPC.getProtocolVersion(ClientDatanodeProtocolPB.class), methodName); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java index 66b7c2f868..0a046041f0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java @@ -109,7 +109,6 @@ import org.apache.hadoop.ipc.ProtocolMetaInterface; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RpcClientUtil; -import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.token.Token; @@ -812,7 +811,7 @@ public void setBalancerBandwidth(long bandwidth) throws IOException { @Override public boolean isMethodSupported(String methodName) throws IOException { return RpcClientUtil.isMethodSupported(rpcProxy, - ClientNamenodeProtocolPB.class, RpcKind.RPC_PROTOCOL_BUFFER, + ClientNamenodeProtocolPB.class, RPC.RpcKind.RPC_PROTOCOL_BUFFER, RPC.getProtocolVersion(ClientNamenodeProtocolPB.class), methodName); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java index 3a1833498b..92563d265d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java @@ -69,7 +69,6 @@ import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RpcClientUtil; -import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UserGroupInformation; @@ -308,7 +307,7 @@ public void commitBlockSynchronization(ExtendedBlock block, public boolean isMethodSupported(String methodName) throws IOException { return RpcClientUtil.isMethodSupported(rpcProxy, DatanodeProtocolPB.class, - RpcKind.RPC_PROTOCOL_BUFFER, + RPC.RpcKind.RPC_PROTOCOL_BUFFER, RPC.getProtocolVersion(DatanodeProtocolPB.class), methodName); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/GetUserMappingsProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/GetUserMappingsProtocolClientSideTranslatorPB.java index b2c6dd2cb2..881f796406 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/GetUserMappingsProtocolClientSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/GetUserMappingsProtocolClientSideTranslatorPB.java @@ -26,7 +26,6 @@ import org.apache.hadoop.ipc.ProtocolMetaInterface; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RpcClientUtil; -import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; import org.apache.hadoop.tools.GetUserMappingsProtocol; import com.google.protobuf.RpcController; @@ -65,7 +64,7 @@ public String[] getGroupsForUser(String user) throws IOException { @Override public boolean isMethodSupported(String methodName) throws IOException { return RpcClientUtil.isMethodSupported(rpcProxy, - GetUserMappingsProtocolPB.class, RpcKind.RPC_PROTOCOL_BUFFER, + GetUserMappingsProtocolPB.class, RPC.RpcKind.RPC_PROTOCOL_BUFFER, RPC.getProtocolVersion(GetUserMappingsProtocolPB.class), methodName); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java index 547ca5c21b..4e518c35bf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java @@ -39,7 +39,6 @@ import org.apache.hadoop.ipc.ProtocolMetaInterface; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RpcClientUtil; -import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; import org.apache.hadoop.security.UserGroupInformation; import com.google.protobuf.RpcController; @@ -119,7 +118,7 @@ public String updateReplicaUnderRecovery(ExtendedBlock oldBlock, @Override public boolean isMethodSupported(String methodName) throws IOException { return RpcClientUtil.isMethodSupported(rpcProxy, - InterDatanodeProtocolPB.class, RpcKind.RPC_PROTOCOL_BUFFER, + InterDatanodeProtocolPB.class, RPC.RpcKind.RPC_PROTOCOL_BUFFER, RPC.getProtocolVersion(InterDatanodeProtocolPB.class), methodName); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolTranslatorPB.java index d14e4e22fe..49fdf5dc95 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolTranslatorPB.java @@ -33,7 +33,6 @@ import org.apache.hadoop.ipc.ProtocolMetaInterface; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RpcClientUtil; -import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; @@ -109,7 +108,7 @@ public FenceResponse fence(JournalInfo journalInfo, long epoch, @Override public boolean isMethodSupported(String methodName) throws IOException { return RpcClientUtil.isMethodSupported(rpcProxy, JournalProtocolPB.class, - RpcKind.RPC_PROTOCOL_BUFFER, + RPC.RpcKind.RPC_PROTOCOL_BUFFER, RPC.getProtocolVersion(JournalProtocolPB.class), methodName); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java index a8aba30c84..b8024579d5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java @@ -47,7 +47,6 @@ import org.apache.hadoop.ipc.ProtocolMetaInterface; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RpcClientUtil; -import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; @@ -209,7 +208,7 @@ public RemoteEditLogManifest getEditLogManifest(long sinceTxId) @Override public boolean isMethodSupported(String methodName) throws IOException { return RpcClientUtil.isMethodSupported(rpcProxy, NamenodeProtocolPB.class, - RpcKind.RPC_PROTOCOL_BUFFER, + RPC.RpcKind.RPC_PROTOCOL_BUFFER, RPC.getProtocolVersion(NamenodeProtocolPB.class), methodName); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/RefreshAuthorizationPolicyProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/RefreshAuthorizationPolicyProtocolClientSideTranslatorPB.java index ab06a88785..e87e97ff8a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/RefreshAuthorizationPolicyProtocolClientSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/RefreshAuthorizationPolicyProtocolClientSideTranslatorPB.java @@ -26,7 +26,6 @@ import org.apache.hadoop.ipc.ProtocolMetaInterface; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RpcClientUtil; -import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol; import com.google.protobuf.RpcController; @@ -64,7 +63,7 @@ public void refreshServiceAcl() throws IOException { public boolean isMethodSupported(String methodName) throws IOException { return RpcClientUtil.isMethodSupported(rpcProxy, RefreshAuthorizationPolicyProtocolPB.class, - RpcKind.RPC_PROTOCOL_BUFFER, + RPC.RpcKind.RPC_PROTOCOL_BUFFER, RPC.getProtocolVersion(RefreshAuthorizationPolicyProtocolPB.class), methodName); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/RefreshUserMappingsProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/RefreshUserMappingsProtocolClientSideTranslatorPB.java index 1cb780e3d4..bed2b99604 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/RefreshUserMappingsProtocolClientSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/RefreshUserMappingsProtocolClientSideTranslatorPB.java @@ -27,7 +27,6 @@ import org.apache.hadoop.ipc.ProtocolMetaInterface; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RpcClientUtil; -import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; import org.apache.hadoop.security.RefreshUserMappingsProtocol; import com.google.protobuf.RpcController; @@ -76,7 +75,7 @@ public void refreshSuperUserGroupsConfiguration() throws IOException { public boolean isMethodSupported(String methodName) throws IOException { return RpcClientUtil .isMethodSupported(rpcProxy, RefreshUserMappingsProtocolPB.class, - RpcKind.RPC_PROTOCOL_BUFFER, + RPC.RpcKind.RPC_PROTOCOL_BUFFER, RPC.getProtocolVersion(RefreshUserMappingsProtocolPB.class), methodName); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java index f3b980f5fb..86bef8e1ee 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java @@ -63,7 +63,6 @@ import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.Server; -import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.test.GenericTestUtils; import org.mockito.internal.stubbing.answers.ThrowsException; @@ -100,7 +99,7 @@ public TestServer(int handlerCount, boolean sleep, } @Override - public Writable call(RpcKind rpcKind, String protocol, Writable param, long receiveTime) + public Writable call(RPC.RpcKind rpcKind, String protocol, Writable param, long receiveTime) throws IOException { if (sleep) { // sleep a bit diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java index c1167a4094..b7195a3432 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java @@ -52,7 +52,6 @@ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.net.NetUtils; import org.junit.Assert; @@ -86,7 +85,7 @@ public TestServer(int handlerCount, boolean sleep, } @Override - public Writable call(RpcKind rpcKind, String protocol, Writable param, long receiveTime) + public Writable call(RPC.RpcKind rpcKind, String protocol, Writable param, long receiveTime) throws IOException { if (sleep) { // sleep a bit diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 9ab6529aef..0f48354858 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -50,6 +50,8 @@ Trunk (unreleased changes) MAPREDUCE-3935. Annotate Counters.Counter and Counters.Group as @Public. (tomwhite) + HADOOP-8285 MR changes for Use ProtoBuf for RpcPayLoadHeader (sanjay radia) + BUG FIXES MAPREDUCE-4100. [Gridmix] Bug fixed in compression emulation feature for diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/factories/impl/pb/RpcServerFactoryPBImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/factories/impl/pb/RpcServerFactoryPBImpl.java index 0ea9d1c65e..9993583a4a 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/factories/impl/pb/RpcServerFactoryPBImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/factories/impl/pb/RpcServerFactoryPBImpl.java @@ -30,7 +30,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ipc.ProtobufRpcEngine; -import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.security.token.SecretManager; @@ -162,7 +161,7 @@ private Server createServer(Class pbProtocol, InetSocketAddress addr, Configu addr.getHostName(), addr.getPort(), numHandlers, false, conf, secretManager); LOG.info("Adding protocol "+pbProtocol.getCanonicalName()+" to the server"); - server.addProtocol(RpcKind.RPC_PROTOCOL_BUFFER, pbProtocol, blockingService); + server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, pbProtocol, blockingService); return server; } }