HADOOP-12672. RPC timeout should not override IPC ping interval (iwasakims)
This commit is contained in:
parent
0005816743
commit
85ec5573eb
@ -239,14 +239,33 @@ static final int getPingInterval(Configuration conf) {
|
||||
*
|
||||
* @param conf Configuration
|
||||
* @return the timeout period in milliseconds. -1 if no timeout value is set
|
||||
* @deprecated use {@link #getRpcTimeout(Configuration)} instead
|
||||
*/
|
||||
@Deprecated
|
||||
final public static int getTimeout(Configuration conf) {
|
||||
int timeout = getRpcTimeout(conf);
|
||||
if (timeout > 0) {
|
||||
return timeout;
|
||||
}
|
||||
if (!conf.getBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY,
|
||||
CommonConfigurationKeys.IPC_CLIENT_PING_DEFAULT)) {
|
||||
return getPingInterval(conf);
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
/**
|
||||
* The time after which a RPC will timeout.
|
||||
*
|
||||
* @param conf Configuration
|
||||
* @return the timeout period in milliseconds.
|
||||
*/
|
||||
public static final int getRpcTimeout(Configuration conf) {
|
||||
int timeout =
|
||||
conf.getInt(CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_KEY,
|
||||
CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_DEFAULT);
|
||||
return (timeout < 0) ? 0 : timeout;
|
||||
}
|
||||
/**
|
||||
* set the connection timeout value in configuration
|
||||
*
|
||||
@ -386,7 +405,7 @@ private class Connection extends Thread {
|
||||
private Socket socket = null; // connected socket
|
||||
private DataInputStream in;
|
||||
private DataOutputStream out;
|
||||
private int rpcTimeout;
|
||||
private final int rpcTimeout;
|
||||
private int maxIdleTime; //connections will be culled if it was idle for
|
||||
//maxIdleTime msecs
|
||||
private final RetryPolicy connectionRetryPolicy;
|
||||
@ -394,8 +413,9 @@ private class Connection extends Thread {
|
||||
private int maxRetriesOnSocketTimeouts;
|
||||
private final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
|
||||
private final boolean tcpLowLatency; // if T then use low-delay QoS
|
||||
private boolean doPing; //do we need to send ping message
|
||||
private int pingInterval; // how often sends ping to the server in msecs
|
||||
private final boolean doPing; //do we need to send ping message
|
||||
private final int pingInterval; // how often sends ping to the server
|
||||
private final int soTimeout; // used by ipc ping and rpc timeout
|
||||
private ByteArrayOutputStream pingRequest; // ping message
|
||||
|
||||
// currently active calls
|
||||
@ -434,6 +454,14 @@ public Connection(ConnectionId remoteId, int serviceClass) throws IOException {
|
||||
pingHeader.writeDelimitedTo(pingRequest);
|
||||
}
|
||||
this.pingInterval = remoteId.getPingInterval();
|
||||
if (rpcTimeout > 0) {
|
||||
// effective rpc timeout is rounded up to multiple of pingInterval
|
||||
// if pingInterval < rpcTimeout.
|
||||
this.soTimeout = (doPing && pingInterval < rpcTimeout) ?
|
||||
pingInterval : rpcTimeout;
|
||||
} else {
|
||||
this.soTimeout = pingInterval;
|
||||
}
|
||||
this.serviceClass = serviceClass;
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("The ping interval is " + this.pingInterval + " ms.");
|
||||
@ -484,12 +512,12 @@ protected PingInputStream(InputStream in) {
|
||||
|
||||
/* Process timeout exception
|
||||
* if the connection is not going to be closed or
|
||||
* is not configured to have a RPC timeout, send a ping.
|
||||
* (if rpcTimeout is not set to be 0, then RPC should timeout.
|
||||
* otherwise, throw the timeout exception.
|
||||
* the RPC is not timed out yet, send a ping.
|
||||
*/
|
||||
private void handleTimeout(SocketTimeoutException e) throws IOException {
|
||||
if (shouldCloseConnection.get() || !running.get() || rpcTimeout > 0) {
|
||||
private void handleTimeout(SocketTimeoutException e, int waiting)
|
||||
throws IOException {
|
||||
if (shouldCloseConnection.get() || !running.get() ||
|
||||
(0 < rpcTimeout && rpcTimeout <= waiting)) {
|
||||
throw e;
|
||||
} else {
|
||||
sendPing();
|
||||
@ -503,11 +531,13 @@ private void handleTimeout(SocketTimeoutException e) throws IOException {
|
||||
*/
|
||||
@Override
|
||||
public int read() throws IOException {
|
||||
int waiting = 0;
|
||||
do {
|
||||
try {
|
||||
return super.read();
|
||||
} catch (SocketTimeoutException e) {
|
||||
handleTimeout(e);
|
||||
waiting += soTimeout;
|
||||
handleTimeout(e, waiting);
|
||||
}
|
||||
} while (true);
|
||||
}
|
||||
@ -520,11 +550,13 @@ public int read() throws IOException {
|
||||
*/
|
||||
@Override
|
||||
public int read(byte[] buf, int off, int len) throws IOException {
|
||||
int waiting = 0;
|
||||
do {
|
||||
try {
|
||||
return super.read(buf, off, len);
|
||||
} catch (SocketTimeoutException e) {
|
||||
handleTimeout(e);
|
||||
waiting += soTimeout;
|
||||
handleTimeout(e, waiting);
|
||||
}
|
||||
} while (true);
|
||||
}
|
||||
@ -632,10 +664,7 @@ private synchronized void setupConnection() throws IOException {
|
||||
}
|
||||
|
||||
NetUtils.connect(this.socket, server, connectionTimeout);
|
||||
if (rpcTimeout > 0) {
|
||||
pingInterval = rpcTimeout; // rpcTimeout overwrites pingInterval
|
||||
}
|
||||
this.socket.setSoTimeout(pingInterval);
|
||||
this.socket.setSoTimeout(soTimeout);
|
||||
return;
|
||||
} catch (ConnectTimeoutException toe) {
|
||||
/* Check for an address change and update the local reference.
|
||||
|
@ -1054,7 +1054,7 @@
|
||||
<value>true</value>
|
||||
<description>Send a ping to the server when timeout on reading the response,
|
||||
if set to true. If no failure is detected, the client retries until at least
|
||||
a byte is read.
|
||||
a byte is read or the time given by ipc.client.rpc-timeout.ms is passed.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
@ -1071,10 +1071,9 @@
|
||||
<name>ipc.client.rpc-timeout.ms</name>
|
||||
<value>0</value>
|
||||
<description>Timeout on waiting response from server, in milliseconds.
|
||||
Currently this timeout works only when ipc.client.ping is set to true
|
||||
because it uses the same facilities with IPC ping.
|
||||
The timeout overrides the ipc.ping.interval and client will throw exception
|
||||
instead of sending ping when the interval is passed.
|
||||
If ipc.client.ping is set to true and this rpc-timeout is greater than
|
||||
the value of ipc.ping.interval, the effective value of the rpc-timeout is
|
||||
rounded up to multiple of ipc.ping.interval.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
|
@ -1118,14 +1118,67 @@ public void testClientRpcTimeout() throws Exception {
|
||||
.setQueueSizePerHandler(1).setNumHandlers(1).setVerbose(true);
|
||||
server = setupTestServer(builder);
|
||||
|
||||
conf.setInt(CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_KEY, 1000);
|
||||
try {
|
||||
proxy = getClient(addr, conf);
|
||||
proxy.sleep(null, newSleepRequest(3000));
|
||||
fail("RPC should time out.");
|
||||
} catch (ServiceException e) {
|
||||
assertTrue(e.getCause() instanceof SocketTimeoutException);
|
||||
LOG.info("got expected timeout.", e);
|
||||
// Test RPC timeout with default ipc.client.ping.
|
||||
try {
|
||||
Configuration c = new Configuration(conf);
|
||||
c.setInt(CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_KEY, 1000);
|
||||
proxy = getClient(addr, c);
|
||||
proxy.sleep(null, newSleepRequest(3000));
|
||||
fail("RPC should time out.");
|
||||
} catch (ServiceException e) {
|
||||
assertTrue(e.getCause() instanceof SocketTimeoutException);
|
||||
LOG.info("got expected timeout.", e);
|
||||
}
|
||||
|
||||
// Test RPC timeout when ipc.client.ping is false.
|
||||
try {
|
||||
Configuration c = new Configuration(conf);
|
||||
c.setBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, false);
|
||||
c.setInt(CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_KEY, 1000);
|
||||
proxy = getClient(addr, c);
|
||||
proxy.sleep(null, newSleepRequest(3000));
|
||||
fail("RPC should time out.");
|
||||
} catch (ServiceException e) {
|
||||
assertTrue(e.getCause() instanceof SocketTimeoutException);
|
||||
LOG.info("got expected timeout.", e);
|
||||
}
|
||||
|
||||
// Test negative timeout value.
|
||||
try {
|
||||
Configuration c = new Configuration(conf);
|
||||
c.setInt(CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_KEY, -1);
|
||||
proxy = getClient(addr, c);
|
||||
proxy.sleep(null, newSleepRequest(2000));
|
||||
} catch (ServiceException e) {
|
||||
LOG.info("got unexpected exception.", e);
|
||||
fail("RPC should not time out.");
|
||||
}
|
||||
|
||||
// Test RPC timeout greater than ipc.ping.interval.
|
||||
try {
|
||||
Configuration c = new Configuration(conf);
|
||||
c.setBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, true);
|
||||
c.setInt(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY, 800);
|
||||
c.setInt(CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_KEY, 1000);
|
||||
proxy = getClient(addr, c);
|
||||
|
||||
try {
|
||||
// should not time out because effective rpc-timeout is
|
||||
// multiple of ping interval: 1600 (= 800 * (1000 / 800 + 1))
|
||||
proxy.sleep(null, newSleepRequest(1300));
|
||||
} catch (ServiceException e) {
|
||||
LOG.info("got unexpected exception.", e);
|
||||
fail("RPC should not time out.");
|
||||
}
|
||||
|
||||
proxy.sleep(null, newSleepRequest(2000));
|
||||
fail("RPC should time out.");
|
||||
} catch (ServiceException e) {
|
||||
assertTrue(e.getCause() instanceof SocketTimeoutException);
|
||||
LOG.info("got expected timeout.", e);
|
||||
}
|
||||
|
||||
} finally {
|
||||
stop(server, proxy);
|
||||
}
|
||||
|
@ -141,7 +141,7 @@ public class DfsClientConf {
|
||||
|
||||
public DfsClientConf(Configuration conf) {
|
||||
// The hdfsTimeout is currently the same as the ipc timeout
|
||||
hdfsTimeout = Client.getTimeout(conf);
|
||||
hdfsTimeout = Client.getRpcTimeout(conf);
|
||||
|
||||
maxRetryAttempts = conf.getInt(
|
||||
Retry.MAX_ATTEMPTS_KEY,
|
||||
|
Loading…
Reference in New Issue
Block a user