HADOOP-6674. Makes use of the SASL authentication options in the SASL RPC. Contributed by Jitendra Pandey.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@951624 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
56b15e9e8f
commit
4b9c956bc5
@ -41,6 +41,9 @@ Trunk (unreleased changes)
|
||||
HADOOP-6661. User document for UserGroupInformation.doAs.
|
||||
(Jitendra Pandey via jghoman)
|
||||
|
||||
HADOOP-6674. Makes use of the SASL authentication options in the
|
||||
SASL RPC. (Jitendra Pandey via ddas)
|
||||
|
||||
BUG FIXES
|
||||
HADOOP-6638. try to relogin in a case of failed RPC connection (expired tgt)
|
||||
only in case the subject is loginUser or proxyUgi.realUser. (boryas)
|
||||
|
@ -76,6 +76,17 @@
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>hadoop.rpc.protection</name>
|
||||
<value>authentication</value>
|
||||
<description>This field sets the quality of protection for secured sasl
|
||||
connections. Possible values are authentication, integrity and privacy.
|
||||
authentication means authentication only and no integrity or privacy;
|
||||
integrity implies authentication and integrity are enabled; and privacy
|
||||
implies all of authentication, integrity and privacy are enabled.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<!--- logging properties -->
|
||||
|
||||
<property>
|
||||
|
@ -35,6 +35,7 @@ import org.apache.commons.logging.*;
|
||||
|
||||
import org.apache.hadoop.io.*;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.SaslRpcServer;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authorize.AuthorizationException;
|
||||
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
|
||||
@ -221,6 +222,9 @@ public class RPC {
|
||||
UserGroupInformation ticket,
|
||||
Configuration conf,
|
||||
SocketFactory factory) throws IOException {
|
||||
if (UserGroupInformation.isSecurityEnabled()) {
|
||||
SaslRpcServer.init(conf);
|
||||
}
|
||||
return getProtocolEngine(protocol,conf)
|
||||
.getProxy(protocol, clientVersion, addr, ticket, conf, factory);
|
||||
}
|
||||
|
@ -853,6 +853,8 @@ public abstract class Server {
|
||||
private final Call saslCall = new Call(SASL_CALLID, null, this);
|
||||
private final ByteArrayOutputStream saslResponse = new ByteArrayOutputStream();
|
||||
|
||||
private boolean useWrap = false;
|
||||
|
||||
public Connection(SelectionKey key, SocketChannel channel,
|
||||
long lastContact) {
|
||||
this.channel = channel;
|
||||
@ -1012,10 +1014,10 @@ public abstract class Server {
|
||||
null);
|
||||
}
|
||||
if (saslServer.isComplete()) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("SASL server context established. Negotiated QoP is "
|
||||
+ saslServer.getNegotiatedProperty(Sasl.QOP));
|
||||
}
|
||||
LOG.info("SASL server context established. Negotiated QoP is "
|
||||
+ saslServer.getNegotiatedProperty(Sasl.QOP));
|
||||
String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
|
||||
useWrap = qop != null && !"auth".equalsIgnoreCase(qop);
|
||||
user = getAuthorizedUgi(saslServer.getAuthorizationID());
|
||||
LOG.info("SASL server successfully authenticated client: " + user);
|
||||
rpcMetrics.authenticationSuccesses.inc();
|
||||
@ -1026,9 +1028,14 @@ public abstract class Server {
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Have read input token of size " + saslToken.length
|
||||
+ " for processing by saslServer.unwrap()");
|
||||
byte[] plaintextData = saslServer
|
||||
.unwrap(saslToken, 0, saslToken.length);
|
||||
processUnwrappedData(plaintextData);
|
||||
|
||||
if (!useWrap) {
|
||||
processOneRpc(saslToken);
|
||||
} else {
|
||||
byte[] plaintextData = saslServer.unwrap(saslToken, 0,
|
||||
saslToken.length);
|
||||
processUnwrappedData(plaintextData);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -1124,9 +1131,15 @@ public abstract class Server {
|
||||
dataLengthBuffer.flip();
|
||||
dataLength = dataLengthBuffer.getInt();
|
||||
|
||||
if (!useSasl && dataLength == Client.PING_CALL_ID) {
|
||||
if ((dataLength == Client.PING_CALL_ID) && (!useWrap)) {
|
||||
// covers the !useSasl too
|
||||
dataLengthBuffer.clear();
|
||||
return 0; //ping message
|
||||
return 0; // ping message
|
||||
}
|
||||
|
||||
if (dataLength < 0) {
|
||||
LOG.warn("Unexpected data length " + dataLength + "!! from " +
|
||||
getHostAddress());
|
||||
}
|
||||
data = ByteBuffer.allocate(dataLength);
|
||||
}
|
||||
@ -1454,9 +1467,12 @@ public abstract class Server {
|
||||
Integer.toString(this.port));
|
||||
this.tcpNoDelay = conf.getBoolean("ipc.server.tcpnodelay", false);
|
||||
|
||||
|
||||
// Create the responder here
|
||||
responder = new Responder();
|
||||
|
||||
if (isSecurityEnabled) {
|
||||
SaslRpcServer.init(conf);
|
||||
}
|
||||
}
|
||||
|
||||
private void closeConnection(Connection connection) {
|
||||
@ -1496,7 +1512,9 @@ public abstract class Server {
|
||||
WritableUtils.writeString(out, errorClass);
|
||||
WritableUtils.writeString(out, error);
|
||||
}
|
||||
wrapWithSasl(response, call);
|
||||
if (call.connection.useWrap) {
|
||||
wrapWithSasl(response, call);
|
||||
}
|
||||
call.setResponse(ByteBuffer.wrap(response.toByteArray()));
|
||||
}
|
||||
|
||||
|
@ -23,6 +23,7 @@ import java.io.EOFException;
|
||||
import java.io.InputStream;
|
||||
import java.io.IOException;
|
||||
|
||||
import javax.security.sasl.Sasl;
|
||||
import javax.security.sasl.SaslClient;
|
||||
import javax.security.sasl.SaslException;
|
||||
import javax.security.sasl.SaslServer;
|
||||
@ -41,6 +42,9 @@ public class SaslInputStream extends InputStream {
|
||||
public static final Log LOG = LogFactory.getLog(SaslInputStream.class);
|
||||
|
||||
private final DataInputStream inStream;
|
||||
/** Should we wrap the communication channel? */
|
||||
private final boolean useWrap;
|
||||
|
||||
/*
|
||||
* data read from the underlying input stream before being processed by SASL
|
||||
*/
|
||||
@ -141,6 +145,8 @@ public class SaslInputStream extends InputStream {
|
||||
this.inStream = new DataInputStream(inStream);
|
||||
this.saslServer = saslServer;
|
||||
this.saslClient = null;
|
||||
String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
|
||||
this.useWrap = qop != null && !"auth".equalsIgnoreCase(qop);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -157,6 +163,8 @@ public class SaslInputStream extends InputStream {
|
||||
this.inStream = new DataInputStream(inStream);
|
||||
this.saslServer = null;
|
||||
this.saslClient = saslClient;
|
||||
String qop = (String) saslClient.getNegotiatedProperty(Sasl.QOP);
|
||||
this.useWrap = qop != null && !"auth".equalsIgnoreCase(qop);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -174,6 +182,9 @@ public class SaslInputStream extends InputStream {
|
||||
* if an I/O error occurs.
|
||||
*/
|
||||
public int read() throws IOException {
|
||||
if (!useWrap) {
|
||||
return inStream.read();
|
||||
}
|
||||
if (ostart >= ofinish) {
|
||||
// we loop for new data as we are blocking
|
||||
int i = 0;
|
||||
@ -224,6 +235,9 @@ public class SaslInputStream extends InputStream {
|
||||
* if an I/O error occurs.
|
||||
*/
|
||||
public int read(byte[] b, int off, int len) throws IOException {
|
||||
if (!useWrap) {
|
||||
return inStream.read(b, off, len);
|
||||
}
|
||||
if (ostart >= ofinish) {
|
||||
// we loop for new data as we are blocking
|
||||
int i = 0;
|
||||
@ -265,6 +279,9 @@ public class SaslInputStream extends InputStream {
|
||||
* if an I/O error occurs.
|
||||
*/
|
||||
public long skip(long n) throws IOException {
|
||||
if (!useWrap) {
|
||||
return inStream.skip(n);
|
||||
}
|
||||
int available = ofinish - ostart;
|
||||
if (n > available) {
|
||||
n = available;
|
||||
@ -288,6 +305,9 @@ public class SaslInputStream extends InputStream {
|
||||
* if an I/O error occurs.
|
||||
*/
|
||||
public int available() throws IOException {
|
||||
if (!useWrap) {
|
||||
return inStream.available();
|
||||
}
|
||||
return (ofinish - ostart);
|
||||
}
|
||||
|
||||
|
@ -18,10 +18,15 @@
|
||||
|
||||
package org.apache.hadoop.security;
|
||||
|
||||
import java.io.BufferedOutputStream;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
|
||||
import javax.security.sasl.Sasl;
|
||||
import javax.security.sasl.SaslClient;
|
||||
import javax.security.sasl.SaslException;
|
||||
import javax.security.sasl.SaslServer;
|
||||
@ -34,7 +39,7 @@ import javax.security.sasl.SaslServer;
|
||||
*/
|
||||
public class SaslOutputStream extends OutputStream {
|
||||
|
||||
private final DataOutputStream outStream;
|
||||
private final OutputStream outStream;
|
||||
// processed data ready to be written out
|
||||
private byte[] saslToken;
|
||||
|
||||
@ -42,6 +47,7 @@ public class SaslOutputStream extends OutputStream {
|
||||
private final SaslServer saslServer;
|
||||
// buffer holding one byte of incoming data
|
||||
private final byte[] ibuffer = new byte[1];
|
||||
private final boolean useWrap;
|
||||
|
||||
/**
|
||||
* Constructs a SASLOutputStream from an OutputStream and a SaslServer <br>
|
||||
@ -54,9 +60,15 @@ public class SaslOutputStream extends OutputStream {
|
||||
* an initialized SaslServer object
|
||||
*/
|
||||
public SaslOutputStream(OutputStream outStream, SaslServer saslServer) {
|
||||
this.outStream = new DataOutputStream(outStream);
|
||||
this.saslServer = saslServer;
|
||||
this.saslClient = null;
|
||||
String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
|
||||
this.useWrap = qop != null && !"auth".equalsIgnoreCase(qop);
|
||||
if (useWrap) {
|
||||
this.outStream = new BufferedOutputStream(outStream, 64*1024);
|
||||
} else {
|
||||
this.outStream = outStream;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -70,9 +82,15 @@ public class SaslOutputStream extends OutputStream {
|
||||
* an initialized SaslClient object
|
||||
*/
|
||||
public SaslOutputStream(OutputStream outStream, SaslClient saslClient) {
|
||||
this.outStream = new DataOutputStream(outStream);
|
||||
this.saslServer = null;
|
||||
this.saslClient = saslClient;
|
||||
String qop = (String) saslClient.getNegotiatedProperty(Sasl.QOP);
|
||||
this.useWrap = qop != null && !"auth".equalsIgnoreCase(qop);
|
||||
if (useWrap) {
|
||||
this.outStream = new BufferedOutputStream(outStream, 64*1024);
|
||||
} else {
|
||||
this.outStream = outStream;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -100,6 +118,10 @@ public class SaslOutputStream extends OutputStream {
|
||||
* if an I/O error occurs.
|
||||
*/
|
||||
public void write(int b) throws IOException {
|
||||
if (!useWrap) {
|
||||
outStream.write(b);
|
||||
return;
|
||||
}
|
||||
ibuffer[0] = (byte) b;
|
||||
write(ibuffer, 0, 1);
|
||||
}
|
||||
@ -137,6 +159,10 @@ public class SaslOutputStream extends OutputStream {
|
||||
* if an I/O error occurs.
|
||||
*/
|
||||
public void write(byte[] inBuf, int off, int len) throws IOException {
|
||||
if (!useWrap) {
|
||||
outStream.write(inBuf, off, len);
|
||||
return;
|
||||
}
|
||||
try {
|
||||
if (saslServer != null) { // using saslServer
|
||||
saslToken = saslServer.wrap(inBuf, off, len);
|
||||
@ -151,7 +177,10 @@ public class SaslOutputStream extends OutputStream {
|
||||
throw se;
|
||||
}
|
||||
if (saslToken != null) {
|
||||
outStream.writeInt(saslToken.length);
|
||||
ByteArrayOutputStream byteOut = new ByteArrayOutputStream();
|
||||
DataOutputStream dout = new DataOutputStream(byteOut);
|
||||
dout.writeInt(saslToken.length);
|
||||
outStream.write(byteOut.toByteArray());
|
||||
outStream.write(saslToken, 0, saslToken.length);
|
||||
saslToken = null;
|
||||
}
|
||||
|
@ -38,6 +38,7 @@ import javax.security.sasl.Sasl;
|
||||
import org.apache.commons.codec.binary.Base64;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.ipc.Server;
|
||||
import org.apache.hadoop.security.token.SecretManager;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
@ -52,14 +53,41 @@ public class SaslRpcServer {
|
||||
public static final String SASL_DEFAULT_REALM = "default";
|
||||
public static final Map<String, String> SASL_PROPS =
|
||||
new TreeMap<String, String>();
|
||||
static {
|
||||
// Request authentication plus integrity protection
|
||||
SASL_PROPS.put(Sasl.QOP, "auth-int");
|
||||
// Request mutual authentication
|
||||
SASL_PROPS.put(Sasl.SERVER_AUTH, "true");
|
||||
}
|
||||
|
||||
public static final int SWITCH_TO_SIMPLE_AUTH = -88;
|
||||
|
||||
public static enum QualityOfProtection {
|
||||
AUTHENTICATION("auth"),
|
||||
INTEGRITY("auth-int"),
|
||||
PRIVACY("auth-conf");
|
||||
|
||||
public final String saslQop;
|
||||
|
||||
private QualityOfProtection(String saslQop) {
|
||||
this.saslQop = saslQop;
|
||||
}
|
||||
|
||||
public String getSaslQop() {
|
||||
return saslQop;
|
||||
}
|
||||
}
|
||||
|
||||
public static void init(Configuration conf) {
|
||||
QualityOfProtection saslQOP = QualityOfProtection.AUTHENTICATION;
|
||||
String rpcProtection = conf.get("hadoop.rpc.protection",
|
||||
QualityOfProtection.AUTHENTICATION.name().toLowerCase());
|
||||
if (QualityOfProtection.INTEGRITY.name().toLowerCase()
|
||||
.equals(rpcProtection)) {
|
||||
saslQOP = QualityOfProtection.INTEGRITY;
|
||||
} else if (QualityOfProtection.PRIVACY.name().toLowerCase().equals(
|
||||
rpcProtection)) {
|
||||
saslQOP = QualityOfProtection.PRIVACY;
|
||||
}
|
||||
|
||||
SASL_PROPS.put(Sasl.QOP, saslQOP.getSaslQop());
|
||||
SASL_PROPS.put(Sasl.SERVER_AUTH, "true");
|
||||
}
|
||||
|
||||
static String encodeIdentifier(byte[] identifier) {
|
||||
return new String(Base64.encodeBase64(identifier));
|
||||
}
|
||||
|
@ -28,6 +28,8 @@ import java.net.InetSocketAddress;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.Collection;
|
||||
|
||||
import javax.security.sasl.Sasl;
|
||||
|
||||
import junit.framework.Assert;
|
||||
|
||||
import org.apache.commons.logging.*;
|
||||
@ -232,6 +234,8 @@ public class TestSaslRPC {
|
||||
try {
|
||||
proxy = (TestSaslProtocol) RPC.getProxy(TestSaslProtocol.class,
|
||||
TestSaslProtocol.versionID, addr, conf);
|
||||
//QOP must be auth
|
||||
Assert.assertEquals(SaslRpcServer.SASL_PROPS.get(Sasl.QOP), "auth");
|
||||
proxy.ping();
|
||||
} finally {
|
||||
server.stop();
|
||||
|
Loading…
x
Reference in New Issue
Block a user