HDFS-16671. RBF: RouterRpcFairnessPolicyController supports configurable permit acquire timeout (#4597)
This commit is contained in:
parent
a5adc27c99
commit
f80fab2b90
@ -29,6 +29,9 @@
|
|||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT;
|
||||||
|
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT_DEFAULT;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Base fairness policy that implements @RouterRpcFairnessPolicyController.
|
* Base fairness policy that implements @RouterRpcFairnessPolicyController.
|
||||||
* Internally a map of nameservice to Semaphore is used to control permits.
|
* Internally a map of nameservice to Semaphore is used to control permits.
|
||||||
@ -42,15 +45,26 @@ public class AbstractRouterRpcFairnessPolicyController
|
|||||||
/** Hash table to hold semaphore for each configured name service. */
|
/** Hash table to hold semaphore for each configured name service. */
|
||||||
private Map<String, Semaphore> permits;
|
private Map<String, Semaphore> permits;
|
||||||
|
|
||||||
|
private long acquireTimeoutMs = DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT_DEFAULT;
|
||||||
|
|
||||||
public void init(Configuration conf) {
|
public void init(Configuration conf) {
|
||||||
this.permits = new HashMap<>();
|
this.permits = new HashMap<>();
|
||||||
|
long timeoutMs = conf.getTimeDuration(DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT,
|
||||||
|
DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS);
|
||||||
|
if (timeoutMs >= 0) {
|
||||||
|
acquireTimeoutMs = timeoutMs;
|
||||||
|
} else {
|
||||||
|
LOG.warn("Invalid value {} configured for {} should be greater than or equal to 0. " +
|
||||||
|
"Using default value of : {}ms instead.", timeoutMs,
|
||||||
|
DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT, DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT_DEFAULT);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean acquirePermit(String nsId) {
|
public boolean acquirePermit(String nsId) {
|
||||||
try {
|
try {
|
||||||
LOG.debug("Taking lock for nameservice {}", nsId);
|
LOG.debug("Taking lock for nameservice {}", nsId);
|
||||||
return this.permits.get(nsId).tryAcquire(1, TimeUnit.SECONDS);
|
return this.permits.get(nsId).tryAcquire(acquireTimeoutMs, TimeUnit.MILLISECONDS);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
LOG.debug("Cannot get a permit for nameservice {}", nsId);
|
LOG.debug("Cannot get a permit for nameservice {}", nsId);
|
||||||
}
|
}
|
||||||
@ -82,15 +96,13 @@ protected int getAvailablePermits(String nsId) {
|
|||||||
@Override
|
@Override
|
||||||
public String getAvailableHandlerOnPerNs() {
|
public String getAvailableHandlerOnPerNs() {
|
||||||
JSONObject json = new JSONObject();
|
JSONObject json = new JSONObject();
|
||||||
for (Map.Entry<String, Semaphore> entry : permits.entrySet()) {
|
permits.forEach((k, v) -> {
|
||||||
try {
|
try {
|
||||||
String nsId = entry.getKey();
|
json.put(k, v.availablePermits());
|
||||||
int availableHandler = entry.getValue().availablePermits();
|
|
||||||
json.put(nsId, availableHandler);
|
|
||||||
} catch (JSONException e) {
|
} catch (JSONException e) {
|
||||||
LOG.warn("Cannot put {} into JSONObject", entry.getKey(), e);
|
LOG.warn("Cannot put {} into JSONObject", k, e);
|
||||||
}
|
}
|
||||||
}
|
});
|
||||||
return json.toString();
|
return json.toString();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -50,13 +50,10 @@ public StaticRouterRpcFairnessPolicyController(Configuration conf) {
|
|||||||
init(conf);
|
init(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void init(Configuration conf)
|
public void init(Configuration conf) throws IllegalArgumentException {
|
||||||
throws IllegalArgumentException {
|
|
||||||
super.init(conf);
|
super.init(conf);
|
||||||
// Total handlers configured to process all incoming Rpc.
|
// Total handlers configured to process all incoming Rpc.
|
||||||
int handlerCount = conf.getInt(
|
int handlerCount = conf.getInt(DFS_ROUTER_HANDLER_COUNT_KEY, DFS_ROUTER_HANDLER_COUNT_DEFAULT);
|
||||||
DFS_ROUTER_HANDLER_COUNT_KEY,
|
|
||||||
DFS_ROUTER_HANDLER_COUNT_DEFAULT);
|
|
||||||
|
|
||||||
LOG.info("Handlers available for fairness assignment {} ", handlerCount);
|
LOG.info("Handlers available for fairness assignment {} ", handlerCount);
|
||||||
|
|
||||||
@ -71,8 +68,7 @@ public void init(Configuration conf)
|
|||||||
allConfiguredNS.add(CONCURRENT_NS);
|
allConfiguredNS.add(CONCURRENT_NS);
|
||||||
validateHandlersCount(conf, handlerCount, allConfiguredNS);
|
validateHandlersCount(conf, handlerCount, allConfiguredNS);
|
||||||
for (String nsId : allConfiguredNS) {
|
for (String nsId : allConfiguredNS) {
|
||||||
int dedicatedHandlers =
|
int dedicatedHandlers = conf.getInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + nsId, 0);
|
||||||
conf.getInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + nsId, 0);
|
|
||||||
LOG.info("Dedicated handlers {} for ns {} ", dedicatedHandlers, nsId);
|
LOG.info("Dedicated handlers {} for ns {} ", dedicatedHandlers, nsId);
|
||||||
if (dedicatedHandlers > 0) {
|
if (dedicatedHandlers > 0) {
|
||||||
handlerCount -= dedicatedHandlers;
|
handlerCount -= dedicatedHandlers;
|
||||||
@ -86,7 +82,7 @@ public void init(Configuration conf)
|
|||||||
// Assign remaining handlers equally to remaining name services and
|
// Assign remaining handlers equally to remaining name services and
|
||||||
// general pool if applicable.
|
// general pool if applicable.
|
||||||
if (!unassignedNS.isEmpty()) {
|
if (!unassignedNS.isEmpty()) {
|
||||||
LOG.info("Unassigned ns {}", unassignedNS.toString());
|
LOG.info("Unassigned ns {}", unassignedNS);
|
||||||
int handlersPerNS = handlerCount / unassignedNS.size();
|
int handlersPerNS = handlerCount / unassignedNS.size();
|
||||||
LOG.info("Handlers available per ns {}", handlersPerNS);
|
LOG.info("Handlers available per ns {}", handlersPerNS);
|
||||||
for (String nsId : unassignedNS) {
|
for (String nsId : unassignedNS) {
|
||||||
@ -101,24 +97,20 @@ public void init(Configuration conf)
|
|||||||
int existingPermits = getAvailablePermits(CONCURRENT_NS);
|
int existingPermits = getAvailablePermits(CONCURRENT_NS);
|
||||||
if (leftOverHandlers > 0) {
|
if (leftOverHandlers > 0) {
|
||||||
LOG.info("Assigned extra {} handlers to commons pool", leftOverHandlers);
|
LOG.info("Assigned extra {} handlers to commons pool", leftOverHandlers);
|
||||||
insertNameServiceWithPermits(CONCURRENT_NS,
|
insertNameServiceWithPermits(CONCURRENT_NS, existingPermits + leftOverHandlers);
|
||||||
existingPermits + leftOverHandlers);
|
|
||||||
}
|
}
|
||||||
LOG.info("Final permit allocation for concurrent ns: {}",
|
LOG.info("Final permit allocation for concurrent ns: {}", getAvailablePermits(CONCURRENT_NS));
|
||||||
getAvailablePermits(CONCURRENT_NS));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void logAssignment(String nsId, int count) {
|
private static void logAssignment(String nsId, int count) {
|
||||||
LOG.info("Assigned {} handlers to nsId {} ",
|
LOG.info("Assigned {} handlers to nsId {} ", count, nsId);
|
||||||
count, nsId);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void validateHandlersCount(Configuration conf, int handlerCount,
|
private void validateHandlersCount(Configuration conf,
|
||||||
Set<String> allConfiguredNS) {
|
int handlerCount, Set<String> allConfiguredNS) {
|
||||||
int totalDedicatedHandlers = 0;
|
int totalDedicatedHandlers = 0;
|
||||||
for (String nsId : allConfiguredNS) {
|
for (String nsId : allConfiguredNS) {
|
||||||
int dedicatedHandlers =
|
int dedicatedHandlers = conf.getInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + nsId, 0);
|
||||||
conf.getInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + nsId, 0);
|
|
||||||
if (dedicatedHandlers > 0) {
|
if (dedicatedHandlers > 0) {
|
||||||
// Total handlers should not be less than sum of dedicated handlers.
|
// Total handlers should not be less than sum of dedicated handlers.
|
||||||
totalDedicatedHandlers += dedicatedHandlers;
|
totalDedicatedHandlers += dedicatedHandlers;
|
||||||
@ -128,8 +120,7 @@ private void validateHandlersCount(Configuration conf, int handlerCount,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (totalDedicatedHandlers > handlerCount) {
|
if (totalDedicatedHandlers > handlerCount) {
|
||||||
String msg = String.format(ERROR_MSG, handlerCount,
|
String msg = String.format(ERROR_MSG, handlerCount, totalDedicatedHandlers);
|
||||||
totalDedicatedHandlers);
|
|
||||||
LOG.error(msg);
|
LOG.error(msg);
|
||||||
throw new IllegalArgumentException(msg);
|
throw new IllegalArgumentException(msg);
|
||||||
}
|
}
|
||||||
|
@ -354,6 +354,10 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
|
|||||||
NoRouterRpcFairnessPolicyController.class;
|
NoRouterRpcFairnessPolicyController.class;
|
||||||
public static final String DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX =
|
public static final String DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX =
|
||||||
FEDERATION_ROUTER_FAIRNESS_PREFIX + "handler.count.";
|
FEDERATION_ROUTER_FAIRNESS_PREFIX + "handler.count.";
|
||||||
|
public static final String DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT =
|
||||||
|
FEDERATION_ROUTER_FAIRNESS_PREFIX + "acquire.timeout";
|
||||||
|
public static final long DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT_DEFAULT =
|
||||||
|
TimeUnit.SECONDS.toMillis(1);
|
||||||
|
|
||||||
// HDFS Router Federation Rename.
|
// HDFS Router Federation Rename.
|
||||||
public static final String DFS_ROUTER_FEDERATION_RENAME_PREFIX =
|
public static final String DFS_ROUTER_FEDERATION_RENAME_PREFIX =
|
||||||
|
@ -723,6 +723,14 @@
|
|||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>dfs.federation.router.fairness.acquire.timeout</name>
|
||||||
|
<value>1s</value>
|
||||||
|
<description>
|
||||||
|
The maximum time to wait for a permit.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>dfs.federation.router.federation.rename.bandwidth</name>
|
<name>dfs.federation.router.federation.rename.bandwidth</name>
|
||||||
<value>10</value>
|
<value>10</value>
|
||||||
|
@ -23,10 +23,14 @@
|
|||||||
import org.apache.hadoop.hdfs.server.federation.router.FederationUtil;
|
import org.apache.hadoop.hdfs.server.federation.router.FederationUtil;
|
||||||
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
|
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
import org.apache.hadoop.util.Time;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS;
|
import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS;
|
||||||
|
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT;
|
||||||
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY;
|
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY;
|
||||||
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE;
|
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE;
|
||||||
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX;
|
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX;
|
||||||
@ -83,6 +87,26 @@ public void testHandlerAllocationPreconfigured() {
|
|||||||
assertFalse(routerRpcFairnessPolicyController.acquirePermit(CONCURRENT_NS));
|
assertFalse(routerRpcFairnessPolicyController.acquirePermit(CONCURRENT_NS));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAcquireTimeout() {
|
||||||
|
Configuration conf = createConf(40);
|
||||||
|
conf.setInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + "ns1", 30);
|
||||||
|
conf.setTimeDuration(DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT, 100, TimeUnit.MILLISECONDS);
|
||||||
|
RouterRpcFairnessPolicyController routerRpcFairnessPolicyController =
|
||||||
|
FederationUtil.newFairnessPolicyController(conf);
|
||||||
|
|
||||||
|
// ns1 should have 30 permits allocated
|
||||||
|
for (int i = 0; i < 30; i++) {
|
||||||
|
assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns1"));
|
||||||
|
}
|
||||||
|
long acquireBeginTimeMs = Time.monotonicNow();
|
||||||
|
assertFalse(routerRpcFairnessPolicyController.acquirePermit("ns1"));
|
||||||
|
long acquireTimeMs = Time.monotonicNow() - acquireBeginTimeMs;
|
||||||
|
|
||||||
|
// There are some other operations, so acquireTimeMs >= 100ms.
|
||||||
|
assertTrue(acquireTimeMs >= 100);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testAllocationErrorWithZeroHandlers() {
|
public void testAllocationErrorWithZeroHandlers() {
|
||||||
Configuration conf = createConf(0);
|
Configuration conf = createConf(0);
|
||||||
|
Loading…
Reference in New Issue
Block a user