HADOOP-10044 Improve the javadoc of rpc code (sanjay Radia)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1550486 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
ee0530ee0e
commit
a4819e70db
@ -280,6 +280,8 @@ Trunk (Unreleased)
|
|||||||
HDFS-5471. CacheAdmin -listPools fails when user lacks permissions to view
|
HDFS-5471. CacheAdmin -listPools fails when user lacks permissions to view
|
||||||
all pools (Andrew Wang via Colin Patrick McCabe)
|
all pools (Andrew Wang via Colin Patrick McCabe)
|
||||||
|
|
||||||
|
HADOOP-10044 Improve the javadoc of rpc code (sanjay Radia)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HADOOP-7761. Improve the performance of raw comparisons. (todd)
|
HADOOP-7761. Improve the performance of raw comparisons. (todd)
|
||||||
|
@ -37,10 +37,24 @@ public class RpcConstants {
|
|||||||
|
|
||||||
public static final int INVALID_RETRY_COUNT = -1;
|
public static final int INVALID_RETRY_COUNT = -1;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The Rpc-connection header is as follows
|
||||||
|
* +----------------------------------+
|
||||||
|
* | "hrpc" 4 bytes |
|
||||||
|
* +----------------------------------+
|
||||||
|
* | Version (1 byte) |
|
||||||
|
* +----------------------------------+
|
||||||
|
* | Service Class (1 byte) |
|
||||||
|
* +----------------------------------+
|
||||||
|
* | AuthProtocol (1 byte) |
|
||||||
|
* +----------------------------------+
|
||||||
|
*/
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The first four bytes of Hadoop RPC connections
|
* The first four bytes of Hadoop RPC connections
|
||||||
*/
|
*/
|
||||||
public static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes());
|
public static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes());
|
||||||
|
public static final int HEADER_LEN_AFTER_HRPC_PART = 3; // 3 bytes that follow
|
||||||
|
|
||||||
// 1 : Introduce ping and server does not throw away RPCs
|
// 1 : Introduce ping and server does not throw away RPCs
|
||||||
// 3 : Introduce the protocol into the RPC connection header
|
// 3 : Introduce the protocol into the RPC connection header
|
||||||
|
@ -1105,6 +1105,9 @@ public abstract class Server {
|
|||||||
this.channel = channel;
|
this.channel = channel;
|
||||||
this.lastContact = lastContact;
|
this.lastContact = lastContact;
|
||||||
this.data = null;
|
this.data = null;
|
||||||
|
|
||||||
|
// the buffer is initialized to read the "hrpc" and after that to read
|
||||||
|
// the length of the Rpc-packet (i.e 4 bytes)
|
||||||
this.dataLengthBuffer = ByteBuffer.allocate(4);
|
this.dataLengthBuffer = ByteBuffer.allocate(4);
|
||||||
this.unwrappedData = null;
|
this.unwrappedData = null;
|
||||||
this.unwrappedDataLengthBuffer = ByteBuffer.allocate(4);
|
this.unwrappedDataLengthBuffer = ByteBuffer.allocate(4);
|
||||||
@ -1200,7 +1203,16 @@ public abstract class Server {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private Throwable getCauseForInvalidToken(IOException e) {
|
/**
|
||||||
|
* Some exceptions ({@link RetriableException} and {@link StandbyException})
|
||||||
|
* that are wrapped as a cause of parameter e are unwrapped so that they can
|
||||||
|
* be sent as the true cause to the client side. In case of
|
||||||
|
* {@link InvalidToken} we go one level deeper to get the true cause.
|
||||||
|
*
|
||||||
|
* @param e the exception that may have a cause we want to unwrap.
|
||||||
|
* @return the true cause for some exceptions.
|
||||||
|
*/
|
||||||
|
private Throwable getTrueCause(IOException e) {
|
||||||
Throwable cause = e;
|
Throwable cause = e;
|
||||||
while (cause != null) {
|
while (cause != null) {
|
||||||
if (cause instanceof RetriableException) {
|
if (cause instanceof RetriableException) {
|
||||||
@ -1223,6 +1235,18 @@ public abstract class Server {
|
|||||||
return e;
|
return e;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Process saslMessage and send saslResponse back
|
||||||
|
* @param saslMessage received SASL message
|
||||||
|
* @throws WrappedRpcServerException setup failed due to SASL negotiation
|
||||||
|
* failure, premature or invalid connection context, or other state
|
||||||
|
* errors. This exception needs to be sent to the client. This
|
||||||
|
* exception will wrap {@link RetriableException},
|
||||||
|
* {@link InvalidToken}, {@link StandbyException} or
|
||||||
|
* {@link SaslException}.
|
||||||
|
* @throws IOException if sending reply fails
|
||||||
|
* @throws InterruptedException
|
||||||
|
*/
|
||||||
private void saslProcess(RpcSaslProto saslMessage)
|
private void saslProcess(RpcSaslProto saslMessage)
|
||||||
throws WrappedRpcServerException, IOException, InterruptedException {
|
throws WrappedRpcServerException, IOException, InterruptedException {
|
||||||
if (saslContextEstablished) {
|
if (saslContextEstablished) {
|
||||||
@ -1239,7 +1263,7 @@ public abstract class Server {
|
|||||||
// attempting user could be null
|
// attempting user could be null
|
||||||
AUDITLOG.warn(AUTH_FAILED_FOR + this.toString() + ":"
|
AUDITLOG.warn(AUTH_FAILED_FOR + this.toString() + ":"
|
||||||
+ attemptingUser + " (" + e.getLocalizedMessage() + ")");
|
+ attemptingUser + " (" + e.getLocalizedMessage() + ")");
|
||||||
throw (IOException) getCauseForInvalidToken(e);
|
throw (IOException) getTrueCause(e);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (saslServer != null && saslServer.isComplete()) {
|
if (saslServer != null && saslServer.isComplete()) {
|
||||||
@ -1274,13 +1298,26 @@ public abstract class Server {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Process a saslMessge.
|
||||||
|
* @param saslMessage received SASL message
|
||||||
|
* @return the sasl response to send back to client
|
||||||
|
* @throws SaslException if authentication or generating response fails,
|
||||||
|
* or SASL protocol mixup
|
||||||
|
* @throws IOException if a SaslServer cannot be created
|
||||||
|
* @throws AccessControlException if the requested authentication type
|
||||||
|
* is not supported or trying to re-attempt negotiation.
|
||||||
|
* @throws InterruptedException
|
||||||
|
*/
|
||||||
private RpcSaslProto processSaslMessage(RpcSaslProto saslMessage)
|
private RpcSaslProto processSaslMessage(RpcSaslProto saslMessage)
|
||||||
throws IOException, InterruptedException {
|
throws SaslException, IOException, AccessControlException,
|
||||||
|
InterruptedException {
|
||||||
RpcSaslProto saslResponse = null;
|
RpcSaslProto saslResponse = null;
|
||||||
final SaslState state = saslMessage.getState(); // required
|
final SaslState state = saslMessage.getState(); // required
|
||||||
switch (state) {
|
switch (state) {
|
||||||
case NEGOTIATE: {
|
case NEGOTIATE: {
|
||||||
if (sentNegotiate) {
|
if (sentNegotiate) {
|
||||||
|
// FIXME shouldn't this be SaslException?
|
||||||
throw new AccessControlException(
|
throw new AccessControlException(
|
||||||
"Client already attempted negotiation");
|
"Client already attempted negotiation");
|
||||||
}
|
}
|
||||||
@ -1402,12 +1439,30 @@ public abstract class Server {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This method reads in a non-blocking fashion from the channel:
|
||||||
|
* this method is called repeatedly when data is present in the channel;
|
||||||
|
* when it has enough data to process one rpc it processes that rpc.
|
||||||
|
*
|
||||||
|
* On the first pass, it processes the connectionHeader,
|
||||||
|
* connectionContext (an outOfBand RPC) and at most one RPC request that
|
||||||
|
* follows that. On future passes it will process at most one RPC request.
|
||||||
|
*
|
||||||
|
* Quirky things: dataLengthBuffer (4 bytes) is used to read "hrpc" OR
|
||||||
|
* rpc request length.
|
||||||
|
*
|
||||||
|
* @return -1 in case of error, else num bytes read so far
|
||||||
|
* @throws WrappedRpcServerException - an exception that has already been
|
||||||
|
* sent back to the client that does not require verbose logging
|
||||||
|
* by the Listener thread
|
||||||
|
* @throws IOException - internal error that should not be returned to
|
||||||
|
* client, typically failure to respond to client
|
||||||
|
* @throws InterruptedException
|
||||||
|
*/
|
||||||
public int readAndProcess()
|
public int readAndProcess()
|
||||||
throws WrappedRpcServerException, IOException, InterruptedException {
|
throws WrappedRpcServerException, IOException, InterruptedException {
|
||||||
while (true) {
|
while (true) {
|
||||||
/* Read at most one RPC. If the header is not read completely yet
|
// dataLengthBuffer is used to read "hrpc" or the rpc-packet length
|
||||||
* then iterate until we read first RPC or until there is no data left.
|
|
||||||
*/
|
|
||||||
int count = -1;
|
int count = -1;
|
||||||
if (dataLengthBuffer.remaining() > 0) {
|
if (dataLengthBuffer.remaining() > 0) {
|
||||||
count = channelRead(channel, dataLengthBuffer);
|
count = channelRead(channel, dataLengthBuffer);
|
||||||
@ -1416,9 +1471,11 @@ public abstract class Server {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (!connectionHeaderRead) {
|
if (!connectionHeaderRead) {
|
||||||
//Every connection is expected to send the header.
|
// Every connection is expected to send the header;
|
||||||
|
// so far we read "hrpc" of the connection header.
|
||||||
if (connectionHeaderBuf == null) {
|
if (connectionHeaderBuf == null) {
|
||||||
connectionHeaderBuf = ByteBuffer.allocate(3);
|
// for the bytes that follow "hrpc", in the connection header
|
||||||
|
connectionHeaderBuf = ByteBuffer.allocate(HEADER_LEN_AFTER_HRPC_PART);
|
||||||
}
|
}
|
||||||
count = channelRead(channel, connectionHeaderBuf);
|
count = channelRead(channel, connectionHeaderBuf);
|
||||||
if (count < 0 || connectionHeaderBuf.remaining() > 0) {
|
if (count < 0 || connectionHeaderBuf.remaining() > 0) {
|
||||||
@ -1451,27 +1508,30 @@ public abstract class Server {
|
|||||||
// this may switch us into SIMPLE
|
// this may switch us into SIMPLE
|
||||||
authProtocol = initializeAuthContext(connectionHeaderBuf.get(2));
|
authProtocol = initializeAuthContext(connectionHeaderBuf.get(2));
|
||||||
|
|
||||||
dataLengthBuffer.clear();
|
dataLengthBuffer.clear(); // clear to next read rpc packet len
|
||||||
connectionHeaderBuf = null;
|
connectionHeaderBuf = null;
|
||||||
connectionHeaderRead = true;
|
connectionHeaderRead = true;
|
||||||
continue;
|
continue; // connection header read, now read 4 bytes rpc packet len
|
||||||
}
|
}
|
||||||
|
|
||||||
if (data == null) {
|
if (data == null) { // just read 4 bytes - length of RPC packet
|
||||||
dataLengthBuffer.flip();
|
dataLengthBuffer.flip();
|
||||||
dataLength = dataLengthBuffer.getInt();
|
dataLength = dataLengthBuffer.getInt();
|
||||||
checkDataLength(dataLength);
|
checkDataLength(dataLength);
|
||||||
|
// Set buffer for reading EXACTLY the RPC-packet length and no more.
|
||||||
data = ByteBuffer.allocate(dataLength);
|
data = ByteBuffer.allocate(dataLength);
|
||||||
}
|
}
|
||||||
|
// Now read the RPC packet
|
||||||
count = channelRead(channel, data);
|
count = channelRead(channel, data);
|
||||||
|
|
||||||
if (data.remaining() == 0) {
|
if (data.remaining() == 0) {
|
||||||
dataLengthBuffer.clear();
|
dataLengthBuffer.clear(); // to read length of future rpc packets
|
||||||
data.flip();
|
data.flip();
|
||||||
boolean isHeaderRead = connectionContextRead;
|
boolean isHeaderRead = connectionContextRead;
|
||||||
processOneRpc(data.array());
|
processOneRpc(data.array());
|
||||||
data = null;
|
data = null;
|
||||||
|
// the last rpc-request we processed could have simply been the
|
||||||
|
// connectionContext; if so continue to read the first RPC.
|
||||||
if (!isHeaderRead) {
|
if (!isHeaderRead) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
@ -1508,8 +1568,16 @@ public abstract class Server {
|
|||||||
return authProtocol;
|
return authProtocol;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Process the Sasl's Negotiate request, including the optimization of
|
||||||
|
* accelerating token negotiation.
|
||||||
|
* @return the response to Negotiate request - the list of enabled
|
||||||
|
* authMethods and challenge if the TOKENS are supported.
|
||||||
|
* @throws SaslException - if attempt to generate challenge fails.
|
||||||
|
* @throws IOException - if it fails to create the SASL server for Tokens
|
||||||
|
*/
|
||||||
private RpcSaslProto buildSaslNegotiateResponse()
|
private RpcSaslProto buildSaslNegotiateResponse()
|
||||||
throws IOException, InterruptedException {
|
throws InterruptedException, SaslException, IOException {
|
||||||
RpcSaslProto negotiateMessage = negotiateResponse;
|
RpcSaslProto negotiateMessage = negotiateResponse;
|
||||||
// accelerate token negotiation by sending initial challenge
|
// accelerate token negotiation by sending initial challenge
|
||||||
// in the negotiation response
|
// in the negotiation response
|
||||||
@ -1635,8 +1703,11 @@ public abstract class Server {
|
|||||||
/**
|
/**
|
||||||
* Process a wrapped RPC Request - unwrap the SASL packet and process
|
* Process a wrapped RPC Request - unwrap the SASL packet and process
|
||||||
* each embedded RPC request
|
* each embedded RPC request
|
||||||
* @param buf - SASL wrapped request of one or more RPCs
|
* @param inBuf - SASL wrapped request of one or more RPCs
|
||||||
* @throws IOException - SASL packet cannot be unwrapped
|
* @throws IOException - SASL packet cannot be unwrapped
|
||||||
|
* @throws WrappedRpcServerException - an exception that has already been
|
||||||
|
* sent back to the client that does not require verbose logging
|
||||||
|
* by the Listener thread
|
||||||
* @throws InterruptedException
|
* @throws InterruptedException
|
||||||
*/
|
*/
|
||||||
private void unwrapPacketAndProcessRpcs(byte[] inBuf)
|
private void unwrapPacketAndProcessRpcs(byte[] inBuf)
|
||||||
@ -1677,13 +1748,21 @@ public abstract class Server {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Process an RPC Request - handle connection setup and decoding of
|
* Process one RPC Request from buffer read from socket stream
|
||||||
* request into a Call
|
* - decode rpc in a rpc-Call
|
||||||
|
* - handle out-of-band RPC requests such as the initial connectionContext
|
||||||
|
* - A successfully decoded RpcCall will be deposited in RPC-Q and
|
||||||
|
* its response will be sent later when the request is processed.
|
||||||
|
*
|
||||||
|
* Prior to this call the connectionHeader ("hrpc...") has been handled and
|
||||||
|
* if SASL then SASL has been established and the buf we are passed
|
||||||
|
* has been unwrapped from SASL.
|
||||||
|
*
|
||||||
* @param buf - contains the RPC request header and the rpc request
|
* @param buf - contains the RPC request header and the rpc request
|
||||||
* @throws IOException - internal error that should not be returned to
|
* @throws IOException - internal error that should not be returned to
|
||||||
* client, typically failure to respond to client
|
* client, typically failure to respond to client
|
||||||
* @throws WrappedRpcServerException - an exception to be sent back to
|
* @throws WrappedRpcServerException - an exception that is sent back to the
|
||||||
* the client that does not require verbose logging by the
|
* client in this method and does not require verbose logging by the
|
||||||
* Listener thread
|
* Listener thread
|
||||||
* @throws InterruptedException
|
* @throws InterruptedException
|
||||||
*/
|
*/
|
||||||
@ -1753,8 +1832,11 @@ public abstract class Server {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Process an RPC Request - the connection headers and context must
|
* Process an RPC Request
|
||||||
* have been already read
|
* - the connection headers and context must have been already read.
|
||||||
|
* - Based on the rpcKind, decode the rpcRequest.
|
||||||
|
* - A successfully decoded RpcCall will be deposited in RPC-Q and
|
||||||
|
* its response will be sent later when the request is processed.
|
||||||
* @param header - RPC request header
|
* @param header - RPC request header
|
||||||
* @param dis - stream to request payload
|
* @param dis - stream to request payload
|
||||||
* @throws WrappedRpcServerException - due to fatal rpc layer issues such
|
* @throws WrappedRpcServerException - due to fatal rpc layer issues such
|
||||||
@ -1803,7 +1885,8 @@ public abstract class Server {
|
|||||||
* @param dis - stream to request payload
|
* @param dis - stream to request payload
|
||||||
* @throws WrappedRpcServerException - setup failed due to SASL
|
* @throws WrappedRpcServerException - setup failed due to SASL
|
||||||
* negotiation failure, premature or invalid connection context,
|
* negotiation failure, premature or invalid connection context,
|
||||||
* or other state errors
|
* or other state errors. This exception needs to be sent to the
|
||||||
|
* client.
|
||||||
* @throws IOException - failed to send a response back to the client
|
* @throws IOException - failed to send a response back to the client
|
||||||
* @throws InterruptedException
|
* @throws InterruptedException
|
||||||
*/
|
*/
|
||||||
|
Loading…
x
Reference in New Issue
Block a user