HDFS-15790. Make ProtobufRpcEngineProtos and ProtobufRpcEngineProtos2 Co-Exist (#2767)

(cherry picked from commit 2bbeae3240)

 Conflicts:
	hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
This commit is contained in:
Vinayakumar B 2021-05-24 15:15:39 +05:30 committed by Wei-Chiu Chuang
parent 86c28f0639
commit dbf1ef4aff
No known key found for this signature in database
GPG Key ID: B362E1C021854B9D
10 changed files with 13696 additions and 173 deletions

View File

@ -418,7 +418,12 @@
</execution> </execution>
<execution> <execution>
<id>src-test-compile-protoc</id> <id>src-test-compile-protoc</id>
<configuration><skip>false</skip></configuration> <configuration>
<skip>false</skip>
<excludes>
<exclude>*legacy.proto</exclude>
</excludes>
</configuration>
</execution> </execution>
</executions> </executions>
</plugin> </plugin>
@ -439,6 +444,10 @@
<id>replace-generated-test-sources</id> <id>replace-generated-test-sources</id>
<configuration> <configuration>
<skip>false</skip> <skip>false</skip>
<excludes>
<exclude>**/TestProtosLegacy.java</exclude>
<exclude>**/TestRpcServiceProtosLegacy.java</exclude>
</excludes>
</configuration> </configuration>
</execution> </execution>
<execution> <execution>
@ -451,6 +460,7 @@
<exclude>**/RpcWritable.java</exclude> <exclude>**/RpcWritable.java</exclude>
<exclude>**/ProtobufRpcEngineCallback.java</exclude> <exclude>**/ProtobufRpcEngineCallback.java</exclude>
<exclude>**/ProtobufRpcEngine.java</exclude> <exclude>**/ProtobufRpcEngine.java</exclude>
<exclude>**/ProtobufRpcEngine2.java</exclude>
<exclude>**/ProtobufRpcEngineProtos.java</exclude> <exclude>**/ProtobufRpcEngineProtos.java</exclude>
</excludes> </excludes>
</configuration> </configuration>
@ -459,6 +469,9 @@
<id>replace-test-sources</id> <id>replace-test-sources</id>
<configuration> <configuration>
<skip>false</skip> <skip>false</skip>
<excludes>
<exclude>**/TestProtoBufRpc.java</exclude>
</excludes>
</configuration> </configuration>
</execution> </execution>
</executions> </executions>
@ -1050,6 +1063,18 @@
</sources> </sources>
</configuration> </configuration>
</execution> </execution>
<execution>
<id>add-test-source-legacy-protobuf</id>
<phase>generate-test-sources</phase>
<goals>
<goal>add-test-source</goal>
</goals>
<configuration>
<sources>
<source>${basedir}/src/test/arm-java</source>
</sources>
</configuration>
</execution>
</executions> </executions>
</plugin> </plugin>
</plugins> </plugins>
@ -1091,6 +1116,28 @@
</includes> </includes>
</configuration> </configuration>
</execution> </execution>
<execution>
<id>src-test-compile-protoc-legacy</id>
<phase>generate-test-sources</phase>
<goals>
<goal>compile</goal>
</goals>
<configuration>
<skip>false</skip>
<!--Generating with old protobuf version for backward compatibility-->
<protocArtifact>
com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}
</protocArtifact>
<includeDependenciesInDescriptorSet>false</includeDependenciesInDescriptorSet>
<protoSourceRoot>${basedir}/src/test/proto</protoSourceRoot>
<outputDirectory>${project.build.directory}/generated-test-sources/java</outputDirectory>
<clearOutputDirectory>false</clearOutputDirectory>
<includes>
<include>test_legacy.proto</include>
<include>test_rpc_service_legacy.proto</include>
</includes>
</configuration>
</execution>
</executions> </executions>
</plugin> </plugin>
</plugins> </plugins>

View File

