HADOOP-6543. Allows secure clients to talk to unsecure clusters. Contributed by Kan Zhang.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@915097 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
ffdde40b9f
commit
c5622e5d4d
@ -158,6 +158,9 @@ Trunk (unreleased changes)
|
||||
|
||||
HADOOP-6583. Captures authentication and authorization metrics. (ddas)
|
||||
|
||||
HADOOP-6543. Allows secure clients to talk to unsecure clusters.
|
||||
(Kan Zhang via ddas)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
@ -209,8 +209,8 @@ private class Connection extends Thread {
|
||||
private String serverPrincipal; // server's krb5 principal name
|
||||
private ConnectionHeader header; // connection header
|
||||
private final ConnectionId remoteId; // connection id
|
||||
private final AuthMethod authMethod; // authentication method
|
||||
private final boolean useSasl;
|
||||
private AuthMethod authMethod; // authentication method
|
||||
private boolean useSasl;
|
||||
private Token<? extends TokenIdentifier> token;
|
||||
private SaslRpcClient saslRpcClient;
|
||||
|
||||
@ -364,13 +364,13 @@ private synchronized void disposeSasl() {
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized void setupSaslConnection(final InputStream in2,
|
||||
private synchronized boolean setupSaslConnection(final InputStream in2,
|
||||
final OutputStream out2)
|
||||
throws javax.security.sasl.SaslException,IOException,InterruptedException {
|
||||
throws IOException {
|
||||
try {
|
||||
saslRpcClient = new SaslRpcClient(authMethod, token,
|
||||
serverPrincipal);
|
||||
saslRpcClient.saslConnect(in2, out2);
|
||||
return saslRpcClient.saslConnect(in2, out2);
|
||||
} catch (javax.security.sasl.SaslException je) {
|
||||
if (authMethod == AuthMethod.KERBEROS &&
|
||||
UserGroupInformation.isLoginKeytabBased()) {
|
||||
@ -378,9 +378,10 @@ private synchronized void setupSaslConnection(final InputStream in2,
|
||||
UserGroupInformation.getCurrentUser().reloginFromKeytab();
|
||||
//try setting up the connection again
|
||||
try {
|
||||
disposeSasl();
|
||||
saslRpcClient = new SaslRpcClient(authMethod, token,
|
||||
serverPrincipal);
|
||||
saslRpcClient.saslConnect(in2, out2);
|
||||
return saslRpcClient.saslConnect(in2, out2);
|
||||
} catch (javax.security.sasl.SaslException jee) {
|
||||
UserGroupInformation.
|
||||
setLastUnsuccessfulAuthenticationAttemptTime
|
||||
@ -437,15 +438,22 @@ private synchronized void setupIOstreams() throws InterruptedException {
|
||||
ticket = ticket.getRealUser();
|
||||
}
|
||||
}
|
||||
ticket.doAs(new PrivilegedExceptionAction<Object>() {
|
||||
if (ticket.doAs(new PrivilegedExceptionAction<Boolean>() {
|
||||
@Override
|
||||
public Object run() throws IOException, InterruptedException {
|
||||
setupSaslConnection(in2, out2);
|
||||
return null;
|
||||
public Boolean run() throws IOException {
|
||||
return setupSaslConnection(in2, out2);
|
||||
}
|
||||
});
|
||||
inStream = saslRpcClient.getInputStream(inStream);
|
||||
outStream = saslRpcClient.getOutputStream(outStream);
|
||||
})) {
|
||||
// Sasl connect is successful. Let's set up Sasl i/o streams.
|
||||
inStream = saslRpcClient.getInputStream(inStream);
|
||||
outStream = saslRpcClient.getOutputStream(outStream);
|
||||
} else {
|
||||
// fall back to simple auth because server told us so.
|
||||
authMethod = AuthMethod.SIMPLE;
|
||||
header = new ConnectionHeader(header.getProtocol(),
|
||||
header.getUgi(), authMethod);
|
||||
useSasl = false;
|
||||
}
|
||||
}
|
||||
if (doPing) {
|
||||
this.in = new DataInputStream(new BufferedInputStream
|
||||
|
@ -20,7 +20,6 @@
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
@ -28,8 +27,6 @@
|
||||
import org.apache.hadoop.io.Writable;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
|
||||
/**
|
||||
* The IPC connection header sent by the client to the server
|
||||
@ -86,16 +83,14 @@ public void readFields(DataInput in) throws IOException {
|
||||
public void write(DataOutput out) throws IOException {
|
||||
Text.writeString(out, (protocol == null) ? "" : protocol);
|
||||
if (ugi != null) {
|
||||
if (UserGroupInformation.isSecurityEnabled()) {
|
||||
if (authMethod == AuthMethod.KERBEROS) {
|
||||
//Send effective user for Kerberos auth
|
||||
out.writeBoolean(true);
|
||||
out.writeUTF(ugi.getUserName());
|
||||
out.writeBoolean(false);
|
||||
} else {
|
||||
//Don't send user for token auth
|
||||
out.writeBoolean(false);
|
||||
}
|
||||
if (authMethod == AuthMethod.KERBEROS) {
|
||||
// Send effective user for Kerberos auth
|
||||
out.writeBoolean(true);
|
||||
out.writeUTF(ugi.getUserName());
|
||||
out.writeBoolean(false);
|
||||
} else if (authMethod == AuthMethod.DIGEST) {
|
||||
// Don't send user for token auth
|
||||
out.writeBoolean(false);
|
||||
} else {
|
||||
//Send both effective user and real user for simple auth
|
||||
out.writeBoolean(true);
|
||||
|
@ -85,6 +85,7 @@
|
||||
*/
|
||||
public abstract class Server {
|
||||
private final boolean authorize;
|
||||
private boolean isSecurityEnabled;
|
||||
|
||||
/**
|
||||
* The first four bytes of Hadoop RPC connections
|
||||
@ -746,6 +747,7 @@ private class Connection {
|
||||
SaslServer saslServer;
|
||||
private AuthMethod authMethod;
|
||||
private boolean saslContextEstablished;
|
||||
private boolean skipInitialSaslHandshake;
|
||||
private ByteBuffer rpcHeaderBuffer;
|
||||
private ByteBuffer unwrappedData;
|
||||
private ByteBuffer unwrappedDataLengthBuffer;
|
||||
@ -929,6 +931,15 @@ private void disposeSasl() {
|
||||
}
|
||||
}
|
||||
|
||||
private void askClientToUseSimpleAuth() throws IOException {
|
||||
saslCall.connection = this;
|
||||
saslResponse.reset();
|
||||
DataOutputStream out = new DataOutputStream(saslResponse);
|
||||
out.writeInt(SaslRpcServer.SWITCH_TO_SIMPLE_AUTH);
|
||||
saslCall.setResponse(ByteBuffer.wrap(saslResponse.toByteArray()));
|
||||
responder.doRespond(saslCall);
|
||||
}
|
||||
|
||||
public int readAndProcess() throws IOException, InterruptedException {
|
||||
while (true) {
|
||||
/* Read at most one RPC. If the header is not read completely yet
|
||||
@ -957,13 +968,16 @@ public int readAndProcess() throws IOException, InterruptedException {
|
||||
if (authMethod == null) {
|
||||
throw new IOException("Unable to read authentication method");
|
||||
}
|
||||
if (UserGroupInformation.isSecurityEnabled()
|
||||
&& authMethod == AuthMethod.SIMPLE) {
|
||||
if (isSecurityEnabled && authMethod == AuthMethod.SIMPLE) {
|
||||
throw new IOException("Authentication is required");
|
||||
}
|
||||
if (!UserGroupInformation.isSecurityEnabled()
|
||||
&& authMethod != AuthMethod.SIMPLE) {
|
||||
throw new IOException("Authentication is not supported");
|
||||
}
|
||||
if (!isSecurityEnabled && authMethod != AuthMethod.SIMPLE) {
|
||||
askClientToUseSimpleAuth();
|
||||
authMethod = AuthMethod.SIMPLE;
|
||||
// client has already sent the initial Sasl message and we
|
||||
// should ignore it. Both client and server should fall back
|
||||
// to simple auth from now on.
|
||||
skipInitialSaslHandshake = true;
|
||||
}
|
||||
if (authMethod != AuthMethod.SIMPLE) {
|
||||
useSasl = true;
|
||||
@ -1000,6 +1014,11 @@ public int readAndProcess() throws IOException, InterruptedException {
|
||||
if (data.remaining() == 0) {
|
||||
dataLengthBuffer.clear();
|
||||
data.flip();
|
||||
if (skipInitialSaslHandshake) {
|
||||
data = null;
|
||||
skipInitialSaslHandshake = false;
|
||||
continue;
|
||||
}
|
||||
boolean isHeaderRead = headerRead;
|
||||
if (useSasl) {
|
||||
saslReadAndProcess(data.array());
|
||||
@ -1278,6 +1297,7 @@ protected Server(String bindAddress, int port,
|
||||
this.authorize =
|
||||
conf.getBoolean(ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG,
|
||||
false);
|
||||
this.isSecurityEnabled = UserGroupInformation.isSecurityEnabled();
|
||||
|
||||
// Start the listener here and let it bind to the port
|
||||
listener = new Listener();
|
||||
@ -1355,6 +1375,11 @@ Configuration getConf() {
|
||||
return conf;
|
||||
}
|
||||
|
||||
/** for unit testing only, should be called before server is started */
|
||||
void disableSecurity() {
|
||||
this.isSecurityEnabled = false;
|
||||
}
|
||||
|
||||
/** Sets the socket buffer size used for responding to RPCs */
|
||||
public void setSocketSendBufSize(int size) { this.socketSendBufferSize = size; }
|
||||
|
||||
|
@ -107,9 +107,11 @@ public SaslRpcClient(AuthMethod method,
|
||||
* InputStream to use
|
||||
* @param outS
|
||||
* OutputStream to use
|
||||
* @return true if connection is set up, or false if needs to switch
|
||||
* to simple Auth.
|
||||
* @throws IOException
|
||||
*/
|
||||
public void saslConnect(InputStream inS, OutputStream outS)
|
||||
public boolean saslConnect(InputStream inS, OutputStream outS)
|
||||
throws IOException {
|
||||
DataInputStream inStream = new DataInputStream(new BufferedInputStream(inS));
|
||||
DataOutputStream outStream = new DataOutputStream(new BufferedOutputStream(
|
||||
@ -128,7 +130,14 @@ public void saslConnect(InputStream inS, OutputStream outS)
|
||||
+ " from initSASLContext.");
|
||||
}
|
||||
if (!saslClient.isComplete()) {
|
||||
saslToken = new byte[inStream.readInt()];
|
||||
int len = inStream.readInt();
|
||||
if (len == SaslRpcServer.SWITCH_TO_SIMPLE_AUTH) {
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Server asks us to fall back to simple auth.");
|
||||
saslClient.dispose();
|
||||
return false;
|
||||
}
|
||||
saslToken = new byte[len];
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Will read input token of size " + saslToken.length
|
||||
+ " for processing by initSASLContext");
|
||||
@ -157,8 +166,13 @@ public void saslConnect(InputStream inS, OutputStream outS)
|
||||
LOG.debug("SASL client context established. Negotiated QoP: "
|
||||
+ saslClient.getNegotiatedProperty(Sasl.QOP));
|
||||
}
|
||||
return true;
|
||||
} catch (IOException e) {
|
||||
saslClient.dispose();
|
||||
try {
|
||||
saslClient.dispose();
|
||||
} catch (SaslException ignored) {
|
||||
// ignore further exceptions during cleanup
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
@ -55,6 +55,7 @@ public class SaslRpcServer {
|
||||
// Request mutual authentication
|
||||
SASL_PROPS.put(Sasl.SERVER_AUTH, "true");
|
||||
}
|
||||
public static final int SWITCH_TO_SIMPLE_AUTH = -88;
|
||||
|
||||
static String encodeIdentifier(byte[] identifier) {
|
||||
return new String(Base64.encodeBase64(identifier));
|
||||
|
@ -75,20 +75,14 @@ public static class TestTokenIdentifier extends TokenIdentifier {
|
||||
final static Text KIND_NAME = new Text("test.token");
|
||||
|
||||
public TestTokenIdentifier() {
|
||||
this.tokenid = new Text();
|
||||
this.realUser = new Text();
|
||||
this(new Text(), new Text());
|
||||
}
|
||||
public TestTokenIdentifier(Text tokenid) {
|
||||
this.tokenid = tokenid;
|
||||
this.realUser = new Text();
|
||||
this(tokenid, new Text());
|
||||
}
|
||||
public TestTokenIdentifier(Text tokenid, Text realUser) {
|
||||
this.tokenid = tokenid;
|
||||
if (realUser == null) {
|
||||
this.realUser = new Text();
|
||||
} else {
|
||||
this.realUser = realUser;
|
||||
}
|
||||
this.tokenid = tokenid == null ? new Text() : tokenid;
|
||||
this.realUser = realUser == null ? new Text() : realUser;
|
||||
}
|
||||
@Override
|
||||
public Text getKind() {
|
||||
@ -96,7 +90,7 @@ public Text getKind() {
|
||||
}
|
||||
@Override
|
||||
public UserGroupInformation getUser() {
|
||||
if ((realUser == null) || ("".equals(realUser.toString()))) {
|
||||
if ("".equals(realUser.toString())) {
|
||||
return UserGroupInformation.createRemoteUser(tokenid.toString());
|
||||
} else {
|
||||
UserGroupInformation realUgi = UserGroupInformation
|
||||
@ -114,9 +108,7 @@ public void readFields(DataInput in) throws IOException {
|
||||
@Override
|
||||
public void write(DataOutput out) throws IOException {
|
||||
tokenid.write(out);
|
||||
if (realUser != null) {
|
||||
realUser.write(out);
|
||||
}
|
||||
realUser.write(out);
|
||||
}
|
||||
}
|
||||
|
||||
@ -170,6 +162,20 @@ public void testDigestRpc() throws Exception {
|
||||
final Server server = RPC.getServer(TestSaslProtocol.class,
|
||||
new TestSaslImpl(), ADDRESS, 0, 5, true, conf, sm);
|
||||
|
||||
doDigestRpc(server, sm);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSecureToInsecureRpc() throws Exception {
|
||||
Server server = RPC.getServer(TestSaslProtocol.class,
|
||||
new TestSaslImpl(), ADDRESS, 0, 5, true, conf, null);
|
||||
server.disableSecurity();
|
||||
TestTokenSecretManager sm = new TestTokenSecretManager();
|
||||
doDigestRpc(server, sm);
|
||||
}
|
||||
|
||||
private void doDigestRpc(Server server, TestTokenSecretManager sm)
|
||||
throws Exception {
|
||||
server.start();
|
||||
|
||||
final UserGroupInformation current = UserGroupInformation.getCurrentUser();
|
||||
|
@ -44,6 +44,14 @@
|
||||
<Field name="out" />
|
||||
<Bug pattern="IS2_INCONSISTENT_SYNC" />
|
||||
</Match>
|
||||
<!--
|
||||
Further SaslException should be ignored during cleanup and
|
||||
original exception should be re-thrown.
|
||||
-->
|
||||
<Match>
|
||||
<Class name="org.apache.hadoop.security.SaslRpcClient" />
|
||||
<Bug pattern="DE_MIGHT_IGNORE" />
|
||||
</Match>
|
||||
<!--
|
||||
Ignore Cross Scripting Vulnerabilities
|
||||
-->
|
||||
|
Loading…
Reference in New Issue
Block a user