From 12289efa2bc0f286ededc4c979cee0d304fadb0b Mon Sep 17 00:00:00 2001 From: Jitendra Nath Pandey Date: Thu, 26 Jan 2012 23:23:03 +0000 Subject: [PATCH] HADOOP-7965. Support for protocol version and signature in PB. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1236444 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-common/CHANGES.txt | 2 + .../dev-support/findbugsExcludeFile.xml | 4 + .../io/retry/RetryInvocationHandler.java | 15 +- .../apache/hadoop/ipc/ProtobufRpcEngine.java | 40 +++- .../apache/hadoop/ipc/ProtocolMetaInfoPB.java | 34 +++ ...rotocolMetaInfoServerSideTranslatorPB.java | 122 +++++++++++ .../hadoop/ipc/ProtocolMetaInterface.java | 42 ++++ .../apache/hadoop/ipc/ProtocolSignature.java | 8 +- .../main/java/org/apache/hadoop/ipc/RPC.java | 31 ++- .../org/apache/hadoop/ipc/RpcClientUtil.java | 193 ++++++++++++++++++ .../java/org/apache/hadoop/ipc/RpcEngine.java | 13 ++ .../hadoop/ipc/RpcInvocationHandler.java | 36 ++++ .../apache/hadoop/ipc/WritableRpcEngine.java | 17 +- .../src/main/proto/ProtocolInfo.proto | 82 ++++++++ .../java/org/apache/hadoop/ipc/TestRPC.java | 9 +- .../hadoop/ipc/TestRPCCompatibility.java | 71 +++++++ 16 files changed, 698 insertions(+), 21 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolMetaInfoPB.java create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolMetaInfoServerSideTranslatorPB.java create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolMetaInterface.java create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcClientUtil.java create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcInvocationHandler.java create mode 100644 hadoop-common-project/hadoop-common/src/main/proto/ProtocolInfo.proto diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index b34432fc55..ed4bb945d4 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -74,6 +74,8 @@ Trunk (unreleased changes) HADOOP-7987. Support setting the run-as user in unsecure mode. (jitendra) + HADOOP-7965. Support for protocol version and signature in PB. (jitendra) + BUGS HADOOP-7851. Configuration.getClasses() never returns the default value. diff --git a/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml b/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml index 190677248e..115d7124ef 100644 --- a/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml +++ b/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml @@ -274,4 +274,8 @@ + + + + diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java index f928760253..0dad53b59b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java @@ -17,20 +17,20 @@ */ package org.apache.hadoop.io.retry; -import java.io.Closeable; import java.io.IOException; -import java.lang.reflect.InvocationHandler; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.lang.reflect.Proxy; import java.util.Collections; import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.retry.RetryPolicy.RetryAction; +import org.apache.hadoop.ipc.Client.ConnectionId; +import org.apache.hadoop.ipc.RpcInvocationHandler; -class RetryInvocationHandler implements InvocationHandler, Closeable { +class RetryInvocationHandler implements RpcInvocationHandler { public static final Log LOG = LogFactory.getLog(RetryInvocationHandler.class); private FailoverProxyProvider proxyProvider; @@ -135,4 +135,11 @@ class RetryInvocationHandler implements InvocationHandler, Closeable { proxyProvider.close(); } + @Override //RpcInvocationHandler + public ConnectionId getConnectionId() { + RpcInvocationHandler inv = (RpcInvocationHandler) Proxy + .getInvocationHandler(currentProxy); + return inv.getConnectionId(); + } + } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java index e0f921fd96..9b3862abdb 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java @@ -18,11 +18,9 @@ package org.apache.hadoop.ipc; -import java.io.Closeable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.net.InetSocketAddress; @@ -37,6 +35,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.ipc.Client.ConnectionId; import org.apache.hadoop.ipc.RPC.RpcInvoker; import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcExceptionProto; @@ -51,7 +50,6 @@ import org.apache.hadoop.util.StringUtils; import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.BlockingService; import com.google.protobuf.Descriptors.MethodDescriptor; -import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.Message; import com.google.protobuf.ServiceException; @@ -80,8 +78,19 @@ public class ProtobufRpcEngine implements RpcEngine { .getClassLoader(), new Class[] { protocol }, new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout)), false); } + + @Override + public ProtocolProxy getProtocolMetaInfoProxy( + ConnectionId connId, Configuration conf, SocketFactory factory) + throws IOException { + Class protocol = ProtocolMetaInfoPB.class; + return new ProtocolProxy(protocol, + (ProtocolMetaInfoPB) Proxy.newProxyInstance(protocol.getClassLoader(), + new Class[] { protocol }, new Invoker(protocol, connId, conf, + factory)), false); + } - private static class Invoker implements InvocationHandler, Closeable { + private static class Invoker implements RpcInvocationHandler { private final Map returnTypes = new ConcurrentHashMap(); private boolean isClosed = false; @@ -93,12 +102,20 @@ public class ProtobufRpcEngine implements RpcEngine { public Invoker(Class protocol, InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, SocketFactory factory, int rpcTimeout) throws IOException { - this.remoteId = Client.ConnectionId.getConnectionId(addr, protocol, - ticket, rpcTimeout, conf); - this.client = CLIENTS.getClient(conf, factory, - RpcResponseWritable.class); - this.clientProtocolVersion = RPC.getProtocolVersion(protocol); + this(protocol, Client.ConnectionId.getConnectionId(addr, protocol, + ticket, rpcTimeout, conf), conf, factory); + } + + /** + * This constructor takes a connectionId, instead of creating a new one. + */ + public Invoker(Class protocol, Client.ConnectionId connId, + Configuration conf, SocketFactory factory) { + this.remoteId = connId; + this.client = CLIENTS.getClient(conf, factory, RpcResponseWritable.class); this.protocolName = RPC.getProtocolName(protocol); + this.clientProtocolVersion = RPC + .getProtocolVersion(protocol); } private HadoopRpcRequestProto constructRpcRequest(Method method, @@ -222,6 +239,11 @@ public class ProtobufRpcEngine implements RpcEngine { returnTypes.put(method.getName(), prototype); return prototype; } + + @Override //RpcInvocationHandler + public ConnectionId getConnectionId() { + return remoteId; + } } @Override diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolMetaInfoPB.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolMetaInfoPB.java new file mode 100644 index 0000000000..968f3d0231 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolMetaInfoPB.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ipc; + +import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolInfoService; + +/** + * Protocol to get versions and signatures for supported protocols from the + * server. + * + * Note: This extends the protocolbuffer service based interface to + * add annotations. + */ +@ProtocolInfo( + protocolName = "org.apache.hadoop.ipc.ProtocolMetaInfoPB", + protocolVersion = 1) +public interface ProtocolMetaInfoPB extends + ProtocolInfoService.BlockingInterface { +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolMetaInfoServerSideTranslatorPB.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolMetaInfoServerSideTranslatorPB.java new file mode 100644 index 0000000000..aaf71f8a4e --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtocolMetaInfoServerSideTranslatorPB.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ipc; + +import org.apache.hadoop.ipc.RPC.Server.VerProtocolImpl; +import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; +import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureRequestProto; +import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureResponseProto; +import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolVersionsRequestProto; +import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolVersionsResponseProto; +import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolSignatureProto; +import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolVersionProto; + +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; + +/** + * This class serves the requests for protocol versions and signatures by + * looking them up in the server registry. + */ +public class ProtocolMetaInfoServerSideTranslatorPB implements + ProtocolMetaInfoPB { + + RPC.Server server; + + public ProtocolMetaInfoServerSideTranslatorPB(RPC.Server server) { + this.server = server; + } + + @Override + public GetProtocolVersionsResponseProto getProtocolVersions( + RpcController controller, GetProtocolVersionsRequestProto request) + throws ServiceException { + String protocol = request.getProtocol(); + GetProtocolVersionsResponseProto.Builder builder = + GetProtocolVersionsResponseProto.newBuilder(); + for (RpcKind r : RpcKind.values()) { + long[] versions; + try { + versions = getProtocolVersionForRpcKind(r, protocol); + } catch (ClassNotFoundException e) { + throw new ServiceException(e); + } + ProtocolVersionProto.Builder b = ProtocolVersionProto.newBuilder(); + if (versions != null) { + b.setRpcKind(r.toString()); + for (long v : versions) { + b.addVersions(v); + } + } + builder.addProtocolVersions(b.build()); + } + return builder.build(); + } + + @Override + public GetProtocolSignatureResponseProto getProtocolSignature( + RpcController controller, GetProtocolSignatureRequestProto request) + throws ServiceException { + GetProtocolSignatureResponseProto.Builder builder = GetProtocolSignatureResponseProto + .newBuilder(); + String protocol = request.getProtocol(); + String rpcKind = request.getRpcKind(); + long[] versions; + try { + versions = getProtocolVersionForRpcKind(RpcKind.valueOf(rpcKind), + protocol); + } catch (ClassNotFoundException e1) { + throw new ServiceException(e1); + } + if (versions == null) { + return builder.build(); + } + for (long v : versions) { + ProtocolSignatureProto.Builder sigBuilder = ProtocolSignatureProto + .newBuilder(); + sigBuilder.setVersion(v); + try { + ProtocolSignature signature = ProtocolSignature.getProtocolSignature( + protocol, v); + for (int m : signature.getMethods()) { + sigBuilder.addMethods(m); + } + } catch (ClassNotFoundException e) { + throw new ServiceException(e); + } + builder.addProtocolSignature(sigBuilder.build()); + } + return builder.build(); + } + + private long[] getProtocolVersionForRpcKind(RpcKind rpcKind, + String protocol) throws ClassNotFoundException { + Class protocolClass = Class.forName(protocol); + String protocolName = RPC.getProtocolName(protocolClass); + VerProtocolImpl[] vers = server.getSupportedProtocolVersions(rpcKind, + protocolName); + if (vers == null) { + return null; + } + long [] versions = new long[vers.length]; + for (int i=0; i protocol, long serverVersion) { + Class protocol, long serverVersion) { String protocolName = RPC.getProtocolName(protocol); synchronized (PROTOCOL_FINGERPRINT_CACHE) { ProtocolSigFingerprint sig = PROTOCOL_FINGERPRINT_CACHE.get(protocolName); @@ -221,6 +221,12 @@ public class ProtocolSignature implements Writable { return sig.signature; } + public static ProtocolSignature getProtocolSignature(String protocolName, + long version) throws ClassNotFoundException { + Class protocol = Class.forName(protocolName); + return getSigFingerprint(protocol, version).signature; + } + /** * Get a server protocol's signature * diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java index 321c1d8ef3..4f85e905cd 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java @@ -41,6 +41,7 @@ import org.apache.commons.logging.*; import org.apache.hadoop.io.*; import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; +import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolInfoService; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.SaslRpcServer; import org.apache.hadoop.security.UserGroupInformation; @@ -49,6 +50,8 @@ import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.conf.*; import org.apache.hadoop.util.ReflectionUtils; +import com.google.protobuf.BlockingService; + /** A simple RPC mechanism. * * A protocol is a Java interface. All parameters and return types must @@ -177,8 +180,8 @@ public class RPC { } // return the RpcEngine configured to handle a protocol - private static synchronized RpcEngine getProtocolEngine(Class protocol, - Configuration conf) { + static synchronized RpcEngine getProtocolEngine(Class protocol, + Configuration conf) { RpcEngine engine = PROTOCOL_ENGINES.get(protocol); if (engine == null) { Class impl = conf.getClass(ENGINE_PROP+"."+protocol.getName(), @@ -522,7 +525,16 @@ public class RPC { return getProtocolProxy(protocol, clientVersion, addr, conf).getProxy(); } - + + /** + * Returns the server address for a given proxy. + */ + public static InetSocketAddress getServerAddress(Object proxy) { + RpcInvocationHandler inv = (RpcInvocationHandler) Proxy + .getInvocationHandler(proxy); + return inv.getConnectionId().getAddress(); + } + /** * Get a protocol proxy that contains a proxy connection to a remote server * and a set of methods that are supported by the server @@ -817,6 +829,19 @@ public class RPC { SecretManager secretManager) throws IOException { super(bindAddress, port, paramClass, handlerCount, numReaders, queueSizePerHandler, conf, serverName, secretManager); + initProtocolMetaInfo(conf); + } + + private void initProtocolMetaInfo(Configuration conf) + throws IOException { + RPC.setProtocolEngine(conf, ProtocolMetaInfoPB.class, + ProtobufRpcEngine.class); + ProtocolMetaInfoServerSideTranslatorPB xlator = + new ProtocolMetaInfoServerSideTranslatorPB(this); + BlockingService protocolInfoBlockingService = ProtocolInfoService + .newReflectiveBlockingService(xlator); + addProtocol(RpcKind.RPC_PROTOCOL_BUFFER, ProtocolMetaInfoPB.class, + protocolInfoBlockingService); } /** diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcClientUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcClientUtil.java new file mode 100644 index 0000000000..cdbc034ea2 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcClientUtil.java @@ -0,0 +1,193 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ipc; + +import java.io.IOException; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.net.InetSocketAddress; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; +import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureRequestProto; +import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureResponseProto; +import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolSignatureProto; +import org.apache.hadoop.net.NetUtils; + +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; + +/** + * This class maintains a cache of protocol versions and corresponding protocol + * signatures, keyed by server address, protocol and rpc kind. + * The cache is lazily populated. + */ +public class RpcClientUtil { + private static RpcController NULL_CONTROLLER = null; + private static final int PRIME = 16777619; + + private static class ProtoSigCacheKey { + private InetSocketAddress serverAddress; + private String protocol; + private String rpcKind; + + ProtoSigCacheKey(InetSocketAddress addr, String p, String rk) { + this.serverAddress = addr; + this.protocol = p; + this.rpcKind = rk; + } + + @Override //Object + public int hashCode() { + int result = 1; + result = PRIME * result + + ((serverAddress == null) ? 0 : serverAddress.hashCode()); + result = PRIME * result + ((protocol == null) ? 0 : protocol.hashCode()); + result = PRIME * result + ((rpcKind == null) ? 0 : rpcKind.hashCode()); + return result; + } + + @Override //Object + public boolean equals(Object other) { + if (other == this) { + return true; + } + if (other instanceof ProtoSigCacheKey) { + ProtoSigCacheKey otherKey = (ProtoSigCacheKey) other; + return (serverAddress.equals(otherKey.serverAddress) && + protocol.equals(otherKey.protocol) && + rpcKind.equals(otherKey.rpcKind)); + } + return false; + } + } + + private static ConcurrentHashMap> + signatureMap = new ConcurrentHashMap>(); + + private static void putVersionSignatureMap(InetSocketAddress addr, + String protocol, String rpcKind, Map map) { + signatureMap.put(new ProtoSigCacheKey(addr, protocol, rpcKind), map); + } + + private static Map getVersionSignatureMap( + InetSocketAddress addr, String protocol, String rpcKind) { + return signatureMap.get(new ProtoSigCacheKey(addr, protocol, rpcKind)); + } + + /** + * Returns whether the given method is supported or not. + * The protocol signatures are fetched and cached. The connection id for the + * proxy provided is re-used. + * @param rpcProxy Proxy which provides an existing connection id. + * @param protocol Protocol for which the method check is required. + * @param rpcKind The RpcKind for which the method check is required. + * @param version The version at the client. + * @param methodName Name of the method. + * @return true if the method is supported, false otherwise. + * @throws IOException + */ + public static boolean isMethodSupported(Object rpcProxy, Class protocol, + RpcKind rpcKind, long version, String methodName) throws IOException { + InetSocketAddress serverAddress = RPC.getServerAddress(rpcProxy); + Map versionMap = getVersionSignatureMap( + serverAddress, protocol.getName(), rpcKind.toString()); + + if (versionMap == null) { + Configuration conf = new Configuration(); + RPC.setProtocolEngine(conf, ProtocolMetaInfoPB.class, + ProtobufRpcEngine.class); + ProtocolMetaInfoPB protocolInfoProxy = getProtocolMetaInfoProxy(rpcProxy, + conf); + GetProtocolSignatureRequestProto.Builder builder = + GetProtocolSignatureRequestProto.newBuilder(); + builder.setProtocol(protocol.getName()); + builder.setRpcKind(rpcKind.toString()); + GetProtocolSignatureResponseProto resp; + try { + resp = protocolInfoProxy.getProtocolSignature(NULL_CONTROLLER, + builder.build()); + } catch (ServiceException se) { + throw ProtobufHelper.getRemoteException(se); + } + versionMap = convertProtocolSignatureProtos(resp + .getProtocolSignatureList()); + putVersionSignatureMap(serverAddress, protocol.getName(), + rpcKind.toString(), versionMap); + } + // Assuming unique method names. + Method desiredMethod; + Method[] allMethods = protocol.getMethods(); + desiredMethod = null; + for (Method m : allMethods) { + if (m.getName().equals(methodName)) { + desiredMethod = m; + break; + } + } + if (desiredMethod == null) { + return false; + } + int methodHash = ProtocolSignature.getFingerprint(desiredMethod); + return methodExists(methodHash, version, versionMap); + } + + private static Map + convertProtocolSignatureProtos(List protoList) { + Map map = new TreeMap(); + for (ProtocolSignatureProto p : protoList) { + int [] methods = new int[p.getMethodsList().size()]; + int index=0; + for (int m : p.getMethodsList()) { + methods[index++] = m; + } + map.put(p.getVersion(), new ProtocolSignature(p.getVersion(), methods)); + } + return map; + } + + private static boolean methodExists(int methodHash, long version, + Map versionMap) { + ProtocolSignature sig = versionMap.get(version); + if (sig != null) { + for (int m : sig.getMethods()) { + if (m == methodHash) { + return true; + } + } + } + return false; + } + + // The proxy returned re-uses the underlying connection. This is a special + // mechanism for ProtocolMetaInfoPB. + // Don't do this for any other protocol, it might cause a security hole. + private static ProtocolMetaInfoPB getProtocolMetaInfoProxy(Object proxy, + Configuration conf) throws IOException { + RpcInvocationHandler inv = (RpcInvocationHandler) Proxy + .getInvocationHandler(proxy); + return RPC + .getProtocolEngine(ProtocolMetaInfoPB.class, conf) + .getProtocolMetaInfoProxy(inv.getConnectionId(), conf, + NetUtils.getDefaultSocketFactory(conf)).getProxy(); + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java index a9076e7d1e..0fc7d60bd3 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java @@ -26,6 +26,7 @@ import javax.net.SocketFactory; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.Client.ConnectionId; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.TokenIdentifier; @@ -54,4 +55,16 @@ public interface RpcEngine { SecretManager secretManager ) throws IOException; + /** + * Returns a proxy for ProtocolMetaInfoPB, which uses the given connection + * id. + * @param connId, ConnectionId to be used for the proxy. + * @param conf, Configuration. + * @param factory, Socket factory. + * @return Proxy object. + * @throws IOException + */ + ProtocolProxy getProtocolMetaInfoProxy( + ConnectionId connId, Configuration conf, SocketFactory factory) + throws IOException; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcInvocationHandler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcInvocationHandler.java new file mode 100644 index 0000000000..6bcd757357 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcInvocationHandler.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ipc; + +import java.io.Closeable; +import java.lang.reflect.InvocationHandler; + +import org.apache.hadoop.ipc.Client.ConnectionId; + +/** + * This interface must be implemented by all InvocationHandler + * implementations. + */ +public interface RpcInvocationHandler extends InvocationHandler, Closeable { + + /** + * Returns the connection id associated with the InvocationHandler instance. + * @return ConnectionId + */ + ConnectionId getConnectionId(); +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java index 19a496809b..fc0da0cf90 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java @@ -21,18 +21,17 @@ package org.apache.hadoop.ipc; import java.lang.reflect.Proxy; import java.lang.reflect.Method; import java.lang.reflect.Array; -import java.lang.reflect.InvocationHandler; import java.lang.reflect.InvocationTargetException; import java.net.InetSocketAddress; import java.io.*; -import java.io.Closeable; import javax.net.SocketFactory; import org.apache.commons.logging.*; import org.apache.hadoop.io.*; +import org.apache.hadoop.ipc.Client.ConnectionId; import org.apache.hadoop.ipc.RPC.RpcInvoker; import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; import org.apache.hadoop.ipc.VersionedProtocol; @@ -202,7 +201,7 @@ public class WritableRpcEngine implements RpcEngine { private static ClientCache CLIENTS=new ClientCache(); - private static class Invoker implements InvocationHandler, Closeable { + private static class Invoker implements RpcInvocationHandler { private Client.ConnectionId remoteId; private Client client; private boolean isClosed = false; @@ -239,6 +238,11 @@ public class WritableRpcEngine implements RpcEngine { CLIENTS.stopClient(client); } } + + @Override + public ConnectionId getConnectionId() { + return remoteId; + } } // for unit testing only @@ -524,4 +528,11 @@ public class WritableRpcEngine implements RpcEngine { } } } + + @Override + public ProtocolProxy getProtocolMetaInfoProxy( + ConnectionId connId, Configuration conf, SocketFactory factory) + throws IOException { + throw new UnsupportedOperationException("This proxy is not supported"); + } } diff --git a/hadoop-common-project/hadoop-common/src/main/proto/ProtocolInfo.proto b/hadoop-common-project/hadoop-common/src/main/proto/ProtocolInfo.proto new file mode 100644 index 0000000000..53046aaffd --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/proto/ProtocolInfo.proto @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +option java_package = "org.apache.hadoop.ipc.protobuf"; +option java_outer_classname = "ProtocolInfoProtos"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; + +/** + * Request to get protocol versions for all supported rpc kinds. + */ +message GetProtocolVersionsRequestProto { + required string protocol = 1; // Protocol name +} + +/** + * Protocol version with corresponding rpc kind. + */ +message ProtocolVersionProto { + required string rpcKind = 1; //RPC kind + repeated uint64 versions = 2; //Protocol version corresponding to the rpc kind. +} + +/** + * Get protocol version response. + */ +message GetProtocolVersionsResponseProto { + repeated ProtocolVersionProto protocolVersions = 1; +} + +/** + * Get protocol signature request. + */ +message GetProtocolSignatureRequestProto { + required string protocol = 1; // Protocol name + required string rpcKind = 2; // RPC kind +} + +/** + * Get protocol signature response. + */ +message GetProtocolSignatureResponseProto { + repeated ProtocolSignatureProto protocolSignature = 1; +} + +message ProtocolSignatureProto { + required uint64 version = 1; + repeated uint32 methods = 2; +} + +/** + * Protocol to get information about protocols. + */ +service ProtocolInfoService { + /** + * Return protocol version corresponding to protocol interface for each + * supported rpc kind. + */ + rpc getProtocolVersions(GetProtocolVersionsRequestProto) + returns (GetProtocolVersionsResponseProto); + + /** + * Return protocol version corresponding to protocol interface. + */ + rpc getProtocolSignature(GetProtocolSignatureRequestProto) + returns (GetProtocolSignatureResponseProto); +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java index 67fc608cb2..49e1ed6453 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java @@ -39,6 +39,7 @@ import org.apache.hadoop.io.UTF8; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryProxy; +import org.apache.hadoop.ipc.Client.ConnectionId; import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.authorize.AuthorizationException; @@ -259,7 +260,13 @@ public class TestRPC { SecretManager secretManager) throws IOException { return null; } - + + @Override + public ProtocolProxy getProtocolMetaInfoProxy( + ConnectionId connId, Configuration conf, SocketFactory factory) + throws IOException { + throw new UnsupportedOperationException("This proxy is not supported"); + } } /** diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java index d38d823200..aca33ef25b 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPCCompatibility.java @@ -32,6 +32,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; +import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureRequestProto; +import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureResponseProto; +import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolSignatureProto; import org.apache.hadoop.net.NetUtils; import org.junit.After; import org.junit.Test; @@ -302,4 +305,72 @@ System.out.println("echo int is NOT supported"); ex.getMessage().contains("VersionMismatch")); } } + + @Test + public void testIsMethodSupported() throws IOException { + server = RPC.getServer(TestProtocol2.class, new TestImpl2(), ADDRESS, 0, 2, + false, conf, null); + server.start(); + addr = NetUtils.getConnectAddress(server); + + TestProtocol2 proxy = RPC.getProxy(TestProtocol2.class, + TestProtocol2.versionID, addr, conf); + boolean supported = RpcClientUtil.isMethodSupported(proxy, + TestProtocol2.class, RpcKind.RPC_WRITABLE, + RPC.getProtocolVersion(TestProtocol2.class), "echo"); + Assert.assertTrue(supported); + supported = RpcClientUtil.isMethodSupported(proxy, + TestProtocol2.class, RpcKind.RPC_PROTOCOL_BUFFER, + RPC.getProtocolVersion(TestProtocol2.class), "echo"); + Assert.assertFalse(supported); + } + + /** + * Verify that ProtocolMetaInfoServerSideTranslatorPB correctly looks up + * the server registry to extract protocol signatures and versions. + */ + @Test + public void testProtocolMetaInfoSSTranslatorPB() throws Exception { + TestImpl1 impl = new TestImpl1(); + server = RPC.getServer(TestProtocol1.class, impl, ADDRESS, 0, 2, false, + conf, null); + server.addProtocol(RpcKind.RPC_WRITABLE, TestProtocol0.class, impl); + server.start(); + + ProtocolMetaInfoServerSideTranslatorPB xlator = + new ProtocolMetaInfoServerSideTranslatorPB(server); + + GetProtocolSignatureResponseProto resp = xlator.getProtocolSignature( + null, + createGetProtocolSigRequestProto(TestProtocol1.class, + RpcKind.RPC_PROTOCOL_BUFFER)); + //No signatures should be found + Assert.assertEquals(0, resp.getProtocolSignatureCount()); + resp = xlator.getProtocolSignature( + null, + createGetProtocolSigRequestProto(TestProtocol1.class, + RpcKind.RPC_WRITABLE)); + Assert.assertEquals(1, resp.getProtocolSignatureCount()); + ProtocolSignatureProto sig = resp.getProtocolSignatureList().get(0); + Assert.assertEquals(TestProtocol1.versionID, sig.getVersion()); + boolean found = false; + int expected = ProtocolSignature.getFingerprint(TestProtocol1.class + .getMethod("echo", String.class)); + for (int m : sig.getMethodsList()) { + if (expected == m) { + found = true; + break; + } + } + Assert.assertTrue(found); + } + + private GetProtocolSignatureRequestProto createGetProtocolSigRequestProto( + Class protocol, RpcKind rpcKind) { + GetProtocolSignatureRequestProto.Builder builder = + GetProtocolSignatureRequestProto.newBuilder(); + builder.setProtocol(protocol.getName()); + builder.setRpcKind(rpcKind.toString()); + return builder.build(); + } } \ No newline at end of file