HADOOP-9756. Remove the deprecated getServer(..) methods from RPC. Contributed by Junping Du
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1507259 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
7723b139d5
commit
a690a215db
@ -493,6 +493,9 @@ Release 2.1.0-beta - 2013-07-02
|
|||||||
HADOOP-9760. Move GSet and related classes to common from HDFS.
|
HADOOP-9760. Move GSet and related classes to common from HDFS.
|
||||||
(suresh)
|
(suresh)
|
||||||
|
|
||||||
|
HADOOP-9756. Remove the deprecated getServer(..) methods from RPC.
|
||||||
|
(Junping Du via szetszwo)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HADOOP-9150. Avoid unnecessary DNS resolution attempts for logical URIs
|
HADOOP-9150. Avoid unnecessary DNS resolution attempts for logical URIs
|
||||||
|
@ -2228,7 +2228,7 @@ private synchronized Document asXmlDocument() throws IOException {
|
|||||||
doc.appendChild(conf);
|
doc.appendChild(conf);
|
||||||
conf.appendChild(doc.createTextNode("\n"));
|
conf.appendChild(doc.createTextNode("\n"));
|
||||||
handleDeprecation(); //ensure properties is set and deprecation is handled
|
handleDeprecation(); //ensure properties is set and deprecation is handled
|
||||||
for (Enumeration e = properties.keys(); e.hasMoreElements();) {
|
for (Enumeration<Object> e = properties.keys(); e.hasMoreElements();) {
|
||||||
String name = (String)e.nextElement();
|
String name = (String)e.nextElement();
|
||||||
Object object = properties.get(name);
|
Object object = properties.get(name);
|
||||||
String value = null;
|
String value = null;
|
||||||
|
@ -646,104 +646,6 @@ public static void stopProxy(Object proxy) {
|
|||||||
+ proxy.getClass());
|
+ proxy.getClass());
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Construct a server for a protocol implementation instance listening on a
|
|
||||||
* port and address.
|
|
||||||
* @deprecated Please use {@link Builder} to build the {@link Server}
|
|
||||||
*/
|
|
||||||
@Deprecated
|
|
||||||
public static Server getServer(final Object instance, final String bindAddress, final int port, Configuration conf)
|
|
||||||
throws IOException {
|
|
||||||
return getServer(instance, bindAddress, port, 1, false, conf);
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Construct a server for a protocol implementation instance listening on a
|
|
||||||
* port and address.
|
|
||||||
* @deprecated Please use {@link Builder} to build the {@link Server}
|
|
||||||
*/
|
|
||||||
@Deprecated
|
|
||||||
public static Server getServer(final Object instance, final String bindAddress, final int port,
|
|
||||||
final int numHandlers,
|
|
||||||
final boolean verbose, Configuration conf)
|
|
||||||
throws IOException {
|
|
||||||
return getServer(instance.getClass(), // use impl class for protocol
|
|
||||||
instance, bindAddress, port, numHandlers, false, conf, null,
|
|
||||||
null);
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Construct a server for a protocol implementation instance.
|
|
||||||
* @deprecated Please use {@link Builder} to build the {@link Server}
|
|
||||||
*/
|
|
||||||
@Deprecated
|
|
||||||
public static Server getServer(Class<?> protocol,
|
|
||||||
Object instance, String bindAddress,
|
|
||||||
int port, Configuration conf)
|
|
||||||
throws IOException {
|
|
||||||
return getServer(protocol, instance, bindAddress, port, 1, false, conf, null,
|
|
||||||
null);
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Construct a server for a protocol implementation instance.
|
|
||||||
* @deprecated Please use {@link Builder} to build the {@link Server}
|
|
||||||
*/
|
|
||||||
@Deprecated
|
|
||||||
public static Server getServer(Class<?> protocol,
|
|
||||||
Object instance, String bindAddress, int port,
|
|
||||||
int numHandlers,
|
|
||||||
boolean verbose, Configuration conf)
|
|
||||||
throws IOException {
|
|
||||||
|
|
||||||
return getServer(protocol, instance, bindAddress, port, numHandlers, verbose,
|
|
||||||
conf, null, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Construct a server for a protocol implementation instance.
|
|
||||||
* @deprecated Please use {@link Builder} to build the {@link Server}
|
|
||||||
*/
|
|
||||||
@Deprecated
|
|
||||||
public static Server getServer(Class<?> protocol,
|
|
||||||
Object instance, String bindAddress, int port,
|
|
||||||
int numHandlers,
|
|
||||||
boolean verbose, Configuration conf,
|
|
||||||
SecretManager<? extends TokenIdentifier> secretManager)
|
|
||||||
throws IOException {
|
|
||||||
return getServer(protocol, instance, bindAddress, port, numHandlers, verbose,
|
|
||||||
conf, secretManager, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @deprecated Please use {@link Builder} to build the {@link Server}
|
|
||||||
*/
|
|
||||||
@Deprecated
|
|
||||||
public static Server getServer(Class<?> protocol,
|
|
||||||
Object instance, String bindAddress, int port,
|
|
||||||
int numHandlers,
|
|
||||||
boolean verbose, Configuration conf,
|
|
||||||
SecretManager<? extends TokenIdentifier> secretManager,
|
|
||||||
String portRangeConfig)
|
|
||||||
throws IOException {
|
|
||||||
return getProtocolEngine(protocol, conf)
|
|
||||||
.getServer(protocol, instance, bindAddress, port, numHandlers, -1, -1,
|
|
||||||
verbose, conf, secretManager, portRangeConfig);
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Construct a server for a protocol implementation instance.
|
|
||||||
* @deprecated Please use {@link Builder} to build the {@link Server}
|
|
||||||
*/
|
|
||||||
@Deprecated
|
|
||||||
public static <PROTO extends VersionedProtocol, IMPL extends PROTO>
|
|
||||||
Server getServer(Class<PROTO> protocol,
|
|
||||||
IMPL instance, String bindAddress, int port,
|
|
||||||
int numHandlers, int numReaders, int queueSizePerHandler,
|
|
||||||
boolean verbose, Configuration conf,
|
|
||||||
SecretManager<? extends TokenIdentifier> secretManager)
|
|
||||||
throws IOException {
|
|
||||||
|
|
||||||
return getProtocolEngine(protocol, conf)
|
|
||||||
.getServer(protocol, instance, bindAddress, port, numHandlers,
|
|
||||||
numReaders, queueSizePerHandler, verbose, conf, secretManager,
|
|
||||||
null);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Class to construct instances of RPC server with specific options.
|
* Class to construct instances of RPC server with specific options.
|
||||||
*/
|
*/
|
||||||
|
@ -217,14 +217,14 @@ public ConnectionId getConnectionId() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSerial() throws Exception {
|
public void testSerial() throws IOException, InterruptedException {
|
||||||
testSerial(3, false, 2, 5, 100);
|
testSerial(3, false, 2, 5, 100);
|
||||||
testSerial(3, true, 2, 5, 10);
|
testSerial(3, true, 2, 5, 10);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testSerial(int handlerCount, boolean handlerSleep,
|
public void testSerial(int handlerCount, boolean handlerSleep,
|
||||||
int clientCount, int callerCount, int callCount)
|
int clientCount, int callerCount, int callCount)
|
||||||
throws Exception {
|
throws IOException, InterruptedException {
|
||||||
Server server = new TestServer(handlerCount, handlerSleep);
|
Server server = new TestServer(handlerCount, handlerSleep);
|
||||||
InetSocketAddress addr = NetUtils.getConnectAddress(server);
|
InetSocketAddress addr = NetUtils.getConnectAddress(server);
|
||||||
server.start();
|
server.start();
|
||||||
@ -250,7 +250,7 @@ public void testSerial(int handlerCount, boolean handlerSleep,
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testStandAloneClient() throws Exception {
|
public void testStandAloneClient() throws IOException {
|
||||||
Client client = new Client(LongWritable.class, conf);
|
Client client = new Client(LongWritable.class, conf);
|
||||||
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 10);
|
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 10);
|
||||||
try {
|
try {
|
||||||
@ -350,7 +350,8 @@ private void doErrorTest(
|
|||||||
Class<? extends LongWritable> clientParamClass,
|
Class<? extends LongWritable> clientParamClass,
|
||||||
Class<? extends LongWritable> serverParamClass,
|
Class<? extends LongWritable> serverParamClass,
|
||||||
Class<? extends LongWritable> serverResponseClass,
|
Class<? extends LongWritable> serverResponseClass,
|
||||||
Class<? extends LongWritable> clientResponseClass) throws Exception {
|
Class<? extends LongWritable> clientResponseClass)
|
||||||
|
throws IOException, InstantiationException, IllegalAccessException {
|
||||||
|
|
||||||
// start server
|
// start server
|
||||||
Server server = new TestServer(1, false,
|
Server server = new TestServer(1, false,
|
||||||
@ -481,7 +482,7 @@ private static void assertExceptionContains(
|
|||||||
* to the client.
|
* to the client.
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testSocketFactoryException() throws Exception {
|
public void testSocketFactoryException() throws IOException {
|
||||||
SocketFactory mockFactory = mock(SocketFactory.class);
|
SocketFactory mockFactory = mock(SocketFactory.class);
|
||||||
doThrow(new IOException("Injected fault")).when(mockFactory).createSocket();
|
doThrow(new IOException("Injected fault")).when(mockFactory).createSocket();
|
||||||
Client client = new Client(LongWritable.class, conf, mockFactory);
|
Client client = new Client(LongWritable.class, conf, mockFactory);
|
||||||
@ -503,7 +504,7 @@ public void testSocketFactoryException() throws Exception {
|
|||||||
* HADOOP-7428.
|
* HADOOP-7428.
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testRTEDuringConnectionSetup() throws Exception {
|
public void testRTEDuringConnectionSetup() throws IOException {
|
||||||
// Set up a socket factory which returns sockets which
|
// Set up a socket factory which returns sockets which
|
||||||
// throw an RTE when setSoTimeout is called.
|
// throw an RTE when setSoTimeout is called.
|
||||||
SocketFactory spyFactory = spy(NetUtils.getDefaultSocketFactory(conf));
|
SocketFactory spyFactory = spy(NetUtils.getDefaultSocketFactory(conf));
|
||||||
@ -544,7 +545,7 @@ public Socket answer(InvocationOnMock invocation) throws Throwable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testIpcTimeout() throws Exception {
|
public void testIpcTimeout() throws IOException {
|
||||||
// start server
|
// start server
|
||||||
Server server = new TestServer(1, true);
|
Server server = new TestServer(1, true);
|
||||||
InetSocketAddress addr = NetUtils.getConnectAddress(server);
|
InetSocketAddress addr = NetUtils.getConnectAddress(server);
|
||||||
@ -566,7 +567,7 @@ public void testIpcTimeout() throws Exception {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testIpcConnectTimeout() throws Exception {
|
public void testIpcConnectTimeout() throws IOException {
|
||||||
// start server
|
// start server
|
||||||
Server server = new TestServer(1, true);
|
Server server = new TestServer(1, true);
|
||||||
InetSocketAddress addr = NetUtils.getConnectAddress(server);
|
InetSocketAddress addr = NetUtils.getConnectAddress(server);
|
||||||
@ -589,7 +590,7 @@ public void testIpcConnectTimeout() throws Exception {
|
|||||||
* Check service class byte in IPC header is correct on wire.
|
* Check service class byte in IPC header is correct on wire.
|
||||||
*/
|
*/
|
||||||
@Test(timeout=60000)
|
@Test(timeout=60000)
|
||||||
public void testIpcWithServiceClass() throws Exception {
|
public void testIpcWithServiceClass() throws IOException {
|
||||||
// start server
|
// start server
|
||||||
Server server = new TestServer(5, false);
|
Server server = new TestServer(5, false);
|
||||||
InetSocketAddress addr = NetUtils.getConnectAddress(server);
|
InetSocketAddress addr = NetUtils.getConnectAddress(server);
|
||||||
@ -616,7 +617,7 @@ public void testIpcWithServiceClass() throws Exception {
|
|||||||
* Make a call from a client and verify if header info is changed in server side
|
* Make a call from a client and verify if header info is changed in server side
|
||||||
*/
|
*/
|
||||||
private void callAndVerify(Server server, InetSocketAddress addr,
|
private void callAndVerify(Server server, InetSocketAddress addr,
|
||||||
int serviceClass, boolean noChanged) throws Exception{
|
int serviceClass, boolean noChanged) throws IOException{
|
||||||
Client client = new Client(LongWritable.class, conf);
|
Client client = new Client(LongWritable.class, conf);
|
||||||
|
|
||||||
client.call(new LongWritable(RANDOM.nextLong()),
|
client.call(new LongWritable(RANDOM.nextLong()),
|
||||||
@ -650,7 +651,7 @@ public void testIpcAfterStopping() throws IOException {
|
|||||||
* and stopping IPC servers.
|
* and stopping IPC servers.
|
||||||
*/
|
*/
|
||||||
@Test(timeout=60000)
|
@Test(timeout=60000)
|
||||||
public void testSocketLeak() throws Exception {
|
public void testSocketLeak() throws IOException {
|
||||||
Assume.assumeTrue(FD_DIR.exists()); // only run on Linux
|
Assume.assumeTrue(FD_DIR.exists()); // only run on Linux
|
||||||
|
|
||||||
long startFds = countOpenFileDescriptors();
|
long startFds = countOpenFileDescriptors();
|
||||||
@ -670,31 +671,31 @@ private long countOpenFileDescriptors() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testIpcFromHadoop_0_18_13() throws Exception {
|
public void testIpcFromHadoop_0_18_13() throws IOException {
|
||||||
doIpcVersionTest(NetworkTraces.HADOOP_0_18_3_RPC_DUMP,
|
doIpcVersionTest(NetworkTraces.HADOOP_0_18_3_RPC_DUMP,
|
||||||
NetworkTraces.RESPONSE_TO_HADOOP_0_18_3_RPC);
|
NetworkTraces.RESPONSE_TO_HADOOP_0_18_3_RPC);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testIpcFromHadoop0_20_3() throws Exception {
|
public void testIpcFromHadoop0_20_3() throws IOException {
|
||||||
doIpcVersionTest(NetworkTraces.HADOOP_0_20_3_RPC_DUMP,
|
doIpcVersionTest(NetworkTraces.HADOOP_0_20_3_RPC_DUMP,
|
||||||
NetworkTraces.RESPONSE_TO_HADOOP_0_20_3_RPC);
|
NetworkTraces.RESPONSE_TO_HADOOP_0_20_3_RPC);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testIpcFromHadoop0_21_0() throws Exception {
|
public void testIpcFromHadoop0_21_0() throws IOException {
|
||||||
doIpcVersionTest(NetworkTraces.HADOOP_0_21_0_RPC_DUMP,
|
doIpcVersionTest(NetworkTraces.HADOOP_0_21_0_RPC_DUMP,
|
||||||
NetworkTraces.RESPONSE_TO_HADOOP_0_21_0_RPC);
|
NetworkTraces.RESPONSE_TO_HADOOP_0_21_0_RPC);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testHttpGetResponse() throws Exception {
|
public void testHttpGetResponse() throws IOException {
|
||||||
doIpcVersionTest("GET / HTTP/1.0\r\n\r\n".getBytes(),
|
doIpcVersionTest("GET / HTTP/1.0\r\n\r\n".getBytes(),
|
||||||
Server.RECEIVED_HTTP_REQ_RESPONSE.getBytes());
|
Server.RECEIVED_HTTP_REQ_RESPONSE.getBytes());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testConnectionRetriesOnSocketTimeoutExceptions() throws Exception {
|
public void testConnectionRetriesOnSocketTimeoutExceptions() throws IOException {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
// set max retries to 0
|
// set max retries to 0
|
||||||
conf.setInt(
|
conf.setInt(
|
||||||
@ -720,7 +721,7 @@ private static class CallInfo {
|
|||||||
* (2) the rpc client receives the same call id/retry from the rpc server.
|
* (2) the rpc client receives the same call id/retry from the rpc server.
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testCallIdAndRetry() throws Exception {
|
public void testCallIdAndRetry() throws IOException {
|
||||||
final CallInfo info = new CallInfo();
|
final CallInfo info = new CallInfo();
|
||||||
|
|
||||||
// Override client to store the call info and check response
|
// Override client to store the call info and check response
|
||||||
@ -772,7 +773,7 @@ private interface DummyProtocol {
|
|||||||
* Test the retry count while used in a retry proxy.
|
* Test the retry count while used in a retry proxy.
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testRetryProxy() throws Exception {
|
public void testRetryProxy() throws IOException {
|
||||||
final Client client = new Client(LongWritable.class, conf);
|
final Client client = new Client(LongWritable.class, conf);
|
||||||
|
|
||||||
final TestServer server = new TestServer(1, false);
|
final TestServer server = new TestServer(1, false);
|
||||||
@ -807,7 +808,7 @@ public void run() {
|
|||||||
* Test if the rpc server gets the default retry count (0) from client.
|
* Test if the rpc server gets the default retry count (0) from client.
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testInitialCallRetryCount() throws Exception {
|
public void testInitialCallRetryCount() throws IOException {
|
||||||
// Override client to store the call id
|
// Override client to store the call id
|
||||||
final Client client = new Client(LongWritable.class, conf);
|
final Client client = new Client(LongWritable.class, conf);
|
||||||
|
|
||||||
@ -838,7 +839,7 @@ public void run() {
|
|||||||
* Test if the rpc server gets the retry count from client.
|
* Test if the rpc server gets the retry count from client.
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testCallRetryCount() throws Exception {
|
public void testCallRetryCount() throws IOException {
|
||||||
final int retryCount = 255;
|
final int retryCount = 255;
|
||||||
// Override client to store the call id
|
// Override client to store the call id
|
||||||
final Client client = new Client(LongWritable.class, conf);
|
final Client client = new Client(LongWritable.class, conf);
|
||||||
@ -870,9 +871,11 @@ public void run() {
|
|||||||
/**
|
/**
|
||||||
* Tests that client generates a unique sequential call ID for each RPC call,
|
* Tests that client generates a unique sequential call ID for each RPC call,
|
||||||
* even if multiple threads are using the same client.
|
* even if multiple threads are using the same client.
|
||||||
|
* @throws InterruptedException
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testUniqueSequentialCallIds() throws Exception {
|
public void testUniqueSequentialCallIds()
|
||||||
|
throws IOException, InterruptedException {
|
||||||
int serverThreads = 10, callerCount = 100, perCallerCallCount = 100;
|
int serverThreads = 10, callerCount = 100, perCallerCallCount = 100;
|
||||||
TestServer server = new TestServer(serverThreads, false);
|
TestServer server = new TestServer(serverThreads, false);
|
||||||
|
|
||||||
@ -937,7 +940,7 @@ private void assertRetriesOnSocketTimeouts(Configuration conf,
|
|||||||
|
|
||||||
private void doIpcVersionTest(
|
private void doIpcVersionTest(
|
||||||
byte[] requestData,
|
byte[] requestData,
|
||||||
byte[] expectedResponse) throws Exception {
|
byte[] expectedResponse) throws IOException {
|
||||||
Server server = new TestServer(1, true);
|
Server server = new TestServer(1, true);
|
||||||
InetSocketAddress addr = NetUtils.getConnectAddress(server);
|
InetSocketAddress addr = NetUtils.getConnectAddress(server);
|
||||||
server.start();
|
server.start();
|
||||||
|
@ -115,7 +115,8 @@ public void run() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testResponseBuffer() throws Exception {
|
public void testResponseBuffer()
|
||||||
|
throws IOException, InterruptedException {
|
||||||
Server.INITIAL_RESP_BUF_SIZE = 1;
|
Server.INITIAL_RESP_BUF_SIZE = 1;
|
||||||
conf.setInt(CommonConfigurationKeys.IPC_SERVER_RPC_MAX_RESPONSE_SIZE_KEY,
|
conf.setInt(CommonConfigurationKeys.IPC_SERVER_RPC_MAX_RESPONSE_SIZE_KEY,
|
||||||
1);
|
1);
|
||||||
@ -123,7 +124,8 @@ public void testResponseBuffer() throws Exception {
|
|||||||
conf = new Configuration(); // reset configuration
|
conf = new Configuration(); // reset configuration
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testServerResponder() throws Exception {
|
public void testServerResponder()
|
||||||
|
throws IOException, InterruptedException {
|
||||||
testServerResponder(10, true, 1, 10, 200);
|
testServerResponder(10, true, 1, 10, 200);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -131,7 +133,8 @@ public void testServerResponder(final int handlerCount,
|
|||||||
final boolean handlerSleep,
|
final boolean handlerSleep,
|
||||||
final int clientCount,
|
final int clientCount,
|
||||||
final int callerCount,
|
final int callerCount,
|
||||||
final int callCount) throws Exception {
|
final int callCount) throws IOException,
|
||||||
|
InterruptedException {
|
||||||
Server server = new TestServer(handlerCount, handlerSleep);
|
Server server = new TestServer(handlerCount, handlerSleep);
|
||||||
server.start();
|
server.start();
|
||||||
|
|
||||||
|
@ -323,7 +323,7 @@ public int getCloseCalled() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testConfRpc() throws Exception {
|
public void testConfRpc() throws IOException {
|
||||||
Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
|
Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
|
||||||
.setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
|
.setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
|
||||||
.setNumHandlers(1).setVerbose(false).build();
|
.setNumHandlers(1).setVerbose(false).build();
|
||||||
@ -350,7 +350,7 @@ public void testConfRpc() throws Exception {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testProxyAddress() throws Exception {
|
public void testProxyAddress() throws IOException {
|
||||||
Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
|
Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
|
||||||
.setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0).build();
|
.setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0).build();
|
||||||
TestProtocol proxy = null;
|
TestProtocol proxy = null;
|
||||||
@ -372,7 +372,7 @@ public void testProxyAddress() throws Exception {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSlowRpc() throws Exception {
|
public void testSlowRpc() throws IOException {
|
||||||
System.out.println("Testing Slow RPC");
|
System.out.println("Testing Slow RPC");
|
||||||
// create a server with two handlers
|
// create a server with two handlers
|
||||||
Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
|
Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
|
||||||
@ -418,11 +418,11 @@ public void testSlowRpc() throws Exception {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCalls() throws Exception {
|
public void testCalls() throws IOException {
|
||||||
testCallsInternal(conf);
|
testCallsInternal(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void testCallsInternal(Configuration conf) throws Exception {
|
private void testCallsInternal(Configuration conf) throws IOException {
|
||||||
Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
|
Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
|
||||||
.setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0).build();
|
.setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0).build();
|
||||||
TestProtocol proxy = null;
|
TestProtocol proxy = null;
|
||||||
@ -540,7 +540,7 @@ public Service[] getServices() {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void doRPCs(Configuration conf, boolean expectFailure) throws Exception {
|
private void doRPCs(Configuration conf, boolean expectFailure) throws IOException {
|
||||||
Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
|
Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
|
||||||
.setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
|
.setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
|
||||||
.setNumHandlers(5).setVerbose(true).build();
|
.setNumHandlers(5).setVerbose(true).build();
|
||||||
@ -599,7 +599,7 @@ public void testServerAddress() throws IOException {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testAuthorization() throws Exception {
|
public void testAuthorization() throws IOException {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
conf.setBoolean(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION,
|
conf.setBoolean(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION,
|
||||||
true);
|
true);
|
||||||
@ -626,7 +626,7 @@ public void testAuthorization() throws Exception {
|
|||||||
* Switch off setting socketTimeout values on RPC sockets.
|
* Switch off setting socketTimeout values on RPC sockets.
|
||||||
* Verify that RPC calls still work ok.
|
* Verify that RPC calls still work ok.
|
||||||
*/
|
*/
|
||||||
public void testNoPings() throws Exception {
|
public void testNoPings() throws IOException {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
|
|
||||||
conf.setBoolean("ipc.client.ping", false);
|
conf.setBoolean("ipc.client.ping", false);
|
||||||
@ -638,10 +638,10 @@ public void testNoPings() throws Exception {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Test stopping a non-registered proxy
|
* Test stopping a non-registered proxy
|
||||||
* @throws Exception
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
@Test(expected=HadoopIllegalArgumentException.class)
|
@Test(expected=HadoopIllegalArgumentException.class)
|
||||||
public void testStopNonRegisteredProxy() throws Exception {
|
public void testStopNonRegisteredProxy() throws IOException {
|
||||||
RPC.stopProxy(null);
|
RPC.stopProxy(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -650,7 +650,7 @@ public void testStopNonRegisteredProxy() throws Exception {
|
|||||||
* be stopped without error.
|
* be stopped without error.
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testStopMockObject() throws Exception {
|
public void testStopMockObject() throws IOException {
|
||||||
RPC.stopProxy(MockitoUtil.mockProtocol(TestProtocol.class));
|
RPC.stopProxy(MockitoUtil.mockProtocol(TestProtocol.class));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -681,7 +681,7 @@ public void testWrappedStopProxy() throws IOException {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testErrorMsgForInsecureClient() throws Exception {
|
public void testErrorMsgForInsecureClient() throws IOException {
|
||||||
Configuration serverConf = new Configuration(conf);
|
Configuration serverConf = new Configuration(conf);
|
||||||
SecurityUtil.setAuthenticationMethod(AuthenticationMethod.KERBEROS,
|
SecurityUtil.setAuthenticationMethod(AuthenticationMethod.KERBEROS,
|
||||||
serverConf);
|
serverConf);
|
||||||
@ -766,7 +766,7 @@ private static int countThreads(String search) {
|
|||||||
* Test that server.stop() properly stops all threads
|
* Test that server.stop() properly stops all threads
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testStopsAllThreads() throws Exception {
|
public void testStopsAllThreads() throws IOException, InterruptedException {
|
||||||
int threadsBefore = countThreads("Server$Listener$Reader");
|
int threadsBefore = countThreads("Server$Listener$Reader");
|
||||||
assertEquals("Expect no Reader threads running before test",
|
assertEquals("Expect no Reader threads running before test",
|
||||||
0, threadsBefore);
|
0, threadsBefore);
|
||||||
@ -797,7 +797,7 @@ public void testStopsAllThreads() throws Exception {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRPCBuilder() throws Exception {
|
public void testRPCBuilder() throws IOException {
|
||||||
// Test mandatory field conf
|
// Test mandatory field conf
|
||||||
try {
|
try {
|
||||||
new RPC.Builder(null).setProtocol(TestProtocol.class)
|
new RPC.Builder(null).setProtocol(TestProtocol.class)
|
||||||
@ -833,11 +833,13 @@ public void testRPCBuilder() throws Exception {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout=90000)
|
@Test(timeout=90000)
|
||||||
public void testRPCInterruptedSimple() throws Exception {
|
public void testRPCInterruptedSimple() throws IOException {
|
||||||
final Configuration conf = new Configuration();
|
final Configuration conf = new Configuration();
|
||||||
Server server = RPC.getServer(
|
Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
|
||||||
TestProtocol.class, new TestImpl(), ADDRESS, 0, 5, true, conf, null
|
.setInstance(new TestImpl()).setBindAddress(ADDRESS)
|
||||||
);
|
.setPort(0).setNumHandlers(5).setVerbose(true)
|
||||||
|
.setSecretManager(null).build();
|
||||||
|
|
||||||
server.start();
|
server.start();
|
||||||
try {
|
try {
|
||||||
InetSocketAddress addr = NetUtils.getConnectAddress(server);
|
InetSocketAddress addr = NetUtils.getConnectAddress(server);
|
||||||
@ -866,9 +868,10 @@ TestProtocol.class, new TestImpl(), ADDRESS, 0, 5, true, conf, null
|
|||||||
@Test(timeout=30000)
|
@Test(timeout=30000)
|
||||||
public void testRPCInterrupted() throws IOException, InterruptedException {
|
public void testRPCInterrupted() throws IOException, InterruptedException {
|
||||||
final Configuration conf = new Configuration();
|
final Configuration conf = new Configuration();
|
||||||
Server server = RPC.getServer(
|
Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
|
||||||
TestProtocol.class, new TestImpl(), ADDRESS, 0, 5, true, conf, null
|
.setInstance(new TestImpl()).setBindAddress(ADDRESS)
|
||||||
);
|
.setPort(0).setNumHandlers(5).setVerbose(true)
|
||||||
|
.setSecretManager(null).build();
|
||||||
|
|
||||||
server.start();
|
server.start();
|
||||||
try {
|
try {
|
||||||
@ -929,7 +932,7 @@ public void run() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws IOException {
|
||||||
new TestRPC().testCallsInternal(conf);
|
new TestRPC().testCallsInternal(conf);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -278,7 +278,7 @@ public void testHashCode() throws Exception {
|
|||||||
TestProtocol3.class.getMethod("echo_alias", int.class));
|
TestProtocol3.class.getMethod("echo_alias", int.class));
|
||||||
assertFalse(intEchoHash == intEchoHashAlias);
|
assertFalse(intEchoHash == intEchoHashAlias);
|
||||||
|
|
||||||
// Make sure that methods with the same returninig type and method name but
|
// Make sure that methods with the same returning type and method name but
|
||||||
// larger number of parameter types have different hash code
|
// larger number of parameter types have different hash code
|
||||||
int intEchoHash2 = ProtocolSignature.getFingerprint(
|
int intEchoHash2 = ProtocolSignature.getFingerprint(
|
||||||
TestProtocol3.class.getMethod("echo", int.class, int.class));
|
TestProtocol3.class.getMethod("echo", int.class, int.class));
|
||||||
|
@ -35,7 +35,7 @@
|
|||||||
public class TestSocketFactory {
|
public class TestSocketFactory {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSocketFactoryAsKeyInMap() throws Exception {
|
public void testSocketFactoryAsKeyInMap() {
|
||||||
Map<SocketFactory, Integer> dummyCache = new HashMap<SocketFactory, Integer>();
|
Map<SocketFactory, Integer> dummyCache = new HashMap<SocketFactory, Integer>();
|
||||||
int toBeCached1 = 1;
|
int toBeCached1 = 1;
|
||||||
int toBeCached2 = 2;
|
int toBeCached2 = 2;
|
||||||
|
@ -344,7 +344,6 @@ private static <T> Class<FailoverProxyProvider<T>> getFailoverProxyProviderClass
|
|||||||
}
|
}
|
||||||
|
|
||||||
/** Creates the Failover proxy provider instance*/
|
/** Creates the Failover proxy provider instance*/
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
private static <T> FailoverProxyProvider<T> createFailoverProxyProvider(
|
private static <T> FailoverProxyProvider<T> createFailoverProxyProvider(
|
||||||
Configuration conf, Class<FailoverProxyProvider<T>> failoverProxyProviderClass,
|
Configuration conf, Class<FailoverProxyProvider<T>> failoverProxyProviderClass,
|
||||||
Class<T> xface, URI nameNodeUri) throws IOException {
|
Class<T> xface, URI nameNodeUri) throws IOException {
|
||||||
@ -354,9 +353,9 @@ private static <T> FailoverProxyProvider<T> createFailoverProxyProvider(
|
|||||||
try {
|
try {
|
||||||
Constructor<FailoverProxyProvider<T>> ctor = failoverProxyProviderClass
|
Constructor<FailoverProxyProvider<T>> ctor = failoverProxyProviderClass
|
||||||
.getConstructor(Configuration.class, URI.class, Class.class);
|
.getConstructor(Configuration.class, URI.class, Class.class);
|
||||||
FailoverProxyProvider<?> provider = ctor.newInstance(conf, nameNodeUri,
|
FailoverProxyProvider<T> provider = ctor.newInstance(conf, nameNodeUri,
|
||||||
xface);
|
xface);
|
||||||
return (FailoverProxyProvider<T>) provider;
|
return provider;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
String message = "Couldn't create proxy provider " + failoverProxyProviderClass;
|
String message = "Couldn't create proxy provider " + failoverProxyProviderClass;
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
|
@ -63,7 +63,7 @@
|
|||||||
* This class HAS to be in this package to access package private
|
* This class HAS to be in this package to access package private
|
||||||
* methods/classes.
|
* methods/classes.
|
||||||
*/
|
*/
|
||||||
@SuppressWarnings({"unchecked" , "deprecation"})
|
@SuppressWarnings({"unchecked"})
|
||||||
public class TaskAttemptListenerImpl extends CompositeService
|
public class TaskAttemptListenerImpl extends CompositeService
|
||||||
implements TaskUmbilicalProtocol, TaskAttemptListener {
|
implements TaskUmbilicalProtocol, TaskAttemptListener {
|
||||||
|
|
||||||
@ -118,11 +118,14 @@ protected void registerHeartbeatHandler(Configuration conf) {
|
|||||||
protected void startRpcServer() {
|
protected void startRpcServer() {
|
||||||
Configuration conf = getConfig();
|
Configuration conf = getConfig();
|
||||||
try {
|
try {
|
||||||
server =
|
server =
|
||||||
RPC.getServer(TaskUmbilicalProtocol.class, this, "0.0.0.0", 0,
|
new RPC.Builder(conf).setProtocol(TaskUmbilicalProtocol.class)
|
||||||
conf.getInt(MRJobConfig.MR_AM_TASK_LISTENER_THREAD_COUNT,
|
.setInstance(this).setBindAddress("0.0.0.0")
|
||||||
MRJobConfig.DEFAULT_MR_AM_TASK_LISTENER_THREAD_COUNT),
|
.setPort(0).setNumHandlers(
|
||||||
false, conf, jobTokenSecretManager);
|
conf.getInt(MRJobConfig.MR_AM_TASK_LISTENER_THREAD_COUNT,
|
||||||
|
MRJobConfig.DEFAULT_MR_AM_TASK_LISTENER_THREAD_COUNT))
|
||||||
|
.setVerbose(false).setSecretManager(jobTokenSecretManager)
|
||||||
|
.build();
|
||||||
|
|
||||||
// Enable service authorization?
|
// Enable service authorization?
|
||||||
if (conf.getBoolean(
|
if (conf.getBoolean(
|
||||||
|
@ -137,11 +137,12 @@ public void ping() {
|
|||||||
/**
|
/**
|
||||||
* Test {@link AuditLogger} with IP set.
|
* Test {@link AuditLogger} with IP set.
|
||||||
*/
|
*/
|
||||||
@SuppressWarnings("deprecation")
|
|
||||||
public void testAuditLoggerWithIP() throws Exception {
|
public void testAuditLoggerWithIP() throws Exception {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
// start the IPC server
|
// start the IPC server
|
||||||
Server server = RPC.getServer(new MyTestRPCServer(), "0.0.0.0", 0, conf);
|
Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
|
||||||
|
.setInstance(new MyTestRPCServer()).setBindAddress("0.0.0.0")
|
||||||
|
.setPort(0).build();
|
||||||
server.start();
|
server.start();
|
||||||
|
|
||||||
InetSocketAddress addr = NetUtils.getConnectAddress(server);
|
InetSocketAddress addr = NetUtils.getConnectAddress(server);
|
||||||
|
Loading…
Reference in New Issue
Block a user