HADOOP-7965. Support for protocol version and signature in PB.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1236444 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jitendra Nath Pandey 2012-01-26 23:23:03 +00:00
parent 3cdc100369
commit 12289efa2b
16 changed files with 698 additions and 21 deletions

View File

@ -74,6 +74,8 @@ Trunk (unreleased changes)
HADOOP-7987. Support setting the run-as user in unsecure mode. (jitendra)
HADOOP-7965. Support for protocol version and signature in PB. (jitendra)
BUGS
HADOOP-7851. Configuration.getClasses() never returns the default value.

View File

@ -274,4 +274,8 @@
<!-- protobuf generated code -->
<Class name="~org\.apache\.hadoop\.ipc\.protobuf\.HadoopRpcProtos.*"/>
</Match>
<Match>
<!-- protobuf generated code -->
<Class name="~org\.apache\.hadoop\.ipc\.protobuf\.ProtocolInfoProtos.*"/>
</Match>
</FindBugsFilter>

View File

@ -17,20 +17,20 @@
*/
package org.apache.hadoop.io.retry;
import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.ipc.RpcInvocationHandler;
class RetryInvocationHandler implements InvocationHandler, Closeable {
class RetryInvocationHandler implements RpcInvocationHandler {
public static final Log LOG = LogFactory.getLog(RetryInvocationHandler.class);
private FailoverProxyProvider proxyProvider;
@ -135,4 +135,11 @@ class RetryInvocationHandler implements InvocationHandler, Closeable {
proxyProvider.close();
}
@Override //RpcInvocationHandler
public ConnectionId getConnectionId() {
RpcInvocationHandler inv = (RpcInvocationHandler) Proxy
.getInvocationHandler(currentProxy);
return inv.getConnectionId();
}
}

View File

@ -18,11 +18,9 @@
package org.apache.hadoop.ipc;
import java.io.Closeable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
@ -37,6 +35,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.ipc.RPC.RpcInvoker;
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcExceptionProto;
@ -51,7 +50,6 @@ import org.apache.hadoop.util.StringUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.BlockingService;
import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import com.google.protobuf.ServiceException;
@ -80,8 +78,19 @@ public class ProtobufRpcEngine implements RpcEngine {
.getClassLoader(), new Class[] { protocol }, new Invoker(protocol,
addr, ticket, conf, factory, rpcTimeout)), false);
}
@Override
public ProtocolProxy<ProtocolMetaInfoPB> getProtocolMetaInfoProxy(
ConnectionId connId, Configuration conf, SocketFactory factory)
throws IOException {
Class<ProtocolMetaInfoPB> protocol = ProtocolMetaInfoPB.class;
return new ProtocolProxy<ProtocolMetaInfoPB>(protocol,
(ProtocolMetaInfoPB) Proxy.newProxyInstance(protocol.getClassLoader(),
new Class[] { protocol }, new Invoker(protocol, connId, conf,
factory)), false);
}
private static class Invoker implements InvocationHandler, Closeable {
private static class Invoker implements RpcInvocationHandler {
private final Map<String, Message> returnTypes =
new ConcurrentHashMap<String, Message>();
private boolean isClosed = false;
@ -93,12 +102,20 @@ public class ProtobufRpcEngine implements RpcEngine {
public Invoker(Class<?> protocol, InetSocketAddress addr,
UserGroupInformation ticket, Configuration conf, SocketFactory factory,
int rpcTimeout) throws IOException {
this.remoteId = Client.ConnectionId.getConnectionId(addr, protocol,
ticket, rpcTimeout, conf);
this.client = CLIENTS.getClient(conf, factory,
RpcResponseWritable.class);
this.clientProtocolVersion = RPC.getProtocolVersion(protocol);
this(protocol, Client.ConnectionId.getConnectionId(addr, protocol,
ticket, rpcTimeout, conf), conf, factory);
}
/**
* This constructor takes a connectionId, instead of creating a new one.
*/
public Invoker(Class<?> protocol, Client.ConnectionId connId,
Configuration conf, SocketFactory factory) {
this.remoteId = connId;
this.client = CLIENTS.getClient(conf, factory, RpcResponseWritable.class);
this.protocolName = RPC.getProtocolName(protocol);
this.clientProtocolVersion = RPC
.getProtocolVersion(protocol);
}
private HadoopRpcRequestProto constructRpcRequest(Method method,
@ -222,6 +239,11 @@ public class ProtobufRpcEngine implements RpcEngine {
returnTypes.put(method.getName(), prototype);
return prototype;
}
@Override //RpcInvocationHandler
public ConnectionId getConnectionId() {
return remoteId;
}
}
@Override

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ipc;
import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolInfoService;
/**
* Protocol to get versions and signatures for supported protocols from the
* server.
*
* Note: This extends the protocolbuffer service based interface to
* add annotations.
*/
@ProtocolInfo(
protocolName = "org.apache.hadoop.ipc.ProtocolMetaInfoPB",
protocolVersion = 1)
public interface ProtocolMetaInfoPB extends
ProtocolInfoService.BlockingInterface {
}

View File

@ -0,0 +1,122 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ipc;
import org.apache.hadoop.ipc.RPC.Server.VerProtocolImpl;
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureRequestProto;
import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureResponseProto;
import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolVersionsRequestProto;
import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolVersionsResponseProto;
import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolSignatureProto;
import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolVersionProto;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
/**
* This class serves the requests for protocol versions and signatures by
* looking them up in the server registry.
*/
public class ProtocolMetaInfoServerSideTranslatorPB implements
ProtocolMetaInfoPB {
RPC.Server server;
public ProtocolMetaInfoServerSideTranslatorPB(RPC.Server server) {
this.server = server;
}
@Override
public GetProtocolVersionsResponseProto getProtocolVersions(
RpcController controller, GetProtocolVersionsRequestProto request)
throws ServiceException {
String protocol = request.getProtocol();
GetProtocolVersionsResponseProto.Builder builder =
GetProtocolVersionsResponseProto.newBuilder();
for (RpcKind r : RpcKind.values()) {
long[] versions;
try {
versions = getProtocolVersionForRpcKind(r, protocol);
} catch (ClassNotFoundException e) {
throw new ServiceException(e);
}
ProtocolVersionProto.Builder b = ProtocolVersionProto.newBuilder();
if (versions != null) {
b.setRpcKind(r.toString());
for (long v : versions) {
b.addVersions(v);
}
}
builder.addProtocolVersions(b.build());
}
return builder.build();
}
@Override
public GetProtocolSignatureResponseProto getProtocolSignature(
RpcController controller, GetProtocolSignatureRequestProto request)
throws ServiceException {
GetProtocolSignatureResponseProto.Builder builder = GetProtocolSignatureResponseProto
.newBuilder();
String protocol = request.getProtocol();
String rpcKind = request.getRpcKind();
long[] versions;
try {
versions = getProtocolVersionForRpcKind(RpcKind.valueOf(rpcKind),
protocol);
} catch (ClassNotFoundException e1) {
throw new ServiceException(e1);
}
if (versions == null) {
return builder.build();
}
for (long v : versions) {
ProtocolSignatureProto.Builder sigBuilder = ProtocolSignatureProto
.newBuilder();
sigBuilder.setVersion(v);
try {
ProtocolSignature signature = ProtocolSignature.getProtocolSignature(
protocol, v);
for (int m : signature.getMethods()) {
sigBuilder.addMethods(m);
}
} catch (ClassNotFoundException e) {
throw new ServiceException(e);
}
builder.addProtocolSignature(sigBuilder.build());
}
return builder.build();
}
private long[] getProtocolVersionForRpcKind(RpcKind rpcKind,
String protocol) throws ClassNotFoundException {
Class<?> protocolClass = Class.forName(protocol);
String protocolName = RPC.getProtocolName(protocolClass);
VerProtocolImpl[] vers = server.getSupportedProtocolVersions(rpcKind,
protocolName);
if (vers == null) {
return null;
}
long [] versions = new long[vers.length];
for (int i=0; i<versions.length; i++) {
versions[i] = vers[i].version;
}
return versions;
}
}

View File

@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ipc;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* This interface is implemented by the client side translators and can be used
* to obtain information about underlying protocol e.g. to check if a method is
* supported on the server side.
*/
@InterfaceAudience.Private
@InterfaceStability.Stable
public interface ProtocolMetaInterface {
/**
* Checks whether the given method name is supported by the server.
* It is assumed that all method names are unique for a protocol.
* @param methodName The name of the method
* @return true if method is supported, otherwise false.
* @throws IOException
*/
public boolean isMethodSupported(String methodName) throws IOException;
}

View File

@ -183,7 +183,7 @@ public class ProtocolSignature implements Writable {
* @return its signature and finger print
*/
private static ProtocolSigFingerprint getSigFingerprint(
Class <? extends VersionedProtocol> protocol, long serverVersion) {
Class <?> protocol, long serverVersion) {
String protocolName = RPC.getProtocolName(protocol);
synchronized (PROTOCOL_FINGERPRINT_CACHE) {
ProtocolSigFingerprint sig = PROTOCOL_FINGERPRINT_CACHE.get(protocolName);
@ -221,6 +221,12 @@ public class ProtocolSignature implements Writable {
return sig.signature;
}
public static ProtocolSignature getProtocolSignature(String protocolName,
long version) throws ClassNotFoundException {
Class<?> protocol = Class.forName(protocolName);
return getSigFingerprint(protocol, version).signature;
}
/**
* Get a server protocol's signature
*

View File

@ -41,6 +41,7 @@ import org.apache.commons.logging.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolInfoService;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SaslRpcServer;
import org.apache.hadoop.security.UserGroupInformation;
@ -49,6 +50,8 @@ import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.util.ReflectionUtils;
import com.google.protobuf.BlockingService;
/** A simple RPC mechanism.
*
* A <i>protocol</i> is a Java interface. All parameters and return types must
@ -177,8 +180,8 @@ public class RPC {
}
// return the RpcEngine configured to handle a protocol
private static synchronized RpcEngine getProtocolEngine(Class<?> protocol,
Configuration conf) {
static synchronized RpcEngine getProtocolEngine(Class<?> protocol,
Configuration conf) {
RpcEngine engine = PROTOCOL_ENGINES.get(protocol);
if (engine == null) {
Class<?> impl = conf.getClass(ENGINE_PROP+"."+protocol.getName(),
@ -522,7 +525,16 @@ public class RPC {
return getProtocolProxy(protocol, clientVersion, addr, conf).getProxy();
}
/**
* Returns the server address for a given proxy.
*/
public static InetSocketAddress getServerAddress(Object proxy) {
RpcInvocationHandler inv = (RpcInvocationHandler) Proxy
.getInvocationHandler(proxy);
return inv.getConnectionId().getAddress();
}
/**
* Get a protocol proxy that contains a proxy connection to a remote server
* and a set of methods that are supported by the server
@ -817,6 +829,19 @@ public class RPC {
SecretManager<? extends TokenIdentifier> secretManager) throws IOException {
super(bindAddress, port, paramClass, handlerCount, numReaders, queueSizePerHandler,
conf, serverName, secretManager);
initProtocolMetaInfo(conf);
}
private void initProtocolMetaInfo(Configuration conf)
throws IOException {
RPC.setProtocolEngine(conf, ProtocolMetaInfoPB.class,
ProtobufRpcEngine.class);
ProtocolMetaInfoServerSideTranslatorPB xlator =
new ProtocolMetaInfoServerSideTranslatorPB(this);
BlockingService protocolInfoBlockingService = ProtocolInfoService
.newReflectiveBlockingService(xlator);
addProtocol(RpcKind.RPC_PROTOCOL_BUFFER, ProtocolMetaInfoPB.class,
protocolInfoBlockingService);
}
/**

View File

@ -0,0 +1,193 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ipc;
import java.io.IOException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureRequestProto;
import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureResponseProto;
import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolSignatureProto;
import org.apache.hadoop.net.NetUtils;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
/**
* This class maintains a cache of protocol versions and corresponding protocol
* signatures, keyed by server address, protocol and rpc kind.
* The cache is lazily populated.
*/
public class RpcClientUtil {
private static RpcController NULL_CONTROLLER = null;
private static final int PRIME = 16777619;
private static class ProtoSigCacheKey {
private InetSocketAddress serverAddress;
private String protocol;
private String rpcKind;
ProtoSigCacheKey(InetSocketAddress addr, String p, String rk) {
this.serverAddress = addr;
this.protocol = p;
this.rpcKind = rk;
}
@Override //Object
public int hashCode() {
int result = 1;
result = PRIME * result
+ ((serverAddress == null) ? 0 : serverAddress.hashCode());
result = PRIME * result + ((protocol == null) ? 0 : protocol.hashCode());
result = PRIME * result + ((rpcKind == null) ? 0 : rpcKind.hashCode());
return result;
}
@Override //Object
public boolean equals(Object other) {
if (other == this) {
return true;
}
if (other instanceof ProtoSigCacheKey) {
ProtoSigCacheKey otherKey = (ProtoSigCacheKey) other;
return (serverAddress.equals(otherKey.serverAddress) &&
protocol.equals(otherKey.protocol) &&
rpcKind.equals(otherKey.rpcKind));
}
return false;
}
}
private static ConcurrentHashMap<ProtoSigCacheKey, Map<Long, ProtocolSignature>>
signatureMap = new ConcurrentHashMap<ProtoSigCacheKey, Map<Long, ProtocolSignature>>();
private static void putVersionSignatureMap(InetSocketAddress addr,
String protocol, String rpcKind, Map<Long, ProtocolSignature> map) {
signatureMap.put(new ProtoSigCacheKey(addr, protocol, rpcKind), map);
}
private static Map<Long, ProtocolSignature> getVersionSignatureMap(
InetSocketAddress addr, String protocol, String rpcKind) {
return signatureMap.get(new ProtoSigCacheKey(addr, protocol, rpcKind));
}
/**
* Returns whether the given method is supported or not.
* The protocol signatures are fetched and cached. The connection id for the
* proxy provided is re-used.
* @param rpcProxy Proxy which provides an existing connection id.
* @param protocol Protocol for which the method check is required.
* @param rpcKind The RpcKind for which the method check is required.
* @param version The version at the client.
* @param methodName Name of the method.
* @return true if the method is supported, false otherwise.
* @throws IOException
*/
public static boolean isMethodSupported(Object rpcProxy, Class<?> protocol,
RpcKind rpcKind, long version, String methodName) throws IOException {
InetSocketAddress serverAddress = RPC.getServerAddress(rpcProxy);
Map<Long, ProtocolSignature> versionMap = getVersionSignatureMap(
serverAddress, protocol.getName(), rpcKind.toString());
if (versionMap == null) {
Configuration conf = new Configuration();
RPC.setProtocolEngine(conf, ProtocolMetaInfoPB.class,
ProtobufRpcEngine.class);
ProtocolMetaInfoPB protocolInfoProxy = getProtocolMetaInfoProxy(rpcProxy,
conf);
GetProtocolSignatureRequestProto.Builder builder =
GetProtocolSignatureRequestProto.newBuilder();
builder.setProtocol(protocol.getName());
builder.setRpcKind(rpcKind.toString());
GetProtocolSignatureResponseProto resp;
try {
resp = protocolInfoProxy.getProtocolSignature(NULL_CONTROLLER,
builder.build());
} catch (ServiceException se) {
throw ProtobufHelper.getRemoteException(se);
}
versionMap = convertProtocolSignatureProtos(resp
.getProtocolSignatureList());
putVersionSignatureMap(serverAddress, protocol.getName(),
rpcKind.toString(), versionMap);
}
// Assuming unique method names.
Method desiredMethod;
Method[] allMethods = protocol.getMethods();
desiredMethod = null;
for (Method m : allMethods) {
if (m.getName().equals(methodName)) {
desiredMethod = m;
break;
}
}
if (desiredMethod == null) {
return false;
}
int methodHash = ProtocolSignature.getFingerprint(desiredMethod);
return methodExists(methodHash, version, versionMap);
}
private static Map<Long, ProtocolSignature>
convertProtocolSignatureProtos(List<ProtocolSignatureProto> protoList) {
Map<Long, ProtocolSignature> map = new TreeMap<Long, ProtocolSignature>();
for (ProtocolSignatureProto p : protoList) {
int [] methods = new int[p.getMethodsList().size()];
int index=0;
for (int m : p.getMethodsList()) {
methods[index++] = m;
}
map.put(p.getVersion(), new ProtocolSignature(p.getVersion(), methods));
}
return map;
}
private static boolean methodExists(int methodHash, long version,
Map<Long, ProtocolSignature> versionMap) {
ProtocolSignature sig = versionMap.get(version);
if (sig != null) {
for (int m : sig.getMethods()) {
if (m == methodHash) {
return true;
}
}
}
return false;
}
// The proxy returned re-uses the underlying connection. This is a special
// mechanism for ProtocolMetaInfoPB.
// Don't do this for any other protocol, it might cause a security hole.
private static ProtocolMetaInfoPB getProtocolMetaInfoProxy(Object proxy,
Configuration conf) throws IOException {
RpcInvocationHandler inv = (RpcInvocationHandler) Proxy
.getInvocationHandler(proxy);
return RPC
.getProtocolEngine(ProtocolMetaInfoPB.class, conf)
.getProtocolMetaInfoProxy(inv.getConnectionId(), conf,
NetUtils.getDefaultSocketFactory(conf)).getProxy();
}
}

View File

@ -26,6 +26,7 @@ import javax.net.SocketFactory;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
@ -54,4 +55,16 @@ public interface RpcEngine {
SecretManager<? extends TokenIdentifier> secretManager
) throws IOException;
/**
* Returns a proxy for ProtocolMetaInfoPB, which uses the given connection
* id.
* @param connId, ConnectionId to be used for the proxy.
* @param conf, Configuration.
* @param factory, Socket factory.
* @return Proxy object.
* @throws IOException
*/
ProtocolProxy<ProtocolMetaInfoPB> getProtocolMetaInfoProxy(
ConnectionId connId, Configuration conf, SocketFactory factory)
throws IOException;
}

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ipc;
import java.io.Closeable;
import java.lang.reflect.InvocationHandler;
import org.apache.hadoop.ipc.Client.ConnectionId;
/**
* This interface must be implemented by all InvocationHandler
* implementations.
*/
public interface RpcInvocationHandler extends InvocationHandler, Closeable {
/**
* Returns the connection id associated with the InvocationHandler instance.
* @return ConnectionId
*/
ConnectionId getConnectionId();
}

View File

@ -21,18 +21,17 @@ package org.apache.hadoop.ipc;
import java.lang.reflect.Proxy;
import java.lang.reflect.Method;
import java.lang.reflect.Array;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.net.InetSocketAddress;
import java.io.*;
import java.io.Closeable;
import javax.net.SocketFactory;
import org.apache.commons.logging.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.ipc.RPC.RpcInvoker;
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
import org.apache.hadoop.ipc.VersionedProtocol;
@ -202,7 +201,7 @@ public class WritableRpcEngine implements RpcEngine {
private static ClientCache CLIENTS=new ClientCache();
private static class Invoker implements InvocationHandler, Closeable {
private static class Invoker implements RpcInvocationHandler {
private Client.ConnectionId remoteId;
private Client client;
private boolean isClosed = false;
@ -239,6 +238,11 @@ public class WritableRpcEngine implements RpcEngine {
CLIENTS.stopClient(client);
}
}
@Override
public ConnectionId getConnectionId() {
return remoteId;
}
}
// for unit testing only
@ -524,4 +528,11 @@ public class WritableRpcEngine implements RpcEngine {
}
}
}
@Override
public ProtocolProxy<ProtocolMetaInfoPB> getProtocolMetaInfoProxy(
ConnectionId connId, Configuration conf, SocketFactory factory)
throws IOException {
throw new UnsupportedOperationException("This proxy is not supported");
}
}

View File

@ -0,0 +1,82 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
option java_package = "org.apache.hadoop.ipc.protobuf";
option java_outer_classname = "ProtocolInfoProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
/**
* Request to get protocol versions for all supported rpc kinds.
*/
message GetProtocolVersionsRequestProto {
required string protocol = 1; // Protocol name
}
/**
* Protocol version with corresponding rpc kind.
*/
message ProtocolVersionProto {
required string rpcKind = 1; //RPC kind
repeated uint64 versions = 2; //Protocol version corresponding to the rpc kind.
}
/**
* Get protocol version response.
*/
message GetProtocolVersionsResponseProto {
repeated ProtocolVersionProto protocolVersions = 1;
}
/**
* Get protocol signature request.
*/
message GetProtocolSignatureRequestProto {
required string protocol = 1; // Protocol name
required string rpcKind = 2; // RPC kind
}
/**
* Get protocol signature response.
*/
message GetProtocolSignatureResponseProto {
repeated ProtocolSignatureProto protocolSignature = 1;
}
message ProtocolSignatureProto {
required uint64 version = 1;
repeated uint32 methods = 2;
}
/**
* Protocol to get information about protocols.
*/
service ProtocolInfoService {
/**
* Return protocol version corresponding to protocol interface for each
* supported rpc kind.
*/
rpc getProtocolVersions(GetProtocolVersionsRequestProto)
returns (GetProtocolVersionsResponseProto);
/**
* Return protocol version corresponding to protocol interface.
*/
rpc getProtocolSignature(GetProtocolSignatureRequestProto)
returns (GetProtocolSignatureResponseProto);
}

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.io.UTF8;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.authorize.AuthorizationException;
@ -259,7 +260,13 @@ public class TestRPC {
SecretManager<? extends TokenIdentifier> secretManager) throws IOException {
return null;
}
@Override
public ProtocolProxy<ProtocolMetaInfoPB> getProtocolMetaInfoProxy(
ConnectionId connId, Configuration conf, SocketFactory factory)
throws IOException {
throw new UnsupportedOperationException("This proxy is not supported");
}
}
/**

View File

@ -32,6 +32,9 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureRequestProto;
import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureResponseProto;
import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolSignatureProto;
import org.apache.hadoop.net.NetUtils;
import org.junit.After;
import org.junit.Test;
@ -302,4 +305,72 @@ System.out.println("echo int is NOT supported");
ex.getMessage().contains("VersionMismatch"));
}
}
@Test
public void testIsMethodSupported() throws IOException {
server = RPC.getServer(TestProtocol2.class, new TestImpl2(), ADDRESS, 0, 2,
false, conf, null);
server.start();
addr = NetUtils.getConnectAddress(server);
TestProtocol2 proxy = RPC.getProxy(TestProtocol2.class,
TestProtocol2.versionID, addr, conf);
boolean supported = RpcClientUtil.isMethodSupported(proxy,
TestProtocol2.class, RpcKind.RPC_WRITABLE,
RPC.getProtocolVersion(TestProtocol2.class), "echo");
Assert.assertTrue(supported);
supported = RpcClientUtil.isMethodSupported(proxy,
TestProtocol2.class, RpcKind.RPC_PROTOCOL_BUFFER,
RPC.getProtocolVersion(TestProtocol2.class), "echo");
Assert.assertFalse(supported);
}
/**
* Verify that ProtocolMetaInfoServerSideTranslatorPB correctly looks up
* the server registry to extract protocol signatures and versions.
*/
@Test
public void testProtocolMetaInfoSSTranslatorPB() throws Exception {
TestImpl1 impl = new TestImpl1();
server = RPC.getServer(TestProtocol1.class, impl, ADDRESS, 0, 2, false,
conf, null);
server.addProtocol(RpcKind.RPC_WRITABLE, TestProtocol0.class, impl);
server.start();
ProtocolMetaInfoServerSideTranslatorPB xlator =
new ProtocolMetaInfoServerSideTranslatorPB(server);
GetProtocolSignatureResponseProto resp = xlator.getProtocolSignature(
null,
createGetProtocolSigRequestProto(TestProtocol1.class,
RpcKind.RPC_PROTOCOL_BUFFER));
//No signatures should be found
Assert.assertEquals(0, resp.getProtocolSignatureCount());
resp = xlator.getProtocolSignature(
null,
createGetProtocolSigRequestProto(TestProtocol1.class,
RpcKind.RPC_WRITABLE));
Assert.assertEquals(1, resp.getProtocolSignatureCount());
ProtocolSignatureProto sig = resp.getProtocolSignatureList().get(0);
Assert.assertEquals(TestProtocol1.versionID, sig.getVersion());
boolean found = false;
int expected = ProtocolSignature.getFingerprint(TestProtocol1.class
.getMethod("echo", String.class));
for (int m : sig.getMethodsList()) {
if (expected == m) {
found = true;
break;
}
}
Assert.assertTrue(found);
}
private GetProtocolSignatureRequestProto createGetProtocolSigRequestProto(
Class<?> protocol, RpcKind rpcKind) {
GetProtocolSignatureRequestProto.Builder builder =
GetProtocolSignatureRequestProto.newBuilder();
builder.setProtocol(protocol.getName());
builder.setRpcKind(rpcKind.toString());
return builder.build();
}
}