HADOOP-10070. RPC client doesn't use per-connection conf to determine server's expected Kerberos principal name. Contributed by Aaron T. Myers.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1570776 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Aaron Myers 2014-02-22 01:09:54 +00:00
parent 87dce7b2d2
commit e69614d650
4 changed files with 43 additions and 21 deletions

View File

@ -406,6 +406,9 @@ Release 2.4.0 - UNRELEASED
HADOOP-10355. Fix TestLoadGenerator#testLoadGenerator. (Haohui Mai via jing9)
HADOOP-10070. RPC client doesn't use per-connection conf to determine
server's expected Kerberos principal name. (atm)
Release 2.3.1 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -542,8 +542,11 @@ private synchronized boolean shouldAuthenticateOverKrb() throws IOException {
private synchronized AuthMethod setupSaslConnection(final InputStream in2,
final OutputStream out2) throws IOException, InterruptedException {
// Do not use Client.conf here! We must use ConnectionId.conf, since the
// Client object is cached and shared between all RPC clients, even those
// for separate services.
saslRpcClient = new SaslRpcClient(remoteId.getTicket(),
remoteId.getProtocol(), remoteId.getAddress(), conf);
remoteId.getProtocol(), remoteId.getAddress(), remoteId.conf);
return saslRpcClient.saslConnect(in2, out2);
}
@ -1480,21 +1483,31 @@ public static class ConnectionId {
private final boolean doPing; //do we need to send ping message
private final int pingInterval; // how often sends ping to the server in msecs
private String saslQop; // here for testing
private final Configuration conf; // used to get the expected kerberos principal name
ConnectionId(InetSocketAddress address, Class<?> protocol,
UserGroupInformation ticket, int rpcTimeout, int maxIdleTime,
RetryPolicy connectionRetryPolicy, int maxRetriesOnSocketTimeouts,
boolean tcpNoDelay, boolean doPing, int pingInterval) {
UserGroupInformation ticket, int rpcTimeout,
RetryPolicy connectionRetryPolicy, Configuration conf) {
this.protocol = protocol;
this.address = address;
this.ticket = ticket;
this.rpcTimeout = rpcTimeout;
this.maxIdleTime = maxIdleTime;
this.connectionRetryPolicy = connectionRetryPolicy;
this.maxRetriesOnSocketTimeouts = maxRetriesOnSocketTimeouts;
this.tcpNoDelay = tcpNoDelay;
this.doPing = doPing;
this.pingInterval = pingInterval;
this.maxIdleTime = conf.getInt(
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_DEFAULT);
this.maxRetriesOnSocketTimeouts = conf.getInt(
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT);
this.tcpNoDelay = conf.getBoolean(
CommonConfigurationKeysPublic.IPC_CLIENT_TCPNODELAY_KEY,
CommonConfigurationKeysPublic.IPC_CLIENT_TCPNODELAY_DEFAULT);
this.doPing = conf.getBoolean(
CommonConfigurationKeys.IPC_CLIENT_PING_KEY,
CommonConfigurationKeys.IPC_CLIENT_PING_DEFAULT);
this.pingInterval = (doPing ? Client.getPingInterval(conf) : 0);
this.conf = conf;
}
InetSocketAddress getAddress() {
@ -1572,19 +1585,8 @@ static ConnectionId getConnectionId(InetSocketAddress addr,
max, retryInterval, TimeUnit.MILLISECONDS);
}
boolean doPing =
conf.getBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, true);
return new ConnectionId(addr, protocol, ticket, rpcTimeout,
conf.getInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_DEFAULT),
connectionRetryPolicy,
conf.getInt(
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT),
conf.getBoolean(CommonConfigurationKeysPublic.IPC_CLIENT_TCPNODELAY_KEY,
CommonConfigurationKeysPublic.IPC_CLIENT_TCPNODELAY_DEFAULT),
doPing,
(doPing ? Client.getPingInterval(conf) : 0));
connectionRetryPolicy, conf);
}
static boolean isEqual(Object a, Object b) {

View File

@ -59,6 +59,9 @@ public synchronized Client getClient(Configuration conf,
} else {
client.incCount();
}
if (Client.LOG.isDebugEnabled()) {
Client.LOG.debug("getting client out of cache: " + client);
}
return client;
}
@ -90,13 +93,23 @@ public synchronized Client getClient(Configuration conf, SocketFactory factory)
* A RPC client is closed only when its reference count becomes zero.
*/
public void stopClient(Client client) {
if (Client.LOG.isDebugEnabled()) {
Client.LOG.debug("stopping client from cache: " + client);
}
synchronized (this) {
client.decCount();
if (client.isZeroReference()) {
if (Client.LOG.isDebugEnabled()) {
Client.LOG.debug("removing client from cache: " + client);
}
clients.remove(client.getSocketFactory());
}
}
if (client.isZeroReference()) {
if (Client.LOG.isDebugEnabled()) {
Client.LOG.debug("stopping actual client because no more references remain: "
+ client);
}
client.stop();
}
}

View File

@ -309,6 +309,10 @@ String getServerPrincipal(SaslAuth authType) throws IOException {
// check that the server advertised principal matches our conf
String confPrincipal = SecurityUtil.getServerPrincipal(
conf.get(serverKey), serverAddr.getAddress());
if (LOG.isDebugEnabled()) {
LOG.debug("getting serverKey: " + serverKey + " conf value: " + conf.get(serverKey)
+ " principal: " + confPrincipal);
}
if (confPrincipal == null || confPrincipal.isEmpty()) {
throw new IllegalArgumentException(
"Failed to specify server's Kerberos principal name");