HADOOP-6099. The RPC module can be configured to not send period pings.
The default behaviour remains unchanged. (dhruba) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@792812 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
34d67b1cc2
commit
c38c5a43ff
@ -471,6 +471,9 @@ Trunk (unreleased changes)
|
|||||||
HADOOP-2366. Support trimmed strings in Configuration. (Michele Catasta
|
HADOOP-2366. Support trimmed strings in Configuration. (Michele Catasta
|
||||||
via szetszwo)
|
via szetszwo)
|
||||||
|
|
||||||
|
HADOOP-6099. The RPC module can be configured to not send period pings.
|
||||||
|
The default behaviour of sending periodic pings remain unchanged. (dhruba)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HADOOP-5595. NameNode does not need to run a replicator to choose a
|
HADOOP-5595. NameNode does not need to run a replicator to choose a
|
||||||
|
@ -73,6 +73,7 @@ public class Client {
|
|||||||
final private int maxRetries; //the max. no. of retries for socket connections
|
final private int maxRetries; //the max. no. of retries for socket connections
|
||||||
private boolean tcpNoDelay; // if T then disable Nagle's Algorithm
|
private boolean tcpNoDelay; // if T then disable Nagle's Algorithm
|
||||||
private int pingInterval; // how often sends ping to the server in msecs
|
private int pingInterval; // how often sends ping to the server in msecs
|
||||||
|
final private boolean doPing; //do we need to send ping message
|
||||||
|
|
||||||
private SocketFactory socketFactory; // how to create sockets
|
private SocketFactory socketFactory; // how to create sockets
|
||||||
private int refCount = 1;
|
private int refCount = 1;
|
||||||
@ -101,6 +102,22 @@ final public static void setPingInterval(Configuration conf, int pingInterval) {
|
|||||||
final static int getPingInterval(Configuration conf) {
|
final static int getPingInterval(Configuration conf) {
|
||||||
return conf.getInt(PING_INTERVAL_NAME, DEFAULT_PING_INTERVAL);
|
return conf.getInt(PING_INTERVAL_NAME, DEFAULT_PING_INTERVAL);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The time after which a RPC will timeout.
|
||||||
|
* If ping is not enabled (via ipc.client.ping), then the timeout value is the
|
||||||
|
* same as the pingInterval.
|
||||||
|
* If ping is enabled, then there is no timeout value.
|
||||||
|
*
|
||||||
|
* @param conf Configuration
|
||||||
|
* @return the timeout period in milliseconds. -1 if no timeout value is set
|
||||||
|
*/
|
||||||
|
final public static int getTimeout(Configuration conf) {
|
||||||
|
if (!conf.getBoolean("ipc.client.ping", true)) {
|
||||||
|
return getPingInterval(conf);
|
||||||
|
}
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Increment this client's reference count
|
* Increment this client's reference count
|
||||||
@ -317,8 +334,13 @@ private synchronized void setupIOstreams() {
|
|||||||
handleConnectionFailure(ioFailures++, maxRetries, ie);
|
handleConnectionFailure(ioFailures++, maxRetries, ie);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
this.in = new DataInputStream(new BufferedInputStream
|
if (doPing) {
|
||||||
|
this.in = new DataInputStream(new BufferedInputStream
|
||||||
(new PingInputStream(NetUtils.getInputStream(socket))));
|
(new PingInputStream(NetUtils.getInputStream(socket))));
|
||||||
|
} else {
|
||||||
|
this.in = new DataInputStream(new BufferedInputStream
|
||||||
|
(NetUtils.getInputStream(socket)));
|
||||||
|
}
|
||||||
this.out = new DataOutputStream
|
this.out = new DataOutputStream
|
||||||
(new BufferedOutputStream(NetUtils.getOutputStream(socket)));
|
(new BufferedOutputStream(NetUtils.getOutputStream(socket)));
|
||||||
writeHeader();
|
writeHeader();
|
||||||
@ -634,6 +656,7 @@ public Client(Class<? extends Writable> valueClass, Configuration conf,
|
|||||||
conf.getInt("ipc.client.connection.maxidletime", 10000); //10s
|
conf.getInt("ipc.client.connection.maxidletime", 10000); //10s
|
||||||
this.maxRetries = conf.getInt("ipc.client.connect.max.retries", 10);
|
this.maxRetries = conf.getInt("ipc.client.connect.max.retries", 10);
|
||||||
this.tcpNoDelay = conf.getBoolean("ipc.client.tcpnodelay", false);
|
this.tcpNoDelay = conf.getBoolean("ipc.client.tcpnodelay", false);
|
||||||
|
this.doPing = conf.getBoolean("ipc.client.ping", true);
|
||||||
this.pingInterval = getPingInterval(conf);
|
this.pingInterval = getPingInterval(conf);
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("The ping interval is" + this.pingInterval + "ms.");
|
LOG.debug("The ping interval is" + this.pingInterval + "ms.");
|
||||||
|
@ -231,7 +231,7 @@ public void testSlowRpc() throws Exception {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public void testCalls() throws Exception {
|
public void testCalls(Configuration conf) throws Exception {
|
||||||
Server server = RPC.getServer(new TestImpl(), ADDRESS, 0, conf);
|
Server server = RPC.getServer(new TestImpl(), ADDRESS, 0, conf);
|
||||||
TestProtocol proxy = null;
|
TestProtocol proxy = null;
|
||||||
try {
|
try {
|
||||||
@ -382,10 +382,20 @@ public void testAuthorization() throws Exception {
|
|||||||
conf.set(ACL_CONFIG, "invalid invalid");
|
conf.set(ACL_CONFIG, "invalid invalid");
|
||||||
doRPCs(conf, true);
|
doRPCs(conf, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Switch off setting socketTimeout values on RPC sockets.
|
||||||
|
* Verify that RPC calls still work ok.
|
||||||
|
*/
|
||||||
|
public void testNoPings() throws Exception {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.setBoolean("ipc.client.ping", false);
|
||||||
|
new TestRPC("testnoPings").testCalls(conf);
|
||||||
|
}
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
|
|
||||||
new TestRPC("test").testCalls();
|
new TestRPC("test").testCalls(conf);
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user