HDFS-16756. RBF proxies the client's user by the login user to enable CacheEntry (#4853). Contributed by ZanderXu.
Reviewed-by: Inigo Goiri <inigoiri@apache.org> Signed-off-by: Ayush Saxena <ayushsaxena@apache.org>
This commit is contained in:
parent
29c4d8d8f7
commit
4a01fadb94
@ -22,6 +22,7 @@
|
|||||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_SEPARATOR_KEY;
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_SEPARATOR_KEY;
|
||||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY;
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY;
|
||||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_TIMEOUT_KEY;
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_TIMEOUT_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_IP_PROXY_USERS;
|
||||||
import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS;
|
import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS;
|
||||||
|
|
||||||
import java.io.EOFException;
|
import java.io.EOFException;
|
||||||
@ -137,6 +138,8 @@ public class RouterRpcClient {
|
|||||||
private Map<String, LongAdder> rejectedPermitsPerNs = new ConcurrentHashMap<>();
|
private Map<String, LongAdder> rejectedPermitsPerNs = new ConcurrentHashMap<>();
|
||||||
private Map<String, LongAdder> acceptedPermitsPerNs = new ConcurrentHashMap<>();
|
private Map<String, LongAdder> acceptedPermitsPerNs = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
private final boolean enableProxyUser;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a router RPC client to manage remote procedure calls to NNs.
|
* Create a router RPC client to manage remote procedure calls to NNs.
|
||||||
*
|
*
|
||||||
@ -194,6 +197,8 @@ public RouterRpcClient(Configuration conf, Router router,
|
|||||||
this.retryPolicy = RetryPolicies.failoverOnNetworkException(
|
this.retryPolicy = RetryPolicies.failoverOnNetworkException(
|
||||||
RetryPolicies.TRY_ONCE_THEN_FAIL, maxFailoverAttempts, maxRetryAttempts,
|
RetryPolicies.TRY_ONCE_THEN_FAIL, maxFailoverAttempts, maxRetryAttempts,
|
||||||
failoverSleepBaseMillis, failoverSleepMaxMillis);
|
failoverSleepBaseMillis, failoverSleepMaxMillis);
|
||||||
|
String[] ipProxyUsers = conf.getStrings(DFS_NAMENODE_IP_PROXY_USERS);
|
||||||
|
this.enableProxyUser = ipProxyUsers != null && ipProxyUsers.length > 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -363,7 +368,7 @@ private ConnectionContext getConnection(UserGroupInformation ugi, String nsId,
|
|||||||
|
|
||||||
// TODO Add tokens from the federated UGI
|
// TODO Add tokens from the federated UGI
|
||||||
UserGroupInformation connUGI = ugi;
|
UserGroupInformation connUGI = ugi;
|
||||||
if (UserGroupInformation.isSecurityEnabled()) {
|
if (UserGroupInformation.isSecurityEnabled() || this.enableProxyUser) {
|
||||||
UserGroupInformation routerUser = UserGroupInformation.getLoginUser();
|
UserGroupInformation routerUser = UserGroupInformation.getLoginUser();
|
||||||
connUGI = UserGroupInformation.createProxyUser(
|
connUGI = UserGroupInformation.createProxyUser(
|
||||||
ugi.getUserName(), routerUser);
|
ugi.getUserName(), routerUser);
|
||||||
|
@ -18,6 +18,7 @@
|
|||||||
package org.apache.hadoop.hdfs.server.federation.router;
|
package org.apache.hadoop.hdfs.server.federation.router;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
@ -46,10 +47,16 @@ public class TestRouterRetryCache {
|
|||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setup() throws Exception {
|
public void setup() throws Exception {
|
||||||
Configuration namenodeConf = new Configuration();
|
UserGroupInformation routerUser = UserGroupInformation.getLoginUser();
|
||||||
namenodeConf.set(DFS_NAMENODE_IP_PROXY_USERS, "fake_joe");
|
Configuration conf = new Configuration();
|
||||||
cluster = new MiniRouterDFSCluster(true, 1);
|
String adminUser = routerUser.getUserName();
|
||||||
cluster.addNamenodeOverrides(namenodeConf);
|
conf.set("hadoop.proxyuser." + adminUser + ".hosts", "*");
|
||||||
|
conf.set("hadoop.proxyuser." + adminUser + ".groups", "*");
|
||||||
|
conf.set("hadoop.proxyuser.fake_joe.hosts", "*");
|
||||||
|
conf.set("hadoop.proxyuser.fake_joe.groups", "*");
|
||||||
|
conf.set(DFS_NAMENODE_IP_PROXY_USERS, routerUser.getShortUserName());
|
||||||
|
cluster = new MiniRouterDFSCluster(true, 1, conf);
|
||||||
|
cluster.addNamenodeOverrides(conf);
|
||||||
|
|
||||||
// Start NNs and DNs and wait until ready
|
// Start NNs and DNs and wait until ready
|
||||||
cluster.startCluster();
|
cluster.startCluster();
|
||||||
@ -83,7 +90,28 @@ public void teardown() throws IOException {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRetryCache() throws Exception {
|
public void testRetryCacheWithOneLevelProxyUser() throws Exception {
|
||||||
|
internalTestRetryCache(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRetryCacheWithTwoLevelProxyUser() throws Exception {
|
||||||
|
internalTestRetryCache(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test RetryCache through RBF with proxyUser and non-ProxyUser respectively.
|
||||||
|
*
|
||||||
|
* 1. Start cluster with current user.
|
||||||
|
* 2. Create one test directory by the admin user.
|
||||||
|
* 3. Create one Router FileSystem with one mocked user, one proxyUser or non-ProxyUser.
|
||||||
|
* 4. Try to create one test directory by the router fileSystem.
|
||||||
|
* 5. Try to rename the new test directory to one test destination directory
|
||||||
|
* 6. Then failover the active to the standby
|
||||||
|
* 7. Try to rename the source directory to the destination directory again with the same callId
|
||||||
|
* 8. Try to
|
||||||
|
*/
|
||||||
|
private void internalTestRetryCache(boolean twoLevelProxyUGI) throws Exception {
|
||||||
RetryInvocationHandler.SET_CALL_ID_FOR_TEST.set(false);
|
RetryInvocationHandler.SET_CALL_ID_FOR_TEST.set(false);
|
||||||
FileSystem routerFS = cluster.getRandomRouter().getFileSystem();
|
FileSystem routerFS = cluster.getRandomRouter().getFileSystem();
|
||||||
Path testDir = new Path("/target-ns0/testdir");
|
Path testDir = new Path("/target-ns0/testdir");
|
||||||
@ -91,11 +119,12 @@ public void testRetryCache() throws Exception {
|
|||||||
routerFS.setPermission(testDir, FsPermission.getDefault());
|
routerFS.setPermission(testDir, FsPermission.getDefault());
|
||||||
|
|
||||||
// Run as fake joe to authorize the test
|
// Run as fake joe to authorize the test
|
||||||
UserGroupInformation joe =
|
UserGroupInformation joe = UserGroupInformation.createUserForTesting("fake_joe",
|
||||||
UserGroupInformation.createUserForTesting("fake_joe",
|
|
||||||
new String[] {"fake_group"});
|
new String[] {"fake_group"});
|
||||||
FileSystem joeFS = joe.doAs(
|
if (twoLevelProxyUGI) {
|
||||||
(PrivilegedExceptionAction<FileSystem>) () ->
|
joe = UserGroupInformation.createProxyUser("fake_proxy_joe", joe);
|
||||||
|
}
|
||||||
|
FileSystem joeFS = joe.doAs((PrivilegedExceptionAction<FileSystem>) () ->
|
||||||
FileSystem.newInstance(routerFS.getUri(), routerFS.getConf()));
|
FileSystem.newInstance(routerFS.getUri(), routerFS.getConf()));
|
||||||
|
|
||||||
Path renameSrc = new Path(testDir, "renameSrc");
|
Path renameSrc = new Path(testDir, "renameSrc");
|
||||||
@ -121,6 +150,15 @@ public void testRetryCache() throws Exception {
|
|||||||
|
|
||||||
Client.setCallIdAndRetryCount(callId, 0, null);
|
Client.setCallIdAndRetryCount(callId, 0, null);
|
||||||
assertTrue(joeFS.rename(renameSrc, renameDst));
|
assertTrue(joeFS.rename(renameSrc, renameDst));
|
||||||
|
|
||||||
|
FileStatus fileStatus = joeFS.getFileStatus(renameDst);
|
||||||
|
if (twoLevelProxyUGI) {
|
||||||
|
assertEquals("fake_proxy_joe", fileStatus.getOwner());
|
||||||
|
} else {
|
||||||
|
assertEquals("fake_joe", fileStatus.getOwner());
|
||||||
|
}
|
||||||
|
|
||||||
|
joeFS.delete(renameDst, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
Loading…
Reference in New Issue
Block a user