From df44178eb6d052b868cfb35adf40316b648f5a6c Mon Sep 17 00:00:00 2001 From: Viraj Jasani Date: Wed, 14 Jul 2021 17:28:32 +0530 Subject: [PATCH] HADOOP-17795. Provide fallbacks for callqueue.impl and scheduler.impl (#3192) Reviewed-by: Wei-Chiu Chuang Signed-off-by: Takanobu Asanuma --- .../java/org/apache/hadoop/ipc/Server.java | 83 ++++++++++++++++++- .../src/main/resources/core-default.xml | 33 ++++++++ .../conf/TestCommonConfigurationFields.java | 2 + .../hadoop/ipc/TestCallQueueManager.java | 7 +- 4 files changed, 118 insertions(+), 7 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 77d580e227..243b3c776f 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -714,6 +714,7 @@ private String getQueueClassPrefix() { return CommonConfigurationKeys.IPC_NAMESPACE + "." + port; } + @Deprecated static Class> getQueueClass( String prefix, Configuration conf) { String name = prefix + "." + CommonConfigurationKeys.IPC_CALLQUEUE_IMPL_KEY; @@ -721,6 +722,32 @@ static Class> getQueueClass( return CallQueueManager.convertQueueClass(queueClass, Call.class); } + /** + * Return class configured by property 'ipc..callqueue.impl' if it is + * present. If the config is not present, default config (without port) is + * used to derive class i.e 'ipc.callqueue.impl', and derived class is + * returned if class value is present and valid. If default config is also + * not present, default class {@link LinkedBlockingQueue} is returned. + * + * @param namespace Namespace "ipc". + * @param port Server's listener port. + * @param conf Configuration properties. + * @return Class returned based on configuration. + */ + static Class> getQueueClass( + String namespace, int port, Configuration conf) { + String nameWithPort = namespace + "." + port + "." + + CommonConfigurationKeys.IPC_CALLQUEUE_IMPL_KEY; + String nameWithoutPort = namespace + "." + + CommonConfigurationKeys.IPC_CALLQUEUE_IMPL_KEY; + Class queueClass = conf.getClass(nameWithPort, null); + if(queueClass == null) { + queueClass = conf.getClass(nameWithoutPort, LinkedBlockingQueue.class); + } + return CallQueueManager.convertQueueClass(queueClass, Call.class); + } + + @Deprecated static Class getSchedulerClass( String prefix, Configuration conf) { String schedulerKeyname = prefix + "." + CommonConfigurationKeys @@ -746,6 +773,51 @@ static Class getSchedulerClass( return CallQueueManager.convertSchedulerClass(schedulerClass); } + /** + * Return class configured by property 'ipc..scheduler.impl' if it is + * present. If the config is not present, and if property + * 'ipc..callqueue.impl' represents FairCallQueue class, + * return DecayRpcScheduler. If config 'ipc..callqueue.impl' + * does not have value FairCallQueue, default config (without port) is used + * to derive class i.e 'ipc.scheduler.impl'. If default config is also not + * present, default class {@link DefaultRpcScheduler} is returned. + * + * @param namespace Namespace "ipc". + * @param port Server's listener port. + * @param conf Configuration properties. + * @return Class returned based on configuration. + */ + static Class getSchedulerClass( + String namespace, int port, Configuration conf) { + String schedulerKeyNameWithPort = namespace + "." + port + "." + + CommonConfigurationKeys.IPC_SCHEDULER_IMPL_KEY; + String schedulerKeyNameWithoutPort = namespace + "." + + CommonConfigurationKeys.IPC_SCHEDULER_IMPL_KEY; + + Class schedulerClass = conf.getClass(schedulerKeyNameWithPort, null); + // Patch the configuration for legacy fcq configuration that does not have + // a separate scheduler setting + if (schedulerClass == null) { + String queueKeyNameWithPort = namespace + "." + port + "." + + CommonConfigurationKeys.IPC_CALLQUEUE_IMPL_KEY; + Class queueClass = conf.getClass(queueKeyNameWithPort, null); + if (queueClass != null) { + if (queueClass.getCanonicalName().equals( + FairCallQueue.class.getCanonicalName())) { + conf.setClass(schedulerKeyNameWithPort, DecayRpcScheduler.class, + RpcScheduler.class); + } + } + } + + schedulerClass = conf.getClass(schedulerKeyNameWithPort, null); + if (schedulerClass == null) { + schedulerClass = conf.getClass(schedulerKeyNameWithoutPort, + DefaultRpcScheduler.class); + } + return CallQueueManager.convertSchedulerClass(schedulerClass); + } + /* * Refresh the call queue */ @@ -755,8 +827,10 @@ public synchronized void refreshCallQueue(Configuration conf) { this.maxQueueSize = handlerCount * conf.getInt( CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_KEY, CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT); - callQueue.swapQueue(getSchedulerClass(prefix, conf), - getQueueClass(prefix, conf), maxQueueSize, prefix, conf); + callQueue.swapQueue( + getSchedulerClass(CommonConfigurationKeys.IPC_NAMESPACE, port, conf), + getQueueClass(CommonConfigurationKeys.IPC_NAMESPACE, port, conf), + maxQueueSize, prefix, conf); callQueue.setClientBackoffEnabled(getClientBackoffEnable(prefix, conf)); } @@ -3107,8 +3181,9 @@ protected Server(String bindAddress, int port, // Setup appropriate callqueue final String prefix = getQueueClassPrefix(); - this.callQueue = new CallQueueManager(getQueueClass(prefix, conf), - getSchedulerClass(prefix, conf), + this.callQueue = new CallQueueManager<>( + getQueueClass(CommonConfigurationKeys.IPC_NAMESPACE, port, conf), + getSchedulerClass(CommonConfigurationKeys.IPC_NAMESPACE, port, conf), getClientBackoffEnable(prefix, conf), maxQueueSize, prefix, conf); this.secretManager = (SecretManager) secretManager; diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml index 048f2b1815..4fa8429cb1 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml +++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml @@ -2514,6 +2514,21 @@ + + ipc.callqueue.impl + java.util.concurrent.LinkedBlockingQueue + + The fully qualified name of a class to use as the implementation + of a call queue. The default implementation is + java.util.concurrent.LinkedBlockingQueue (FIFO queue). + Use org.apache.hadoop.ipc.FairCallQueue for the Fair Call Queue. + This config is fallback config for ipc.[port_number].callqueue.impl. + If call queue is not defined at port level, this default + config is used and hence, this is fallback config to + config with port. + + + ipc.[port_number].scheduler.impl org.apache.hadoop.ipc.DefaultRpcScheduler @@ -2527,6 +2542,24 @@ + + ipc.scheduler.impl + org.apache.hadoop.ipc.DefaultRpcScheduler + + The fully qualified name of a class to use as the + implementation of the scheduler. The default implementation is + org.apache.hadoop.ipc.DefaultRpcScheduler (no-op scheduler) when + not using FairCallQueue. If using FairCallQueue, defaults to + org.apache.hadoop.ipc.DecayRpcScheduler. Use + org.apache.hadoop.ipc.DecayRpcScheduler in conjunction + with the Fair Call Queue. + This config is fallback config for ipc.[port_number].scheduler.impl. + If scheduler queue is not defined at port level, this default + config is used and hence, this is fallback config to + config with port. + + + ipc.[port_number].scheduler.priority.levels 4 diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java index 5678ec7e41..448c47c959 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java @@ -157,7 +157,9 @@ public void initializeMemberVariables() { // FairCallQueue configs that includes dynamic ports in its keys xmlPropsToSkipCompare.add("ipc.[port_number].backoff.enable"); xmlPropsToSkipCompare.add("ipc.[port_number].callqueue.impl"); + xmlPropsToSkipCompare.add("ipc.callqueue.impl"); xmlPropsToSkipCompare.add("ipc.[port_number].scheduler.impl"); + xmlPropsToSkipCompare.add("ipc.scheduler.impl"); xmlPropsToSkipCompare.add("ipc.[port_number].scheduler.priority.levels"); xmlPropsToSkipCompare.add( "ipc.[port_number].faircallqueue.multiplexer.weights"); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java index 38b3fe5681..4a60520a36 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java @@ -218,7 +218,8 @@ public void testFcqBackwardCompatibility() throws InterruptedException { // Specify only Fair Call Queue without a scheduler // Ensure the DecayScheduler will be added to avoid breaking. - Class scheduler = Server.getSchedulerClass(ns, + Class scheduler = + Server.getSchedulerClass(CommonConfigurationKeys.IPC_NAMESPACE, 0, conf); assertTrue(scheduler.getCanonicalName(). equals("org.apache.hadoop.ipc.DecayRpcScheduler")); @@ -250,8 +251,8 @@ public void testSchedulerWithoutFCQ() throws InterruptedException { "LinkedBlockingQueue")); manager = new CallQueueManager(queue, - Server.getSchedulerClass(ns, conf), false, - 3, "", conf); + Server.getSchedulerClass(CommonConfigurationKeys.IPC_NAMESPACE, 0, + conf), false, 3, "", conf); // LinkedBlockingQueue with a capacity of 3 can put 3 calls assertCanPut(manager, 3, 3);