YARN-6602. Impersonation does not work if standby RM is contacted first (rkanter)
This commit is contained in:
parent
66bba8c024
commit
9855225a79
@ -48,7 +48,6 @@
|
|||||||
@InterfaceStability.Stable
|
@InterfaceStability.Stable
|
||||||
public class ClientRMProxy<T> extends RMProxy<T> {
|
public class ClientRMProxy<T> extends RMProxy<T> {
|
||||||
private static final Log LOG = LogFactory.getLog(ClientRMProxy.class);
|
private static final Log LOG = LogFactory.getLog(ClientRMProxy.class);
|
||||||
private static final ClientRMProxy INSTANCE = new ClientRMProxy();
|
|
||||||
|
|
||||||
private interface ClientRMProtocols extends ApplicationClientProtocol,
|
private interface ClientRMProtocols extends ApplicationClientProtocol,
|
||||||
ApplicationMasterProtocol, ResourceManagerAdministrationProtocol {
|
ApplicationMasterProtocol, ResourceManagerAdministrationProtocol {
|
||||||
@ -69,7 +68,8 @@ private ClientRMProxy(){
|
|||||||
*/
|
*/
|
||||||
public static <T> T createRMProxy(final Configuration configuration,
|
public static <T> T createRMProxy(final Configuration configuration,
|
||||||
final Class<T> protocol) throws IOException {
|
final Class<T> protocol) throws IOException {
|
||||||
return createRMProxy(configuration, protocol, INSTANCE);
|
ClientRMProxy<T> clientRMProxy = new ClientRMProxy<>();
|
||||||
|
return createRMProxy(configuration, protocol, clientRMProxy);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void setAMRMTokenService(final Configuration conf)
|
private static void setAMRMTokenService(final Configuration conf)
|
||||||
|
@ -74,7 +74,7 @@ public void init(Configuration configuration, RMProxy<T> rmProxy,
|
|||||||
protected T getProxyInternal() {
|
protected T getProxyInternal() {
|
||||||
try {
|
try {
|
||||||
final InetSocketAddress rmAddress = rmProxy.getRMAddress(conf, protocol);
|
final InetSocketAddress rmAddress = rmProxy.getRMAddress(conf, protocol);
|
||||||
return RMProxy.getProxy(conf, protocol, rmAddress);
|
return rmProxy.getProxy(conf, protocol, rmAddress);
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
LOG.error("Unable to create proxy to the ResourceManager " +
|
LOG.error("Unable to create proxy to the ResourceManager " +
|
||||||
rmServiceIds[currentProxyIndex], ioe);
|
rmServiceIds[currentProxyIndex], ioe);
|
||||||
|
@ -57,8 +57,15 @@
|
|||||||
public class RMProxy<T> {
|
public class RMProxy<T> {
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(RMProxy.class);
|
private static final Log LOG = LogFactory.getLog(RMProxy.class);
|
||||||
|
private UserGroupInformation user;
|
||||||
|
|
||||||
protected RMProxy() {}
|
protected RMProxy() {
|
||||||
|
try {
|
||||||
|
this.user = UserGroupInformation.getCurrentUser();
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
throw new YarnRuntimeException("Unable to determine user", ioe);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Verify the passed protocol is supported.
|
* Verify the passed protocol is supported.
|
||||||
@ -86,7 +93,7 @@ protected InetSocketAddress getRMAddress(
|
|||||||
*/
|
*/
|
||||||
@Private
|
@Private
|
||||||
protected static <T> T createRMProxy(final Configuration configuration,
|
protected static <T> T createRMProxy(final Configuration configuration,
|
||||||
final Class<T> protocol, RMProxy instance) throws IOException {
|
final Class<T> protocol, RMProxy<T> instance) throws IOException {
|
||||||
YarnConfiguration conf = (configuration instanceof YarnConfiguration)
|
YarnConfiguration conf = (configuration instanceof YarnConfiguration)
|
||||||
? (YarnConfiguration) configuration
|
? (YarnConfiguration) configuration
|
||||||
: new YarnConfiguration(configuration);
|
: new YarnConfiguration(configuration);
|
||||||
@ -103,7 +110,7 @@ protected static <T> T createRMProxy(final Configuration configuration,
|
|||||||
*/
|
*/
|
||||||
@Private
|
@Private
|
||||||
protected static <T> T createRMProxy(final Configuration configuration,
|
protected static <T> T createRMProxy(final Configuration configuration,
|
||||||
final Class<T> protocol, RMProxy instance, final long retryTime,
|
final Class<T> protocol, RMProxy<T> instance, final long retryTime,
|
||||||
final long retryInterval) throws IOException {
|
final long retryInterval) throws IOException {
|
||||||
YarnConfiguration conf = (configuration instanceof YarnConfiguration)
|
YarnConfiguration conf = (configuration instanceof YarnConfiguration)
|
||||||
? (YarnConfiguration) configuration
|
? (YarnConfiguration) configuration
|
||||||
@ -114,7 +121,7 @@ protected static <T> T createRMProxy(final Configuration configuration,
|
|||||||
}
|
}
|
||||||
|
|
||||||
private static <T> T newProxyInstance(final YarnConfiguration conf,
|
private static <T> T newProxyInstance(final YarnConfiguration conf,
|
||||||
final Class<T> protocol, RMProxy instance, RetryPolicy retryPolicy)
|
final Class<T> protocol, RMProxy<T> instance, RetryPolicy retryPolicy)
|
||||||
throws IOException{
|
throws IOException{
|
||||||
if (HAUtil.isHAEnabled(conf)) {
|
if (HAUtil.isHAEnabled(conf)) {
|
||||||
RMFailoverProxyProvider<T> provider =
|
RMFailoverProxyProvider<T> provider =
|
||||||
@ -123,44 +130,20 @@ private static <T> T newProxyInstance(final YarnConfiguration conf,
|
|||||||
} else {
|
} else {
|
||||||
InetSocketAddress rmAddress = instance.getRMAddress(conf, protocol);
|
InetSocketAddress rmAddress = instance.getRMAddress(conf, protocol);
|
||||||
LOG.info("Connecting to ResourceManager at " + rmAddress);
|
LOG.info("Connecting to ResourceManager at " + rmAddress);
|
||||||
T proxy = RMProxy.<T>getProxy(conf, protocol, rmAddress);
|
T proxy = instance.getProxy(conf, protocol, rmAddress);
|
||||||
return (T) RetryProxy.create(protocol, proxy, retryPolicy);
|
return (T) RetryProxy.create(protocol, proxy, retryPolicy);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* @deprecated
|
|
||||||
* This method is deprecated and is not used by YARN internally any more.
|
|
||||||
* To create a proxy to the RM, use ClientRMProxy#createRMProxy or
|
|
||||||
* ServerRMProxy#createRMProxy.
|
|
||||||
*
|
|
||||||
* Create a proxy to the ResourceManager at the specified address.
|
|
||||||
*
|
|
||||||
* @param conf Configuration to generate retry policy
|
|
||||||
* @param protocol Protocol for the proxy
|
|
||||||
* @param rmAddress Address of the ResourceManager
|
|
||||||
* @param <T> Type information of the proxy
|
|
||||||
* @return Proxy to the RM
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
@Deprecated
|
|
||||||
public static <T> T createRMProxy(final Configuration conf,
|
|
||||||
final Class<T> protocol, InetSocketAddress rmAddress) throws IOException {
|
|
||||||
RetryPolicy retryPolicy = createRetryPolicy(conf, HAUtil.isHAEnabled(conf));
|
|
||||||
T proxy = RMProxy.<T>getProxy(conf, protocol, rmAddress);
|
|
||||||
LOG.info("Connecting to ResourceManager at " + rmAddress);
|
|
||||||
return (T) RetryProxy.create(protocol, proxy, retryPolicy);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get a proxy to the RM at the specified address. To be used to create a
|
* Get a proxy to the RM at the specified address. To be used to create a
|
||||||
* RetryProxy.
|
* RetryProxy.
|
||||||
*/
|
*/
|
||||||
@Private
|
@Private
|
||||||
static <T> T getProxy(final Configuration conf,
|
<T> T getProxy(final Configuration conf,
|
||||||
final Class<T> protocol, final InetSocketAddress rmAddress)
|
final Class<T> protocol, final InetSocketAddress rmAddress)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
return UserGroupInformation.getCurrentUser().doAs(
|
return user.doAs(
|
||||||
new PrivilegedAction<T>() {
|
new PrivilegedAction<T>() {
|
||||||
@Override
|
@Override
|
||||||
public T run() {
|
public T run() {
|
||||||
|
@ -95,7 +95,7 @@ protected T createRetriableProxy() {
|
|||||||
// Create proxy that can retry exceptions properly.
|
// Create proxy that can retry exceptions properly.
|
||||||
RetryPolicy retryPolicy = RMProxy.createRetryPolicy(conf, false);
|
RetryPolicy retryPolicy = RMProxy.createRetryPolicy(conf, false);
|
||||||
InetSocketAddress rmAddress = rmProxy.getRMAddress(conf, protocol);
|
InetSocketAddress rmAddress = rmProxy.getRMAddress(conf, protocol);
|
||||||
T proxy = RMProxy.<T> getProxy(conf, protocol, rmAddress);
|
T proxy = rmProxy.getProxy(conf, protocol, rmAddress);
|
||||||
return (T) RetryProxy.create(protocol, proxy, retryPolicy);
|
return (T) RetryProxy.create(protocol, proxy, retryPolicy);
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
LOG.error("Unable to create proxy to the ResourceManager "
|
LOG.error("Unable to create proxy to the ResourceManager "
|
||||||
|
@ -18,12 +18,26 @@
|
|||||||
|
|
||||||
package org.apache.hadoop.yarn.client;
|
package org.apache.hadoop.yarn.client;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
|
||||||
import org.apache.hadoop.yarn.conf.HAUtil;
|
import org.apache.hadoop.yarn.conf.HAUtil;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC;
|
||||||
|
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||||
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.security.PrivilegedExceptionAction;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
public class TestClientRMProxy {
|
public class TestClientRMProxy {
|
||||||
@ -86,4 +100,99 @@ public void testGetAMRMTokenService() {
|
|||||||
service.contains(defaultRMAddress));
|
service.contains(defaultRMAddress));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verify that the RPC layer is always created using the correct UGI from the
|
||||||
|
* RMProxy. It should always use the UGI from creation in subsequent uses,
|
||||||
|
* even outside of a doAs.
|
||||||
|
*
|
||||||
|
* @throws Exception an Exception occurred
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testProxyUserCorrectUGI() throws Exception {
|
||||||
|
final YarnConfiguration conf = new YarnConfiguration();
|
||||||
|
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
|
||||||
|
conf.set(YarnConfiguration.RM_HA_IDS, "rm1,rm2");
|
||||||
|
conf.set(HAUtil.addSuffix(YarnConfiguration.RM_HOSTNAME, "rm1"),
|
||||||
|
"0.0.0.0");
|
||||||
|
conf.set(HAUtil.addSuffix(YarnConfiguration.RM_HOSTNAME, "rm2"),
|
||||||
|
"0.0.0.0");
|
||||||
|
conf.setLong(YarnConfiguration.CLIENT_FAILOVER_MAX_ATTEMPTS, 2);
|
||||||
|
conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS, 2);
|
||||||
|
conf.setLong(
|
||||||
|
YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS, 2);
|
||||||
|
|
||||||
|
// Replace the RPC implementation with one that will capture the current UGI
|
||||||
|
conf.setClass(YarnConfiguration.IPC_RPC_IMPL,
|
||||||
|
UGICapturingHadoopYarnProtoRPC.class, YarnRPC.class);
|
||||||
|
|
||||||
|
UserGroupInformation realUser = UserGroupInformation.getCurrentUser();
|
||||||
|
UserGroupInformation proxyUser =
|
||||||
|
UserGroupInformation.createProxyUserForTesting("proxy", realUser,
|
||||||
|
new String[] {"group1"});
|
||||||
|
|
||||||
|
// Create the RMProxy using the proxyUser
|
||||||
|
ApplicationClientProtocol rmProxy = proxyUser.doAs(
|
||||||
|
new PrivilegedExceptionAction<ApplicationClientProtocol>() {
|
||||||
|
@Override
|
||||||
|
public ApplicationClientProtocol run() throws Exception {
|
||||||
|
return ClientRMProxy.createRMProxy(conf,
|
||||||
|
ApplicationClientProtocol.class);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// It was in a doAs, so the UGI should be correct
|
||||||
|
assertUGI();
|
||||||
|
|
||||||
|
// Try to use the RMProxy, which should trigger the RPC again
|
||||||
|
GetNewApplicationRequest request =
|
||||||
|
Records.newRecord(GetNewApplicationRequest.class);
|
||||||
|
UGICapturingHadoopYarnProtoRPC.lastCurrentUser = null;
|
||||||
|
try {
|
||||||
|
rmProxy.getNewApplication(request);
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
// ignore - RMs are not running so this is expected to fail
|
||||||
|
}
|
||||||
|
|
||||||
|
// This time it was outside a doAs, but make sure the UGI was still correct
|
||||||
|
assertUGI();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertUGI() throws IOException {
|
||||||
|
UserGroupInformation lastCurrentUser =
|
||||||
|
UGICapturingHadoopYarnProtoRPC.lastCurrentUser;
|
||||||
|
assertNotNull(lastCurrentUser);
|
||||||
|
assertEquals("proxy", lastCurrentUser.getShortUserName());
|
||||||
|
Assert.assertEquals(UserGroupInformation.AuthenticationMethod.PROXY,
|
||||||
|
lastCurrentUser.getAuthenticationMethod());
|
||||||
|
assertEquals(UserGroupInformation.getCurrentUser(),
|
||||||
|
lastCurrentUser.getRealUser());
|
||||||
|
// Reset UGICapturingHadoopYarnProtoRPC
|
||||||
|
UGICapturingHadoopYarnProtoRPC.lastCurrentUser = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Subclass of {@link HadoopYarnProtoRPC} which captures the current UGI in
|
||||||
|
* a static variable. Used by {@link #testProxyUserCorrectUGI()}.
|
||||||
|
*/
|
||||||
|
public static class UGICapturingHadoopYarnProtoRPC
|
||||||
|
extends HadoopYarnProtoRPC {
|
||||||
|
|
||||||
|
static UserGroupInformation lastCurrentUser = null;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Object getProxy(Class protocol, InetSocketAddress addr,
|
||||||
|
Configuration conf) {
|
||||||
|
UserGroupInformation currentUser = null;
|
||||||
|
try {
|
||||||
|
currentUser = UserGroupInformation.getCurrentUser();
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
Assert.fail("Unable to get current user\n"
|
||||||
|
+ StringUtils.stringifyException(ioe));
|
||||||
|
}
|
||||||
|
lastCurrentUser = currentUser;
|
||||||
|
|
||||||
|
return super.getProxy(protocol, addr, conf);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -32,7 +32,6 @@
|
|||||||
|
|
||||||
public class ServerRMProxy<T> extends RMProxy<T> {
|
public class ServerRMProxy<T> extends RMProxy<T> {
|
||||||
private static final Log LOG = LogFactory.getLog(ServerRMProxy.class);
|
private static final Log LOG = LogFactory.getLog(ServerRMProxy.class);
|
||||||
private static final ServerRMProxy INSTANCE = new ServerRMProxy();
|
|
||||||
|
|
||||||
private ServerRMProxy() {
|
private ServerRMProxy() {
|
||||||
super();
|
super();
|
||||||
@ -65,7 +64,8 @@ public static <T> T createRMProxy(final Configuration configuration,
|
|||||||
configuration.getLong(
|
configuration.getLong(
|
||||||
YarnConfiguration.NM_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS,
|
YarnConfiguration.NM_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS,
|
||||||
rmRetryInterval);
|
rmRetryInterval);
|
||||||
return createRMProxy(configuration, protocol, INSTANCE,
|
ServerRMProxy<T> serverRMProxy = new ServerRMProxy<>();
|
||||||
|
return createRMProxy(configuration, protocol, serverRMProxy,
|
||||||
nmRmConnectWait, nmRmRetryInterval);
|
nmRmConnectWait, nmRmRetryInterval);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user