HADOOP-7932. Make client connection retries on socket time outs configurable. Contributed by Uma Maheswara Rao G.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1220957 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
4bb0456c66
commit
009dfed8d7
@ -21,3 +21,6 @@ HADOOP-7928. HA: Client failover policy is incorrectly trying to fail over all
|
|||||||
|
|
||||||
HADOOP-7925. Add interface and update CLI to query current state to
|
HADOOP-7925. Add interface and update CLI to query current state to
|
||||||
HAServiceProtocol (eli via todd)
|
HAServiceProtocol (eli via todd)
|
||||||
|
|
||||||
|
HADOOP-7932. Make client connection retries on socket time outs configurable.
|
||||||
|
(Uma Maheswara Rao G via todd)
|
||||||
|
@ -172,6 +172,11 @@ public class CommonConfigurationKeysPublic {
|
|||||||
/** Default value for IPC_CLIENT_CONNECT_MAX_RETRIES_KEY */
|
/** Default value for IPC_CLIENT_CONNECT_MAX_RETRIES_KEY */
|
||||||
public static final int IPC_CLIENT_CONNECT_MAX_RETRIES_DEFAULT = 10;
|
public static final int IPC_CLIENT_CONNECT_MAX_RETRIES_DEFAULT = 10;
|
||||||
/** See <a href="{@docRoot}/../core-default.html">core-default.xml</a> */
|
/** See <a href="{@docRoot}/../core-default.html">core-default.xml</a> */
|
||||||
|
public static final String IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY =
|
||||||
|
"ipc.client.connect.max.retries.on.timeouts";
|
||||||
|
/** Default value for IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY */
|
||||||
|
public static final int IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT = 45;
|
||||||
|
/** See <a href="{@docRoot}/../core-default.html">core-default.xml</a> */
|
||||||
public static final String IPC_CLIENT_TCPNODELAY_KEY =
|
public static final String IPC_CLIENT_TCPNODELAY_KEY =
|
||||||
"ipc.client.tcpnodelay";
|
"ipc.client.tcpnodelay";
|
||||||
/** Defalt value for IPC_CLIENT_TCPNODELAY_KEY */
|
/** Defalt value for IPC_CLIENT_TCPNODELAY_KEY */
|
||||||
|
@ -48,6 +48,7 @@
|
|||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
import org.apache.hadoop.ipc.RpcPayloadHeader.*;
|
import org.apache.hadoop.ipc.RpcPayloadHeader.*;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
@ -224,6 +225,8 @@ private class Connection extends Thread {
|
|||||||
private int maxIdleTime; //connections will be culled if it was idle for
|
private int maxIdleTime; //connections will be culled if it was idle for
|
||||||
//maxIdleTime msecs
|
//maxIdleTime msecs
|
||||||
private int maxRetries; //the max. no. of retries for socket connections
|
private int maxRetries; //the max. no. of retries for socket connections
|
||||||
|
// the max. no. of retries for socket connections on time out exceptions
|
||||||
|
private int maxRetriesOnSocketTimeouts;
|
||||||
private boolean tcpNoDelay; // if T then disable Nagle's Algorithm
|
private boolean tcpNoDelay; // if T then disable Nagle's Algorithm
|
||||||
private boolean doPing; //do we need to send ping message
|
private boolean doPing; //do we need to send ping message
|
||||||
private int pingInterval; // how often sends ping to the server in msecs
|
private int pingInterval; // how often sends ping to the server in msecs
|
||||||
@ -247,6 +250,7 @@ public Connection(ConnectionId remoteId) throws IOException {
|
|||||||
this.rpcTimeout = remoteId.getRpcTimeout();
|
this.rpcTimeout = remoteId.getRpcTimeout();
|
||||||
this.maxIdleTime = remoteId.getMaxIdleTime();
|
this.maxIdleTime = remoteId.getMaxIdleTime();
|
||||||
this.maxRetries = remoteId.getMaxRetries();
|
this.maxRetries = remoteId.getMaxRetries();
|
||||||
|
this.maxRetriesOnSocketTimeouts = remoteId.getMaxRetriesOnSocketTimeouts();
|
||||||
this.tcpNoDelay = remoteId.getTcpNoDelay();
|
this.tcpNoDelay = remoteId.getTcpNoDelay();
|
||||||
this.doPing = remoteId.getDoPing();
|
this.doPing = remoteId.getDoPing();
|
||||||
this.pingInterval = remoteId.getPingInterval();
|
this.pingInterval = remoteId.getPingInterval();
|
||||||
@ -475,11 +479,8 @@ private synchronized void setupConnection() throws IOException {
|
|||||||
if (updateAddress()) {
|
if (updateAddress()) {
|
||||||
timeoutFailures = ioFailures = 0;
|
timeoutFailures = ioFailures = 0;
|
||||||
}
|
}
|
||||||
/*
|
handleConnectionFailure(timeoutFailures++,
|
||||||
* The max number of retries is 45, which amounts to 20s*45 = 15
|
maxRetriesOnSocketTimeouts, toe);
|
||||||
* minutes retries.
|
|
||||||
*/
|
|
||||||
handleConnectionFailure(timeoutFailures++, 45, toe);
|
|
||||||
} catch (IOException ie) {
|
} catch (IOException ie) {
|
||||||
if (updateAddress()) {
|
if (updateAddress()) {
|
||||||
timeoutFailures = ioFailures = 0;
|
timeoutFailures = ioFailures = 0;
|
||||||
@ -1263,6 +1264,8 @@ public static class ConnectionId {
|
|||||||
private int maxIdleTime; //connections will be culled if it was idle for
|
private int maxIdleTime; //connections will be culled if it was idle for
|
||||||
//maxIdleTime msecs
|
//maxIdleTime msecs
|
||||||
private int maxRetries; //the max. no. of retries for socket connections
|
private int maxRetries; //the max. no. of retries for socket connections
|
||||||
|
// the max. no. of retries for socket connections on time out exceptions
|
||||||
|
private int maxRetriesOnSocketTimeouts;
|
||||||
private boolean tcpNoDelay; // if T then disable Nagle's Algorithm
|
private boolean tcpNoDelay; // if T then disable Nagle's Algorithm
|
||||||
private boolean doPing; //do we need to send ping message
|
private boolean doPing; //do we need to send ping message
|
||||||
private int pingInterval; // how often sends ping to the server in msecs
|
private int pingInterval; // how often sends ping to the server in msecs
|
||||||
@ -1270,8 +1273,8 @@ public static class ConnectionId {
|
|||||||
ConnectionId(InetSocketAddress address, Class<?> protocol,
|
ConnectionId(InetSocketAddress address, Class<?> protocol,
|
||||||
UserGroupInformation ticket, int rpcTimeout,
|
UserGroupInformation ticket, int rpcTimeout,
|
||||||
String serverPrincipal, int maxIdleTime,
|
String serverPrincipal, int maxIdleTime,
|
||||||
int maxRetries, boolean tcpNoDelay,
|
int maxRetries, int maxRetriesOnSocketTimeouts,
|
||||||
boolean doPing, int pingInterval) {
|
boolean tcpNoDelay, boolean doPing, int pingInterval) {
|
||||||
this.protocol = protocol;
|
this.protocol = protocol;
|
||||||
this.address = address;
|
this.address = address;
|
||||||
this.ticket = ticket;
|
this.ticket = ticket;
|
||||||
@ -1279,6 +1282,7 @@ public static class ConnectionId {
|
|||||||
this.serverPrincipal = serverPrincipal;
|
this.serverPrincipal = serverPrincipal;
|
||||||
this.maxIdleTime = maxIdleTime;
|
this.maxIdleTime = maxIdleTime;
|
||||||
this.maxRetries = maxRetries;
|
this.maxRetries = maxRetries;
|
||||||
|
this.maxRetriesOnSocketTimeouts = maxRetriesOnSocketTimeouts;
|
||||||
this.tcpNoDelay = tcpNoDelay;
|
this.tcpNoDelay = tcpNoDelay;
|
||||||
this.doPing = doPing;
|
this.doPing = doPing;
|
||||||
this.pingInterval = pingInterval;
|
this.pingInterval = pingInterval;
|
||||||
@ -1312,6 +1316,11 @@ int getMaxRetries() {
|
|||||||
return maxRetries;
|
return maxRetries;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** max connection retries on socket time outs */
|
||||||
|
public int getMaxRetriesOnSocketTimeouts() {
|
||||||
|
return maxRetriesOnSocketTimeouts;
|
||||||
|
}
|
||||||
|
|
||||||
boolean getTcpNoDelay() {
|
boolean getTcpNoDelay() {
|
||||||
return tcpNoDelay;
|
return tcpNoDelay;
|
||||||
}
|
}
|
||||||
@ -1343,6 +1352,9 @@ public static ConnectionId getConnectionId(InetSocketAddress addr,
|
|||||||
rpcTimeout, remotePrincipal,
|
rpcTimeout, remotePrincipal,
|
||||||
conf.getInt("ipc.client.connection.maxidletime", 10000), // 10s
|
conf.getInt("ipc.client.connection.maxidletime", 10000), // 10s
|
||||||
conf.getInt("ipc.client.connect.max.retries", 10),
|
conf.getInt("ipc.client.connect.max.retries", 10),
|
||||||
|
conf.getInt(
|
||||||
|
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
|
||||||
|
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT),
|
||||||
conf.getBoolean("ipc.client.tcpnodelay", false),
|
conf.getBoolean("ipc.client.tcpnodelay", false),
|
||||||
doPing,
|
doPing,
|
||||||
(doPing ? Client.getPingInterval(conf) : 0));
|
(doPing ? Client.getPingInterval(conf) : 0));
|
||||||
|
@ -20,7 +20,9 @@
|
|||||||
|
|
||||||
import org.apache.commons.logging.*;
|
import org.apache.commons.logging.*;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
import org.apache.hadoop.io.IntWritable;
|
||||||
import org.apache.hadoop.io.Writable;
|
import org.apache.hadoop.io.Writable;
|
||||||
import org.apache.hadoop.io.LongWritable;
|
import org.apache.hadoop.io.LongWritable;
|
||||||
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
|
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
|
||||||
@ -590,6 +592,38 @@ public void testHttpGetResponse() throws Exception {
|
|||||||
Server.RECEIVED_HTTP_REQ_RESPONSE.getBytes());
|
Server.RECEIVED_HTTP_REQ_RESPONSE.getBytes());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testConnectionRetriesOnSocketTimeoutExceptions() throws Exception {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
// set max retries to 0
|
||||||
|
conf.setInt(
|
||||||
|
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
|
||||||
|
0);
|
||||||
|
assertRetriesOnSocketTimeouts(conf, 1);
|
||||||
|
|
||||||
|
// set max retries to 3
|
||||||
|
conf.setInt(
|
||||||
|
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
|
||||||
|
3);
|
||||||
|
assertRetriesOnSocketTimeouts(conf, 4);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertRetriesOnSocketTimeouts(Configuration conf,
|
||||||
|
int maxTimeoutRetries) throws IOException, InterruptedException {
|
||||||
|
SocketFactory mockFactory = Mockito.mock(SocketFactory.class);
|
||||||
|
doThrow(new SocketTimeoutException()).when(mockFactory).createSocket();
|
||||||
|
Client client = new Client(IntWritable.class, conf, mockFactory);
|
||||||
|
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 9090);
|
||||||
|
try {
|
||||||
|
client.call(new IntWritable(RANDOM.nextInt()), address, null, null, 0,
|
||||||
|
conf);
|
||||||
|
fail("Not throwing the SocketTimeoutException");
|
||||||
|
} catch (SocketTimeoutException e) {
|
||||||
|
Mockito.verify(mockFactory, Mockito.times(maxTimeoutRetries))
|
||||||
|
.createSocket();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void doIpcVersionTest(
|
private void doIpcVersionTest(
|
||||||
byte[] requestData,
|
byte[] requestData,
|
||||||
byte[] expectedResponse) throws Exception {
|
byte[] expectedResponse) throws Exception {
|
||||||
|
Loading…
Reference in New Issue
Block a user