@ -31,8 +31,6 @@
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.ipc.Client.ConnectionId; import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.ipc.RPC.RpcInvoker;
import org.apache.hadoop.ipc.RPC.RpcKind;
import org.apache.hadoop.ipc.protobuf.ProtobufRpcEngineProtos.RequestHeaderProto; import org.apache.hadoop.ipc.protobuf.ProtobufRpcEngineProtos.RequestHeaderProto;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.SecretManager;
@ -68,9 +66,8 @@ public class ProtobufRpcEngine implements RpcEngine {
ASYNC_RETURN_MESSAGE = new ThreadLocal<>(); ASYNC_RETURN_MESSAGE = new ThreadLocal<>();
static { // Register the rpcRequest deserializer for ProtobufRpcEngine static { // Register the rpcRequest deserializer for ProtobufRpcEngine
org.apache.hadoop.ipc.Server.registerProtocolEngine( //These will be used in server side, which is always ProtobufRpcEngine2
RPC.RpcKind.RPC_PROTOCOL_BUFFER, RpcProtobufRequest.class, ProtobufRpcEngine2.registerProtocolEngine();
new Server.ProtoBufRpcInvoker());
} }
private static final ClientCache CLIENTS = new ClientCache(); private static final ClientCache CLIENTS = new ClientCache();
@ -353,8 +350,6 @@ static Client getClient(Configuration conf) {
RpcWritable.Buffer.class); RpcWritable.Buffer.class);
} }
@Override @Override
public RPC.Server getServer(Class<?> protocol, Object protocolImpl, public RPC.Server getServer(Class<?> protocol, Object protocolImpl,
String bindAddress, int port, int numHandlers, int numReaders, String bindAddress, int port, int numHandlers, int numReaders,
@ -367,24 +362,16 @@ public RPC.Server getServer(Class<?> protocol, Object protocolImpl,
portRangeConfig, alignmentContext); portRangeConfig, alignmentContext);
} }
public static class Server extends RPC.Server { /**
* Server implementation is always ProtobufRpcEngine2 based implementation,
* supports backward compatibility for protobuf 2.5 based implementations,
* which uses non-shaded protobuf classes.
*/
public static class Server extends ProtobufRpcEngine2.Server {
static final ThreadLocal<ProtobufRpcEngineCallback> currentCallback = static final ThreadLocal<ProtobufRpcEngineCallback> currentCallback =
new ThreadLocal<>(); new ThreadLocal<>();
static final ThreadLocal<CallInfo> currentCallInfo = new ThreadLocal<>();
private static final RpcInvoker RPC_INVOKER = new ProtoBufRpcInvoker();
static class CallInfo {
private final RPC.Server server;
private final String methodName;
public CallInfo(RPC.Server server, String methodName) {
this.server = server;
this.methodName = methodName;
}
}
static class ProtobufRpcEngineCallbackImpl static class ProtobufRpcEngineCallbackImpl
implements ProtobufRpcEngineCallback { implements ProtobufRpcEngineCallback {
@ -394,9 +381,9 @@ static class ProtobufRpcEngineCallbackImpl
private final long setupTime; private final long setupTime;
public ProtobufRpcEngineCallbackImpl() { public ProtobufRpcEngineCallbackImpl() {
this.server = currentCallInfo.get().server; this.server = CURRENT_CALL_INFO.get().getServer();
this.call = Server.getCurCall().get(); this.call = Server.getCurCall().get();
this.methodName = currentCallInfo.get().methodName; this.methodName = CURRENT_CALL_INFO.get().getMethodName();
this.setupTime = Time.now(); this.setupTime = Time.now();
} }
@ -443,144 +430,58 @@ public Server(Class<?> protocolClass, Object protocolImpl,
SecretManager<? extends TokenIdentifier> secretManager, SecretManager<? extends TokenIdentifier> secretManager,
String portRangeConfig, AlignmentContext alignmentContext) String portRangeConfig, AlignmentContext alignmentContext)
throws IOException { throws IOException {
super(bindAddress, port, null, numHandlers, super(protocolClass, protocolImpl, conf, bindAddress, port, numHandlers,
numReaders, queueSizePerHandler, conf, numReaders, queueSizePerHandler, verbose, secretManager,
serverNameFromClass(protocolImpl.getClass()), secretManager, portRangeConfig, alignmentContext);
portRangeConfig);
setAlignmentContext(alignmentContext);
this.verbose = verbose;
registerProtocolAndImpl(RPC.RpcKind.RPC_PROTOCOL_BUFFER, protocolClass,
protocolImpl);
}
@Override
protected RpcInvoker getServerRpcInvoker(RpcKind rpcKind) {
if (rpcKind == RpcKind.RPC_PROTOCOL_BUFFER) {
return RPC_INVOKER;
}
return super.getServerRpcInvoker(rpcKind);
} }
/** /**
* Protobuf invoker for {@link RpcInvoker} * This implementation is same as
* ProtobufRpcEngine2.Server.ProtobufInvoker#call(..)
* except this implementation uses non-shaded protobuf classes from legacy
* protobuf version (default 2.5.0).
*/ */
static class ProtoBufRpcInvoker implements RpcInvoker { static RpcWritable processCall(RPC.Server server,
private static ProtoClassProtoImpl getProtocolImpl(RPC.Server server, String connectionProtocolName, RpcWritable.Buffer request,
String protoName, long clientVersion) throws RpcServerException { String methodName, ProtoClassProtoImpl protocolImpl) throws Exception {
ProtoNameVer pv = new ProtoNameVer(protoName, clientVersion); BlockingService service = (BlockingService) protocolImpl.protocolImpl;
ProtoClassProtoImpl impl = MethodDescriptor methodDescriptor = service.getDescriptorForType()
server.getProtocolImplMap(RPC.RpcKind.RPC_PROTOCOL_BUFFER).get(pv); .findMethodByName(methodName);
if (impl == null) { // no match for Protocol AND Version if (methodDescriptor == null) {
VerProtocolImpl highest = String msg = "Unknown method " + methodName + " called on "
server.getHighestSupportedProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, + connectionProtocolName + " protocol.";
protoName); LOG.warn(msg);
if (highest == null) { throw new RpcNoSuchMethodException(msg);
throw new RpcNoSuchProtocolException(
"Unknown protocol: " + protoName);
}
// protocol supported but not the version that client wants
throw new RPC.VersionMismatch(protoName, clientVersion,
highest.version);
}
return impl;
} }
Message prototype = service.getRequestPrototype(methodDescriptor);
Message param = request.getValue(prototype);
@Override Message result;
/** Call currentCall = Server.getCurCall().get();
* This is a server side method, which is invoked over RPC. On success try {
* the return response has protobuf response payload. On failure, the server.rpcDetailedMetrics.init(protocolImpl.protocolClass);
* exception name and the stack trace are returned in the response. CURRENT_CALL_INFO.set(new CallInfo(server, methodName));
* See {@link HadoopRpcResponseProto} currentCall.setDetailedMetricsName(methodName);
* result = service.callBlockingMethod(methodDescriptor, null, param);
* In this method there three types of exceptions possible and they are // Check if this needs to be a deferred response,
* returned in response as follows. // by checking the ThreadLocal callback being set
* <ol> if (currentCallback.get() != null) {
* <li> Exceptions encountered in this method that are returned currentCall.deferResponse();
* as {@link RpcServerException} </li> currentCallback.set(null);
* <li> Exceptions thrown by the service is wrapped in ServiceException. return null;
* In that this method returns in response the exception thrown by the
* service.</li>
* <li> Other exceptions thrown by the service. They are returned as
* it is.</li>
* </ol>
*/
public Writable call(RPC.Server server, String connectionProtocolName,
Writable writableRequest, long receiveTime) throws Exception {
RpcProtobufRequest request = (RpcProtobufRequest) writableRequest;
RequestHeaderProto rpcRequest = request.getRequestHeader();
String methodName = rpcRequest.getMethodName();
/**
* RPCs for a particular interface (ie protocol) are done using a
* IPC connection that is setup using rpcProxy.
* The rpcProxy's has a declared protocol name that is
* sent form client to server at connection time.
*
* Each Rpc call also sends a protocol name
* (called declaringClassprotocolName). This name is usually the same
* as the connection protocol name except in some cases.
* For example metaProtocols such ProtocolInfoProto which get info
* about the protocol reuse the connection but need to indicate that
* the actual protocol is different (i.e. the protocol is
* ProtocolInfoProto) since they reuse the connection; in this case
* the declaringClassProtocolName field is set to the ProtocolInfoProto.
*/
String declaringClassProtoName =
rpcRequest.getDeclaringClassProtocolName();
long clientVersion = rpcRequest.getClientProtocolVersion();
return call(server, connectionProtocolName, request, receiveTime,
methodName, declaringClassProtoName, clientVersion);
}
protected Writable call(RPC.Server server, String connectionProtocolName,
RpcWritable.Buffer request, long receiveTime, String methodName,
String declaringClassProtoName, long clientVersion) throws Exception {
if (server.verbose)
LOG.info("Call: connectionProtocolName=" + connectionProtocolName +
", method=" + methodName);
ProtoClassProtoImpl protocolImpl = getProtocolImpl(server,
declaringClassProtoName, clientVersion);
BlockingService service = (BlockingService) protocolImpl.protocolImpl;
MethodDescriptor methodDescriptor = service.getDescriptorForType()
.findMethodByName(methodName);
if (methodDescriptor == null) {
String msg = "Unknown method " + methodName + " called on "
+ connectionProtocolName + " protocol.";
LOG.warn(msg);
throw new RpcNoSuchMethodException(msg);
} }
Message prototype = service.getRequestPrototype(methodDescriptor); } catch (ServiceException e) {
Message param = request.getValue(prototype); Exception exception = (Exception) e.getCause();
currentCall
Message result; .setDetailedMetricsName(exception.getClass().getSimpleName());
Call currentCall = Server.getCurCall().get(); throw (Exception) e.getCause();
try { } catch (Exception e) {
server.rpcDetailedMetrics.init(protocolImpl.protocolClass); currentCall.setDetailedMetricsName(e.getClass().getSimpleName());
currentCallInfo.set(new CallInfo(server, methodName)); throw e;
currentCall.setDetailedMetricsName(methodName); } finally {
result = service.callBlockingMethod(methodDescriptor, null, param); CURRENT_CALL_INFO.set(null);
// Check if this needs to be a deferred response,
// by checking the ThreadLocal callback being set
if (currentCallback.get() != null) {
currentCall.deferResponse();
currentCallback.set(null);
return null;
}
} catch (ServiceException e) {
Exception exception = (Exception) e.getCause();
currentCall.setDetailedMetricsName(
exception.getClass().getSimpleName());
throw (Exception) e.getCause();
} catch (Exception e) {
currentCall.setDetailedMetricsName(e.getClass().getSimpleName());
throw e;
} finally {
currentCallInfo.set(null);
}
return RpcWritable.wrap(result);
} }
return RpcWritable.wrap(result);
} }
} }

