diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index e90cc5fda4..85315d3e81 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -22,6 +22,7 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_SEPARATOR_KEY; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_TIMEOUT_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_IP_PROXY_USERS; import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS; import java.io.EOFException; @@ -137,6 +138,8 @@ public class RouterRpcClient { private Map rejectedPermitsPerNs = new ConcurrentHashMap<>(); private Map acceptedPermitsPerNs = new ConcurrentHashMap<>(); + private final boolean enableProxyUser; + /** * Create a router RPC client to manage remote procedure calls to NNs. * @@ -194,6 +197,8 @@ public RouterRpcClient(Configuration conf, Router router, this.retryPolicy = RetryPolicies.failoverOnNetworkException( RetryPolicies.TRY_ONCE_THEN_FAIL, maxFailoverAttempts, maxRetryAttempts, failoverSleepBaseMillis, failoverSleepMaxMillis); + String[] ipProxyUsers = conf.getStrings(DFS_NAMENODE_IP_PROXY_USERS); + this.enableProxyUser = ipProxyUsers != null && ipProxyUsers.length > 0; } /** @@ -363,7 +368,7 @@ private ConnectionContext getConnection(UserGroupInformation ugi, String nsId, // TODO Add tokens from the federated UGI UserGroupInformation connUGI = ugi; - if (UserGroupInformation.isSecurityEnabled()) { + if (UserGroupInformation.isSecurityEnabled() || this.enableProxyUser) { UserGroupInformation routerUser = UserGroupInformation.getLoginUser(); connUGI = UserGroupInformation.createProxyUser( ugi.getUserName(), routerUser); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRetryCache.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRetryCache.java index 46a2354979..e498874812 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRetryCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRetryCache.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.federation.router; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; @@ -46,10 +47,16 @@ public class TestRouterRetryCache { @Before public void setup() throws Exception { - Configuration namenodeConf = new Configuration(); - namenodeConf.set(DFS_NAMENODE_IP_PROXY_USERS, "fake_joe"); - cluster = new MiniRouterDFSCluster(true, 1); - cluster.addNamenodeOverrides(namenodeConf); + UserGroupInformation routerUser = UserGroupInformation.getLoginUser(); + Configuration conf = new Configuration(); + String adminUser = routerUser.getUserName(); + conf.set("hadoop.proxyuser." + adminUser + ".hosts", "*"); + conf.set("hadoop.proxyuser." + adminUser + ".groups", "*"); + conf.set("hadoop.proxyuser.fake_joe.hosts", "*"); + conf.set("hadoop.proxyuser.fake_joe.groups", "*"); + conf.set(DFS_NAMENODE_IP_PROXY_USERS, routerUser.getShortUserName()); + cluster = new MiniRouterDFSCluster(true, 1, conf); + cluster.addNamenodeOverrides(conf); // Start NNs and DNs and wait until ready cluster.startCluster(); @@ -83,7 +90,28 @@ public void teardown() throws IOException { } @Test - public void testRetryCache() throws Exception { + public void testRetryCacheWithOneLevelProxyUser() throws Exception { + internalTestRetryCache(false); + } + + @Test + public void testRetryCacheWithTwoLevelProxyUser() throws Exception { + internalTestRetryCache(true); + } + + /** + * Test RetryCache through RBF with proxyUser and non-ProxyUser respectively. + * + * 1. Start cluster with current user. + * 2. Create one test directory by the admin user. + * 3. Create one Router FileSystem with one mocked user, one proxyUser or non-ProxyUser. + * 4. Try to create one test directory by the router fileSystem. + * 5. Try to rename the new test directory to one test destination directory + * 6. Then failover the active to the standby + * 7. Try to rename the source directory to the destination directory again with the same callId + * 8. Try to + */ + private void internalTestRetryCache(boolean twoLevelProxyUGI) throws Exception { RetryInvocationHandler.SET_CALL_ID_FOR_TEST.set(false); FileSystem routerFS = cluster.getRandomRouter().getFileSystem(); Path testDir = new Path("/target-ns0/testdir"); @@ -91,12 +119,13 @@ public void testRetryCache() throws Exception { routerFS.setPermission(testDir, FsPermission.getDefault()); // Run as fake joe to authorize the test - UserGroupInformation joe = - UserGroupInformation.createUserForTesting("fake_joe", - new String[]{"fake_group"}); - FileSystem joeFS = joe.doAs( - (PrivilegedExceptionAction) () -> - FileSystem.newInstance(routerFS.getUri(), routerFS.getConf())); + UserGroupInformation joe = UserGroupInformation.createUserForTesting("fake_joe", + new String[] {"fake_group"}); + if (twoLevelProxyUGI) { + joe = UserGroupInformation.createProxyUser("fake_proxy_joe", joe); + } + FileSystem joeFS = joe.doAs((PrivilegedExceptionAction) () -> + FileSystem.newInstance(routerFS.getUri(), routerFS.getConf())); Path renameSrc = new Path(testDir, "renameSrc"); Path renameDst = new Path(testDir, "renameDst"); @@ -121,6 +150,15 @@ public void testRetryCache() throws Exception { Client.setCallIdAndRetryCount(callId, 0, null); assertTrue(joeFS.rename(renameSrc, renameDst)); + + FileStatus fileStatus = joeFS.getFileStatus(renameDst); + if (twoLevelProxyUGI) { + assertEquals("fake_proxy_joe", fileStatus.getOwner()); + } else { + assertEquals("fake_joe", fileStatus.getOwner()); + } + + joeFS.delete(renameDst, true); } @Test