From f4e2bfce585d762eaf26096613d135203f080eb3 Mon Sep 17 00:00:00 2001 From: Inigo Goiri Date: Fri, 11 Jan 2019 10:11:18 -0800 Subject: [PATCH] HDFS-13856. RBF: RouterAdmin should support dfsrouteradmin -refreshRouterArgs command. Contributed by yanghuafeng. --- .../federation/router/RouterAdminServer.java | 26 +- .../hdfs/tools/federation/RouterAdmin.java | 72 +++++ .../src/site/markdown/HDFSRouterFederation.md | 6 + .../router/TestRouterAdminGenericRefresh.java | 252 ++++++++++++++++++ .../src/site/markdown/HDFSCommands.md | 2 + 5 files changed, 357 insertions(+), 1 deletion(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminGenericRefresh.java diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java index 18c19e087e..027dd11144 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java @@ -23,12 +23,14 @@ import java.io.IOException; import java.net.InetSocketAddress; +import java.util.Collection; import java.util.Set; import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.HDFSPolicyProvider; +import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.proto.RouterProtocolProtos.RouterAdminProtocolService; @@ -64,9 +66,15 @@ import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse; import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.ipc.GenericRefreshProtocol; import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC.Server; +import org.apache.hadoop.ipc.RefreshRegistry; +import org.apache.hadoop.ipc.RefreshResponse; +import org.apache.hadoop.ipc.proto.GenericRefreshProtocolProtos; +import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolPB; +import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolServerSideTranslatorPB; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.AbstractService; @@ -81,7 +89,8 @@ * router. It is created, started, and stopped by {@link Router}. */ public class RouterAdminServer extends AbstractService - implements MountTableManager, RouterStateManager, NameserviceManager { + implements MountTableManager, RouterStateManager, NameserviceManager, + GenericRefreshProtocol { private static final Logger LOG = LoggerFactory.getLogger(RouterAdminServer.class); @@ -160,6 +169,15 @@ public RouterAdminServer(Configuration conf, Router router) router.setAdminServerAddress(this.adminAddress); iStateStoreCache = router.getSubclusterResolver() instanceof StateStoreCache; + + GenericRefreshProtocolServerSideTranslatorPB genericRefreshXlator = + new GenericRefreshProtocolServerSideTranslatorPB(this); + BlockingService genericRefreshService = + GenericRefreshProtocolProtos.GenericRefreshProtocolService. + newReflectiveBlockingService(genericRefreshXlator); + + DFSUtil.addPBProtocol(conf, GenericRefreshProtocolPB.class, + genericRefreshService, adminServer); } /** @@ -487,4 +505,10 @@ public static String getSuperUser() { public static String getSuperGroup(){ return superGroup; } + + @Override // GenericRefreshProtocol + public Collection refresh(String identifier, String[] args) { + // Let the registry handle as needed + return RefreshRegistry.defaultRegistry().dispatch(identifier, args); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java index 27c42cd634..37aad88565 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java @@ -19,6 +19,8 @@ import java.io.IOException; import java.net.InetSocketAddress; +import java.util.Arrays; +import java.util.Collection; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -26,8 +28,10 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager; @@ -61,9 +65,14 @@ import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest; import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse; import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; +import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.RefreshResponse; import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolClientSideTranslatorPB; +import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolPB; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; @@ -147,6 +156,8 @@ private String getUsage(String cmd) { return "\t[-getDisabledNameservices]"; } else if (cmd.equals("-refresh")) { return "\t[-refresh]"; + } else if (cmd.equals("-refreshRouterArgs")) { + return "\t[-refreshRouterArgs [arg1..argn]]"; } return getUsage(null); } @@ -213,6 +224,10 @@ private boolean validateMin(String[] argv) { if (argv.length < 3) { return false; } + } else if ("-refreshRouterArgs".equals(cmd)) { + if (argv.length < 2) { + return false; + } } return true; } @@ -310,6 +325,8 @@ public int run(String[] argv) throws Exception { getDisabledNameservices(); } else if ("-refresh".equals(cmd)) { refresh(address); + } else if ("-refreshRouterArgs".equals(cmd)) { + exitCode = genericRefresh(argv, i); } else { throw new IllegalArgumentException("Unknown Command: " + cmd); } @@ -923,6 +940,61 @@ private void getDisabledNameservices() throws IOException { } } + public int genericRefresh(String[] argv, int i) throws IOException { + String hostport = argv[i++]; + String identifier = argv[i++]; + String[] args = Arrays.copyOfRange(argv, i, argv.length); + + // Get the current configuration + Configuration conf = getConf(); + + // for security authorization + // server principal for this call + // should be NN's one. + conf.set(CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_USER_NAME_KEY, + conf.get(DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, "")); + + // Create the client + Class xface = GenericRefreshProtocolPB.class; + InetSocketAddress address = NetUtils.createSocketAddr(hostport); + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + + RPC.setProtocolEngine(conf, xface, ProtobufRpcEngine.class); + GenericRefreshProtocolPB proxy = (GenericRefreshProtocolPB)RPC.getProxy( + xface, RPC.getProtocolVersion(xface), address, ugi, conf, + NetUtils.getDefaultSocketFactory(conf), 0); + + Collection responses = null; + try (GenericRefreshProtocolClientSideTranslatorPB xlator = + new GenericRefreshProtocolClientSideTranslatorPB(proxy)) { + // Refresh + responses = xlator.refresh(identifier, args); + + int returnCode = 0; + + // Print refresh responses + System.out.println("Refresh Responses:\n"); + for (RefreshResponse response : responses) { + System.out.println(response.toString()); + + if (returnCode == 0 && response.getReturnCode() != 0) { + // This is the first non-zero return code, so we should return this + returnCode = response.getReturnCode(); + } else if (returnCode != 0 && response.getReturnCode() != 0) { + // Then now we have multiple non-zero return codes, + // so we merge them into -1 + returnCode = -1; + } + } + return returnCode; + } finally { + if (responses == null) { + System.out.println("Failed to get response.\n"); + return -1; + } + } + } + /** * Normalize a path for that filesystem. * diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md index 959cd637dd..bcf8fa9c31 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md @@ -274,6 +274,12 @@ For example, one can disable `ns1`, list it and enable it again: This is useful when decommissioning subclusters or when one subcluster is missbehaving (e.g., low performance or unavailability). +### Router server generically refresh + +To trigger a runtime-refresh of the resource specified by \ on \. For example, to enable white list checking, we just need to send a refresh command other than restart the router server. + + [hdfs]$ $HADOOP_HOME/bin/hdfs dfsrouteradmin -refreshRouterArgs [arg1..argn] + Client configuration -------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminGenericRefresh.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminGenericRefresh.java new file mode 100644 index 0000000000..fd68116ad2 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminGenericRefresh.java @@ -0,0 +1,252 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.federation.router; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.hdfs.tools.federation.RouterAdmin; +import org.apache.hadoop.ipc.RefreshHandler; +import org.apache.hadoop.ipc.RefreshRegistry; +import org.apache.hadoop.ipc.RefreshResponse; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.IOException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Before all tests, a router is spun up. + * Before each test, mock refresh handlers are created and registered. + * After each test, the mock handlers are unregistered. + * After all tests, the router is spun down. + */ +public class TestRouterAdminGenericRefresh { + private static Router router; + private static RouterAdmin admin; + + private static RefreshHandler firstHandler; + private static RefreshHandler secondHandler; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + + // Build and start a router with admin + RPC + router = new Router(); + Configuration config = new RouterConfigBuilder() + .admin() + .rpc() + .build(); + router.init(config); + router.start(); + admin = new RouterAdmin(config); + } + + @AfterClass + public static void tearDownBeforeClass() throws IOException { + if (router != null) { + router.stop(); + router.close(); + } + } + + @Before + public void setUp() throws Exception { + // Register Handlers, first one just sends an ok response + firstHandler = Mockito.mock(RefreshHandler.class); + Mockito.when(firstHandler.handleRefresh(Mockito.anyString(), + Mockito.any(String[].class))).thenReturn( + RefreshResponse.successResponse()); + RefreshRegistry.defaultRegistry().register("firstHandler", firstHandler); + + // Second handler has conditional response for testing args + secondHandler = Mockito.mock(RefreshHandler.class); + Mockito.when(secondHandler.handleRefresh( + "secondHandler", new String[]{"one", "two"})).thenReturn( + new RefreshResponse(3, "three")); + Mockito.when(secondHandler.handleRefresh( + "secondHandler", new String[]{"one"})).thenReturn( + new RefreshResponse(2, "two")); + RefreshRegistry.defaultRegistry().register("secondHandler", secondHandler); + } + + @After + public void tearDown() throws Exception { + RefreshRegistry.defaultRegistry().unregisterAll("firstHandler"); + RefreshRegistry.defaultRegistry().unregisterAll("secondHandler"); + } + + @Test + public void testInvalidCommand() throws Exception { + String[] args = new String[]{"-refreshRouterArgs", "nn"}; + int exitCode = admin.run(args); + assertEquals("RouterAdmin should fail due to bad args", -1, exitCode); + } + + @Test + public void testInvalidIdentifier() throws Exception { + String[] argv = new String[]{"-refreshRouterArgs", "localhost:" + + router.getAdminServerAddress().getPort(), "unregisteredIdentity"}; + int exitCode = admin.run(argv); + assertEquals("RouterAdmin should fail due to no handler registered", + -1, exitCode); + } + + @Test + public void testValidIdentifier() throws Exception { + String[] args = new String[]{"-refreshRouterArgs", "localhost:" + + router.getAdminServerAddress().getPort(), "firstHandler"}; + int exitCode = admin.run(args); + assertEquals("RouterAdmin should succeed", 0, exitCode); + + Mockito.verify(firstHandler).handleRefresh("firstHandler", new String[]{}); + // Second handler was never called + Mockito.verify(secondHandler, Mockito.never()) + .handleRefresh(Mockito.anyString(), Mockito.any(String[].class)); + } + + @Test + public void testVariableArgs() throws Exception { + String[] args = new String[]{"-refreshRouterArgs", "localhost:" + + router.getAdminServerAddress().getPort(), "secondHandler", "one"}; + int exitCode = admin.run(args); + assertEquals("RouterAdmin should return 2", 2, exitCode); + + exitCode = admin.run(new String[]{"-refreshRouterArgs", "localhost:" + + router.getAdminServerAddress().getPort(), + "secondHandler", "one", "two"}); + assertEquals("RouterAdmin should now return 3", 3, exitCode); + + Mockito.verify(secondHandler).handleRefresh( + "secondHandler", new String[]{"one"}); + Mockito.verify(secondHandler).handleRefresh( + "secondHandler", new String[]{"one", "two"}); + } + + @Test + public void testUnregistration() throws Exception { + RefreshRegistry.defaultRegistry().unregisterAll("firstHandler"); + + // And now this should fail + String[] args = new String[]{"-refreshRouterArgs", "localhost:" + + router.getAdminServerAddress().getPort(), "firstHandler"}; + int exitCode = admin.run(args); + assertEquals("RouterAdmin should return -1", -1, exitCode); + } + + @Test + public void testUnregistrationReturnValue() { + RefreshHandler mockHandler = Mockito.mock(RefreshHandler.class); + RefreshRegistry.defaultRegistry().register("test", mockHandler); + boolean ret = RefreshRegistry.defaultRegistry(). + unregister("test", mockHandler); + assertTrue(ret); + } + + @Test + public void testMultipleRegistration() throws Exception { + RefreshRegistry.defaultRegistry().register("sharedId", firstHandler); + RefreshRegistry.defaultRegistry().register("sharedId", secondHandler); + + // this should trigger both + String[] args = new String[]{"-refreshRouterArgs", "localhost:" + + router.getAdminServerAddress().getPort(), "sharedId", "one"}; + int exitCode = admin.run(args); + + // -1 because one of the responses is unregistered + assertEquals(-1, exitCode); + + // verify we called both + Mockito.verify(firstHandler).handleRefresh( + "sharedId", new String[]{"one"}); + Mockito.verify(secondHandler).handleRefresh( + "sharedId", new String[]{"one"}); + + RefreshRegistry.defaultRegistry().unregisterAll("sharedId"); + } + + @Test + public void testMultipleReturnCodeMerging() throws Exception { + // Two handlers which return two non-zero values + RefreshHandler handlerOne = Mockito.mock(RefreshHandler.class); + Mockito.when(handlerOne.handleRefresh(Mockito.anyString(), + Mockito.any(String[].class))).thenReturn( + new RefreshResponse(23, "Twenty Three")); + + RefreshHandler handlerTwo = Mockito.mock(RefreshHandler.class); + Mockito.when(handlerTwo.handleRefresh(Mockito.anyString(), + Mockito.any(String[].class))).thenReturn( + new RefreshResponse(10, "Ten")); + + // Then registered to the same ID + RefreshRegistry.defaultRegistry().register("shared", handlerOne); + RefreshRegistry.defaultRegistry().register("shared", handlerTwo); + + // We refresh both + String[] args = new String[]{"-refreshRouterArgs", "localhost:" + + router.getAdminServerAddress().getPort(), "shared"}; + int exitCode = admin.run(args); + + // We get -1 because of our logic for melding non-zero return codes + assertEquals(-1, exitCode); + + // Verify we called both + Mockito.verify(handlerOne).handleRefresh("shared", new String[]{}); + Mockito.verify(handlerTwo).handleRefresh("shared", new String[]{}); + + RefreshRegistry.defaultRegistry().unregisterAll("shared"); + } + + @Test + public void testExceptionResultsInNormalError() throws Exception { + // In this test, we ensure that all handlers are called + // even if we throw an exception in one + RefreshHandler exceptionalHandler = Mockito.mock(RefreshHandler.class); + Mockito.when(exceptionalHandler.handleRefresh(Mockito.anyString(), + Mockito.any(String[].class))).thenThrow( + new RuntimeException("Exceptional Handler Throws Exception")); + + RefreshHandler otherExceptionalHandler = Mockito.mock(RefreshHandler.class); + Mockito.when(otherExceptionalHandler.handleRefresh(Mockito.anyString(), + Mockito.any(String[].class))).thenThrow( + new RuntimeException("More Exceptions")); + + RefreshRegistry.defaultRegistry().register("exceptional", + exceptionalHandler); + RefreshRegistry.defaultRegistry().register("exceptional", + otherExceptionalHandler); + + String[] args = new String[]{"-refreshRouterArgs", "localhost:" + + router.getAdminServerAddress().getPort(), "exceptional"}; + int exitCode = admin.run(args); + assertEquals(-1, exitCode); // Exceptions result in a -1 + + Mockito.verify(exceptionalHandler).handleRefresh( + "exceptional", new String[]{}); + Mockito.verify(otherExceptionalHandler).handleRefresh( + "exceptional", new String[]{}); + + RefreshRegistry.defaultRegistry().unregisterAll("exceptional"); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md index a967ee4342..421e3881db 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md +++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md @@ -438,6 +438,7 @@ Usage: [-nameservice disable | enable ] [-getDisabledNameservices] [-refresh] + [-refreshRouterArgs [arg1..argn]] | COMMAND\_OPTION | Description | |:---- |:---- | @@ -451,6 +452,7 @@ Usage: | `-nameservice` `disable` `enable` *nameservice* | Disable/enable a name service from the federation. If disabled, requests will not go to that name service. | | `-getDisabledNameservices` | Get the name services that are disabled in the federation. | | `-refresh` | Update mount table cache of the connected router. | +| `refreshRouterArgs` \ \ [arg1..argn] | To trigger a runtime-refresh of the resource specified by \ on \. For example, to enable white list checking, we just need to send a refresh command other than restart the router server. | The commands for managing Router-based federation. See [Mount table management](../hadoop-hdfs-rbf/HDFSRouterFederation.html#Mount_table_management) for more info.