HADOOP-12672. RPC timeout should not override IPC ping interval (iwasakims)
This commit is contained in:
parent
e01c6ea688
commit
682adc6ba9
@ -386,7 +386,7 @@ private class Connection extends Thread {
|
||||
private Socket socket = null; // connected socket
|
||||
private DataInputStream in;
|
||||
private DataOutputStream out;
|
||||
private int rpcTimeout;
|
||||
private final int rpcTimeout;
|
||||
private int maxIdleTime; //connections will be culled if it was idle for
|
||||
//maxIdleTime msecs
|
||||
private final RetryPolicy connectionRetryPolicy;
|
||||
@ -394,8 +394,9 @@ private class Connection extends Thread {
|
||||
private int maxRetriesOnSocketTimeouts;
|
||||
private final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
|
||||
private final boolean tcpLowLatency; // if T then use low-delay QoS
|
||||
private boolean doPing; //do we need to send ping message
|
||||
private int pingInterval; // how often sends ping to the server in msecs
|
||||
private final boolean doPing; //do we need to send ping message
|
||||
private final int pingInterval; // how often sends ping to the server
|
||||
private final int soTimeout; // used by ipc ping and rpc timeout
|
||||
private ByteArrayOutputStream pingRequest; // ping message
|
||||
|
||||
// currently active calls
|
||||
@ -434,6 +435,9 @@ public Connection(ConnectionId remoteId, int serviceClass) throws IOException {
|
||||
pingHeader.writeDelimitedTo(pingRequest);
|
||||
}
|
||||
this.pingInterval = remoteId.getPingInterval();
|
||||
this.soTimeout =
|
||||
(rpcTimeout == 0 || (doPing && pingInterval < rpcTimeout))?
|
||||
this.pingInterval : this.rpcTimeout;
|
||||
this.serviceClass = serviceClass;
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("The ping interval is " + this.pingInterval + " ms.");
|
||||
@ -484,12 +488,12 @@ protected PingInputStream(InputStream in) {
|
||||
|
||||
/* Process timeout exception
|
||||
* if the connection is not going to be closed or
|
||||
* is not configured to have a RPC timeout, send a ping.
|
||||
* (if rpcTimeout is not set to be 0, then RPC should timeout.
|
||||
* otherwise, throw the timeout exception.
|
||||
* the RPC is not timed out yet, send a ping.
|
||||
*/
|
||||
private void handleTimeout(SocketTimeoutException e) throws IOException {
|
||||
if (shouldCloseConnection.get() || !running.get() || rpcTimeout > 0) {
|
||||
private void handleTimeout(SocketTimeoutException e, int waiting)
|
||||
throws IOException {
|
||||
if (shouldCloseConnection.get() || !running.get() ||
|
||||
(0 < rpcTimeout && rpcTimeout <= waiting)) {
|
||||
throw e;
|
||||
} else {
|
||||
sendPing();
|
||||
@ -503,11 +507,13 @@ private void handleTimeout(SocketTimeoutException e) throws IOException {
|
||||
*/
|
||||
@Override
|
||||
public int read() throws IOException {
|
||||
int waiting = 0;
|
||||
do {
|
||||
try {
|
||||
return super.read();
|
||||
} catch (SocketTimeoutException e) {
|
||||
handleTimeout(e);
|
||||
waiting += soTimeout;
|
||||
handleTimeout(e, waiting);
|
||||
}
|
||||
} while (true);
|
||||
}
|
||||
@ -520,11 +526,13 @@ public int read() throws IOException {
|
||||
*/
|
||||
@Override
|
||||
public int read(byte[] buf, int off, int len) throws IOException {
|
||||
int waiting = 0;
|
||||
do {
|
||||
try {
|
||||
return super.read(buf, off, len);
|
||||
} catch (SocketTimeoutException e) {
|
||||
handleTimeout(e);
|
||||
waiting += soTimeout;
|
||||
handleTimeout(e, waiting);
|
||||
}
|
||||
} while (true);
|
||||
}
|
||||
@ -632,10 +640,7 @@ private synchronized void setupConnection() throws IOException {
|
||||
}
|
||||
|
||||
NetUtils.connect(this.socket, server, connectionTimeout);
|
||||
if (rpcTimeout > 0) {
|
||||
pingInterval = rpcTimeout; // rpcTimeout overwrites pingInterval
|
||||
}
|
||||
this.socket.setSoTimeout(pingInterval);
|
||||
this.socket.setSoTimeout(soTimeout);
|
||||
return;
|
||||
} catch (ConnectTimeoutException toe) {
|
||||
/* Check for an address change and update the local reference.
|
||||
|
@ -1054,7 +1054,7 @@
|
||||
<value>true</value>
|
||||
<description>Send a ping to the server when timeout on reading the response,
|
||||
if set to true. If no failure is detected, the client retries until at least
|
||||
a byte is read.
|
||||
a byte is read or the time given by ipc.client.rpc-timeout.ms is passed.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
@ -1071,10 +1071,9 @@
|
||||
<name>ipc.client.rpc-timeout.ms</name>
|
||||
<value>0</value>
|
||||
<description>Timeout on waiting response from server, in milliseconds.
|
||||
Currently this timeout works only when ipc.client.ping is set to true
|
||||
because it uses the same facilities with IPC ping.
|
||||
The timeout overrides the ipc.ping.interval and client will throw exception
|
||||
instead of sending ping when the interval is passed.
|
||||
If ipc.client.ping is set to true and this rpc-timeout is greater than
|
||||
the value of ipc.ping.interval, the effective value of the rpc-timeout is
|
||||
rounded up to multiple of ipc.ping.interval.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
|
@ -1043,6 +1043,74 @@ public void testClientRpcTimeout() throws Exception {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test RPC timeout when ipc.client.ping is false.
|
||||
*/
|
||||
@Test(timeout=30000)
|
||||
public void testClientRpcTimeoutWithoutPing() throws Exception {
|
||||
final Server server = new RPC.Builder(conf)
|
||||
.setProtocol(TestProtocol.class).setInstance(new TestImpl())
|
||||
.setBindAddress(ADDRESS).setPort(0)
|
||||
.setQueueSizePerHandler(1).setNumHandlers(1).setVerbose(true)
|
||||
.build();
|
||||
server.start();
|
||||
|
||||
final Configuration conf = new Configuration();
|
||||
conf.setBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, false);
|
||||
conf.setInt(CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_KEY, 1000);
|
||||
final TestProtocol proxy =
|
||||
RPC.getProxy(TestProtocol.class, TestProtocol.versionID,
|
||||
NetUtils.getConnectAddress(server), conf);
|
||||
|
||||
try {
|
||||
proxy.sleep(3000);
|
||||
fail("RPC should time out.");
|
||||
} catch (SocketTimeoutException e) {
|
||||
LOG.info("got expected timeout.", e);
|
||||
} finally {
|
||||
server.stop();
|
||||
RPC.stopProxy(proxy);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test RPC timeout greater than ipc.ping.interval.
|
||||
*/
|
||||
@Test(timeout=30000)
|
||||
public void testClientRpcTimeoutGreaterThanPingInterval() throws Exception {
|
||||
final Server server = new RPC.Builder(conf)
|
||||
.setProtocol(TestProtocol.class).setInstance(new TestImpl())
|
||||
.setBindAddress(ADDRESS).setPort(0)
|
||||
.setQueueSizePerHandler(1).setNumHandlers(1).setVerbose(true)
|
||||
.build();
|
||||
server.start();
|
||||
|
||||
final Configuration conf = new Configuration();
|
||||
conf.setBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, true);
|
||||
conf.setInt(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY, 800);
|
||||
conf.setInt(CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_KEY, 1000);
|
||||
final TestProtocol proxy =
|
||||
RPC.getProxy(TestProtocol.class, TestProtocol.versionID,
|
||||
NetUtils.getConnectAddress(server), conf);
|
||||
|
||||
// should not time out.
|
||||
proxy.sleep(300);
|
||||
|
||||
// should not time out because effective rpc-timeout is
|
||||
// multiple of ping interval: 1600 (= 800 * (1000 / 800 + 1))
|
||||
proxy.sleep(1300);
|
||||
|
||||
try {
|
||||
proxy.sleep(2000);
|
||||
fail("RPC should time out.");
|
||||
} catch (SocketTimeoutException e) {
|
||||
LOG.info("got expected timeout.", e);
|
||||
} finally {
|
||||
server.stop();
|
||||
RPC.stopProxy(proxy);
|
||||
}
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
new TestRPC().testCallsInternal(conf);
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user