diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java index b29263edcd..0232debb35 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java @@ -48,7 +48,6 @@ @InterfaceStability.Stable public class ClientRMProxy extends RMProxy { private static final Log LOG = LogFactory.getLog(ClientRMProxy.class); - private static final ClientRMProxy INSTANCE = new ClientRMProxy(); private interface ClientRMProtocols extends ApplicationClientProtocol, ApplicationMasterProtocol, ResourceManagerAdministrationProtocol { @@ -69,7 +68,8 @@ private ClientRMProxy(){ */ public static T createRMProxy(final Configuration configuration, final Class protocol) throws IOException { - return createRMProxy(configuration, protocol, INSTANCE); + ClientRMProxy clientRMProxy = new ClientRMProxy<>(); + return createRMProxy(configuration, protocol, clientRMProxy); } private static void setAMRMTokenService(final Configuration conf) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ConfiguredRMFailoverProxyProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ConfiguredRMFailoverProxyProvider.java index 8676db244a..d6b6cce996 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ConfiguredRMFailoverProxyProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ConfiguredRMFailoverProxyProvider.java @@ -74,7 +74,7 @@ public void init(Configuration configuration, RMProxy rmProxy, protected T getProxyInternal() { try { final InetSocketAddress rmAddress = rmProxy.getRMAddress(conf, protocol); - return RMProxy.getProxy(conf, protocol, rmAddress); + return rmProxy.getProxy(conf, protocol, rmAddress); } catch (IOException ioe) { LOG.error("Unable to create proxy to the ResourceManager " + rmServiceIds[currentProxyIndex], ioe); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java index 3ab06bd11e..8aa4107a98 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java @@ -57,8 +57,15 @@ public class RMProxy { private static final Log LOG = LogFactory.getLog(RMProxy.class); + private UserGroupInformation user; - protected RMProxy() {} + protected RMProxy() { + try { + this.user = UserGroupInformation.getCurrentUser(); + } catch (IOException ioe) { + throw new YarnRuntimeException("Unable to determine user", ioe); + } + } /** * Verify the passed protocol is supported. @@ -86,7 +93,7 @@ protected InetSocketAddress getRMAddress( */ @Private protected static T createRMProxy(final Configuration configuration, - final Class protocol, RMProxy instance) throws IOException { + final Class protocol, RMProxy instance) throws IOException { YarnConfiguration conf = (configuration instanceof YarnConfiguration) ? (YarnConfiguration) configuration : new YarnConfiguration(configuration); @@ -103,7 +110,7 @@ protected static T createRMProxy(final Configuration configuration, */ @Private protected static T createRMProxy(final Configuration configuration, - final Class protocol, RMProxy instance, final long retryTime, + final Class protocol, RMProxy instance, final long retryTime, final long retryInterval) throws IOException { YarnConfiguration conf = (configuration instanceof YarnConfiguration) ? (YarnConfiguration) configuration @@ -114,7 +121,7 @@ protected static T createRMProxy(final Configuration configuration, } private static T newProxyInstance(final YarnConfiguration conf, - final Class protocol, RMProxy instance, RetryPolicy retryPolicy) + final Class protocol, RMProxy instance, RetryPolicy retryPolicy) throws IOException{ if (HAUtil.isHAEnabled(conf)) { RMFailoverProxyProvider provider = @@ -123,44 +130,20 @@ private static T newProxyInstance(final YarnConfiguration conf, } else { InetSocketAddress rmAddress = instance.getRMAddress(conf, protocol); LOG.info("Connecting to ResourceManager at " + rmAddress); - T proxy = RMProxy.getProxy(conf, protocol, rmAddress); + T proxy = instance.getProxy(conf, protocol, rmAddress); return (T) RetryProxy.create(protocol, proxy, retryPolicy); } } - /** - * @deprecated - * This method is deprecated and is not used by YARN internally any more. - * To create a proxy to the RM, use ClientRMProxy#createRMProxy or - * ServerRMProxy#createRMProxy. - * - * Create a proxy to the ResourceManager at the specified address. - * - * @param conf Configuration to generate retry policy - * @param protocol Protocol for the proxy - * @param rmAddress Address of the ResourceManager - * @param Type information of the proxy - * @return Proxy to the RM - * @throws IOException - */ - @Deprecated - public static T createRMProxy(final Configuration conf, - final Class protocol, InetSocketAddress rmAddress) throws IOException { - RetryPolicy retryPolicy = createRetryPolicy(conf, HAUtil.isHAEnabled(conf)); - T proxy = RMProxy.getProxy(conf, protocol, rmAddress); - LOG.info("Connecting to ResourceManager at " + rmAddress); - return (T) RetryProxy.create(protocol, proxy, retryPolicy); - } - /** * Get a proxy to the RM at the specified address. To be used to create a * RetryProxy. */ @Private - static T getProxy(final Configuration conf, + T getProxy(final Configuration conf, final Class protocol, final InetSocketAddress rmAddress) throws IOException { - return UserGroupInformation.getCurrentUser().doAs( + return user.doAs( new PrivilegedAction() { @Override public T run() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RequestHedgingRMFailoverProxyProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RequestHedgingRMFailoverProxyProvider.java index 13c02af4dd..4c1622523d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RequestHedgingRMFailoverProxyProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RequestHedgingRMFailoverProxyProvider.java @@ -95,7 +95,7 @@ protected T createRetriableProxy() { // Create proxy that can retry exceptions properly. RetryPolicy retryPolicy = RMProxy.createRetryPolicy(conf, false); InetSocketAddress rmAddress = rmProxy.getRMAddress(conf, protocol); - T proxy = RMProxy. getProxy(conf, protocol, rmAddress); + T proxy = rmProxy.getProxy(conf, protocol, rmAddress); return (T) RetryProxy.create(protocol, proxy, retryPolicy); } catch (IOException ioe) { LOG.error("Unable to create proxy to the ResourceManager " diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/TestClientRMProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/TestClientRMProxy.java index 700a37ff31..6c31fea7d5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/TestClientRMProxy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/TestClientRMProxy.java @@ -18,12 +18,26 @@ package org.apache.hadoop.yarn.client; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.util.Records; +import org.junit.Assert; import org.junit.Test; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.security.PrivilegedExceptionAction; + import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; public class TestClientRMProxy { @@ -86,4 +100,99 @@ public void testGetAMRMTokenService() { service.contains(defaultRMAddress)); } } + + /** + * Verify that the RPC layer is always created using the correct UGI from the + * RMProxy. It should always use the UGI from creation in subsequent uses, + * even outside of a doAs. + * + * @throws Exception an Exception occurred + */ + @Test + public void testProxyUserCorrectUGI() throws Exception { + final YarnConfiguration conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true); + conf.set(YarnConfiguration.RM_HA_IDS, "rm1,rm2"); + conf.set(HAUtil.addSuffix(YarnConfiguration.RM_HOSTNAME, "rm1"), + "0.0.0.0"); + conf.set(HAUtil.addSuffix(YarnConfiguration.RM_HOSTNAME, "rm2"), + "0.0.0.0"); + conf.setLong(YarnConfiguration.CLIENT_FAILOVER_MAX_ATTEMPTS, 2); + conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS, 2); + conf.setLong( + YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS, 2); + + // Replace the RPC implementation with one that will capture the current UGI + conf.setClass(YarnConfiguration.IPC_RPC_IMPL, + UGICapturingHadoopYarnProtoRPC.class, YarnRPC.class); + + UserGroupInformation realUser = UserGroupInformation.getCurrentUser(); + UserGroupInformation proxyUser = + UserGroupInformation.createProxyUserForTesting("proxy", realUser, + new String[] {"group1"}); + + // Create the RMProxy using the proxyUser + ApplicationClientProtocol rmProxy = proxyUser.doAs( + new PrivilegedExceptionAction() { + @Override + public ApplicationClientProtocol run() throws Exception { + return ClientRMProxy.createRMProxy(conf, + ApplicationClientProtocol.class); + } + }); + + // It was in a doAs, so the UGI should be correct + assertUGI(); + + // Try to use the RMProxy, which should trigger the RPC again + GetNewApplicationRequest request = + Records.newRecord(GetNewApplicationRequest.class); + UGICapturingHadoopYarnProtoRPC.lastCurrentUser = null; + try { + rmProxy.getNewApplication(request); + } catch (IOException ioe) { + // ignore - RMs are not running so this is expected to fail + } + + // This time it was outside a doAs, but make sure the UGI was still correct + assertUGI(); + } + + private void assertUGI() throws IOException { + UserGroupInformation lastCurrentUser = + UGICapturingHadoopYarnProtoRPC.lastCurrentUser; + assertNotNull(lastCurrentUser); + assertEquals("proxy", lastCurrentUser.getShortUserName()); + Assert.assertEquals(UserGroupInformation.AuthenticationMethod.PROXY, + lastCurrentUser.getAuthenticationMethod()); + assertEquals(UserGroupInformation.getCurrentUser(), + lastCurrentUser.getRealUser()); + // Reset UGICapturingHadoopYarnProtoRPC + UGICapturingHadoopYarnProtoRPC.lastCurrentUser = null; + } + + /** + * Subclass of {@link HadoopYarnProtoRPC} which captures the current UGI in + * a static variable. Used by {@link #testProxyUserCorrectUGI()}. + */ + public static class UGICapturingHadoopYarnProtoRPC + extends HadoopYarnProtoRPC { + + static UserGroupInformation lastCurrentUser = null; + + @Override + public Object getProxy(Class protocol, InetSocketAddress addr, + Configuration conf) { + UserGroupInformation currentUser = null; + try { + currentUser = UserGroupInformation.getCurrentUser(); + } catch (IOException ioe) { + Assert.fail("Unable to get current user\n" + + StringUtils.stringifyException(ioe)); + } + lastCurrentUser = currentUser; + + return super.getProxy(protocol, addr, conf); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java index 8555fc3a94..3012be382c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java @@ -32,7 +32,6 @@ public class ServerRMProxy extends RMProxy { private static final Log LOG = LogFactory.getLog(ServerRMProxy.class); - private static final ServerRMProxy INSTANCE = new ServerRMProxy(); private ServerRMProxy() { super(); @@ -65,7 +64,8 @@ public static T createRMProxy(final Configuration configuration, configuration.getLong( YarnConfiguration.NM_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS, rmRetryInterval); - return createRMProxy(configuration, protocol, INSTANCE, + ServerRMProxy serverRMProxy = new ServerRMProxy<>(); + return createRMProxy(configuration, protocol, serverRMProxy, nmRmConnectWait, nmRmRetryInterval); }