diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index c1ac20e906..45e5535aeb 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -18,10 +18,11 @@ package org.apache.hadoop.ipc; -import static org.apache.hadoop.ipc.RpcConstants.CONNECTION_CONTEXT_CALL_ID; +import static org.apache.hadoop.ipc.RpcConstants.*; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; +import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.FilterInputStream; @@ -382,6 +383,7 @@ public class Client { private boolean tcpNoDelay; // if T then disable Nagle's Algorithm private boolean doPing; //do we need to send ping message private int pingInterval; // how often sends ping to the server in msecs + private ByteArrayOutputStream pingRequest; // ping message // currently active calls private Hashtable calls = new Hashtable(); @@ -407,6 +409,15 @@ public class Client { this.maxRetriesOnSocketTimeouts = remoteId.getMaxRetriesOnSocketTimeouts(); this.tcpNoDelay = remoteId.getTcpNoDelay(); this.doPing = remoteId.getDoPing(); + if (doPing) { + // construct a RPC header with the callId as the ping callId + pingRequest = new ByteArrayOutputStream(); + RpcRequestHeaderProto pingHeader = ProtoUtil + .makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER, + OperationProto.RPC_FINAL_PACKET, PING_CALL_ID, + RpcConstants.INVALID_RETRY_COUNT, clientId); + pingHeader.writeDelimitedTo(pingRequest); + } this.pingInterval = remoteId.getPingInterval(); this.serviceClass = serviceClass; if (LOG.isDebugEnabled()) { @@ -910,7 +921,8 @@ public class Client { if ( curTime - lastActivity.get() >= pingInterval) { lastActivity.set(curTime); synchronized (out) { - out.writeInt(RpcConstants.PING_CALL_ID); + out.writeInt(pingRequest.size()); + pingRequest.writeTo(out); out.flush(); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java index 14cc06e0d0..831862bb23 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java @@ -27,13 +27,13 @@ public class RpcConstants { // Hidden Constructor } - public static final int PING_CALL_ID = -1; + public static final int AUTHORIZATION_FAILED_CALL_ID = -1; + public static final int INVALID_CALL_ID = -2; + public static final int CONNECTION_CONTEXT_CALL_ID = -3; + public static final int PING_CALL_ID = -4; public static final byte[] DUMMY_CLIENT_ID = new byte[0]; - public static final int INVALID_CALL_ID = -2; - - public static final int CONNECTION_CONTEXT_CALL_ID = -3; public static final int INVALID_RETRY_COUNT = -1; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 5303655c65..119eaa1cb1 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -72,8 +72,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; -import static org.apache.hadoop.ipc.RpcConstants.CURRENT_VERSION; -import static org.apache.hadoop.ipc.RpcConstants.CONNECTION_CONTEXT_CALL_ID; +import static org.apache.hadoop.ipc.RpcConstants.*; import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseWrapper; import org.apache.hadoop.ipc.RPC.RpcInvoker; import org.apache.hadoop.ipc.RPC.VersionMismatch; @@ -1177,9 +1176,7 @@ public abstract class Server { public UserGroupInformation attemptingUser = null; // user name before auth // Fake 'call' for failed authorization response - private static final int AUTHORIZATION_FAILED_CALLID = -1; - - private final Call authFailedCall = new Call(AUTHORIZATION_FAILED_CALLID, + private final Call authFailedCall = new Call(AUTHORIZATION_FAILED_CALL_ID, RpcConstants.INVALID_RETRY_COUNT, null, this); private ByteArrayOutputStream authFailedResponse = new ByteArrayOutputStream(); @@ -1523,11 +1520,6 @@ public abstract class Server { if (data == null) { dataLengthBuffer.flip(); dataLength = dataLengthBuffer.getInt(); - if ((dataLength == RpcConstants.PING_CALL_ID) && (!useWrap)) { - // covers the !useSasl too - dataLengthBuffer.clear(); - return 0; // ping message - } checkDataLength(dataLength); data = ByteBuffer.allocate(dataLength); } @@ -1738,13 +1730,6 @@ public abstract class Server { if (unwrappedData == null) { unwrappedDataLengthBuffer.flip(); int unwrappedDataLength = unwrappedDataLengthBuffer.getInt(); - - if (unwrappedDataLength == RpcConstants.PING_CALL_ID) { - if (LOG.isDebugEnabled()) - LOG.debug("Received ping message"); - unwrappedDataLengthBuffer.clear(); - continue; // ping message - } unwrappedData = ByteBuffer.allocate(unwrappedDataLength); } @@ -1913,6 +1898,8 @@ public abstract class Server { "SASL protocol not requested by client"); } saslReadAndProcess(dis); + } else if (callId == PING_CALL_ID) { + LOG.debug("Received ping message"); } else { throw new WrappedRpcServerException( RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, @@ -1926,7 +1913,7 @@ public abstract class Server { */ private void authorizeConnection() throws WrappedRpcServerException { try { - // If auth method is DIGEST, the token was obtained by the + // If auth method is TOKEN, the token was obtained by the // real user for the effective user, therefore not required to // authorize real user. doAs is allowed only for simple or kerberos // authentication diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java index e657b04570..f6917d2159 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java @@ -100,6 +100,7 @@ public class TestRPC { void ping() throws IOException; void slowPing(boolean shouldSlow) throws IOException; + void sleep(long delay) throws IOException, InterruptedException; String echo(String value) throws IOException; String[] echo(String[] value) throws IOException; Writable echo(Writable value) throws IOException; @@ -145,6 +146,11 @@ public class TestRPC { } } + @Override + public void sleep(long delay) throws InterruptedException { + Thread.sleep(delay); + } + @Override public String echo(String value) throws IOException { return value; } @@ -932,6 +938,28 @@ public class TestRPC { } } + @Test + public void testConnectionPing() throws Exception { + Configuration conf = new Configuration(); + int pingInterval = 50; + conf.setBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, true); + conf.setInt(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY, pingInterval); + final Server server = new RPC.Builder(conf) + .setProtocol(TestProtocol.class).setInstance(new TestImpl()) + .setBindAddress(ADDRESS).setPort(0).setNumHandlers(5).setVerbose(true) + .build(); + server.start(); + + final TestProtocol proxy = RPC.getProxy(TestProtocol.class, + TestProtocol.versionID, server.getListenerAddress(), conf); + try { + // this call will throw exception if server couldn't decode the ping + proxy.sleep(pingInterval*4); + } finally { + if (proxy != null) RPC.stopProxy(proxy); + } + } + public static void main(String[] args) throws IOException { new TestRPC().testCallsInternal(conf);