HADOOP-17975. Fallback to simple auth does not work for a secondary DistributedFileSystem instance. (#3579)
(cherry picked from commit ae3ba45db5
)
This commit is contained in:
parent
17b8ad3d48
commit
48e95d8109
@ -807,17 +807,18 @@ public Object run() throws IOException, InterruptedException {
|
||||
*/
|
||||
private synchronized void setupIOstreams(
|
||||
AtomicBoolean fallbackToSimpleAuth) {
|
||||
if (socket != null || shouldCloseConnection.get()) {
|
||||
return;
|
||||
}
|
||||
UserGroupInformation ticket = remoteId.getTicket();
|
||||
if (ticket != null) {
|
||||
final UserGroupInformation realUser = ticket.getRealUser();
|
||||
if (realUser != null) {
|
||||
ticket = realUser;
|
||||
}
|
||||
}
|
||||
try {
|
||||
if (socket != null || shouldCloseConnection.get()) {
|
||||
setFallBackToSimpleAuth(fallbackToSimpleAuth);
|
||||
return;
|
||||
}
|
||||
UserGroupInformation ticket = remoteId.getTicket();
|
||||
if (ticket != null) {
|
||||
final UserGroupInformation realUser = ticket.getRealUser();
|
||||
if (realUser != null) {
|
||||
ticket = realUser;
|
||||
}
|
||||
}
|
||||
connectingThread.set(Thread.currentThread());
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Connecting to "+server);
|
||||
@ -863,20 +864,8 @@ public AuthMethod run()
|
||||
remoteId.saslQop =
|
||||
(String)saslRpcClient.getNegotiatedProperty(Sasl.QOP);
|
||||
LOG.debug("Negotiated QOP is :" + remoteId.saslQop);
|
||||
if (fallbackToSimpleAuth != null) {
|
||||
fallbackToSimpleAuth.set(false);
|
||||
}
|
||||
} else if (UserGroupInformation.isSecurityEnabled()) {
|
||||
if (!fallbackAllowed) {
|
||||
throw new AccessControlException(
|
||||
"Server asks us to fall back to SIMPLE " +
|
||||
"auth, but this client is configured to only allow secure " +
|
||||
"connections.");
|
||||
}
|
||||
if (fallbackToSimpleAuth != null) {
|
||||
fallbackToSimpleAuth.set(true);
|
||||
}
|
||||
}
|
||||
setFallBackToSimpleAuth(fallbackToSimpleAuth);
|
||||
}
|
||||
|
||||
if (doPing) {
|
||||
@ -909,7 +898,41 @@ public AuthMethod run()
|
||||
connectingThread.set(null);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private void setFallBackToSimpleAuth(AtomicBoolean fallbackToSimpleAuth)
|
||||
throws AccessControlException {
|
||||
if (authMethod == null || authProtocol != AuthProtocol.SASL) {
|
||||
if (authProtocol == AuthProtocol.SASL) {
|
||||
LOG.trace("Auth method is not set, yield from setting auth fallback.");
|
||||
}
|
||||
return;
|
||||
}
|
||||
if (fallbackToSimpleAuth == null) {
|
||||
// this should happen only during testing.
|
||||
LOG.trace("Connection {} will skip to set fallbackToSimpleAuth as it is null.", remoteId);
|
||||
} else {
|
||||
if (fallbackToSimpleAuth.get()) {
|
||||
// we already set the value to true, we do not need to examine again.
|
||||
return;
|
||||
}
|
||||
}
|
||||
if (authMethod != AuthMethod.SIMPLE) {
|
||||
if (fallbackToSimpleAuth != null) {
|
||||
LOG.trace("Disabling fallbackToSimpleAuth, target does not use SIMPLE authentication.");
|
||||
fallbackToSimpleAuth.set(false);
|
||||
}
|
||||
} else if (UserGroupInformation.isSecurityEnabled()) {
|
||||
if (!fallbackAllowed) {
|
||||
throw new AccessControlException("Server asks us to fall back to SIMPLE auth, but this "
|
||||
+ "client is configured to only allow secure connections.");
|
||||
}
|
||||
if (fallbackToSimpleAuth != null) {
|
||||
LOG.trace("Enabling fallbackToSimpleAuth for target, as we are allowed to fall back.");
|
||||
fallbackToSimpleAuth.set(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void closeConnection() {
|
||||
if (socket == null) {
|
||||
return;
|
||||
|
@ -22,6 +22,7 @@
|
||||
import org.apache.hadoop.thirdparty.protobuf.RpcController;
|
||||
import org.apache.hadoop.thirdparty.protobuf.ServiceException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
@ -124,18 +125,19 @@ protected static RPC.Server setupTestServer(
|
||||
return server;
|
||||
}
|
||||
|
||||
protected static TestRpcService getClient(InetSocketAddress serverAddr,
|
||||
Configuration clientConf)
|
||||
protected static TestRpcService getClient(InetSocketAddress serverAddr, Configuration clientConf)
|
||||
throws ServiceException {
|
||||
try {
|
||||
return RPC.getProxy(TestRpcService.class, 0, serverAddr, clientConf);
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
return getClient(serverAddr, clientConf, null);
|
||||
}
|
||||
|
||||
protected static TestRpcService getClient(InetSocketAddress serverAddr,
|
||||
Configuration clientConf, final RetryPolicy connectionRetryPolicy)
|
||||
Configuration clientConf, RetryPolicy connectionRetryPolicy) throws ServiceException {
|
||||
return getClient(serverAddr, clientConf, connectionRetryPolicy, null);
|
||||
}
|
||||
|
||||
protected static TestRpcService getClient(InetSocketAddress serverAddr,
|
||||
Configuration clientConf, final RetryPolicy connectionRetryPolicy,
|
||||
AtomicBoolean fallbackToSimpleAuth)
|
||||
throws ServiceException {
|
||||
try {
|
||||
return RPC.getProtocolProxy(
|
||||
@ -146,7 +148,7 @@ protected static TestRpcService getClient(InetSocketAddress serverAddr,
|
||||
clientConf,
|
||||
NetUtils.getDefaultSocketFactory(clientConf),
|
||||
RPC.getRpcTimeout(clientConf),
|
||||
connectionRetryPolicy, null).getProxy();
|
||||
connectionRetryPolicy, fallbackToSimpleAuth).getProxy();
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
|
@ -72,6 +72,7 @@
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
@ -569,6 +570,72 @@ public void testSimpleServer() throws Exception {
|
||||
assertAuthEquals(SIMPLE, getAuthMethod(KERBEROS, SIMPLE, UseToken.OTHER));
|
||||
}
|
||||
|
||||
/**
|
||||
* In DfsClient there is a fallback mechanism to simple auth, which passes in an atomic boolean
|
||||
* to the ipc Client, which then sets it during setupIOStreams.
|
||||
* SetupIOStreams were running only once per connection, so if two separate DfsClient was
|
||||
* instantiated, then due to the connection caching inside the ipc client, the second DfsClient
|
||||
* did not have the passed in atomic boolean set properly if the first client was not yet closed,
|
||||
* as setupIOStreams was yielding to set up new streams as it has reused the already existing
|
||||
* connection.
|
||||
* This test mimics this behaviour, and asserts the fallback whether it is set correctly.
|
||||
* @see <a href="https://issues.apache.org/jira/browse/HADOOP-17975">HADOOP-17975</a>
|
||||
*/
|
||||
@Test
|
||||
public void testClientFallbackToSimpleAuthForASecondClient() throws Exception {
|
||||
Configuration serverConf = createConfForAuth(SIMPLE);
|
||||
Server server = startServer(serverConf,
|
||||
setupServerUgi(SIMPLE, serverConf),
|
||||
createServerSecretManager(SIMPLE, new TestTokenSecretManager()));
|
||||
final InetSocketAddress serverAddress = NetUtils.getConnectAddress(server);
|
||||
|
||||
clientFallBackToSimpleAllowed = true;
|
||||
Configuration clientConf = createConfForAuth(KERBEROS);
|
||||
UserGroupInformation clientUgi = setupClientUgi(KERBEROS, clientConf);
|
||||
|
||||
AtomicBoolean fallbackToSimpleAuth1 = new AtomicBoolean();
|
||||
AtomicBoolean fallbackToSimpleAuth2 = new AtomicBoolean();
|
||||
try {
|
||||
LOG.info("trying ugi:"+ clientUgi +" tokens:"+ clientUgi.getTokens());
|
||||
clientUgi.doAs((PrivilegedExceptionAction<Void>) () -> {
|
||||
TestRpcService proxy1 = null;
|
||||
TestRpcService proxy2 = null;
|
||||
try {
|
||||
proxy1 = getClient(serverAddress, clientConf, null, fallbackToSimpleAuth1);
|
||||
proxy1.ping(null, newEmptyRequest());
|
||||
// make sure the other side thinks we are who we said we are!!!
|
||||
assertEquals(clientUgi.getUserName(),
|
||||
proxy1.getAuthUser(null, newEmptyRequest()).getUser());
|
||||
AuthMethod authMethod =
|
||||
convert(proxy1.getAuthMethod(null, newEmptyRequest()));
|
||||
assertAuthEquals(SIMPLE, authMethod.toString());
|
||||
|
||||
proxy2 = getClient(serverAddress, clientConf, null, fallbackToSimpleAuth2);
|
||||
proxy2.ping(null, newEmptyRequest());
|
||||
// make sure the other side thinks we are who we said we are!!!
|
||||
assertEquals(clientUgi.getUserName(),
|
||||
proxy2.getAuthUser(null, newEmptyRequest()).getUser());
|
||||
AuthMethod authMethod2 =
|
||||
convert(proxy2.getAuthMethod(null, newEmptyRequest()));
|
||||
assertAuthEquals(SIMPLE, authMethod2.toString());
|
||||
} finally {
|
||||
if (proxy1 != null) {
|
||||
RPC.stopProxy(proxy1);
|
||||
}
|
||||
if (proxy2 != null) {
|
||||
RPC.stopProxy(proxy2);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
});
|
||||
} finally {
|
||||
server.stop();
|
||||
}
|
||||
|
||||
assertTrue("First client does not set to fall back properly.", fallbackToSimpleAuth1.get());
|
||||
assertTrue("Second client does not set to fall back properly.", fallbackToSimpleAuth2.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNoClientFallbackToSimple()
|
||||
throws Exception {
|
||||
@ -815,22 +882,44 @@ private String getAuthMethod(
|
||||
return e.toString();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private String internalGetAuthMethod(
|
||||
final AuthMethod clientAuth,
|
||||
final AuthMethod serverAuth,
|
||||
final UseToken tokenType) throws Exception {
|
||||
|
||||
final Configuration serverConf = new Configuration(conf);
|
||||
serverConf.set(HADOOP_SECURITY_AUTHENTICATION, serverAuth.toString());
|
||||
UserGroupInformation.setConfiguration(serverConf);
|
||||
|
||||
final UserGroupInformation serverUgi = (serverAuth == KERBEROS)
|
||||
? UserGroupInformation.createRemoteUser("server/localhost@NONE")
|
||||
: UserGroupInformation.createRemoteUser("server");
|
||||
serverUgi.setAuthenticationMethod(serverAuth);
|
||||
|
||||
final TestTokenSecretManager sm = new TestTokenSecretManager();
|
||||
|
||||
Configuration serverConf = createConfForAuth(serverAuth);
|
||||
Server server = startServer(
|
||||
serverConf,
|
||||
setupServerUgi(serverAuth, serverConf),
|
||||
createServerSecretManager(serverAuth, sm));
|
||||
final InetSocketAddress serverAddress = NetUtils.getConnectAddress(server);
|
||||
|
||||
final Configuration clientConf = createConfForAuth(clientAuth);
|
||||
final UserGroupInformation clientUgi = setupClientUgi(clientAuth, clientConf);
|
||||
|
||||
setupTokenIfNeeded(tokenType, sm, clientUgi, serverAddress);
|
||||
|
||||
try {
|
||||
return createClientAndQueryAuthMethod(serverAddress, clientConf, clientUgi, null);
|
||||
} finally {
|
||||
server.stop();
|
||||
}
|
||||
}
|
||||
|
||||
private Configuration createConfForAuth(AuthMethod clientAuth) {
|
||||
final Configuration clientConf = new Configuration(conf);
|
||||
clientConf.set(HADOOP_SECURITY_AUTHENTICATION, clientAuth.toString());
|
||||
clientConf.setBoolean(
|
||||
CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
|
||||
clientFallBackToSimpleAllowed);
|
||||
return clientConf;
|
||||
}
|
||||
|
||||
private SecretManager<?> createServerSecretManager(
|
||||
AuthMethod serverAuth, TestTokenSecretManager sm) {
|
||||
boolean useSecretManager = (serverAuth != SIMPLE);
|
||||
if (enableSecretManager != null) {
|
||||
useSecretManager &= enableSecretManager;
|
||||
@ -839,26 +928,43 @@ private String internalGetAuthMethod(
|
||||
useSecretManager |= forceSecretManager;
|
||||
}
|
||||
final SecretManager<?> serverSm = useSecretManager ? sm : null;
|
||||
return serverSm;
|
||||
}
|
||||
|
||||
private Server startServer(Configuration serverConf, UserGroupInformation serverUgi,
|
||||
SecretManager<?> serverSm) throws IOException, InterruptedException {
|
||||
Server server = serverUgi.doAs(new PrivilegedExceptionAction<Server>() {
|
||||
@Override
|
||||
public Server run() throws IOException {
|
||||
return setupTestServer(serverConf, 5, serverSm);
|
||||
}
|
||||
});
|
||||
return server;
|
||||
}
|
||||
|
||||
final Configuration clientConf = new Configuration(conf);
|
||||
clientConf.set(HADOOP_SECURITY_AUTHENTICATION, clientAuth.toString());
|
||||
clientConf.setBoolean(
|
||||
CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
|
||||
clientFallBackToSimpleAllowed);
|
||||
private UserGroupInformation setupServerUgi(AuthMethod serverAuth,
|
||||
Configuration serverConf) {
|
||||
UserGroupInformation.setConfiguration(serverConf);
|
||||
|
||||
final UserGroupInformation serverUgi = (serverAuth == KERBEROS)
|
||||
? UserGroupInformation.createRemoteUser("server/localhost@NONE")
|
||||
: UserGroupInformation.createRemoteUser("server");
|
||||
serverUgi.setAuthenticationMethod(serverAuth);
|
||||
return serverUgi;
|
||||
}
|
||||
|
||||
private UserGroupInformation setupClientUgi(AuthMethod clientAuth,
|
||||
Configuration clientConf) {
|
||||
UserGroupInformation.setConfiguration(clientConf);
|
||||
|
||||
|
||||
final UserGroupInformation clientUgi =
|
||||
UserGroupInformation.createRemoteUser("client");
|
||||
clientUgi.setAuthenticationMethod(clientAuth);
|
||||
clientUgi.setAuthenticationMethod(clientAuth);
|
||||
return clientUgi;
|
||||
}
|
||||
|
||||
final InetSocketAddress addr = NetUtils.getConnectAddress(server);
|
||||
private void setupTokenIfNeeded(UseToken tokenType, TestTokenSecretManager sm,
|
||||
UserGroupInformation clientUgi, InetSocketAddress addr) {
|
||||
if (tokenType != UseToken.NONE) {
|
||||
TestTokenIdentifier tokenId = new TestTokenIdentifier(
|
||||
new Text(clientUgi.getUserName()));
|
||||
@ -881,44 +987,44 @@ public Server run() throws IOException {
|
||||
}
|
||||
clientUgi.addToken(token);
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
LOG.info("trying ugi:"+clientUgi+" tokens:"+clientUgi.getTokens());
|
||||
return clientUgi.doAs(new PrivilegedExceptionAction<String>() {
|
||||
@Override
|
||||
public String run() throws IOException {
|
||||
TestRpcService proxy = null;
|
||||
try {
|
||||
proxy = getClient(addr, clientConf);
|
||||
private String createClientAndQueryAuthMethod(InetSocketAddress serverAddress,
|
||||
Configuration clientConf, UserGroupInformation clientUgi, AtomicBoolean fallbackToSimpleAuth)
|
||||
throws IOException, InterruptedException {
|
||||
LOG.info("trying ugi:"+ clientUgi +" tokens:"+ clientUgi.getTokens());
|
||||
return clientUgi.doAs(new PrivilegedExceptionAction<String>() {
|
||||
@Override
|
||||
public String run() throws IOException {
|
||||
TestRpcService proxy = null;
|
||||
try {
|
||||
proxy = getClient(serverAddress, clientConf, null, fallbackToSimpleAuth);
|
||||
|
||||
proxy.ping(null, newEmptyRequest());
|
||||
// make sure the other side thinks we are who we said we are!!!
|
||||
assertEquals(clientUgi.getUserName(),
|
||||
proxy.getAuthUser(null, newEmptyRequest()).getUser());
|
||||
AuthMethod authMethod =
|
||||
convert(proxy.getAuthMethod(null, newEmptyRequest()));
|
||||
// verify sasl completed with correct QOP
|
||||
assertEquals((authMethod != SIMPLE) ? expectedQop.saslQop : null,
|
||||
RPC.getConnectionIdForProxy(proxy).getSaslQop());
|
||||
return authMethod != null ? authMethod.toString() : null;
|
||||
} catch (ServiceException se) {
|
||||
if (se.getCause() instanceof RemoteException) {
|
||||
throw (RemoteException) se.getCause();
|
||||
} else if (se.getCause() instanceof IOException) {
|
||||
throw (IOException) se.getCause();
|
||||
} else {
|
||||
throw new RuntimeException(se.getCause());
|
||||
}
|
||||
} finally {
|
||||
if (proxy != null) {
|
||||
RPC.stopProxy(proxy);
|
||||
}
|
||||
proxy.ping(null, newEmptyRequest());
|
||||
// make sure the other side thinks we are who we said we are!!!
|
||||
assertEquals(clientUgi.getUserName(),
|
||||
proxy.getAuthUser(null, newEmptyRequest()).getUser());
|
||||
AuthMethod authMethod =
|
||||
convert(proxy.getAuthMethod(null, newEmptyRequest()));
|
||||
// verify sasl completed with correct QOP
|
||||
assertEquals((authMethod != SIMPLE) ? expectedQop.saslQop : null,
|
||||
RPC.getConnectionIdForProxy(proxy).getSaslQop());
|
||||
return authMethod != null ? authMethod.toString() : null;
|
||||
} catch (ServiceException se) {
|
||||
if (se.getCause() instanceof RemoteException) {
|
||||
throw (RemoteException) se.getCause();
|
||||
} else if (se.getCause() instanceof IOException) {
|
||||
throw (IOException) se.getCause();
|
||||
} else {
|
||||
throw new RuntimeException(se.getCause());
|
||||
}
|
||||
} finally {
|
||||
if (proxy != null) {
|
||||
RPC.stopProxy(proxy);
|
||||
}
|
||||
}
|
||||
});
|
||||
} finally {
|
||||
server.stop();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private static void assertAuthEquals(AuthMethod expect,
|
||||
|
Loading…
Reference in New Issue
Block a user