HADOOP-9015. Standardize creation of SaslRpcServers (daryn via bobby)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1406851 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
fb5b96dfc3
commit
1594dd6965
@ -347,6 +347,8 @@ Release 2.0.3-alpha - Unreleased
|
||||
|
||||
HADOOP-9014. Standardize creation of SaslRpcClients (daryn via bobby)
|
||||
|
||||
HADOOP-9015. Standardize creation of SaslRpcServers (daryn via bobby)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HADOOP-8866. SampleQuantiles#query is O(N^2) instead of O(N). (Andrew Wang
|
||||
|
@ -57,6 +57,7 @@
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
|
||||
import javax.security.auth.callback.CallbackHandler;
|
||||
import javax.security.sasl.Sasl;
|
||||
import javax.security.sasl.SaslException;
|
||||
import javax.security.sasl.SaslServer;
|
||||
@ -87,6 +88,7 @@
|
||||
import org.apache.hadoop.security.SaslRpcServer.SaslGssCallbackHandler;
|
||||
import org.apache.hadoop.security.SaslRpcServer.SaslStatus;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authentication.util.KerberosName;
|
||||
import org.apache.hadoop.security.authorize.AuthorizationException;
|
||||
import org.apache.hadoop.security.authorize.PolicyProvider;
|
||||
import org.apache.hadoop.security.authorize.ProxyUsers;
|
||||
@ -1078,7 +1080,6 @@ public class Connection {
|
||||
|
||||
IpcConnectionContextProto connectionContext;
|
||||
String protocolName;
|
||||
boolean useSasl;
|
||||
SaslServer saslServer;
|
||||
private AuthMethod authMethod;
|
||||
private boolean saslContextEstablished;
|
||||
@ -1194,49 +1195,6 @@ private void saslReadAndProcess(byte[] saslToken) throws IOException,
|
||||
if (!saslContextEstablished) {
|
||||
byte[] replyToken = null;
|
||||
try {
|
||||
if (saslServer == null) {
|
||||
switch (authMethod) {
|
||||
case DIGEST:
|
||||
if (secretManager == null) {
|
||||
throw new AccessControlException(
|
||||
"Server is not configured to do DIGEST authentication.");
|
||||
}
|
||||
secretManager.checkAvailableForRead();
|
||||
saslServer = Sasl.createSaslServer(AuthMethod.DIGEST
|
||||
.getMechanismName(), null, SaslRpcServer.SASL_DEFAULT_REALM,
|
||||
SaslRpcServer.SASL_PROPS, new SaslDigestCallbackHandler(
|
||||
secretManager, this));
|
||||
break;
|
||||
default:
|
||||
UserGroupInformation current = UserGroupInformation
|
||||
.getCurrentUser();
|
||||
String fullName = current.getUserName();
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Kerberos principal name is " + fullName);
|
||||
final String names[] = SaslRpcServer.splitKerberosName(fullName);
|
||||
if (names.length != 3) {
|
||||
throw new AccessControlException(
|
||||
"Kerberos principal name does NOT have the expected "
|
||||
+ "hostname part: " + fullName);
|
||||
}
|
||||
current.doAs(new PrivilegedExceptionAction<Object>() {
|
||||
@Override
|
||||
public Object run() throws SaslException {
|
||||
saslServer = Sasl.createSaslServer(AuthMethod.KERBEROS
|
||||
.getMechanismName(), names[0], names[1],
|
||||
SaslRpcServer.SASL_PROPS, new SaslGssCallbackHandler());
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}
|
||||
if (saslServer == null)
|
||||
throw new AccessControlException(
|
||||
"Unable to find SASL server implementation for "
|
||||
+ authMethod.getMechanismName());
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Created SASL server with mechanism = "
|
||||
+ authMethod.getMechanismName());
|
||||
}
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Have read input token of size " + saslToken.length
|
||||
+ " for processing by saslServer.evaluateResponse()");
|
||||
@ -1375,38 +1333,27 @@ public int readAndProcess() throws IOException, InterruptedException {
|
||||
dataLengthBuffer.clear();
|
||||
if (authMethod == null) {
|
||||
throw new IOException("Unable to read authentication method");
|
||||
}
|
||||
}
|
||||
boolean useSaslServer = isSecurityEnabled;
|
||||
final boolean clientUsingSasl;
|
||||
switch (authMethod) {
|
||||
case SIMPLE: { // no sasl for simple
|
||||
if (isSecurityEnabled) {
|
||||
AccessControlException ae = new AccessControlException("Authorization ("
|
||||
+ CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION
|
||||
+ ") is enabled but authentication ("
|
||||
+ CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION
|
||||
+ ") is configured as simple. Please configure another method "
|
||||
+ "like kerberos or digest.");
|
||||
setupResponse(authFailedResponse, authFailedCall, RpcStatusProto.FATAL,
|
||||
null, ae.getClass().getName(), ae.getMessage());
|
||||
responder.doRespond(authFailedCall);
|
||||
throw ae;
|
||||
}
|
||||
clientUsingSasl = false;
|
||||
useSasl = false;
|
||||
break;
|
||||
}
|
||||
case DIGEST: {
|
||||
case DIGEST: { // always allow tokens if there's a secret manager
|
||||
useSaslServer |= (secretManager != null);
|
||||
clientUsingSasl = true;
|
||||
useSasl = (secretManager != null);
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
clientUsingSasl = true;
|
||||
useSasl = isSecurityEnabled;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (clientUsingSasl && !useSasl) {
|
||||
}
|
||||
if (useSaslServer) {
|
||||
saslServer = createSaslServer(authMethod);
|
||||
} else if (clientUsingSasl) { // security is off
|
||||
doSaslReply(SaslStatus.SUCCESS, new IntWritable(
|
||||
SaslRpcServer.SWITCH_TO_SIMPLE_AUTH), null, null);
|
||||
authMethod = AuthMethod.SIMPLE;
|
||||
@ -1448,7 +1395,7 @@ public int readAndProcess() throws IOException, InterruptedException {
|
||||
continue;
|
||||
}
|
||||
boolean isHeaderRead = connectionContextRead;
|
||||
if (useSasl) {
|
||||
if (saslServer != null) {
|
||||
saslReadAndProcess(data.array());
|
||||
} else {
|
||||
processOneRpc(data.array());
|
||||
@ -1462,6 +1409,84 @@ public int readAndProcess() throws IOException, InterruptedException {
|
||||
}
|
||||
}
|
||||
|
||||
private SaslServer createSaslServer(AuthMethod authMethod)
|
||||
throws IOException {
|
||||
try {
|
||||
return createSaslServerInternal(authMethod);
|
||||
} catch (IOException ioe) {
|
||||
final String ioeClass = ioe.getClass().getName();
|
||||
final String ioeMessage = ioe.getLocalizedMessage();
|
||||
if (authMethod == AuthMethod.SIMPLE) {
|
||||
setupResponse(authFailedResponse, authFailedCall,
|
||||
RpcStatusProto.FATAL, null, ioeClass, ioeMessage);
|
||||
responder.doRespond(authFailedCall);
|
||||
} else {
|
||||
doSaslReply(SaslStatus.ERROR, null, ioeClass, ioeMessage);
|
||||
}
|
||||
throw ioe;
|
||||
}
|
||||
}
|
||||
|
||||
private SaslServer createSaslServerInternal(AuthMethod authMethod)
|
||||
throws IOException {
|
||||
SaslServer saslServer = null;
|
||||
String hostname = null;
|
||||
String saslProtocol = null;
|
||||
CallbackHandler saslCallback = null;
|
||||
|
||||
switch (authMethod) {
|
||||
case SIMPLE: {
|
||||
throw new AccessControlException("Authorization ("
|
||||
+ CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION
|
||||
+ ") is enabled but authentication ("
|
||||
+ CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION
|
||||
+ ") is configured as simple. Please configure another method "
|
||||
+ "like kerberos or digest.");
|
||||
}
|
||||
case DIGEST: {
|
||||
if (secretManager == null) {
|
||||
throw new AccessControlException(
|
||||
"Server is not configured to do DIGEST authentication.");
|
||||
}
|
||||
secretManager.checkAvailableForRead();
|
||||
hostname = SaslRpcServer.SASL_DEFAULT_REALM;
|
||||
saslCallback = new SaslDigestCallbackHandler(secretManager, this);
|
||||
break;
|
||||
}
|
||||
case KERBEROS: {
|
||||
String fullName = UserGroupInformation.getCurrentUser().getUserName();
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Kerberos principal name is " + fullName);
|
||||
KerberosName krbName = new KerberosName(fullName);
|
||||
hostname = krbName.getHostName();
|
||||
if (hostname == null) {
|
||||
throw new AccessControlException(
|
||||
"Kerberos principal name does NOT have the expected "
|
||||
+ "hostname part: " + fullName);
|
||||
}
|
||||
saslProtocol = krbName.getServiceName();
|
||||
saslCallback = new SaslGssCallbackHandler();
|
||||
break;
|
||||
}
|
||||
default:
|
||||
throw new AccessControlException(
|
||||
"Server does not support SASL " + authMethod);
|
||||
}
|
||||
|
||||
String mechanism = authMethod.getMechanismName();
|
||||
saslServer = Sasl.createSaslServer(
|
||||
mechanism, saslProtocol, hostname,
|
||||
SaslRpcServer.SASL_PROPS, saslCallback);
|
||||
if (saslServer == null) {
|
||||
throw new AccessControlException(
|
||||
"Unable to find SASL server implementation for " + mechanism);
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Created SASL server with mechanism = " + mechanism);
|
||||
}
|
||||
return saslServer;
|
||||
}
|
||||
|
||||
/**
|
||||
* Try to set up the response to indicate that the client version
|
||||
* is incompatible with the server. This can contain special-case
|
||||
@ -1523,7 +1548,7 @@ private void processConnectionContext(byte[] buf) throws IOException {
|
||||
.getProtocol() : null;
|
||||
|
||||
UserGroupInformation protocolUser = ProtoUtil.getUgi(connectionContext);
|
||||
if (!useSasl) {
|
||||
if (saslServer == null) {
|
||||
user = protocolUser;
|
||||
if (user != null) {
|
||||
user.setAuthenticationMethod(AuthMethod.SIMPLE);
|
||||
@ -1999,7 +2024,7 @@ private void setupResponseOldVersionFatal(ByteArrayOutputStream response,
|
||||
|
||||
private void wrapWithSasl(ByteArrayOutputStream response, Call call)
|
||||
throws IOException {
|
||||
if (call.connection.useSasl) {
|
||||
if (call.connection.saslServer != null) {
|
||||
byte[] token = response.toByteArray();
|
||||
// synchronization may be needed since there can be multiple Handler
|
||||
// threads using saslServer to wrap responses.
|
||||
|
@ -53,6 +53,7 @@
|
||||
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
||||
|
||||
import org.apache.log4j.Level;
|
||||
import org.apache.tools.ant.types.Assertions.EnabledAssertion;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
@ -69,8 +70,8 @@ public class TestSaslRPC {
|
||||
static final String SERVER_KEYTAB_KEY = "test.ipc.server.keytab";
|
||||
static final String SERVER_PRINCIPAL_1 = "p1/foo@BAR";
|
||||
static final String SERVER_PRINCIPAL_2 = "p2/foo@BAR";
|
||||
|
||||
private static Configuration conf;
|
||||
static Boolean forceSecretManager = null;
|
||||
|
||||
@BeforeClass
|
||||
public static void setupKerb() {
|
||||
@ -83,6 +84,7 @@ public void setup() {
|
||||
conf = new Configuration();
|
||||
SecurityUtil.setAuthenticationMethod(KERBEROS, conf);
|
||||
UserGroupInformation.setConfiguration(conf);
|
||||
forceSecretManager = null;
|
||||
}
|
||||
|
||||
static {
|
||||
@ -266,16 +268,6 @@ public void testDigestRpcWithoutAnnotation() throws Exception {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSecureToInsecureRpc() throws Exception {
|
||||
SecurityUtil.setAuthenticationMethod(AuthenticationMethod.SIMPLE, conf);
|
||||
Server server = new RPC.Builder(conf).setProtocol(TestSaslProtocol.class)
|
||||
.setInstance(new TestSaslImpl()).setBindAddress(ADDRESS).setPort(0)
|
||||
.setNumHandlers(5).setVerbose(true).build();
|
||||
TestTokenSecretManager sm = new TestTokenSecretManager();
|
||||
doDigestRpc(server, sm);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testErrorMessage() throws Exception {
|
||||
BadTokenSecretManager sm = new BadTokenSecretManager();
|
||||
@ -463,6 +455,8 @@ static void testKerberosRpc(String principal, String keytab) throws Exception {
|
||||
"Failed to specify server's Kerberos principal name.*");
|
||||
private static Pattern Denied =
|
||||
Pattern.compile(".*Authorization .* is enabled .*");
|
||||
private static Pattern NoDigest =
|
||||
Pattern.compile(".*Server is not configured to do DIGEST auth.*");
|
||||
|
||||
/*
|
||||
* simple server
|
||||
@ -479,6 +473,9 @@ public void testSimpleServerWithTokens() throws Exception {
|
||||
// Tokens are ignored because client is reverted to simple
|
||||
assertAuthEquals(SIMPLE, getAuthMethod(SIMPLE, SIMPLE, true));
|
||||
assertAuthEquals(SIMPLE, getAuthMethod(KERBEROS, SIMPLE, true));
|
||||
forceSecretManager = true;
|
||||
assertAuthEquals(SIMPLE, getAuthMethod(SIMPLE, SIMPLE, true));
|
||||
assertAuthEquals(SIMPLE, getAuthMethod(KERBEROS, SIMPLE, true));
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -486,6 +483,9 @@ public void testSimpleServerWithInvalidTokens() throws Exception {
|
||||
// Tokens are ignored because client is reverted to simple
|
||||
assertAuthEquals(SIMPLE, getAuthMethod(SIMPLE, SIMPLE, false));
|
||||
assertAuthEquals(SIMPLE, getAuthMethod(KERBEROS, SIMPLE, false));
|
||||
forceSecretManager = true;
|
||||
assertAuthEquals(SIMPLE, getAuthMethod(SIMPLE, SIMPLE, false));
|
||||
assertAuthEquals(SIMPLE, getAuthMethod(KERBEROS, SIMPLE, false));
|
||||
}
|
||||
|
||||
/*
|
||||
@ -502,12 +502,19 @@ public void testKerberosServerWithTokens() throws Exception {
|
||||
// can use tokens regardless of auth
|
||||
assertAuthEquals(TOKEN, getAuthMethod(SIMPLE, KERBEROS, true));
|
||||
assertAuthEquals(TOKEN, getAuthMethod(KERBEROS, KERBEROS, true));
|
||||
// can't fallback to simple when using kerberos w/o tokens
|
||||
forceSecretManager = false;
|
||||
assertAuthEquals(NoDigest, getAuthMethod(SIMPLE, KERBEROS, true));
|
||||
assertAuthEquals(NoDigest, getAuthMethod(KERBEROS, KERBEROS, true));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testKerberosServerWithInvalidTokens() throws Exception {
|
||||
assertAuthEquals(BadToken, getAuthMethod(SIMPLE, KERBEROS, false));
|
||||
assertAuthEquals(BadToken, getAuthMethod(KERBEROS, KERBEROS, false));
|
||||
forceSecretManager = false;
|
||||
assertAuthEquals(NoDigest, getAuthMethod(SIMPLE, KERBEROS, true));
|
||||
assertAuthEquals(NoDigest, getAuthMethod(KERBEROS, KERBEROS, true));
|
||||
}
|
||||
|
||||
|
||||
@ -551,6 +558,12 @@ private String internalGetAuthMethod(
|
||||
serverUgi.setAuthenticationMethod(serverAuth);
|
||||
|
||||
final TestTokenSecretManager sm = new TestTokenSecretManager();
|
||||
boolean useSecretManager = (serverAuth != SIMPLE);
|
||||
if (forceSecretManager != null) {
|
||||
useSecretManager &= forceSecretManager.booleanValue();
|
||||
}
|
||||
final SecretManager<?> serverSm = useSecretManager ? sm : null;
|
||||
|
||||
Server server = serverUgi.doAs(new PrivilegedExceptionAction<Server>() {
|
||||
@Override
|
||||
public Server run() throws IOException {
|
||||
@ -558,7 +571,7 @@ public Server run() throws IOException {
|
||||
.setProtocol(TestSaslProtocol.class)
|
||||
.setInstance(new TestSaslImpl()).setBindAddress(ADDRESS).setPort(0)
|
||||
.setNumHandlers(5).setVerbose(true)
|
||||
.setSecretManager((serverAuth != SIMPLE) ? sm : null)
|
||||
.setSecretManager(serverSm)
|
||||
.build();
|
||||
server.start();
|
||||
return server;
|
||||
@ -587,7 +600,6 @@ public Server run() throws IOException {
|
||||
clientUgi.addToken(token);
|
||||
}
|
||||
|
||||
|
||||
try {
|
||||
return clientUgi.doAs(new PrivilegedExceptionAction<String>() {
|
||||
@Override
|
||||
@ -597,6 +609,12 @@ public String run() throws IOException {
|
||||
proxy = (TestSaslProtocol) RPC.getProxy(TestSaslProtocol.class,
|
||||
TestSaslProtocol.versionID, addr, clientConf);
|
||||
|
||||
proxy.ping();
|
||||
// verify sasl completed
|
||||
if (serverAuth != SIMPLE) {
|
||||
assertEquals(SaslRpcServer.SASL_PROPS.get(Sasl.QOP), "auth");
|
||||
}
|
||||
|
||||
// make sure the other side thinks we are who we said we are!!!
|
||||
assertEquals(clientUgi.getUserName(), proxy.getAuthUser());
|
||||
return proxy.getAuthMethod().toString();
|
||||
|
Loading…
Reference in New Issue
Block a user