diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/AbstractRouterRpcFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/AbstractRouterRpcFairnessPolicyController.java index 548f1a82f6..fe498c66b7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/AbstractRouterRpcFairnessPolicyController.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/AbstractRouterRpcFairnessPolicyController.java @@ -36,7 +36,7 @@ public class AbstractRouterRpcFairnessPolicyController implements RouterRpcFairnessPolicyController { - private static final Logger LOG = + public static final Logger LOG = LoggerFactory.getLogger(AbstractRouterRpcFairnessPolicyController.class); /** Hash table to hold semaphore for each configured name service. */ @@ -64,6 +64,7 @@ public void releasePermit(String nsId) { @Override public void shutdown() { + LOG.debug("Shutting down router fairness policy controller"); // drain all semaphores for (Semaphore sema: this.permits.values()) { sema.drainPermits(); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RefreshFairnessPolicyControllerHandler.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RefreshFairnessPolicyControllerHandler.java new file mode 100644 index 0000000000..f7bc0e8f5a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RefreshFairnessPolicyControllerHandler.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *
+ * http://www.apache.org/licenses/LICENSE-2.0 + *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.federation.fairness;
+
+import org.apache.hadoop.hdfs.server.federation.router.Router;
+import org.apache.hadoop.ipc.RefreshHandler;
+import org.apache.hadoop.ipc.RefreshResponse;
+
+public class RefreshFairnessPolicyControllerHandler implements RefreshHandler {
+
+ final static public String HANDLER_IDENTIFIER = "RefreshFairnessPolicyController";
+ private final Router router;
+
+ public RefreshFairnessPolicyControllerHandler(Router router) {
+ this.router = router;
+ }
+
+ @Override
+ public RefreshResponse handleRefresh(String identifier, String[] args) {
+ if (HANDLER_IDENTIFIER.equals(identifier)) {
+ return new RefreshResponse(0, router.getRpcServer().refreshFairnessPolicyController());
+ }
+ return new RefreshResponse(-1, "Failed");
+ }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java
index deb933ad5a..127470a126 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java
@@ -20,6 +20,7 @@
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY;
+import static org.apache.hadoop.hdfs.server.federation.fairness.RefreshFairnessPolicyControllerHandler.HANDLER_IDENTIFIER;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -45,6 +46,7 @@
import org.apache.hadoop.hdfs.protocolPB.RouterAdminProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.RouterAdminProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.RouterPolicyProvider;
+import org.apache.hadoop.hdfs.server.federation.fairness.RefreshFairnessPolicyControllerHandler;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
@@ -211,6 +213,8 @@ public RouterAdminServer(Configuration conf, Router router)
genericRefreshService, adminServer);
DFSUtil.addPBProtocol(conf, RefreshCallQueueProtocolPB.class,
refreshCallQueueService, adminServer);
+
+ registerRefreshFairnessPolicyControllerHandler();
}
/**
@@ -784,4 +788,9 @@ public void refreshCallQueue() throws IOException {
Configuration configuration = new Configuration();
router.getRpcServer().getServer().refreshCallQueue(configuration);
}
+
+ private void registerRefreshFairnessPolicyControllerHandler() {
+ RefreshRegistry.defaultRegistry()
+ .register(HANDLER_IDENTIFIER, new RefreshFairnessPolicyControllerHandler(router));
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
index 90d6c347ef..34a2c47c3e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
@@ -132,7 +132,7 @@ public class RouterRpcClient {
Pattern.compile("\\tat (.*)\\.(.*)\\((.*):(\\d*)\\)");
/** Fairness manager to control handlers assigned per NS. */
- private RouterRpcFairnessPolicyController routerRpcFairnessPolicyController;
+ private volatile RouterRpcFairnessPolicyController routerRpcFairnessPolicyController;
private Map
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.federation.fairness;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.event.Level;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
+import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
+import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
+import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod;
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcClient;
+import org.apache.hadoop.test.GenericTestUtils;
+
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX;
+import static org.junit.Assert.assertEquals;
+
+public class TestRouterRefreshFairnessPolicyController {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestRouterRefreshFairnessPolicyController.class);
+ private final GenericTestUtils.LogCapturer controllerLog =
+ GenericTestUtils.LogCapturer.captureLogs(AbstractRouterRpcFairnessPolicyController.LOG);
+
+ private StateStoreDFSCluster cluster;
+
+ @BeforeClass
+ public static void setLogLevel() {
+ GenericTestUtils.setLogLevel(AbstractRouterRpcFairnessPolicyController.LOG, Level.DEBUG);
+ }
+
+ @After
+ public void cleanup() {
+ if (cluster != null) {
+ cluster.shutdown();
+ cluster = null;
+ }
+ }
+
+ @Before
+ public void setupCluster() throws Exception {
+ cluster = new StateStoreDFSCluster(false, 2);
+ Configuration conf = new RouterConfigBuilder().stateStore().rpc().build();
+
+ // Handlers concurrent:ns0 = 3:3
+ conf.setClass(RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS,
+ StaticRouterRpcFairnessPolicyController.class, RouterRpcFairnessPolicyController.class);
+ conf.setInt(RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY, 9);
+ // Allow metrics
+ conf.setBoolean(RBFConfigKeys.DFS_ROUTER_METRICS_ENABLE, true);
+
+ // Datanodes not needed for this test.
+ cluster.setNumDatanodesPerNameservice(0);
+
+ cluster.addRouterOverrides(conf);
+ cluster.startCluster();
+ cluster.startRouters();
+ cluster.waitClusterUp();
+ }
+
+ @Test
+ public void testRefreshNonexistentHandlerClass() {
+ MiniRouterDFSCluster.RouterContext routerContext = cluster.getRandomRouter();
+ routerContext.getConf().set(RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS,
+ "org.apache.hadoop.hdfs.server.federation.fairness.ThisControllerDoesNotExist");
+ assertEquals(StaticRouterRpcFairnessPolicyController.class.getCanonicalName(),
+ routerContext.getRouterRpcClient()
+ .refreshFairnessPolicyController(routerContext.getConf()));
+ }
+
+ @Test
+ public void testRefreshClassDoesNotImplementControllerInterface() {
+ MiniRouterDFSCluster.RouterContext routerContext = cluster.getRandomRouter();
+ routerContext.getConf()
+ .set(RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS, "java.lang.String");
+ assertEquals(StaticRouterRpcFairnessPolicyController.class.getCanonicalName(),
+ routerContext.getRouterRpcClient()
+ .refreshFairnessPolicyController(routerContext.getConf()));
+ }
+
+ @Test
+ public void testRefreshSuccessful() {
+ MiniRouterDFSCluster.RouterContext routerContext = cluster.getRandomRouter();
+
+ routerContext.getConf().set(RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS,
+ StaticRouterRpcFairnessPolicyController.class.getCanonicalName());
+ assertEquals(StaticRouterRpcFairnessPolicyController.class.getCanonicalName(),
+ routerContext.getRouterRpcClient()
+ .refreshFairnessPolicyController(routerContext.getConf()));
+
+ routerContext.getConf().set(RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS,
+ NoRouterRpcFairnessPolicyController.class.getCanonicalName());
+ assertEquals(NoRouterRpcFairnessPolicyController.class.getCanonicalName(),
+ routerContext.getRouterRpcClient()
+ .refreshFairnessPolicyController(routerContext.getConf()));
+ }
+
+ @Test
+ public void testConcurrentRefreshRequests() throws InterruptedException {
+ MiniRouterDFSCluster.RouterContext routerContext = cluster.getRandomRouter();
+ RouterRpcClient client = Mockito.spy(routerContext.getRouterRpcClient());
+ controllerLog.clearOutput();
+
+ // Spawn 100 concurrent refresh requests
+ Thread[] threads = new Thread[100];
+ for (int i = 0; i < 100; i++) {
+ threads[i] = new Thread(() -> {
+ client.refreshFairnessPolicyController(routerContext.getConf());
+ });
+ }
+
+ for (Thread thread : threads) {
+ thread.start();
+ }
+
+ for (Thread thread : threads) {
+ thread.join();
+ }
+
+ // There should be 100 controller shutdowns. All controllers created should be shut down.
+ assertEquals(100, StringUtils.countMatches(controllerLog.getOutput(),
+ "Shutting down router fairness policy controller"));
+ controllerLog.clearOutput();
+ }
+
+ @Test
+ public void testRefreshStaticChangeHandlers() throws Exception {
+ // Setup and mock
+ MiniRouterDFSCluster.RouterContext routerContext = cluster.getRandomRouter();
+ RouterRpcClient client = Mockito.spy(routerContext.getRouterRpcClient());
+ final long sleepTime = 3000;
+ Mockito.doAnswer(invocationOnMock -> {
+ Thread.sleep(sleepTime);
+ return null;
+ }).when(client)
+ .invokeMethod(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
+
+ // No calls yet
+ assertEquals("{}",
+ routerContext.getRouterRpcServer().getRPCMetrics().getProxyOpPermitAcceptedPerNs());
+ List