View File

@ -18,9 +18,6 @@
package org.apache.hadoop.ipc; package org.apache.hadoop.ipc;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.protobuf.*;
import org.apache.hadoop.thirdparty.protobuf.Descriptors.MethodDescriptor;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
@ -33,6 +30,12 @@
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.protobuf.BlockingService;
import org.apache.hadoop.thirdparty.protobuf.Descriptors.MethodDescriptor;
import org.apache.hadoop.thirdparty.protobuf.Message;
import org.apache.hadoop.thirdparty.protobuf.ServiceException;
import org.apache.hadoop.thirdparty.protobuf.TextFormat;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.concurrent.AsyncGet; import org.apache.hadoop.util.concurrent.AsyncGet;
import org.apache.htrace.core.TraceScope; import org.apache.htrace.core.TraceScope;
@ -61,9 +64,16 @@ public class ProtobufRpcEngine2 implements RpcEngine {
ASYNC_RETURN_MESSAGE = new ThreadLocal<>(); ASYNC_RETURN_MESSAGE = new ThreadLocal<>();
static { // Register the rpcRequest deserializer for ProtobufRpcEngine static { // Register the rpcRequest deserializer for ProtobufRpcEngine
org.apache.hadoop.ipc.Server.registerProtocolEngine( registerProtocolEngine();
RPC.RpcKind.RPC_PROTOCOL_BUFFER, RpcProtobufRequest.class, }
new Server.ProtoBufRpcInvoker());
static void registerProtocolEngine() {
if (Server.getRpcInvoker(RPC.RpcKind.RPC_PROTOCOL_BUFFER) == null) {
org.apache.hadoop.ipc.Server
.registerProtocolEngine(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
ProtobufRpcEngine2.RpcProtobufRequest.class,
new Server.ProtoBufRpcInvoker());
}
} }
private static final ClientCache CLIENTS = new ClientCache(); private static final ClientCache CLIENTS = new ClientCache();
@ -383,6 +393,14 @@ static class CallInfo {
this.server = server; this.server = server;
this.methodName = methodName; this.methodName = methodName;
} }
public RPC.Server getServer() {
return server;
}
public String getMethodName() {
return methodName;
}
} }
static class ProtobufRpcEngineCallbackImpl static class ProtobufRpcEngineCallbackImpl
@ -394,9 +412,9 @@ static class ProtobufRpcEngineCallbackImpl
private final long setupTime; private final long setupTime;
ProtobufRpcEngineCallbackImpl() { ProtobufRpcEngineCallbackImpl() {
this.server = CURRENT_CALL_INFO.get().server; this.server = CURRENT_CALL_INFO.get().getServer();
this.call = Server.getCurCall().get(); this.call = Server.getCurCall().get();
this.methodName = CURRENT_CALL_INFO.get().methodName; this.methodName = CURRENT_CALL_INFO.get().getMethodName();
this.setupTime = Time.now(); this.setupTime = Time.now();
} }
@ -417,7 +435,7 @@ public void error(Throwable t) {
} }
@InterfaceStability.Unstable @InterfaceStability.Unstable
public static ProtobufRpcEngineCallback2 registerForDeferredResponse() { public static ProtobufRpcEngineCallback2 registerForDeferredResponse2() {
ProtobufRpcEngineCallback2 callback = new ProtobufRpcEngineCallbackImpl(); ProtobufRpcEngineCallback2 callback = new ProtobufRpcEngineCallbackImpl();
CURRENT_CALLBACK.set(callback); CURRENT_CALLBACK.set(callback);
return callback; return callback;
@ -453,6 +471,17 @@ public Server(Class<?> protocolClass, Object protocolImpl,
protocolImpl); protocolImpl);
} }
//Use the latest protobuf rpc invoker itself as that is backward compatible.
private static final RpcInvoker RPC_INVOKER = new ProtoBufRpcInvoker();
@Override
protected RpcInvoker getServerRpcInvoker(RPC.RpcKind rpcKind) {
if (rpcKind == RPC.RpcKind.RPC_PROTOCOL_BUFFER) {
return RPC_INVOKER;
}
return super.getServerRpcInvoker(rpcKind);
}
/** /**
* Protobuf invoker for {@link RpcInvoker}. * Protobuf invoker for {@link RpcInvoker}.
*/ */
@ -524,6 +553,7 @@ public Writable call(RPC.Server server, String connectionProtocolName,
methodName, declaringClassProtoName, clientVersion); methodName, declaringClassProtoName, clientVersion);
} }
@SuppressWarnings("deprecation")
protected Writable call(RPC.Server server, String connectionProtocolName, protected Writable call(RPC.Server server, String connectionProtocolName,
RpcWritable.Buffer request, long receiveTime, String methodName, RpcWritable.Buffer request, long receiveTime, String methodName,
String declaringClassProtoName, long clientVersion) throws Exception { String declaringClassProtoName, long clientVersion) throws Exception {
@ -534,6 +564,21 @@ protected Writable call(RPC.Server server, String connectionProtocolName,
ProtoClassProtoImpl protocolImpl = getProtocolImpl(server, ProtoClassProtoImpl protocolImpl = getProtocolImpl(server,
declaringClassProtoName, clientVersion); declaringClassProtoName, clientVersion);
if (protocolImpl.isShadedPBImpl()) {
return call(server, connectionProtocolName, request, methodName,
protocolImpl);
}
//Legacy protobuf implementation. Handle using legacy (Non-shaded)
// protobuf classes.
return ProtobufRpcEngine.Server
.processCall(server, connectionProtocolName, request, methodName,
protocolImpl);
}
private RpcWritable call(RPC.Server server,
String connectionProtocolName, RpcWritable.Buffer request,
String methodName, ProtoClassProtoImpl protocolImpl)
throws Exception {
BlockingService service = (BlockingService) protocolImpl.protocolImpl; BlockingService service = (BlockingService) protocolImpl.protocolImpl;
MethodDescriptor methodDescriptor = service.getDescriptorForType() MethodDescriptor methodDescriptor = service.getDescriptorForType()
.findMethodByName(methodName); .findMethodByName(methodName);

View File

@ -937,11 +937,18 @@ public int hashCode() {
*/ */
static class ProtoClassProtoImpl { static class ProtoClassProtoImpl {
final Class<?> protocolClass; final Class<?> protocolClass;
final Object protocolImpl; final Object protocolImpl;
private final boolean shadedPBImpl;
ProtoClassProtoImpl(Class<?> protocolClass, Object protocolImpl) { ProtoClassProtoImpl(Class<?> protocolClass, Object protocolImpl) {
this.protocolClass = protocolClass; this.protocolClass = protocolClass;
this.protocolImpl = protocolImpl; this.protocolImpl = protocolImpl;
this.shadedPBImpl = protocolImpl instanceof BlockingService;
} }
public boolean isShadedPBImpl() {
return shadedPBImpl;
}
} }
ArrayList<Map<ProtoNameVer, ProtoClassProtoImpl>> protocolImplMapArray = ArrayList<Map<ProtoNameVer, ProtoClassProtoImpl>> protocolImplMapArray =

View File

@ -17,12 +17,10 @@
*/ */
package org.apache.hadoop.ipc; package org.apache.hadoop.ipc;
import org.apache.hadoop.thirdparty.protobuf.BlockingService;
import org.apache.hadoop.thirdparty.protobuf.RpcController;
import org.apache.hadoop.thirdparty.protobuf.ServiceException;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ipc.RPC.RpcKind;
import org.apache.hadoop.ipc.metrics.RpcMetrics; import org.apache.hadoop.ipc.metrics.RpcMetrics;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto;
import org.apache.hadoop.ipc.protobuf.TestProtos; import org.apache.hadoop.ipc.protobuf.TestProtos;
@ -30,38 +28,71 @@
import org.apache.hadoop.ipc.protobuf.TestProtos.EchoResponseProto; import org.apache.hadoop.ipc.protobuf.TestProtos.EchoResponseProto;
import org.apache.hadoop.ipc.protobuf.TestProtos.EmptyRequestProto; import org.apache.hadoop.ipc.protobuf.TestProtos.EmptyRequestProto;
import org.apache.hadoop.ipc.protobuf.TestProtos.EmptyResponseProto; import org.apache.hadoop.ipc.protobuf.TestProtos.EmptyResponseProto;
import org.apache.hadoop.ipc.protobuf.TestProtosLegacy;
import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpc2Proto; import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpc2Proto;
import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcProto; import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcProto;
import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtosLegacy;
import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.thirdparty.protobuf.BlockingService;
import org.apache.hadoop.thirdparty.protobuf.RpcController;
import org.apache.hadoop.thirdparty.protobuf.ServiceException;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import java.io.IOException; import java.io.IOException;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import static org.assertj.core.api.Assertions.assertThat;
import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt; import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics; import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.junit.Assume.assumeFalse;
/** /**
* Test for testing protocol buffer based RPC mechanism. * Test for testing protocol buffer based RPC mechanism.
* This test depends on test.proto definition of types in src/test/proto * This test depends on test.proto definition of types in src/test/proto
* and protobuf service definition from src/test/test_rpc_service.proto * and protobuf service definition from src/test/test_rpc_service.proto
*/ */
@RunWith(Parameterized.class)
public class TestProtoBufRpc extends TestRpcBase { public class TestProtoBufRpc extends TestRpcBase {
private static RPC.Server server; private static RPC.Server server;
private final static int SLEEP_DURATION = 1000; private final static int SLEEP_DURATION = 1000;
/**
* Test with legacy protobuf implementation in same server.
*/
private boolean testWithLegacy;
/**
* Test with legacy protobuf implementation loaded first while creating the
* RPC server.
*/
private boolean testWithLegacyFirst;
public TestProtoBufRpc(Boolean testWithLegacy, Boolean testWithLegacyFirst) {
this.testWithLegacy = testWithLegacy;
this.testWithLegacyFirst = testWithLegacyFirst;
}
@ProtocolInfo(protocolName = "testProto2", protocolVersion = 1) @ProtocolInfo(protocolName = "testProto2", protocolVersion = 1)
public interface TestRpcService2 extends public interface TestRpcService2 extends
TestProtobufRpc2Proto.BlockingInterface { TestProtobufRpc2Proto.BlockingInterface {
} }
@ProtocolInfo(protocolName="testProtoLegacy", protocolVersion = 1)
public interface TestRpcService2Legacy
extends TestRpcServiceProtosLegacy.
TestProtobufRpc2Proto.BlockingInterface {
}
public static class PBServer2Impl implements TestRpcService2 { public static class PBServer2Impl implements TestRpcService2 {
@Override @Override
@ -88,12 +119,58 @@ public TestProtos.SleepResponseProto sleep(RpcController controller,
} }
} }
public static class PBServer2ImplLegacy implements TestRpcService2Legacy {
@Override
public TestProtosLegacy.EmptyResponseProto ping2(
com.google.protobuf.RpcController unused,
TestProtosLegacy.EmptyRequestProto request)
throws com.google.protobuf.ServiceException {
return TestProtosLegacy.EmptyResponseProto.newBuilder().build();
}
@Override
public TestProtosLegacy.EchoResponseProto echo2(
com.google.protobuf.RpcController unused,
TestProtosLegacy.EchoRequestProto request)
throws com.google.protobuf.ServiceException {
return TestProtosLegacy.EchoResponseProto.newBuilder()
.setMessage(request.getMessage()).build();
}
@Override
public TestProtosLegacy.SleepResponseProto sleep(
com.google.protobuf.RpcController controller,
TestProtosLegacy.SleepRequestProto request)
throws com.google.protobuf.ServiceException {
try {
Thread.sleep(request.getMilliSeconds());
} catch (InterruptedException ex) {
}
return TestProtosLegacy.SleepResponseProto.newBuilder().build();
}
}
@Parameters
public static Collection<Object[]> params() {
Collection<Object[]> params = new ArrayList<Object[]>();
params.add(new Object[] {Boolean.TRUE, Boolean.TRUE });
params.add(new Object[] {Boolean.TRUE, Boolean.FALSE });
params.add(new Object[] {Boolean.FALSE, Boolean.FALSE });
return params;
}
@Before @Before
@SuppressWarnings("deprecation")
public void setUp() throws IOException { // Setup server for both protocols public void setUp() throws IOException { // Setup server for both protocols
conf = new Configuration(); conf = new Configuration();
conf.setInt(CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH, 1024); conf.setInt(CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH, 1024);
conf.setBoolean(CommonConfigurationKeys.IPC_SERVER_LOG_SLOW_RPC, true); conf.setBoolean(CommonConfigurationKeys.IPC_SERVER_LOG_SLOW_RPC, true);
// Set RPC engine to protobuf RPC engine // Set RPC engine to protobuf RPC engine
if (testWithLegacy) {
RPC.setProtocolEngine(conf, TestRpcService2Legacy.class,
ProtobufRpcEngine.class);
}
RPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcEngine2.class); RPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcEngine2.class);
RPC.setProtocolEngine(conf, TestRpcService2.class, RPC.setProtocolEngine(conf, TestRpcService2.class,
ProtobufRpcEngine2.class); ProtobufRpcEngine2.class);
@ -103,9 +180,21 @@ public void setUp() throws IOException { // Setup server for both protocols
BlockingService service = TestProtobufRpcProto BlockingService service = TestProtobufRpcProto
.newReflectiveBlockingService(serverImpl); .newReflectiveBlockingService(serverImpl);
// Get RPC server for server side implementation if (testWithLegacy && testWithLegacyFirst) {
server = new RPC.Builder(conf).setProtocol(TestRpcService.class) PBServer2ImplLegacy server2ImplLegacy = new PBServer2ImplLegacy();
.setInstance(service).setBindAddress(ADDRESS).setPort(PORT).build(); com.google.protobuf.BlockingService legacyService =
TestRpcServiceProtosLegacy.TestProtobufRpc2Proto
.newReflectiveBlockingService(server2ImplLegacy);
server = new RPC.Builder(conf).setProtocol(TestRpcService2Legacy.class)
.setInstance(legacyService).setBindAddress(ADDRESS).setPort(PORT)
.build();
server.addProtocol(RpcKind.RPC_PROTOCOL_BUFFER, TestRpcService.class,
service);
} else {
// Get RPC server for server side implementation
server = new RPC.Builder(conf).setProtocol(TestRpcService.class)
.setInstance(service).setBindAddress(ADDRESS).setPort(PORT).build();
}
addr = NetUtils.getConnectAddress(server); addr = NetUtils.getConnectAddress(server);
// now the second protocol // now the second protocol
@ -115,6 +204,16 @@ public void setUp() throws IOException { // Setup server for both protocols
server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, TestRpcService2.class, server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, TestRpcService2.class,
service2); service2);
if (testWithLegacy && !testWithLegacyFirst) {
PBServer2ImplLegacy server2ImplLegacy = new PBServer2ImplLegacy();
com.google.protobuf.BlockingService legacyService =
TestRpcServiceProtosLegacy.TestProtobufRpc2Proto
.newReflectiveBlockingService(server2ImplLegacy);
server
.addProtocol(RpcKind.RPC_PROTOCOL_BUFFER, TestRpcService2Legacy.class,
legacyService);
}
server.start(); server.start();
} }
@ -128,6 +227,10 @@ private TestRpcService2 getClient2() throws IOException {
return RPC.getProxy(TestRpcService2.class, 0, addr, conf); return RPC.getProxy(TestRpcService2.class, 0, addr, conf);
} }
private TestRpcService2Legacy getClientLegacy() throws IOException {
return RPC.getProxy(TestRpcService2Legacy.class, 0, addr, conf);
}
@Test (timeout=5000) @Test (timeout=5000)
public void testProtoBufRpc() throws Exception { public void testProtoBufRpc() throws Exception {
TestRpcService client = getClient(addr, conf); TestRpcService client = getClient(addr, conf);
@ -179,10 +282,39 @@ public void testProtoBufRpc2() throws Exception {
MetricsRecordBuilder rpcDetailedMetrics = MetricsRecordBuilder rpcDetailedMetrics =
getMetrics(server.getRpcDetailedMetrics().name()); getMetrics(server.getRpcDetailedMetrics().name());
assertCounterGt("Echo2NumOps", 0L, rpcDetailedMetrics); assertCounterGt("Echo2NumOps", 0L, rpcDetailedMetrics);
if (testWithLegacy) {
testProtobufLegacy();
}
}
private void testProtobufLegacy()
throws IOException, com.google.protobuf.ServiceException {
TestRpcService2Legacy client = getClientLegacy();
// Test ping method
client.ping2(null, TestProtosLegacy.EmptyRequestProto.newBuilder().build());
// Test echo method
TestProtosLegacy.EchoResponseProto echoResponse = client.echo2(null,
TestProtosLegacy.EchoRequestProto.newBuilder().setMessage("hello")
.build());
assertThat(echoResponse.getMessage()).isEqualTo("hello");
// Ensure RPC metrics are updated
MetricsRecordBuilder rpcMetrics = getMetrics(server.getRpcMetrics().name());
assertCounterGt("RpcQueueTimeNumOps", 0L, rpcMetrics);
assertCounterGt("RpcProcessingTimeNumOps", 0L, rpcMetrics);
MetricsRecordBuilder rpcDetailedMetrics =
getMetrics(server.getRpcDetailedMetrics().name());
assertCounterGt("Echo2NumOps", 0L, rpcDetailedMetrics);
} }
@Test (timeout=5000) @Test (timeout=5000)
public void testProtoBufRandomException() throws Exception { public void testProtoBufRandomException() throws Exception {
//No test with legacy
assumeFalse(testWithLegacy);
TestRpcService client = getClient(addr, conf); TestRpcService client = getClient(addr, conf);
try { try {
@ -200,6 +332,8 @@ public void testProtoBufRandomException() throws Exception {
@Test(timeout=6000) @Test(timeout=6000)
public void testExtraLongRpc() throws Exception { public void testExtraLongRpc() throws Exception {
//No test with legacy
assumeFalse(testWithLegacy);
TestRpcService2 client = getClient2(); TestRpcService2 client = getClient2();
final String shortString = StringUtils.repeat("X", 4); final String shortString = StringUtils.repeat("X", 4);
// short message goes through // short message goes through
@ -219,6 +353,8 @@ public void testExtraLongRpc() throws Exception {
@Test(timeout = 12000) @Test(timeout = 12000)
public void testLogSlowRPC() throws IOException, ServiceException, public void testLogSlowRPC() throws IOException, ServiceException,
TimeoutException, InterruptedException { TimeoutException, InterruptedException {
//No test with legacy
assumeFalse(testWithLegacy);
TestRpcService2 client = getClient2(); TestRpcService2 client = getClient2();
// make 10 K fast calls // make 10 K fast calls
for (int x = 0; x < 10000; x++) { for (int x = 0; x < 10000; x++) {
@ -244,6 +380,8 @@ public void testLogSlowRPC() throws IOException, ServiceException,
@Test(timeout = 12000) @Test(timeout = 12000)
public void testEnsureNoLogIfDisabled() throws IOException, ServiceException { public void testEnsureNoLogIfDisabled() throws IOException, ServiceException {
//No test with legacy
assumeFalse(testWithLegacy);
// disable slow RPC logging // disable slow RPC logging
server.setLogSlowRPC(false); server.setLogSlowRPC(false);
TestRpcService2 client = getClient2(); TestRpcService2 client = getClient2();

View File

@ -145,7 +145,7 @@ public static class TestProtoBufRpcServerHandoffServer
ServiceException { ServiceException {
final long startTime = System.currentTimeMillis(); final long startTime = System.currentTimeMillis();
final ProtobufRpcEngineCallback2 callback = final ProtobufRpcEngineCallback2 callback =
ProtobufRpcEngine2.Server.registerForDeferredResponse(); ProtobufRpcEngine2.Server.registerForDeferredResponse2();
final long sleepTime = request.getSleepTime(); final long sleepTime = request.getSleepTime();
new Thread() { new Thread() {
@Override @Override

View File

@ -0,0 +1,101 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
option java_package = "org.apache.hadoop.ipc.protobuf";
option java_outer_classname = "TestProtosLegacy";
option java_generate_equals_and_hash = true;
package hadoop.common;
message EmptyRequestProto {
}
message EmptyResponseProto {
}
message EchoRequestProto {
required string message = 1;
}
message EchoResponseProto {
required string message = 1;
}
message OptRequestProto {
optional string message = 1;
}
message OptResponseProto {
optional string message = 1;
}
message SleepRequestProto{
required int32 milliSeconds = 1;
}
message SleepResponseProto{
}
message SlowPingRequestProto {
required bool shouldSlow = 1;
}
message EchoRequestProto2 {
repeated string message = 1;
}
message EchoResponseProto2 {
repeated string message = 1;
}
message AddRequestProto {
required int32 param1 = 1;
required int32 param2 = 2;
}
message AddRequestProto2 {
repeated int32 params = 1;
}
message AddResponseProto {
required int32 result = 1;
}
message ExchangeRequestProto {
repeated int32 values = 1;
}
message ExchangeResponseProto {
repeated int32 values = 1;
}
message AuthMethodResponseProto {
required int32 code = 1;
required string mechanismName = 2;
}
message UserResponseProto {
required string user = 1;
}
message SleepRequestProto2 {
optional int64 sleep_time = 1;
}
message SleepResponseProto2 {
optional int64 receive_time = 1;
optional int64 response_time = 2;
}

View File

@ -0,0 +1,79 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
syntax = "proto2";
option java_package = "org.apache.hadoop.ipc.protobuf";
option java_outer_classname = "TestRpcServiceProtosLegacy";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
package hadoop.common;
import "test_legacy.proto";
/**
* A protobuf service for use in tests
*/
service TestProtobufRpcProto {
rpc ping(EmptyRequestProto) returns (EmptyResponseProto);
rpc echo(EchoRequestProto) returns (EchoResponseProto);
rpc error(EmptyRequestProto) returns (EmptyResponseProto);
rpc error2(EmptyRequestProto) returns (EmptyResponseProto);
rpc slowPing(SlowPingRequestProto) returns (EmptyResponseProto);
rpc echo2(EchoRequestProto2) returns (EchoResponseProto2);
rpc add(AddRequestProto) returns (AddResponseProto);
rpc add2(AddRequestProto2) returns (AddResponseProto);
rpc testServerGet(EmptyRequestProto) returns (EmptyResponseProto);
rpc exchange(ExchangeRequestProto) returns (ExchangeResponseProto);
rpc sleep(SleepRequestProto) returns (EmptyResponseProto);
rpc lockAndSleep(SleepRequestProto) returns (EmptyResponseProto);
rpc getAuthMethod(EmptyRequestProto) returns (AuthMethodResponseProto);
rpc getAuthUser(EmptyRequestProto) returns (UserResponseProto);
rpc echoPostponed(EchoRequestProto) returns (EchoResponseProto);
rpc sendPostponed(EmptyRequestProto) returns (EmptyResponseProto);
rpc getCurrentUser(EmptyRequestProto) returns (UserResponseProto);
rpc getServerRemoteUser(EmptyRequestProto) returns (UserResponseProto);
}
service TestProtobufRpc2Proto {
rpc ping2(EmptyRequestProto) returns (EmptyResponseProto);
rpc echo2(EchoRequestProto) returns (EchoResponseProto);
rpc sleep(SleepRequestProto) returns (SleepResponseProto);
}
service OldProtobufRpcProto {
rpc ping(EmptyRequestProto) returns (EmptyResponseProto);
rpc echo(EmptyRequestProto) returns (EmptyResponseProto);
}
service NewProtobufRpcProto {
rpc ping(EmptyRequestProto) returns (EmptyResponseProto);
rpc echo(OptRequestProto) returns (OptResponseProto);
}
service NewerProtobufRpcProto {
rpc ping(EmptyRequestProto) returns (EmptyResponseProto);
rpc echo(EmptyRequestProto) returns (EmptyResponseProto);
}
service CustomProto {
rpc ping(EmptyRequestProto) returns (EmptyResponseProto);
}
service TestProtobufRpcHandoffProto {
rpc sleep(SleepRequestProto2) returns (SleepResponseProto2);
}