Revert "HADOOP-12672. RPC timeout should not override IPC ping interval (iwasakims)"

This reverts commit 682adc6ba9.
This commit is contained in:
Steve Loughran 2016-03-11 17:00:17 +00:00
parent 247a790609
commit 754299695b
3 changed files with 19 additions and 91 deletions

View File

@ -386,7 +386,7 @@ private class Connection extends Thread {
private Socket socket = null; // connected socket
private DataInputStream in;
private DataOutputStream out;
private final int rpcTimeout;
private int rpcTimeout;
private int maxIdleTime; //connections will be culled if it was idle for
//maxIdleTime msecs
private final RetryPolicy connectionRetryPolicy;
@ -394,9 +394,8 @@ private class Connection extends Thread {
private int maxRetriesOnSocketTimeouts;
private final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
private final boolean tcpLowLatency; // if T then use low-delay QoS
private final boolean doPing; //do we need to send ping message
private final int pingInterval; // how often sends ping to the server
private final int soTimeout; // used by ipc ping and rpc timeout
private boolean doPing; //do we need to send ping message
private int pingInterval; // how often sends ping to the server in msecs
private ByteArrayOutputStream pingRequest; // ping message
// currently active calls
@ -435,9 +434,6 @@ public Connection(ConnectionId remoteId, int serviceClass) throws IOException {
pingHeader.writeDelimitedTo(pingRequest);
}
this.pingInterval = remoteId.getPingInterval();
this.soTimeout =
(rpcTimeout == 0 || (doPing && pingInterval < rpcTimeout))?
this.pingInterval : this.rpcTimeout;
this.serviceClass = serviceClass;
if (LOG.isDebugEnabled()) {
LOG.debug("The ping interval is " + this.pingInterval + " ms.");
@ -488,12 +484,12 @@ protected PingInputStream(InputStream in) {
/* Process timeout exception
* if the connection is not going to be closed or
* the RPC is not timed out yet, send a ping.
* is not configured to have a RPC timeout, send a ping.
* (if rpcTimeout is not set to be 0, then RPC should timeout.
* otherwise, throw the timeout exception.
*/
private void handleTimeout(SocketTimeoutException e, int waiting)
throws IOException {
if (shouldCloseConnection.get() || !running.get() ||
(0 < rpcTimeout && rpcTimeout <= waiting)) {
private void handleTimeout(SocketTimeoutException e) throws IOException {
if (shouldCloseConnection.get() || !running.get() || rpcTimeout > 0) {
throw e;
} else {
sendPing();
@ -507,13 +503,11 @@ private void handleTimeout(SocketTimeoutException e, int waiting)
*/
@Override
public int read() throws IOException {
int waiting = 0;
do {
try {
return super.read();
} catch (SocketTimeoutException e) {
waiting += soTimeout;
handleTimeout(e, waiting);
handleTimeout(e);
}
} while (true);
}
@ -526,13 +520,11 @@ public int read() throws IOException {
*/
@Override
public int read(byte[] buf, int off, int len) throws IOException {
int waiting = 0;
do {
try {
return super.read(buf, off, len);
} catch (SocketTimeoutException e) {
waiting += soTimeout;
handleTimeout(e, waiting);
handleTimeout(e);
}
} while (true);
}
@ -640,7 +632,10 @@ private synchronized void setupConnection() throws IOException {
}
NetUtils.connect(this.socket, server, connectionTimeout);
this.socket.setSoTimeout(soTimeout);
if (rpcTimeout > 0) {
pingInterval = rpcTimeout; // rpcTimeout overwrites pingInterval
}
this.socket.setSoTimeout(pingInterval);
return;
} catch (ConnectTimeoutException toe) {
/* Check for an address change and update the local reference.

View File

@ -1054,7 +1054,7 @@
<value>true</value>
<description>Send a ping to the server when timeout on reading the response,
if set to true. If no failure is detected, the client retries until at least
a byte is read or the time given by ipc.client.rpc-timeout.ms is passed.
a byte is read.
</description>
</property>
@ -1071,9 +1071,10 @@
<name>ipc.client.rpc-timeout.ms</name>
<value>0</value>
<description>Timeout on waiting response from server, in milliseconds.
If ipc.client.ping is set to true and this rpc-timeout is greater than
the value of ipc.ping.interval, the effective value of the rpc-timeout is
rounded up to multiple of ipc.ping.interval.
Currently this timeout works only when ipc.client.ping is set to true
because it uses the same facilities with IPC ping.
The timeout overrides the ipc.ping.interval and client will throw exception
instead of sending ping when the interval is passed.
</description>
</property>

View File

@ -1043,74 +1043,6 @@ public void testClientRpcTimeout() throws Exception {
}
}
/**
* Test RPC timeout when ipc.client.ping is false.
*/
@Test(timeout=30000)
public void testClientRpcTimeoutWithoutPing() throws Exception {
final Server server = new RPC.Builder(conf)
.setProtocol(TestProtocol.class).setInstance(new TestImpl())
.setBindAddress(ADDRESS).setPort(0)
.setQueueSizePerHandler(1).setNumHandlers(1).setVerbose(true)
.build();
server.start();
final Configuration conf = new Configuration();
conf.setBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, false);
conf.setInt(CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_KEY, 1000);
final TestProtocol proxy =
RPC.getProxy(TestProtocol.class, TestProtocol.versionID,
NetUtils.getConnectAddress(server), conf);
try {
proxy.sleep(3000);
fail("RPC should time out.");
} catch (SocketTimeoutException e) {
LOG.info("got expected timeout.", e);
} finally {
server.stop();
RPC.stopProxy(proxy);
}
}
/**
* Test RPC timeout greater than ipc.ping.interval.
*/
@Test(timeout=30000)
public void testClientRpcTimeoutGreaterThanPingInterval() throws Exception {
final Server server = new RPC.Builder(conf)
.setProtocol(TestProtocol.class).setInstance(new TestImpl())
.setBindAddress(ADDRESS).setPort(0)
.setQueueSizePerHandler(1).setNumHandlers(1).setVerbose(true)
.build();
server.start();
final Configuration conf = new Configuration();
conf.setBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, true);
conf.setInt(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY, 800);
conf.setInt(CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_KEY, 1000);
final TestProtocol proxy =
RPC.getProxy(TestProtocol.class, TestProtocol.versionID,
NetUtils.getConnectAddress(server), conf);
// should not time out.
proxy.sleep(300);
// should not time out because effective rpc-timeout is
// multiple of ping interval: 1600 (= 800 * (1000 / 800 + 1))
proxy.sleep(1300);
try {
proxy.sleep(2000);
fail("RPC should time out.");
} catch (SocketTimeoutException e) {
LOG.info("got expected timeout.", e);
} finally {
server.stop();
RPC.stopProxy(proxy);
}
}
public static void main(String[] args) throws Exception {
new TestRPC().testCallsInternal(conf);
}