From a4819e70dbf88b0905a6669078afa1ff0924ad4f Mon Sep 17 00:00:00 2001 From: Sanjay Radia Date: Thu, 12 Dec 2013 18:56:45 +0000 Subject: [PATCH] HADOOP-10044 Improve the javadoc of rpc code (sanjay Radia) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1550486 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-common/CHANGES.txt | 2 + .../org/apache/hadoop/ipc/RpcConstants.java | 14 ++ .../java/org/apache/hadoop/ipc/Server.java | 127 +++++++++++++++--- 3 files changed, 121 insertions(+), 22 deletions(-) diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 1ccc28e3ba..b37d7dee81 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -280,6 +280,8 @@ Trunk (Unreleased) HDFS-5471. CacheAdmin -listPools fails when user lacks permissions to view all pools (Andrew Wang via Colin Patrick McCabe) + HADOOP-10044 Improve the javadoc of rpc code (sanjay Radia) + OPTIMIZATIONS HADOOP-7761. Improve the performance of raw comparisons. (todd) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java index 831862bb23..c457500e90 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java @@ -37,10 +37,24 @@ public class RpcConstants { public static final int INVALID_RETRY_COUNT = -1; + /** + * The Rpc-connection header is as follows + * +----------------------------------+ + * | "hrpc" 4 bytes | + * +----------------------------------+ + * | Version (1 byte) | + * +----------------------------------+ + * | Service Class (1 byte) | + * +----------------------------------+ + * | AuthProtocol (1 byte) | + * +----------------------------------+ + */ + /** * The first four bytes of Hadoop RPC connections */ public static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes()); + public static final int HEADER_LEN_AFTER_HRPC_PART = 3; // 3 bytes that follow // 1 : Introduce ping and server does not throw away RPCs // 3 : Introduce the protocol into the RPC connection header diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 7fb395cdb0..7f56940890 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -1105,6 +1105,9 @@ public abstract class Server { this.channel = channel; this.lastContact = lastContact; this.data = null; + + // the buffer is initialized to read the "hrpc" and after that to read + // the length of the Rpc-packet (i.e 4 bytes) this.dataLengthBuffer = ByteBuffer.allocate(4); this.unwrappedData = null; this.unwrappedDataLengthBuffer = ByteBuffer.allocate(4); @@ -1200,7 +1203,16 @@ public abstract class Server { } } - private Throwable getCauseForInvalidToken(IOException e) { + /** + * Some exceptions ({@link RetriableException} and {@link StandbyException}) + * that are wrapped as a cause of parameter e are unwrapped so that they can + * be sent as the true cause to the client side. In case of + * {@link InvalidToken} we go one level deeper to get the true cause. + * + * @param e the exception that may have a cause we want to unwrap. + * @return the true cause for some exceptions. + */ + private Throwable getTrueCause(IOException e) { Throwable cause = e; while (cause != null) { if (cause instanceof RetriableException) { @@ -1223,6 +1235,18 @@ public abstract class Server { return e; } + /** + * Process saslMessage and send saslResponse back + * @param saslMessage received SASL message + * @throws WrappedRpcServerException setup failed due to SASL negotiation + * failure, premature or invalid connection context, or other state + * errors. This exception needs to be sent to the client. This + * exception will wrap {@link RetriableException}, + * {@link InvalidToken}, {@link StandbyException} or + * {@link SaslException}. + * @throws IOException if sending reply fails + * @throws InterruptedException + */ private void saslProcess(RpcSaslProto saslMessage) throws WrappedRpcServerException, IOException, InterruptedException { if (saslContextEstablished) { @@ -1239,7 +1263,7 @@ public abstract class Server { // attempting user could be null AUDITLOG.warn(AUTH_FAILED_FOR + this.toString() + ":" + attemptingUser + " (" + e.getLocalizedMessage() + ")"); - throw (IOException) getCauseForInvalidToken(e); + throw (IOException) getTrueCause(e); } if (saslServer != null && saslServer.isComplete()) { @@ -1274,13 +1298,26 @@ public abstract class Server { } } + /** + * Process a saslMessge. + * @param saslMessage received SASL message + * @return the sasl response to send back to client + * @throws SaslException if authentication or generating response fails, + * or SASL protocol mixup + * @throws IOException if a SaslServer cannot be created + * @throws AccessControlException if the requested authentication type + * is not supported or trying to re-attempt negotiation. + * @throws InterruptedException + */ private RpcSaslProto processSaslMessage(RpcSaslProto saslMessage) - throws IOException, InterruptedException { + throws SaslException, IOException, AccessControlException, + InterruptedException { RpcSaslProto saslResponse = null; final SaslState state = saslMessage.getState(); // required switch (state) { case NEGOTIATE: { if (sentNegotiate) { + // FIXME shouldn't this be SaslException? throw new AccessControlException( "Client already attempted negotiation"); } @@ -1402,12 +1439,30 @@ public abstract class Server { } } + /** + * This method reads in a non-blocking fashion from the channel: + * this method is called repeatedly when data is present in the channel; + * when it has enough data to process one rpc it processes that rpc. + * + * On the first pass, it processes the connectionHeader, + * connectionContext (an outOfBand RPC) and at most one RPC request that + * follows that. On future passes it will process at most one RPC request. + * + * Quirky things: dataLengthBuffer (4 bytes) is used to read "hrpc" OR + * rpc request length. + * + * @return -1 in case of error, else num bytes read so far + * @throws WrappedRpcServerException - an exception that has already been + * sent back to the client that does not require verbose logging + * by the Listener thread + * @throws IOException - internal error that should not be returned to + * client, typically failure to respond to client + * @throws InterruptedException + */ public int readAndProcess() throws WrappedRpcServerException, IOException, InterruptedException { while (true) { - /* Read at most one RPC. If the header is not read completely yet - * then iterate until we read first RPC or until there is no data left. - */ + // dataLengthBuffer is used to read "hrpc" or the rpc-packet length int count = -1; if (dataLengthBuffer.remaining() > 0) { count = channelRead(channel, dataLengthBuffer); @@ -1416,9 +1471,11 @@ public abstract class Server { } if (!connectionHeaderRead) { - //Every connection is expected to send the header. + // Every connection is expected to send the header; + // so far we read "hrpc" of the connection header. if (connectionHeaderBuf == null) { - connectionHeaderBuf = ByteBuffer.allocate(3); + // for the bytes that follow "hrpc", in the connection header + connectionHeaderBuf = ByteBuffer.allocate(HEADER_LEN_AFTER_HRPC_PART); } count = channelRead(channel, connectionHeaderBuf); if (count < 0 || connectionHeaderBuf.remaining() > 0) { @@ -1451,27 +1508,30 @@ public abstract class Server { // this may switch us into SIMPLE authProtocol = initializeAuthContext(connectionHeaderBuf.get(2)); - dataLengthBuffer.clear(); + dataLengthBuffer.clear(); // clear to next read rpc packet len connectionHeaderBuf = null; connectionHeaderRead = true; - continue; + continue; // connection header read, now read 4 bytes rpc packet len } - if (data == null) { + if (data == null) { // just read 4 bytes - length of RPC packet dataLengthBuffer.flip(); dataLength = dataLengthBuffer.getInt(); checkDataLength(dataLength); + // Set buffer for reading EXACTLY the RPC-packet length and no more. data = ByteBuffer.allocate(dataLength); } - + // Now read the RPC packet count = channelRead(channel, data); if (data.remaining() == 0) { - dataLengthBuffer.clear(); + dataLengthBuffer.clear(); // to read length of future rpc packets data.flip(); boolean isHeaderRead = connectionContextRead; processOneRpc(data.array()); data = null; + // the last rpc-request we processed could have simply been the + // connectionContext; if so continue to read the first RPC. if (!isHeaderRead) { continue; } @@ -1508,8 +1568,16 @@ public abstract class Server { return authProtocol; } + /** + * Process the Sasl's Negotiate request, including the optimization of + * accelerating token negotiation. + * @return the response to Negotiate request - the list of enabled + * authMethods and challenge if the TOKENS are supported. + * @throws SaslException - if attempt to generate challenge fails. + * @throws IOException - if it fails to create the SASL server for Tokens + */ private RpcSaslProto buildSaslNegotiateResponse() - throws IOException, InterruptedException { + throws InterruptedException, SaslException, IOException { RpcSaslProto negotiateMessage = negotiateResponse; // accelerate token negotiation by sending initial challenge // in the negotiation response @@ -1635,8 +1703,11 @@ public abstract class Server { /** * Process a wrapped RPC Request - unwrap the SASL packet and process * each embedded RPC request - * @param buf - SASL wrapped request of one or more RPCs + * @param inBuf - SASL wrapped request of one or more RPCs * @throws IOException - SASL packet cannot be unwrapped + * @throws WrappedRpcServerException - an exception that has already been + * sent back to the client that does not require verbose logging + * by the Listener thread * @throws InterruptedException */ private void unwrapPacketAndProcessRpcs(byte[] inBuf) @@ -1677,13 +1748,21 @@ public abstract class Server { } /** - * Process an RPC Request - handle connection setup and decoding of - * request into a Call + * Process one RPC Request from buffer read from socket stream + * - decode rpc in a rpc-Call + * - handle out-of-band RPC requests such as the initial connectionContext + * - A successfully decoded RpcCall will be deposited in RPC-Q and + * its response will be sent later when the request is processed. + * + * Prior to this call the connectionHeader ("hrpc...") has been handled and + * if SASL then SASL has been established and the buf we are passed + * has been unwrapped from SASL. + * * @param buf - contains the RPC request header and the rpc request * @throws IOException - internal error that should not be returned to * client, typically failure to respond to client - * @throws WrappedRpcServerException - an exception to be sent back to - * the client that does not require verbose logging by the + * @throws WrappedRpcServerException - an exception that is sent back to the + * client in this method and does not require verbose logging by the * Listener thread * @throws InterruptedException */ @@ -1753,8 +1832,11 @@ public abstract class Server { } /** - * Process an RPC Request - the connection headers and context must - * have been already read + * Process an RPC Request + * - the connection headers and context must have been already read. + * - Based on the rpcKind, decode the rpcRequest. + * - A successfully decoded RpcCall will be deposited in RPC-Q and + * its response will be sent later when the request is processed. * @param header - RPC request header * @param dis - stream to request payload * @throws WrappedRpcServerException - due to fatal rpc layer issues such @@ -1803,7 +1885,8 @@ public abstract class Server { * @param dis - stream to request payload * @throws WrappedRpcServerException - setup failed due to SASL * negotiation failure, premature or invalid connection context, - * or other state errors + * or other state errors. This exception needs to be sent to the + * client. * @throws IOException - failed to send a response back to the client * @throws InterruptedException */