From c03c8fe199429a43c6aa944016566738abd9b193 Mon Sep 17 00:00:00 2001 From: Jitendra Nath Pandey Date: Thu, 8 Aug 2013 23:02:20 +0000 Subject: [PATCH] HADOOP-9820. RPCv9 wire protocol is insufficient to support multiplexing. Contributed by Daryn Sharp. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1512091 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-common/CHANGES.txt | 2 + .../java/org/apache/hadoop/ipc/Client.java | 14 +- .../java/org/apache/hadoop/ipc/Server.java | 80 ++++++---- .../apache/hadoop/security/SaslRpcClient.java | 141 +++++++++++++++--- .../src/main/proto/RpcHeader.proto | 1 + 5 files changed, 188 insertions(+), 50 deletions(-) diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 31c13069c2..08d7ad98b7 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -383,6 +383,8 @@ Release 2.1.0-beta - 2013-08-06 HADOOP-9832. [RPC v9] Add RPC header to client ping (daryn) + HADOOP-9820. [RPC v9] Wire protocol is insufficient to support multiplexing. (daryn via jitendra) + NEW FEATURES HADOOP-9283. Add support for running the Hadoop client on AIX. (atm) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index 7f94bb4148..ae30dd3eb3 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -737,12 +737,16 @@ public AuthMethod run() } if (doPing) { - this.in = new DataInputStream(new BufferedInputStream( - new PingInputStream(inStream))); - } else { - this.in = new DataInputStream(new BufferedInputStream(inStream)); + inStream = new PingInputStream(inStream); } - this.out = new DataOutputStream(new BufferedOutputStream(outStream)); + this.in = new DataInputStream(new BufferedInputStream(inStream)); + + // SASL may have already buffered the stream + if (!(outStream instanceof BufferedOutputStream)) { + outStream = new BufferedOutputStream(outStream); + } + this.out = new DataOutputStream(outStream); + writeConnectionContext(remoteId, authMethod); // update last activity time diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 119eaa1cb1..1533b3d00c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -73,6 +73,8 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; import static org.apache.hadoop.ipc.RpcConstants.*; + +import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseMessageWrapper; import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseWrapper; import org.apache.hadoop.ipc.RPC.RpcInvoker; import org.apache.hadoop.ipc.RPC.VersionMismatch; @@ -1274,7 +1276,27 @@ private UserGroupInformation getAuthorizedUgi(String authorizedId) } private void saslReadAndProcess(DataInputStream dis) throws - WrappedRpcServerException, IOException, InterruptedException { + WrappedRpcServerException, IOException, InterruptedException { + final RpcSaslProto saslMessage = + decodeProtobufFromStream(RpcSaslProto.newBuilder(), dis); + switch (saslMessage.getState()) { + case WRAP: { + if (!saslContextEstablished || !useWrap) { + throw new WrappedRpcServerException( + RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, + new SaslException("Server is not wrapping data")); + } + // loops over decoded data and calls processOneRpc + unwrapPacketAndProcessRpcs(saslMessage.getToken().toByteArray()); + break; + } + default: + saslProcess(saslMessage); + } + } + + private void saslProcess(RpcSaslProto saslMessage) + throws WrappedRpcServerException, IOException, InterruptedException { if (saslContextEstablished) { throw new WrappedRpcServerException( RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, @@ -1283,7 +1305,7 @@ private void saslReadAndProcess(DataInputStream dis) throws RpcSaslProto saslResponse = null; try { try { - saslResponse = processSaslMessage(dis); + saslResponse = processSaslMessage(saslMessage); } catch (IOException e) { IOException sendToClient = e; Throwable cause = e; @@ -1328,14 +1350,14 @@ private void saslReadAndProcess(DataInputStream dis) throws // do NOT enable wrapping until the last auth response is sent if (saslContextEstablished) { String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP); + // SASL wrapping is only used if the connection has a QOP, and + // the value is not auth. ex. auth-int & auth-priv useWrap = (qop != null && !"auth".equalsIgnoreCase(qop)); } } - private RpcSaslProto processSaslMessage(DataInputStream dis) + private RpcSaslProto processSaslMessage(RpcSaslProto saslMessage) throws IOException, InterruptedException { - final RpcSaslProto saslMessage = - decodeProtobufFromStream(RpcSaslProto.newBuilder(), dis); RpcSaslProto saslResponse = null; final SaslState state = saslMessage.getState(); // required switch (state) { @@ -1530,7 +1552,7 @@ public int readAndProcess() dataLengthBuffer.clear(); data.flip(); boolean isHeaderRead = connectionContextRead; - processRpcRequestPacket(data.array()); + processOneRpc(data.array()); data = null; if (!isHeaderRead) { continue; @@ -1693,29 +1715,19 @@ private void processConnectionContext(DataInputStream dis) } /** - * Process a RPC Request - if SASL wrapping is enabled, unwrap the - * requests and process each one, else directly process the request - * @param buf - single request or SASL wrapped requests - * @throws IOException - connection failed to authenticate or authorize, - * or the request could not be decoded into a Call + * Process a wrapped RPC Request - unwrap the SASL packet and process + * each embedded RPC request + * @param buf - SASL wrapped request of one or more RPCs + * @throws IOException - SASL packet cannot be unwrapped * @throws InterruptedException */ - private void processRpcRequestPacket(byte[] buf) - throws WrappedRpcServerException, IOException, InterruptedException { - if (saslContextEstablished && useWrap) { - if (LOG.isDebugEnabled()) - LOG.debug("Have read input token of size " + buf.length - + " for processing by saslServer.unwrap()"); - final byte[] plaintextData = saslServer.unwrap(buf, 0, buf.length); - // loops over decoded data and calls processOneRpc - unwrapPacketAndProcessRpcs(plaintextData); - } else { - processOneRpc(buf); - } - } - private void unwrapPacketAndProcessRpcs(byte[] inBuf) throws WrappedRpcServerException, IOException, InterruptedException { + if (LOG.isDebugEnabled()) { + LOG.debug("Have read input token of size " + inBuf.length + + " for processing by saslServer.unwrap()"); + } + inBuf = saslServer.unwrap(inBuf, 0, inBuf.length); ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream( inBuf)); // Read all RPCs contained in the inBuf, even partial ones @@ -2378,9 +2390,21 @@ private void wrapWithSasl(ByteArrayOutputStream response, Call call) LOG.debug("Adding saslServer wrapped token of size " + token.length + " as call response."); response.reset(); - DataOutputStream saslOut = new DataOutputStream(response); - saslOut.writeInt(token.length); - saslOut.write(token, 0, token.length); + // rebuild with sasl header and payload + RpcResponseHeaderProto saslHeader = RpcResponseHeaderProto.newBuilder() + .setCallId(AuthProtocol.SASL.callId) + .setStatus(RpcStatusProto.SUCCESS) + .build(); + RpcSaslProto saslMessage = RpcSaslProto.newBuilder() + .setState(SaslState.WRAP) + .setToken(ByteString.copyFrom(token, 0, token.length)) + .build(); + RpcResponseMessageWrapper saslResponse = + new RpcResponseMessageWrapper(saslHeader, saslMessage); + + DataOutputStream out = new DataOutputStream(response); + out.writeInt(saslResponse.getLength()); + saslResponse.write(out); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java index da8d474b5b..b5e8c32f9d 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java @@ -20,12 +20,16 @@ import java.io.BufferedInputStream; import java.io.BufferedOutputStream; +import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.DataOutputStream; +import java.io.FilterInputStream; +import java.io.FilterOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.InetSocketAddress; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -485,38 +489,141 @@ private RpcSaslProto.Builder createSaslReply(SaslState state, return response; } + private boolean useWrap() { + // getNegotiatedProperty throws if client isn't complete + String qop = (String) saslClient.getNegotiatedProperty(Sasl.QOP); + // SASL wrapping is only used if the connection has a QOP, and + // the value is not auth. ex. auth-int & auth-priv + return qop != null && !"auth".equalsIgnoreCase(qop); + } + /** - * Get a SASL wrapped InputStream. Can be called only after saslConnect() has - * been called. + * Get SASL wrapped InputStream if SASL QoP requires unwrapping, + * otherwise return original stream. Can be called only after + * saslConnect() has been called. * - * @param in - * the InputStream to wrap - * @return a SASL wrapped InputStream + * @param in - InputStream used to make the connection + * @return InputStream that may be using SASL unwrap * @throws IOException */ public InputStream getInputStream(InputStream in) throws IOException { - if (!saslClient.isComplete()) { - throw new IOException("Sasl authentication exchange hasn't completed yet"); + if (useWrap()) { + in = new WrappedInputStream(in); } - return new SaslInputStream(in, saslClient); + return in; } /** - * Get a SASL wrapped OutputStream. Can be called only after saslConnect() has - * been called. + * Get SASL wrapped OutputStream if SASL QoP requires wrapping, + * otherwise return original stream. Can be called only after + * saslConnect() has been called. * - * @param out - * the OutputStream to wrap - * @return a SASL wrapped OutputStream + * @param in - InputStream used to make the connection + * @return InputStream that may be using SASL unwrap * @throws IOException */ public OutputStream getOutputStream(OutputStream out) throws IOException { - if (!saslClient.isComplete()) { - throw new IOException("Sasl authentication exchange hasn't completed yet"); + if (useWrap()) { + // the client and server negotiate a maximum buffer size that can be + // wrapped + String maxBuf = (String)saslClient.getNegotiatedProperty(Sasl.RAW_SEND_SIZE); + out = new BufferedOutputStream(new WrappedOutputStream(out), + Integer.parseInt(maxBuf)); } - return new SaslOutputStream(out, saslClient); + return out; } + // ideally this should be folded into the RPC decoding loop but it's + // currently split across Client and SaslRpcClient... + class WrappedInputStream extends FilterInputStream { + private ByteBuffer unwrappedRpcBuffer = ByteBuffer.allocate(0); + public WrappedInputStream(InputStream in) throws IOException { + super(in); + } + + @Override + public int read() throws IOException { + byte[] b = new byte[1]; + int n = read(b, 0, 1); + return (n != -1) ? b[0] : -1; + } + + @Override + public int read(byte b[]) throws IOException { + return read(b, 0, b.length); + } + + @Override + public int read(byte[] buf, int off, int len) throws IOException { + synchronized(unwrappedRpcBuffer) { + // fill the buffer with the next RPC message + if (unwrappedRpcBuffer.remaining() == 0) { + readNextRpcPacket(); + } + // satisfy as much of the request as possible + int readLen = Math.min(len, unwrappedRpcBuffer.remaining()); + unwrappedRpcBuffer.get(buf, off, readLen); + return readLen; + } + } + + // all messages must be RPC SASL wrapped, else an exception is thrown + private void readNextRpcPacket() throws IOException { + LOG.debug("reading next wrapped RPC packet"); + DataInputStream dis = new DataInputStream(in); + int rpcLen = dis.readInt(); + byte[] rpcBuf = new byte[rpcLen]; + dis.readFully(rpcBuf); + + // decode the RPC header + ByteArrayInputStream bis = new ByteArrayInputStream(rpcBuf); + RpcResponseHeaderProto.Builder headerBuilder = + RpcResponseHeaderProto.newBuilder(); + headerBuilder.mergeDelimitedFrom(bis); + + boolean isWrapped = false; + // Must be SASL wrapped, verify and decode. + if (headerBuilder.getCallId() == AuthProtocol.SASL.callId) { + RpcSaslProto.Builder saslMessage = RpcSaslProto.newBuilder(); + saslMessage.mergeDelimitedFrom(bis); + if (saslMessage.getState() == SaslState.WRAP) { + isWrapped = true; + byte[] token = saslMessage.getToken().toByteArray(); + if (LOG.isDebugEnabled()) { + LOG.debug("unwrapping token of length:" + token.length); + } + token = saslClient.unwrap(token, 0, token.length); + unwrappedRpcBuffer = ByteBuffer.wrap(token); + } + } + if (!isWrapped) { + throw new SaslException("Server sent non-wrapped response"); + } + } + } + + class WrappedOutputStream extends FilterOutputStream { + public WrappedOutputStream(OutputStream out) throws IOException { + super(out); + } + @Override + public void write(byte[] buf, int off, int len) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("wrapping token of length:" + len); + } + buf = saslClient.wrap(buf, off, len); + RpcSaslProto saslMessage = RpcSaslProto.newBuilder() + .setState(SaslState.WRAP) + .setToken(ByteString.copyFrom(buf, 0, buf.length)) + .build(); + RpcRequestMessageWrapper request = + new RpcRequestMessageWrapper(saslHeader, saslMessage); + DataOutputStream dob = new DataOutputStream(out); + dob.writeInt(request.getLength()); + request.write(dob); + } + } + /** Release resources used by wrapped saslClient */ public void dispose() throws SaslException { if (saslClient != null) { @@ -572,4 +679,4 @@ public void handle(Callback[] callbacks) } } } -} \ No newline at end of file +} diff --git a/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto b/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto index 2f61d98682..673883b23a 100644 --- a/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto +++ b/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto @@ -141,6 +141,7 @@ message RpcSaslProto { INITIATE = 2; CHALLENGE = 3; RESPONSE = 4; + WRAP = 5; } message SaslAuth {