HDFS-15863. RBF: Validation message to be corrected in FairnessPolicyController. Contributed by Renukaprasad C.

This commit is contained in:
He Xiaoqiao 2021-03-31 13:00:13 +08:00
parent 054e1c5e83
commit da4ceba4aa
No known key found for this signature in database
GPG Key ID: A80CC124E9A0FA63
2 changed files with 34 additions and 19 deletions

View File

@ -42,6 +42,10 @@ public class StaticRouterRpcFairnessPolicyController extends
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(StaticRouterRpcFairnessPolicyController.class); LoggerFactory.getLogger(StaticRouterRpcFairnessPolicyController.class);
public static final String ERROR_MSG = "Configured handlers "
+ DFS_ROUTER_HANDLER_COUNT_KEY + '='
+ " %d is less than the minimum required handlers %d";
public StaticRouterRpcFairnessPolicyController(Configuration conf) { public StaticRouterRpcFairnessPolicyController(Configuration conf) {
init(conf); init(conf);
} }
@ -65,15 +69,13 @@ public void init(Configuration conf)
// Insert the concurrent nameservice into the set to process together // Insert the concurrent nameservice into the set to process together
allConfiguredNS.add(CONCURRENT_NS); allConfiguredNS.add(CONCURRENT_NS);
validateHandlersCount(conf, handlerCount, allConfiguredNS);
for (String nsId : allConfiguredNS) { for (String nsId : allConfiguredNS) {
int dedicatedHandlers = int dedicatedHandlers =
conf.getInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + nsId, 0); conf.getInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + nsId, 0);
LOG.info("Dedicated handlers {} for ns {} ", dedicatedHandlers, nsId); LOG.info("Dedicated handlers {} for ns {} ", dedicatedHandlers, nsId);
if (dedicatedHandlers > 0) { if (dedicatedHandlers > 0) {
handlerCount -= dedicatedHandlers; handlerCount -= dedicatedHandlers;
// Total handlers should not be less than sum of dedicated
// handlers.
validateCount(nsId, handlerCount, 0);
insertNameServiceWithPermits(nsId, dedicatedHandlers); insertNameServiceWithPermits(nsId, dedicatedHandlers);
logAssignment(nsId, dedicatedHandlers); logAssignment(nsId, dedicatedHandlers);
} else { } else {
@ -88,8 +90,6 @@ public void init(Configuration conf)
int handlersPerNS = handlerCount / unassignedNS.size(); int handlersPerNS = handlerCount / unassignedNS.size();
LOG.info("Handlers available per ns {}", handlersPerNS); LOG.info("Handlers available per ns {}", handlersPerNS);
for (String nsId : unassignedNS) { for (String nsId : unassignedNS) {
// Each NS should have at least one handler assigned.
validateCount(nsId, handlersPerNS, 1);
insertNameServiceWithPermits(nsId, handlersPerNS); insertNameServiceWithPermits(nsId, handlersPerNS);
logAssignment(nsId, handlersPerNS); logAssignment(nsId, handlersPerNS);
} }
@ -112,15 +112,26 @@ private static void logAssignment(String nsId, int count) {
count, nsId); count, nsId);
} }
private static void validateCount(String nsId, int handlers, int min) throws private void validateHandlersCount(Configuration conf, int handlerCount,
IllegalArgumentException { Set<String> allConfiguredNS) {
if (handlers < min) { int totalDedicatedHandlers = 0;
String msg = for (String nsId : allConfiguredNS) {
"Available handlers " + handlers + int dedicatedHandlers =
" lower than min " + min + conf.getInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + nsId, 0);
" for nsId " + nsId; if (dedicatedHandlers > 0) {
// Total handlers should not be less than sum of dedicated handlers.
totalDedicatedHandlers += dedicatedHandlers;
} else {
// Each NS should have at least one handler assigned.
totalDedicatedHandlers++;
}
}
if (totalDedicatedHandlers > handlerCount) {
String msg = String.format(ERROR_MSG, handlerCount,
totalDedicatedHandlers);
LOG.error(msg); LOG.error(msg);
throw new IllegalArgumentException(msg); throw new IllegalArgumentException(msg);
} }
} }
} }

View File

@ -85,30 +85,31 @@ public void testHandlerAllocationPreconfigured() {
@Test @Test
public void testAllocationErrorWithZeroHandlers() { public void testAllocationErrorWithZeroHandlers() {
Configuration conf = createConf(0); Configuration conf = createConf(0);
verifyInstantiationError(conf); verifyInstantiationError(conf, 0, 3);
} }
@Test @Test
public void testAllocationErrorForLowDefaultHandlers() { public void testAllocationErrorForLowDefaultHandlers() {
Configuration conf = createConf(1); Configuration conf = createConf(1);
verifyInstantiationError(conf); verifyInstantiationError(conf, 1, 3);
} }
@Test @Test
public void testAllocationErrorForLowDefaultHandlersPerNS() { public void testAllocationErrorForLowDefaultHandlersPerNS() {
Configuration conf = createConf(1); Configuration conf = createConf(1);
conf.setInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + "concurrent", 1); conf.setInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + "concurrent", 1);
verifyInstantiationError(conf); verifyInstantiationError(conf, 1, 3);
} }
@Test @Test
public void testAllocationErrorForLowPreconfiguredHandlers() { public void testAllocationErrorForLowPreconfiguredHandlers() {
Configuration conf = createConf(1); Configuration conf = createConf(1);
conf.setInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + "ns1", 2); conf.setInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + "ns1", 2);
verifyInstantiationError(conf); verifyInstantiationError(conf, 1, 4);
} }
private void verifyInstantiationError(Configuration conf) { private void verifyInstantiationError(Configuration conf, int handlerCount,
int totalDedicatedHandlers) {
GenericTestUtils.LogCapturer logs = GenericTestUtils.LogCapturer GenericTestUtils.LogCapturer logs = GenericTestUtils.LogCapturer
.captureLogs(LoggerFactory.getLogger( .captureLogs(LoggerFactory.getLogger(
StaticRouterRpcFairnessPolicyController.class)); StaticRouterRpcFairnessPolicyController.class));
@ -117,8 +118,11 @@ private void verifyInstantiationError(Configuration conf) {
} catch (IllegalArgumentException e) { } catch (IllegalArgumentException e) {
// Ignore the exception as it is expected here. // Ignore the exception as it is expected here.
} }
assertTrue("Should contain error message", String errorMsg = String.format(
logs.getOutput().contains("lower than min")); StaticRouterRpcFairnessPolicyController.ERROR_MSG, handlerCount,
totalDedicatedHandlers);
assertTrue("Should contain error message: " + errorMsg,
logs.getOutput().contains(errorMsg));
} }
private RouterRpcFairnessPolicyController getFairnessPolicyController( private RouterRpcFairnessPolicyController getFairnessPolicyController(