HADOOP-11252. RPC client does not time out by default. Contributed by Wilfred Spiegelenburg and Masatake Iwasaki.
This commit is contained in:
parent
f9e36dea96
commit
64ae85fd2e
@ -2365,6 +2365,9 @@ Release 2.6.4 - UNRELEASED
|
||||
|
||||
BUG FIXES
|
||||
|
||||
HADOOP-11252. RPC client does not time out by default.
|
||||
(Wilfred Spiegelenburg and Masatake Iwasaki via aajisaka)
|
||||
|
||||
Release 2.6.3 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -52,6 +52,11 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
|
||||
public static final String IPC_CLIENT_PING_KEY = "ipc.client.ping";
|
||||
/** Default value of IPC_CLIENT_PING_KEY */
|
||||
public static final boolean IPC_CLIENT_PING_DEFAULT = true;
|
||||
/** Timeout value for RPC client on waiting for response. */
|
||||
public static final String IPC_CLIENT_RPC_TIMEOUT_KEY =
|
||||
"ipc.client.rpc-timeout.ms";
|
||||
/** Default value for IPC_CLIENT_RPC_TIMEOUT_KEY. */
|
||||
public static final int IPC_CLIENT_RPC_TIMEOUT_DEFAULT = 0;
|
||||
/** Responses larger than this will be logged */
|
||||
public static final String IPC_SERVER_RPC_MAX_RESPONSE_SIZE_KEY =
|
||||
"ipc.server.max.response.size";
|
||||
|
@ -215,7 +215,7 @@ synchronized ExecutorService unrefAndCleanup() {
|
||||
* @param conf Configuration
|
||||
* @param pingInterval the ping interval
|
||||
*/
|
||||
final public static void setPingInterval(Configuration conf, int pingInterval) {
|
||||
static final void setPingInterval(Configuration conf, int pingInterval) {
|
||||
conf.setInt(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY, pingInterval);
|
||||
}
|
||||
|
||||
@ -226,7 +226,7 @@ final public static void setPingInterval(Configuration conf, int pingInterval) {
|
||||
* @param conf Configuration
|
||||
* @return the ping interval
|
||||
*/
|
||||
final public static int getPingInterval(Configuration conf) {
|
||||
static final int getPingInterval(Configuration conf) {
|
||||
return conf.getInt(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY,
|
||||
CommonConfigurationKeys.IPC_PING_INTERVAL_DEFAULT);
|
||||
}
|
||||
|
@ -40,6 +40,7 @@
|
||||
import org.apache.commons.logging.*;
|
||||
|
||||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.io.*;
|
||||
import org.apache.hadoop.io.retry.RetryPolicy;
|
||||
import org.apache.hadoop.ipc.Client.ConnectionId;
|
||||
@ -347,7 +348,8 @@ public static <T> ProtocolProxy<T> waitForProtocolProxy(Class<T> protocol,
|
||||
long clientVersion,
|
||||
InetSocketAddress addr, Configuration conf,
|
||||
long connTimeout) throws IOException {
|
||||
return waitForProtocolProxy(protocol, clientVersion, addr, conf, 0, null, connTimeout);
|
||||
return waitForProtocolProxy(protocol, clientVersion, addr, conf,
|
||||
getRpcTimeout(conf), null, connTimeout);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -491,8 +493,8 @@ public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol,
|
||||
UserGroupInformation ticket,
|
||||
Configuration conf,
|
||||
SocketFactory factory) throws IOException {
|
||||
return getProtocolProxy(
|
||||
protocol, clientVersion, addr, ticket, conf, factory, 0, null);
|
||||
return getProtocolProxy(protocol, clientVersion, addr, ticket, conf,
|
||||
factory, getRpcTimeout(conf), null);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -686,6 +688,17 @@ public static void stopProxy(Object proxy) {
|
||||
+ "does not provide closeable invocation handler "
|
||||
+ proxy.getClass());
|
||||
}
|
||||
/**
|
||||
* Get the RPC time from configuration;
|
||||
* If not set in the configuration, return the default value.
|
||||
*
|
||||
* @param conf Configuration
|
||||
* @return the RPC timeout (ms)
|
||||
*/
|
||||
public static int getRpcTimeout(Configuration conf) {
|
||||
return conf.getInt(CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_KEY,
|
||||
CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_DEFAULT);
|
||||
}
|
||||
|
||||
/**
|
||||
* Class to construct instances of RPC server with specific options.
|
||||
|
@ -1060,6 +1060,35 @@ for ldap providers in the same way as above does.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>ipc.client.ping</name>
|
||||
<value>true</value>
|
||||
<description>Send a ping to the server when timeout on reading the response,
|
||||
if set to true. If no failure is detected, the client retries until at least
|
||||
a byte is read.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>ipc.ping.interval</name>
|
||||
<value>60000</value>
|
||||
<description>Timeout on waiting response from server, in milliseconds.
|
||||
The client will send ping when the interval is passed without receiving bytes,
|
||||
if ipc.client.ping is set to true.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>ipc.client.rpc-timeout.ms</name>
|
||||
<value>0</value>
|
||||
<description>Timeout on waiting response from server, in milliseconds.
|
||||
Currently this timeout works only when ipc.client.ping is set to true
|
||||
because it uses the same facilities with IPC ping.
|
||||
The timeout overrides the ipc.ping.interval and client will throw exception
|
||||
instead of sending ping when the interval is passed.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>ipc.server.listen.queue.size</name>
|
||||
<value>128</value>
|
||||
|
@ -39,6 +39,7 @@
|
||||
import java.net.ConnectException;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
@ -1146,6 +1147,35 @@ && countThreads(CallQueueManager.class.getName()) != 1) {
|
||||
assertTrue("RetriableException not received", succeeded);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test RPC timeout.
|
||||
*/
|
||||
@Test(timeout=30000)
|
||||
public void testClientRpcTimeout() throws Exception {
|
||||
final Server server = new RPC.Builder(conf)
|
||||
.setProtocol(TestProtocol.class).setInstance(new TestImpl())
|
||||
.setBindAddress(ADDRESS).setPort(0)
|
||||
.setQueueSizePerHandler(1).setNumHandlers(1).setVerbose(true)
|
||||
.build();
|
||||
server.start();
|
||||
|
||||
final Configuration conf = new Configuration();
|
||||
conf.setInt(CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_KEY, 1000);
|
||||
final TestProtocol proxy =
|
||||
RPC.getProxy(TestProtocol.class, TestProtocol.versionID,
|
||||
NetUtils.getConnectAddress(server), conf);
|
||||
|
||||
try {
|
||||
proxy.sleep(3000);
|
||||
fail("RPC should time out.");
|
||||
} catch (SocketTimeoutException e) {
|
||||
LOG.info("got expected timeout.", e);
|
||||
} finally {
|
||||
server.stop();
|
||||
RPC.stopProxy(proxy);
|
||||
}
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws IOException {
|
||||
new TestRPC().testCallsInternal(conf);
|
||||
|
||||
|
@ -103,10 +103,9 @@ public DatanodeProtocolClientSideTranslatorPB(InetSocketAddress nameNodeAddr,
|
||||
private static DatanodeProtocolPB createNamenode(
|
||||
InetSocketAddress nameNodeAddr, Configuration conf,
|
||||
UserGroupInformation ugi) throws IOException {
|
||||
return RPC.getProtocolProxy(DatanodeProtocolPB.class,
|
||||
return RPC.getProxy(DatanodeProtocolPB.class,
|
||||
RPC.getProtocolVersion(DatanodeProtocolPB.class), nameNodeAddr, ugi,
|
||||
conf, NetUtils.getSocketFactory(conf, DatanodeProtocolPB.class),
|
||||
org.apache.hadoop.ipc.Client.getPingInterval(conf), null).getProxy();
|
||||
conf, NetUtils.getSocketFactory(conf, DatanodeProtocolPB.class));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
Loading…
Reference in New Issue
Block a user