From 4b9c956bc56cec2c5b669102340d0912e0db3517 Mon Sep 17 00:00:00 2001 From: Devaraj Das Date: Sat, 5 Jun 2010 00:34:36 +0000 Subject: [PATCH] HADOOP-6674. Makes use of the SASL authentication options in the SASL RPC. Contributed by Jitendra Pandey. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@951624 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 3 ++ src/java/core-default.xml | 11 +++++ src/java/org/apache/hadoop/ipc/RPC.java | 4 ++ src/java/org/apache/hadoop/ipc/Server.java | 40 ++++++++++++++----- .../hadoop/security/SaslInputStream.java | 20 ++++++++++ .../hadoop/security/SaslOutputStream.java | 37 +++++++++++++++-- .../apache/hadoop/security/SaslRpcServer.java | 40 ++++++++++++++++--- .../org/apache/hadoop/ipc/TestSaslRPC.java | 4 ++ 8 files changed, 138 insertions(+), 21 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index c6d06096c5..403fe81094 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -41,6 +41,9 @@ Trunk (unreleased changes) HADOOP-6661. User document for UserGroupInformation.doAs. (Jitendra Pandey via jghoman) + HADOOP-6674. Makes use of the SASL authentication options in the + SASL RPC. (Jitendra Pandey via ddas) + BUG FIXES HADOOP-6638. try to relogin in a case of failed RPC connection (expired tgt) only in case the subject is loginUser or proxyUgi.realUser. (boryas) diff --git a/src/java/core-default.xml b/src/java/core-default.xml index 454a59d898..89ec63a82f 100644 --- a/src/java/core-default.xml +++ b/src/java/core-default.xml @@ -76,6 +76,17 @@ + + hadoop.rpc.protection + authentication + This field sets the quality of protection for secured sasl + connections. Possible values are authentication, integrity and privacy. + authentication means authentication only and no integrity or privacy; + integrity implies authentication and integrity are enabled; and privacy + implies all of authentication, integrity and privacy are enabled. + + + diff --git a/src/java/org/apache/hadoop/ipc/RPC.java b/src/java/org/apache/hadoop/ipc/RPC.java index 08f11de3f7..7471e1ea69 100644 --- a/src/java/org/apache/hadoop/ipc/RPC.java +++ b/src/java/org/apache/hadoop/ipc/RPC.java @@ -35,6 +35,7 @@ import org.apache.hadoop.io.*; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.SaslRpcServer; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AuthorizationException; import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; @@ -221,6 +222,9 @@ public static Object getProxy(Class protocol, long clientVersion, UserGroupInformation ticket, Configuration conf, SocketFactory factory) throws IOException { + if (UserGroupInformation.isSecurityEnabled()) { + SaslRpcServer.init(conf); + } return getProtocolEngine(protocol,conf) .getProxy(protocol, clientVersion, addr, ticket, conf, factory); } diff --git a/src/java/org/apache/hadoop/ipc/Server.java b/src/java/org/apache/hadoop/ipc/Server.java index b058d0f766..194bb430ff 100644 --- a/src/java/org/apache/hadoop/ipc/Server.java +++ b/src/java/org/apache/hadoop/ipc/Server.java @@ -853,6 +853,8 @@ public class Connection { private final Call saslCall = new Call(SASL_CALLID, null, this); private final ByteArrayOutputStream saslResponse = new ByteArrayOutputStream(); + private boolean useWrap = false; + public Connection(SelectionKey key, SocketChannel channel, long lastContact) { this.channel = channel; @@ -1012,10 +1014,10 @@ public Object run() throws SaslException { null); } if (saslServer.isComplete()) { - if (LOG.isDebugEnabled()) { - LOG.debug("SASL server context established. Negotiated QoP is " - + saslServer.getNegotiatedProperty(Sasl.QOP)); - } + LOG.info("SASL server context established. Negotiated QoP is " + + saslServer.getNegotiatedProperty(Sasl.QOP)); + String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP); + useWrap = qop != null && !"auth".equalsIgnoreCase(qop); user = getAuthorizedUgi(saslServer.getAuthorizationID()); LOG.info("SASL server successfully authenticated client: " + user); rpcMetrics.authenticationSuccesses.inc(); @@ -1026,9 +1028,14 @@ public Object run() throws SaslException { if (LOG.isDebugEnabled()) LOG.debug("Have read input token of size " + saslToken.length + " for processing by saslServer.unwrap()"); - byte[] plaintextData = saslServer - .unwrap(saslToken, 0, saslToken.length); - processUnwrappedData(plaintextData); + + if (!useWrap) { + processOneRpc(saslToken); + } else { + byte[] plaintextData = saslServer.unwrap(saslToken, 0, + saslToken.length); + processUnwrappedData(plaintextData); + } } } @@ -1124,9 +1131,15 @@ public int readAndProcess() throws IOException, InterruptedException { dataLengthBuffer.flip(); dataLength = dataLengthBuffer.getInt(); - if (!useSasl && dataLength == Client.PING_CALL_ID) { + if ((dataLength == Client.PING_CALL_ID) && (!useWrap)) { + // covers the !useSasl too dataLengthBuffer.clear(); - return 0; //ping message + return 0; // ping message + } + + if (dataLength < 0) { + LOG.warn("Unexpected data length " + dataLength + "!! from " + + getHostAddress()); } data = ByteBuffer.allocate(dataLength); } @@ -1454,9 +1467,12 @@ protected Server(String bindAddress, int port, Integer.toString(this.port)); this.tcpNoDelay = conf.getBoolean("ipc.server.tcpnodelay", false); - // Create the responder here responder = new Responder(); + + if (isSecurityEnabled) { + SaslRpcServer.init(conf); + } } private void closeConnection(Connection connection) { @@ -1496,7 +1512,9 @@ private void setupResponse(ByteArrayOutputStream response, WritableUtils.writeString(out, errorClass); WritableUtils.writeString(out, error); } - wrapWithSasl(response, call); + if (call.connection.useWrap) { + wrapWithSasl(response, call); + } call.setResponse(ByteBuffer.wrap(response.toByteArray())); } diff --git a/src/java/org/apache/hadoop/security/SaslInputStream.java b/src/java/org/apache/hadoop/security/SaslInputStream.java index 60ddefb9b0..5d2e21a2f1 100644 --- a/src/java/org/apache/hadoop/security/SaslInputStream.java +++ b/src/java/org/apache/hadoop/security/SaslInputStream.java @@ -23,6 +23,7 @@ import java.io.InputStream; import java.io.IOException; +import javax.security.sasl.Sasl; import javax.security.sasl.SaslClient; import javax.security.sasl.SaslException; import javax.security.sasl.SaslServer; @@ -41,6 +42,9 @@ public class SaslInputStream extends InputStream { public static final Log LOG = LogFactory.getLog(SaslInputStream.class); private final DataInputStream inStream; + /** Should we wrap the communication channel? */ + private final boolean useWrap; + /* * data read from the underlying input stream before being processed by SASL */ @@ -141,6 +145,8 @@ public SaslInputStream(InputStream inStream, SaslServer saslServer) { this.inStream = new DataInputStream(inStream); this.saslServer = saslServer; this.saslClient = null; + String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP); + this.useWrap = qop != null && !"auth".equalsIgnoreCase(qop); } /** @@ -157,6 +163,8 @@ public SaslInputStream(InputStream inStream, SaslClient saslClient) { this.inStream = new DataInputStream(inStream); this.saslServer = null; this.saslClient = saslClient; + String qop = (String) saslClient.getNegotiatedProperty(Sasl.QOP); + this.useWrap = qop != null && !"auth".equalsIgnoreCase(qop); } /** @@ -174,6 +182,9 @@ public SaslInputStream(InputStream inStream, SaslClient saslClient) { * if an I/O error occurs. */ public int read() throws IOException { + if (!useWrap) { + return inStream.read(); + } if (ostart >= ofinish) { // we loop for new data as we are blocking int i = 0; @@ -224,6 +235,9 @@ public int read(byte[] b) throws IOException { * if an I/O error occurs. */ public int read(byte[] b, int off, int len) throws IOException { + if (!useWrap) { + return inStream.read(b, off, len); + } if (ostart >= ofinish) { // we loop for new data as we are blocking int i = 0; @@ -265,6 +279,9 @@ public int read(byte[] b, int off, int len) throws IOException { * if an I/O error occurs. */ public long skip(long n) throws IOException { + if (!useWrap) { + return inStream.skip(n); + } int available = ofinish - ostart; if (n > available) { n = available; @@ -288,6 +305,9 @@ public long skip(long n) throws IOException { * if an I/O error occurs. */ public int available() throws IOException { + if (!useWrap) { + return inStream.available(); + } return (ofinish - ostart); } diff --git a/src/java/org/apache/hadoop/security/SaslOutputStream.java b/src/java/org/apache/hadoop/security/SaslOutputStream.java index 2f2807b2cd..58fe09db79 100644 --- a/src/java/org/apache/hadoop/security/SaslOutputStream.java +++ b/src/java/org/apache/hadoop/security/SaslOutputStream.java @@ -18,10 +18,15 @@ package org.apache.hadoop.security; +import java.io.BufferedOutputStream; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.io.OutputStream; +import javax.security.sasl.Sasl; import javax.security.sasl.SaslClient; import javax.security.sasl.SaslException; import javax.security.sasl.SaslServer; @@ -34,7 +39,7 @@ */ public class SaslOutputStream extends OutputStream { - private final DataOutputStream outStream; + private final OutputStream outStream; // processed data ready to be written out private byte[] saslToken; @@ -42,6 +47,7 @@ public class SaslOutputStream extends OutputStream { private final SaslServer saslServer; // buffer holding one byte of incoming data private final byte[] ibuffer = new byte[1]; + private final boolean useWrap; /** * Constructs a SASLOutputStream from an OutputStream and a SaslServer
@@ -54,9 +60,15 @@ public class SaslOutputStream extends OutputStream { * an initialized SaslServer object */ public SaslOutputStream(OutputStream outStream, SaslServer saslServer) { - this.outStream = new DataOutputStream(outStream); this.saslServer = saslServer; this.saslClient = null; + String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP); + this.useWrap = qop != null && !"auth".equalsIgnoreCase(qop); + if (useWrap) { + this.outStream = new BufferedOutputStream(outStream, 64*1024); + } else { + this.outStream = outStream; + } } /** @@ -70,9 +82,15 @@ public SaslOutputStream(OutputStream outStream, SaslServer saslServer) { * an initialized SaslClient object */ public SaslOutputStream(OutputStream outStream, SaslClient saslClient) { - this.outStream = new DataOutputStream(outStream); this.saslServer = null; this.saslClient = saslClient; + String qop = (String) saslClient.getNegotiatedProperty(Sasl.QOP); + this.useWrap = qop != null && !"auth".equalsIgnoreCase(qop); + if (useWrap) { + this.outStream = new BufferedOutputStream(outStream, 64*1024); + } else { + this.outStream = outStream; + } } /** @@ -100,6 +118,10 @@ private void disposeSasl() throws SaslException { * if an I/O error occurs. */ public void write(int b) throws IOException { + if (!useWrap) { + outStream.write(b); + return; + } ibuffer[0] = (byte) b; write(ibuffer, 0, 1); } @@ -137,6 +159,10 @@ public void write(byte[] b) throws IOException { * if an I/O error occurs. */ public void write(byte[] inBuf, int off, int len) throws IOException { + if (!useWrap) { + outStream.write(inBuf, off, len); + return; + } try { if (saslServer != null) { // using saslServer saslToken = saslServer.wrap(inBuf, off, len); @@ -151,7 +177,10 @@ public void write(byte[] inBuf, int off, int len) throws IOException { throw se; } if (saslToken != null) { - outStream.writeInt(saslToken.length); + ByteArrayOutputStream byteOut = new ByteArrayOutputStream(); + DataOutputStream dout = new DataOutputStream(byteOut); + dout.writeInt(saslToken.length); + outStream.write(byteOut.toByteArray()); outStream.write(saslToken, 0, saslToken.length); saslToken = null; } diff --git a/src/java/org/apache/hadoop/security/SaslRpcServer.java b/src/java/org/apache/hadoop/security/SaslRpcServer.java index 62c704e2a0..5632c85c3c 100644 --- a/src/java/org/apache/hadoop/security/SaslRpcServer.java +++ b/src/java/org/apache/hadoop/security/SaslRpcServer.java @@ -38,6 +38,7 @@ import org.apache.commons.codec.binary.Base64; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.TokenIdentifier; @@ -52,14 +53,41 @@ public class SaslRpcServer { public static final String SASL_DEFAULT_REALM = "default"; public static final Map SASL_PROPS = new TreeMap(); - static { - // Request authentication plus integrity protection - SASL_PROPS.put(Sasl.QOP, "auth-int"); - // Request mutual authentication - SASL_PROPS.put(Sasl.SERVER_AUTH, "true"); - } + public static final int SWITCH_TO_SIMPLE_AUTH = -88; + public static enum QualityOfProtection { + AUTHENTICATION("auth"), + INTEGRITY("auth-int"), + PRIVACY("auth-conf"); + + public final String saslQop; + + private QualityOfProtection(String saslQop) { + this.saslQop = saslQop; + } + + public String getSaslQop() { + return saslQop; + } + } + + public static void init(Configuration conf) { + QualityOfProtection saslQOP = QualityOfProtection.AUTHENTICATION; + String rpcProtection = conf.get("hadoop.rpc.protection", + QualityOfProtection.AUTHENTICATION.name().toLowerCase()); + if (QualityOfProtection.INTEGRITY.name().toLowerCase() + .equals(rpcProtection)) { + saslQOP = QualityOfProtection.INTEGRITY; + } else if (QualityOfProtection.PRIVACY.name().toLowerCase().equals( + rpcProtection)) { + saslQOP = QualityOfProtection.PRIVACY; + } + + SASL_PROPS.put(Sasl.QOP, saslQOP.getSaslQop()); + SASL_PROPS.put(Sasl.SERVER_AUTH, "true"); + } + static String encodeIdentifier(byte[] identifier) { return new String(Base64.encodeBase64(identifier)); } diff --git a/src/test/core/org/apache/hadoop/ipc/TestSaslRPC.java b/src/test/core/org/apache/hadoop/ipc/TestSaslRPC.java index 44a38b9d95..c57cfcecbb 100644 --- a/src/test/core/org/apache/hadoop/ipc/TestSaslRPC.java +++ b/src/test/core/org/apache/hadoop/ipc/TestSaslRPC.java @@ -28,6 +28,8 @@ import java.security.PrivilegedExceptionAction; import java.util.Collection; +import javax.security.sasl.Sasl; + import junit.framework.Assert; import org.apache.commons.logging.*; @@ -232,6 +234,8 @@ private void doDigestRpc(Server server, TestTokenSecretManager sm) try { proxy = (TestSaslProtocol) RPC.getProxy(TestSaslProtocol.class, TestSaslProtocol.versionID, addr, conf); + //QOP must be auth + Assert.assertEquals(SaslRpcServer.SASL_PROPS.get(Sasl.QOP), "auth"); proxy.ping(); } finally { server.stop();