diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java index f868521304..a6248146f3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java @@ -18,8 +18,10 @@ package org.apache.hadoop.hdfs.server.federation.router; import java.io.IOException; +import java.lang.reflect.Constructor; import java.net.InetSocketAddress; import java.util.ArrayList; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; @@ -48,9 +50,15 @@ import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.RefreshUserMappingsProtocol; import org.apache.hadoop.security.SaslRpcServer; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolClientSideTranslatorPB; +import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolPB; +import org.apache.hadoop.tools.GetUserMappingsProtocol; +import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolClientSideTranslatorPB; +import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolPB; import org.apache.hadoop.util.Time; import org.eclipse.jetty.util.ajax.JSON; import org.slf4j.Logger; @@ -97,6 +105,32 @@ public class ConnectionPool { /** The last time a connection was active. */ private volatile long lastActiveTime = 0; + /** Map for the protocols and their protobuf implementations. */ + private final static Map, ProtoImpl> PROTO_MAP = new HashMap<>(); + static { + PROTO_MAP.put(ClientProtocol.class, + new ProtoImpl(ClientNamenodeProtocolPB.class, + ClientNamenodeProtocolTranslatorPB.class)); + PROTO_MAP.put(NamenodeProtocol.class, new ProtoImpl( + NamenodeProtocolPB.class, NamenodeProtocolTranslatorPB.class)); + PROTO_MAP.put(RefreshUserMappingsProtocol.class, + new ProtoImpl(RefreshUserMappingsProtocolPB.class, + RefreshUserMappingsProtocolClientSideTranslatorPB.class)); + PROTO_MAP.put(GetUserMappingsProtocol.class, + new ProtoImpl(GetUserMappingsProtocolPB.class, + GetUserMappingsProtocolClientSideTranslatorPB.class)); + } + + /** Class to store the protocol implementation. */ + private static class ProtoImpl { + private final Class protoPb; + private final Class protoClientPb; + + ProtoImpl(Class pPb, Class pClientPb) { + this.protoPb = pPb; + this.protoClientPb = pClientPb; + } + } protected ConnectionPool(Configuration config, String address, UserGroupInformation user, int minPoolSize, int maxPoolSize, @@ -325,6 +359,7 @@ public ConnectionContext newConnection() throws IOException { * context for a single user/security context. To maximize throughput it is * recommended to use multiple connection per user+server, allowing multiple * writes and reads to be dispatched in parallel. + * @param * * @param conf Configuration for the connection. * @param nnAddress Address of server supporting the ClientProtocol. @@ -334,47 +369,19 @@ public ConnectionContext newConnection() throws IOException { * security context. * @throws IOException If it cannot be created. */ - protected static ConnectionContext newConnection(Configuration conf, - String nnAddress, UserGroupInformation ugi, Class proto) - throws IOException { - ConnectionContext ret; - if (proto == ClientProtocol.class) { - ret = newClientConnection(conf, nnAddress, ugi); - } else if (proto == NamenodeProtocol.class) { - ret = newNamenodeConnection(conf, nnAddress, ugi); - } else { - String msg = "Unsupported protocol for connection to NameNode: " + - ((proto != null) ? proto.getClass().getName() : "null"); + protected static ConnectionContext newConnection(Configuration conf, + String nnAddress, UserGroupInformation ugi, Class proto) + throws IOException { + if (!PROTO_MAP.containsKey(proto)) { + String msg = "Unsupported protocol for connection to NameNode: " + + ((proto != null) ? proto.getClass().getName() : "null"); LOG.error(msg); throw new IllegalStateException(msg); } - return ret; - } + ProtoImpl classes = PROTO_MAP.get(proto); + RPC.setProtocolEngine(conf, classes.protoPb, ProtobufRpcEngine.class); - /** - * Creates a proxy wrapper for a client NN connection. Each proxy contains - * context for a single user/security context. To maximize throughput it is - * recommended to use multiple connection per user+server, allowing multiple - * writes and reads to be dispatched in parallel. - * - * Mostly based on NameNodeProxies#createNonHAProxy() but it needs the - * connection identifier. - * - * @param conf Configuration for the connection. - * @param nnAddress Address of server supporting the ClientProtocol. - * @param ugi User context. - * @return Proxy for the target ClientProtocol that contains the user's - * security context. - * @throws IOException If it cannot be created. - */ - private static ConnectionContext newClientConnection( - Configuration conf, String nnAddress, UserGroupInformation ugi) - throws IOException { - RPC.setProtocolEngine( - conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class); - - final RetryPolicy defaultPolicy = RetryUtils.getDefaultRetryPolicy( - conf, + final RetryPolicy defaultPolicy = RetryUtils.getDefaultRetryPolicy(conf, HdfsClientConfigKeys.Retry.POLICY_ENABLED_KEY, HdfsClientConfigKeys.Retry.POLICY_ENABLED_DEFAULT, HdfsClientConfigKeys.Retry.POLICY_SPEC_KEY, @@ -386,61 +393,32 @@ private static ConnectionContext newClientConnection( SaslRpcServer.init(conf); } InetSocketAddress socket = NetUtils.createSocketAddr(nnAddress); - final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class); - ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy( - ClientNamenodeProtocolPB.class, version, socket, ugi, conf, - factory, RPC.getRpcTimeout(conf), defaultPolicy, null).getProxy(); - ClientProtocol client = new ClientNamenodeProtocolTranslatorPB(proxy); + final long version = RPC.getProtocolVersion(classes.protoPb); + Object proxy = RPC.getProtocolProxy(classes.protoPb, version, socket, ugi, + conf, factory, RPC.getRpcTimeout(conf), defaultPolicy, null).getProxy(); + T client = newProtoClient(proto, classes, proxy); Text dtService = SecurityUtil.buildTokenService(socket); - ProxyAndInfo clientProxy = - new ProxyAndInfo(client, dtService, socket); + ProxyAndInfo clientProxy = + new ProxyAndInfo(client, dtService, socket); ConnectionContext connection = new ConnectionContext(clientProxy); return connection; } - /** - * Creates a proxy wrapper for a NN connection. Each proxy contains context - * for a single user/security context. To maximize throughput it is - * recommended to use multiple connection per user+server, allowing multiple - * writes and reads to be dispatched in parallel. - * - * @param conf Configuration for the connection. - * @param nnAddress Address of server supporting the ClientProtocol. - * @param ugi User context. - * @return Proxy for the target NamenodeProtocol that contains the user's - * security context. - * @throws IOException If it cannot be created. - */ - private static ConnectionContext newNamenodeConnection( - Configuration conf, String nnAddress, UserGroupInformation ugi) - throws IOException { - RPC.setProtocolEngine( - conf, NamenodeProtocolPB.class, ProtobufRpcEngine.class); - - final RetryPolicy defaultPolicy = RetryUtils.getDefaultRetryPolicy( - conf, - HdfsClientConfigKeys.Retry.POLICY_ENABLED_KEY, - HdfsClientConfigKeys.Retry.POLICY_ENABLED_DEFAULT, - HdfsClientConfigKeys.Retry.POLICY_SPEC_KEY, - HdfsClientConfigKeys.Retry.POLICY_SPEC_DEFAULT, - HdfsConstants.SAFEMODE_EXCEPTION_CLASS_NAME); - - SocketFactory factory = SocketFactory.getDefault(); - if (UserGroupInformation.isSecurityEnabled()) { - SaslRpcServer.init(conf); + private static T newProtoClient(Class proto, ProtoImpl classes, + Object proxy) { + try { + Constructor constructor = + classes.protoClientPb.getConstructor(classes.protoPb); + Object o = constructor.newInstance(new Object[] {proxy}); + if (proto.isAssignableFrom(o.getClass())) { + @SuppressWarnings("unchecked") + T client = (T) o; + return client; + } + } catch (Exception e) { + LOG.error(e.getMessage()); } - InetSocketAddress socket = NetUtils.createSocketAddr(nnAddress); - final long version = RPC.getProtocolVersion(NamenodeProtocolPB.class); - NamenodeProtocolPB proxy = RPC.getProtocolProxy(NamenodeProtocolPB.class, - version, socket, ugi, conf, - factory, RPC.getRpcTimeout(conf), defaultPolicy, null).getProxy(); - NamenodeProtocol client = new NamenodeProtocolTranslatorPB(proxy); - Text dtService = SecurityUtil.buildTokenService(socket); - - ProxyAndInfo clientProxy = - new ProxyAndInfo(client, dtService, socket); - ConnectionContext connection = new ConnectionContext(clientProxy); - return connection; + return null; } -} +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index 6facd7ef2b..f33d1d36e9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -139,15 +139,17 @@ import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.security.AccessControlException; -import org.apache.hadoop.security.Groups; import org.apache.hadoop.security.RefreshUserMappingsProtocol; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.authorize.ProxyUsers; import org.apache.hadoop.security.proto.RefreshUserMappingsProtocolProtos; import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolPB; import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolServerSideTranslatorPB; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.tools.GetUserMappingsProtocol; +import org.apache.hadoop.tools.proto.GetUserMappingsProtocolProtos; +import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolPB; +import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolServerSideTranslatorPB; import org.apache.hadoop.util.ReflectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -163,8 +165,8 @@ * the requests to the active * {@link org.apache.hadoop.hdfs.server.namenode.NameNode NameNode}. */ -public class RouterRpcServer extends AbstractService - implements ClientProtocol, NamenodeProtocol, RefreshUserMappingsProtocol { +public class RouterRpcServer extends AbstractService implements ClientProtocol, + NamenodeProtocol, RefreshUserMappingsProtocol, GetUserMappingsProtocol { private static final Logger LOG = LoggerFactory.getLogger(RouterRpcServer.class); @@ -207,6 +209,8 @@ public class RouterRpcServer extends AbstractService private final RouterNamenodeProtocol nnProto; /** ClientProtocol calls. */ private final RouterClientProtocol clientProto; + /** Other protocol calls. */ + private final RouterUserProtocol routerProto; /** Router security manager to handle token operations. */ private RouterSecurityManager securityManager = null; /** Super user credentials that a thread may use. */ @@ -269,6 +273,12 @@ public RouterRpcServer(Configuration configuration, Router router, RefreshUserMappingsProtocolProtos.RefreshUserMappingsProtocolService. newReflectiveBlockingService(refreshUserMappingXlator); + GetUserMappingsProtocolServerSideTranslatorPB getUserMappingXlator = + new GetUserMappingsProtocolServerSideTranslatorPB(this); + BlockingService getUserMappingService = + GetUserMappingsProtocolProtos.GetUserMappingsProtocolService. + newReflectiveBlockingService(getUserMappingXlator); + InetSocketAddress confRpcAddress = conf.getSocketAddr( RBFConfigKeys.DFS_ROUTER_RPC_BIND_HOST_KEY, RBFConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY, @@ -297,6 +307,8 @@ public RouterRpcServer(Configuration configuration, Router router, conf, NamenodeProtocolPB.class, nnPbService, this.rpcServer); DFSUtil.addPBProtocol(conf, RefreshUserMappingsProtocolPB.class, refreshUserMappingService, this.rpcServer); + DFSUtil.addPBProtocol(conf, GetUserMappingsProtocolPB.class, + getUserMappingService, this.rpcServer); // Set service-level authorization security policy this.serviceAuthEnabled = conf.getBoolean( @@ -346,6 +358,7 @@ public RouterRpcServer(Configuration configuration, Router router, this.quotaCall = new Quota(this.router, this); this.nnProto = new RouterNamenodeProtocol(this); this.clientProto = new RouterClientProtocol(conf, this); + this.routerProto = new RouterUserProtocol(this); } @Override @@ -1706,13 +1719,16 @@ boolean isInvokeConcurrent(final String path) throws IOException { @Override public void refreshUserToGroupsMappings() throws IOException { - LOG.info("Refresh user groups mapping in Router."); - Groups.getUserToGroupsMappingService().refresh(); + routerProto.refreshUserToGroupsMappings(); } @Override public void refreshSuperUserGroupsConfiguration() throws IOException { - LOG.info("Refresh superuser groups configuration in Router."); - ProxyUsers.refreshSuperUserGroupsConfiguration(); + routerProto.refreshSuperUserGroupsConfiguration(); + } + + @Override + public String[] getGroupsForUser(String user) throws IOException { + return routerProto.getGroupsForUser(user); } } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterUserProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterUserProtocol.java new file mode 100644 index 0000000000..742991e1c2 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterUserProtocol.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import static org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer.merge; + +import java.io.IOException; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo; +import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory; +import org.apache.hadoop.security.Groups; +import org.apache.hadoop.security.RefreshUserMappingsProtocol; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authorize.ProxyUsers; +import org.apache.hadoop.tools.GetUserMappingsProtocol; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Module that implements all the RPC calls in + * {@link RefreshUserMappingsProtocol} {@link GetUserMappingsProtocol} in the + * {@link RouterRpcServer}. + */ +public class RouterUserProtocol + implements RefreshUserMappingsProtocol, GetUserMappingsProtocol { + + private static final Logger LOG = + LoggerFactory.getLogger(RouterUserProtocol.class); + + /** RPC server to receive client calls. */ + private final RouterRpcServer rpcServer; + /** RPC clients to connect to the Namenodes. */ + private final RouterRpcClient rpcClient; + + private final ActiveNamenodeResolver namenodeResolver; + + public RouterUserProtocol(RouterRpcServer server) { + this.rpcServer = server; + this.rpcClient = this.rpcServer.getRPCClient(); + this.namenodeResolver = this.rpcServer.getNamenodeResolver(); + } + + @Override + public void refreshUserToGroupsMappings() throws IOException { + LOG.debug("Refresh user groups mapping in Router."); + rpcServer.checkOperation(OperationCategory.UNCHECKED); + Set nss = namenodeResolver.getNamespaces(); + if (nss.isEmpty()) { + Groups.getUserToGroupsMappingService().refresh(); + } else { + RemoteMethod method = new RemoteMethod(RefreshUserMappingsProtocol.class, + "refreshUserToGroupsMappings"); + rpcClient.invokeConcurrent(nss, method); + } + } + + @Override + public void refreshSuperUserGroupsConfiguration() throws IOException { + LOG.debug("Refresh superuser groups configuration in Router."); + rpcServer.checkOperation(OperationCategory.UNCHECKED); + Set nss = namenodeResolver.getNamespaces(); + if (nss.isEmpty()) { + ProxyUsers.refreshSuperUserGroupsConfiguration(); + } else { + RemoteMethod method = new RemoteMethod(RefreshUserMappingsProtocol.class, + "refreshSuperUserGroupsConfiguration"); + rpcClient.invokeConcurrent(nss, method); + } + } + + @Override + public String[] getGroupsForUser(String user) throws IOException { + LOG.debug("Getting groups for user {}", user); + rpcServer.checkOperation(OperationCategory.UNCHECKED); + Set nss = namenodeResolver.getNamespaces(); + if (nss.isEmpty()) { + return UserGroupInformation.createRemoteUser(user).getGroupNames(); + } else { + RemoteMethod method = new RemoteMethod(GetUserMappingsProtocol.class, + "getGroupsForUser", new Class[] {String.class}, user); + Map results = + rpcClient.invokeConcurrent(nss, method, String[].class); + return merge(results, String.class); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java index e656e7aecc..a07daefc6e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java @@ -1631,6 +1631,16 @@ public void testCacheAdmin() throws Exception { assertFalse(routerDFS.listCacheDirectives(filter).hasNext()); } + @Test + public void testgetGroupsForUser() throws IOException { + String[] group = new String[] {"bar", "group2"}; + UserGroupInformation.createUserForTesting("user", + new String[] {"bar", "group2"}); + String[] result = + router.getRouter().getRpcServer().getGroupsForUser("user"); + assertArrayEquals(group, result); + } + /** * Check the erasure coding policies in the Router and the Namenode. * @return The erasure coding policies. diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRefreshUserMappingsWithRouters.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterUserMappings.java similarity index 86% rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRefreshUserMappingsWithRouters.java rename to hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterUserMappings.java index 597b8c2740..dc7ebbf0d3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRefreshUserMappingsWithRouters.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterUserMappings.java @@ -20,14 +20,16 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.NameNodeProxies; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster; import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider; import org.apache.hadoop.hdfs.tools.DFSAdmin; -import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.hdfs.tools.GetGroups; import org.apache.hadoop.security.GroupMappingServiceProvider; import org.apache.hadoop.security.Groups; import org.apache.hadoop.security.UserGroupInformation; @@ -36,16 +38,19 @@ import org.apache.hadoop.security.authorize.ProxyUsers; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.LambdaTestUtils; +import org.apache.hadoop.tools.GetUserMappingsProtocol; import org.junit.After; import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; +import java.io.PrintStream; import java.io.PrintWriter; import java.io.UnsupportedEncodingException; import java.net.URL; @@ -54,18 +59,20 @@ import java.util.List; import java.util.concurrent.TimeUnit; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; /** - * Tests RefreshUserMappingsProtocol With Routers. + * Test RefreshUserMappingsProtocol and GetUserMappingsProtocol with Routers. */ -public class TestRefreshUserMappingsWithRouters { +public class TestRouterUserMappings { private static final Logger LOG = LoggerFactory.getLogger( - TestRefreshUserMappingsWithRouters.class); + TestRouterUserMappings.class); private MiniRouterDFSCluster cluster; private Router router; @@ -74,7 +81,6 @@ public class TestRefreshUserMappingsWithRouters { private static final String ROUTER_NS = "rbfns"; private static final String HDFS_SCHEMA = "hdfs://"; private static final String LOOPBACK_ADDRESS = "127.0.0.1"; - private static final String HDFS_PREFIX = HDFS_SCHEMA + LOOPBACK_ADDRESS; private String tempResource = null; @@ -111,7 +117,7 @@ public void cacheGroupsAdd(List groups) throws IOException { public void setUp() { conf = new Configuration(false); conf.setClass("hadoop.security.group.mapping", - TestRefreshUserMappingsWithRouters.MockUnixGroupsMapping.class, + TestRouterUserMappings.MockUnixGroupsMapping.class, GroupMappingServiceProvider.class); conf.setLong("hadoop.security.groups.cache.secs", GROUP_REFRESH_TIMEOUT_SEC); @@ -122,23 +128,6 @@ public void setUp() { Groups.getUserToGroupsMappingService(conf); } - /** - * Setup a single router, and return this router's rpc address - * as fs.defaultFS for {@link DFSAdmin}. - * @return router's rpc address - * @throws Exception - */ - private String setUpSingleRouterAndReturnDefaultFs() { - router = new Router(); - conf.set(RBFConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY, - LOOPBACK_ADDRESS + ":" + NetUtils.getFreeSocketPort()); - router.init(conf); - router.start(); - String defaultFs = HDFS_PREFIX + ":" + - router.getRpcServerAddress().getPort(); - return defaultFs; - } - /** * Setup a multi-routers mini dfs cluster with two nameservices * and four routers. @@ -178,28 +167,14 @@ private String setUpMultiRoutersAndReturnDefaultFs() throws Exception { } @Test - public void testRefreshSuperUserGroupsConfigurationWithSingleRouter() - throws Exception { - testRefreshSuperUserGroupsConfigurationInternal( - setUpSingleRouterAndReturnDefaultFs()); - } - - @Test - public void testRefreshSuperUserGroupsConfigurationWithMultiRouters() + public void testRefreshSuperUserGroupsConfiguration() throws Exception { testRefreshSuperUserGroupsConfigurationInternal( setUpMultiRoutersAndReturnDefaultFs()); } @Test - public void testGroupMappingRefreshWithSingleRouter() throws Exception { - testGroupMappingRefreshInternal( - setUpSingleRouterAndReturnDefaultFs()); - } - - - @Test - public void testGroupMappingRefreshWithMultiRouters() throws Exception { + public void testGroupMappingRefresh() throws Exception { testGroupMappingRefreshInternal( setUpMultiRoutersAndReturnDefaultFs()); } @@ -282,6 +257,43 @@ private void testRefreshSuperUserGroupsConfigurationInternal( fail("second auth for " + ugi1.getShortUserName() + " should've succeeded: " + e.getLocalizedMessage()); } + + // get groups + testGroupsForUserCLI(conf, "user"); + testGroupsForUserProtocol(conf, "user"); + } + + /** + * Use the command line to get the groups. + * @param config Configuration containing the default filesystem. + * @param username Username to check. + * @throws Exception If it cannot get the groups. + */ + private void testGroupsForUserCLI(Configuration config, String username) + throws Exception { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + PrintStream oldOut = System.out; + System.setOut(new PrintStream(out)); + new GetGroups(config).run(new String[]{username}); + assertTrue("Wrong output: " + out, + out.toString().startsWith(username + " : " + username)); + out.reset(); + System.setOut(oldOut); + } + + /** + * Use the GetUserMappingsProtocol protocol to get the groups. + * @param config Configuration containing the default filesystem. + * @param username Username to check. + * @throws IOException If it cannot get the groups. + */ + private void testGroupsForUserProtocol(Configuration config, String username) + throws IOException { + GetUserMappingsProtocol proto = NameNodeProxies.createProxy( + config, FileSystem.getDefaultUri(config), + GetUserMappingsProtocol.class).getProxy(); + String[] groups = proto.getGroupsForUser(username); + assertArrayEquals(new String[] {"user1", "user2"}, groups); } /**