From f80fab2b902a1b21f942ab3eed09ed2c80a90f48 Mon Sep 17 00:00:00 2001 From: xuzq <15040255127@163.com> Date: Thu, 28 Jul 2022 15:42:53 +0800 Subject: [PATCH] HDFS-16671. RBF: RouterRpcFairnessPolicyController supports configurable permit acquire timeout (#4597) --- ...ractRouterRpcFairnessPolicyController.java | 26 +++++++++++----- ...aticRouterRpcFairnessPolicyController.java | 31 +++++++------------ .../federation/router/RBFConfigKeys.java | 4 +++ .../src/main/resources/hdfs-rbf-default.xml | 8 +++++ ...TestRouterRpcFairnessPolicyController.java | 24 ++++++++++++++ 5 files changed, 66 insertions(+), 27 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/AbstractRouterRpcFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/AbstractRouterRpcFairnessPolicyController.java index fe498c66b7..db917be712 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/AbstractRouterRpcFairnessPolicyController.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/AbstractRouterRpcFairnessPolicyController.java @@ -29,6 +29,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT_DEFAULT; + /** * Base fairness policy that implements @RouterRpcFairnessPolicyController. * Internally a map of nameservice to Semaphore is used to control permits. @@ -42,15 +45,26 @@ public class AbstractRouterRpcFairnessPolicyController /** Hash table to hold semaphore for each configured name service. */ private Map permits; + private long acquireTimeoutMs = DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT_DEFAULT; + public void init(Configuration conf) { this.permits = new HashMap<>(); + long timeoutMs = conf.getTimeDuration(DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT, + DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS); + if (timeoutMs >= 0) { + acquireTimeoutMs = timeoutMs; + } else { + LOG.warn("Invalid value {} configured for {} should be greater than or equal to 0. " + + "Using default value of : {}ms instead.", timeoutMs, + DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT, DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT_DEFAULT); + } } @Override public boolean acquirePermit(String nsId) { try { LOG.debug("Taking lock for nameservice {}", nsId); - return this.permits.get(nsId).tryAcquire(1, TimeUnit.SECONDS); + return this.permits.get(nsId).tryAcquire(acquireTimeoutMs, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { LOG.debug("Cannot get a permit for nameservice {}", nsId); } @@ -82,15 +96,13 @@ protected int getAvailablePermits(String nsId) { @Override public String getAvailableHandlerOnPerNs() { JSONObject json = new JSONObject(); - for (Map.Entry entry : permits.entrySet()) { + permits.forEach((k, v) -> { try { - String nsId = entry.getKey(); - int availableHandler = entry.getValue().availablePermits(); - json.put(nsId, availableHandler); + json.put(k, v.availablePermits()); } catch (JSONException e) { - LOG.warn("Cannot put {} into JSONObject", entry.getKey(), e); + LOG.warn("Cannot put {} into JSONObject", k, e); } - } + }); return json.toString(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/StaticRouterRpcFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/StaticRouterRpcFairnessPolicyController.java index aa0777fc03..35045bdca8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/StaticRouterRpcFairnessPolicyController.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/StaticRouterRpcFairnessPolicyController.java @@ -50,13 +50,10 @@ public StaticRouterRpcFairnessPolicyController(Configuration conf) { init(conf); } - public void init(Configuration conf) - throws IllegalArgumentException { + public void init(Configuration conf) throws IllegalArgumentException { super.init(conf); // Total handlers configured to process all incoming Rpc. - int handlerCount = conf.getInt( - DFS_ROUTER_HANDLER_COUNT_KEY, - DFS_ROUTER_HANDLER_COUNT_DEFAULT); + int handlerCount = conf.getInt(DFS_ROUTER_HANDLER_COUNT_KEY, DFS_ROUTER_HANDLER_COUNT_DEFAULT); LOG.info("Handlers available for fairness assignment {} ", handlerCount); @@ -71,8 +68,7 @@ public void init(Configuration conf) allConfiguredNS.add(CONCURRENT_NS); validateHandlersCount(conf, handlerCount, allConfiguredNS); for (String nsId : allConfiguredNS) { - int dedicatedHandlers = - conf.getInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + nsId, 0); + int dedicatedHandlers = conf.getInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + nsId, 0); LOG.info("Dedicated handlers {} for ns {} ", dedicatedHandlers, nsId); if (dedicatedHandlers > 0) { handlerCount -= dedicatedHandlers; @@ -86,7 +82,7 @@ public void init(Configuration conf) // Assign remaining handlers equally to remaining name services and // general pool if applicable. if (!unassignedNS.isEmpty()) { - LOG.info("Unassigned ns {}", unassignedNS.toString()); + LOG.info("Unassigned ns {}", unassignedNS); int handlersPerNS = handlerCount / unassignedNS.size(); LOG.info("Handlers available per ns {}", handlersPerNS); for (String nsId : unassignedNS) { @@ -101,24 +97,20 @@ public void init(Configuration conf) int existingPermits = getAvailablePermits(CONCURRENT_NS); if (leftOverHandlers > 0) { LOG.info("Assigned extra {} handlers to commons pool", leftOverHandlers); - insertNameServiceWithPermits(CONCURRENT_NS, - existingPermits + leftOverHandlers); + insertNameServiceWithPermits(CONCURRENT_NS, existingPermits + leftOverHandlers); } - LOG.info("Final permit allocation for concurrent ns: {}", - getAvailablePermits(CONCURRENT_NS)); + LOG.info("Final permit allocation for concurrent ns: {}", getAvailablePermits(CONCURRENT_NS)); } private static void logAssignment(String nsId, int count) { - LOG.info("Assigned {} handlers to nsId {} ", - count, nsId); + LOG.info("Assigned {} handlers to nsId {} ", count, nsId); } - private void validateHandlersCount(Configuration conf, int handlerCount, - Set allConfiguredNS) { + private void validateHandlersCount(Configuration conf, + int handlerCount, Set allConfiguredNS) { int totalDedicatedHandlers = 0; for (String nsId : allConfiguredNS) { - int dedicatedHandlers = - conf.getInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + nsId, 0); + int dedicatedHandlers = conf.getInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + nsId, 0); if (dedicatedHandlers > 0) { // Total handlers should not be less than sum of dedicated handlers. totalDedicatedHandlers += dedicatedHandlers; @@ -128,8 +120,7 @@ private void validateHandlersCount(Configuration conf, int handlerCount, } } if (totalDedicatedHandlers > handlerCount) { - String msg = String.format(ERROR_MSG, handlerCount, - totalDedicatedHandlers); + String msg = String.format(ERROR_MSG, handlerCount, totalDedicatedHandlers); LOG.error(msg); throw new IllegalArgumentException(msg); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java index c0a9e3f294..3b6df41808 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java @@ -354,6 +354,10 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic { NoRouterRpcFairnessPolicyController.class; public static final String DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX = FEDERATION_ROUTER_FAIRNESS_PREFIX + "handler.count."; + public static final String DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT = + FEDERATION_ROUTER_FAIRNESS_PREFIX + "acquire.timeout"; + public static final long DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT_DEFAULT = + TimeUnit.SECONDS.toMillis(1); // HDFS Router Federation Rename. public static final String DFS_ROUTER_FEDERATION_RENAME_PREFIX = diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml index fcf6a28475..51d9b8aabd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml @@ -723,6 +723,14 @@ + + dfs.federation.router.fairness.acquire.timeout + 1s + + The maximum time to wait for a permit. + + + dfs.federation.router.federation.rename.bandwidth 10 diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterRpcFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterRpcFairnessPolicyController.java index 8307f666b5..1f5770b1dd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterRpcFairnessPolicyController.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterRpcFairnessPolicyController.java @@ -23,10 +23,14 @@ import org.apache.hadoop.hdfs.server.federation.router.FederationUtil; import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.Time; import org.junit.Test; import org.slf4j.LoggerFactory; +import java.util.concurrent.TimeUnit; + import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX; @@ -83,6 +87,26 @@ public void testHandlerAllocationPreconfigured() { assertFalse(routerRpcFairnessPolicyController.acquirePermit(CONCURRENT_NS)); } + @Test + public void testAcquireTimeout() { + Configuration conf = createConf(40); + conf.setInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + "ns1", 30); + conf.setTimeDuration(DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT, 100, TimeUnit.MILLISECONDS); + RouterRpcFairnessPolicyController routerRpcFairnessPolicyController = + FederationUtil.newFairnessPolicyController(conf); + + // ns1 should have 30 permits allocated + for (int i = 0; i < 30; i++) { + assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns1")); + } + long acquireBeginTimeMs = Time.monotonicNow(); + assertFalse(routerRpcFairnessPolicyController.acquirePermit("ns1")); + long acquireTimeMs = Time.monotonicNow() - acquireBeginTimeMs; + + // There are some other operations, so acquireTimeMs >= 100ms. + assertTrue(acquireTimeMs >= 100); + } + @Test public void testAllocationErrorWithZeroHandlers() { Configuration conf = createConf(0);