HADOOP-17795. Provide fallbacks for callqueue.impl and scheduler.impl (#3192)

Reviewed-by: Wei-Chiu Chuang <weichiu@apache.org>
Signed-off-by: Takanobu Asanuma <tasanuma@apache.org>
This commit is contained in:
Viraj Jasani 2021-07-14 17:28:32 +05:30 committed by GitHub
parent 87e0000137
commit df44178eb6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 118 additions and 7 deletions

View File

@ -714,6 +714,7 @@ private String getQueueClassPrefix() {
return CommonConfigurationKeys.IPC_NAMESPACE + "." + port; return CommonConfigurationKeys.IPC_NAMESPACE + "." + port;
} }
@Deprecated
static Class<? extends BlockingQueue<Call>> getQueueClass( static Class<? extends BlockingQueue<Call>> getQueueClass(
String prefix, Configuration conf) { String prefix, Configuration conf) {
String name = prefix + "." + CommonConfigurationKeys.IPC_CALLQUEUE_IMPL_KEY; String name = prefix + "." + CommonConfigurationKeys.IPC_CALLQUEUE_IMPL_KEY;
@ -721,6 +722,32 @@ static Class<? extends BlockingQueue<Call>> getQueueClass(
return CallQueueManager.convertQueueClass(queueClass, Call.class); return CallQueueManager.convertQueueClass(queueClass, Call.class);
} }
/**
* Return class configured by property 'ipc.<port>.callqueue.impl' if it is
* present. If the config is not present, default config (without port) is
* used to derive class i.e 'ipc.callqueue.impl', and derived class is
* returned if class value is present and valid. If default config is also
* not present, default class {@link LinkedBlockingQueue} is returned.
*
* @param namespace Namespace "ipc".
* @param port Server's listener port.
* @param conf Configuration properties.
* @return Class returned based on configuration.
*/
static Class<? extends BlockingQueue<Call>> getQueueClass(
String namespace, int port, Configuration conf) {
String nameWithPort = namespace + "." + port + "."
+ CommonConfigurationKeys.IPC_CALLQUEUE_IMPL_KEY;
String nameWithoutPort = namespace + "."
+ CommonConfigurationKeys.IPC_CALLQUEUE_IMPL_KEY;
Class<?> queueClass = conf.getClass(nameWithPort, null);
if(queueClass == null) {
queueClass = conf.getClass(nameWithoutPort, LinkedBlockingQueue.class);
}
return CallQueueManager.convertQueueClass(queueClass, Call.class);
}
@Deprecated
static Class<? extends RpcScheduler> getSchedulerClass( static Class<? extends RpcScheduler> getSchedulerClass(
String prefix, Configuration conf) { String prefix, Configuration conf) {
String schedulerKeyname = prefix + "." + CommonConfigurationKeys String schedulerKeyname = prefix + "." + CommonConfigurationKeys
@ -746,6 +773,51 @@ static Class<? extends RpcScheduler> getSchedulerClass(
return CallQueueManager.convertSchedulerClass(schedulerClass); return CallQueueManager.convertSchedulerClass(schedulerClass);
} }
/**
* Return class configured by property 'ipc.<port>.scheduler.impl' if it is
* present. If the config is not present, and if property
* 'ipc.<port>.callqueue.impl' represents FairCallQueue class,
* return DecayRpcScheduler. If config 'ipc.<port>.callqueue.impl'
* does not have value FairCallQueue, default config (without port) is used
* to derive class i.e 'ipc.scheduler.impl'. If default config is also not
* present, default class {@link DefaultRpcScheduler} is returned.
*
* @param namespace Namespace "ipc".
* @param port Server's listener port.
* @param conf Configuration properties.
* @return Class returned based on configuration.
*/
static Class<? extends RpcScheduler> getSchedulerClass(
String namespace, int port, Configuration conf) {
String schedulerKeyNameWithPort = namespace + "." + port + "."
+ CommonConfigurationKeys.IPC_SCHEDULER_IMPL_KEY;
String schedulerKeyNameWithoutPort = namespace + "."
+ CommonConfigurationKeys.IPC_SCHEDULER_IMPL_KEY;
Class<?> schedulerClass = conf.getClass(schedulerKeyNameWithPort, null);
// Patch the configuration for legacy fcq configuration that does not have
// a separate scheduler setting
if (schedulerClass == null) {
String queueKeyNameWithPort = namespace + "." + port + "."
+ CommonConfigurationKeys.IPC_CALLQUEUE_IMPL_KEY;
Class<?> queueClass = conf.getClass(queueKeyNameWithPort, null);
if (queueClass != null) {
if (queueClass.getCanonicalName().equals(
FairCallQueue.class.getCanonicalName())) {
conf.setClass(schedulerKeyNameWithPort, DecayRpcScheduler.class,
RpcScheduler.class);
}
}
}
schedulerClass = conf.getClass(schedulerKeyNameWithPort, null);
if (schedulerClass == null) {
schedulerClass = conf.getClass(schedulerKeyNameWithoutPort,
DefaultRpcScheduler.class);
}
return CallQueueManager.convertSchedulerClass(schedulerClass);
}
/* /*
* Refresh the call queue * Refresh the call queue
*/ */
@ -755,8 +827,10 @@ public synchronized void refreshCallQueue(Configuration conf) {
this.maxQueueSize = handlerCount * conf.getInt( this.maxQueueSize = handlerCount * conf.getInt(
CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_KEY, CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_KEY,
CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT); CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT);
callQueue.swapQueue(getSchedulerClass(prefix, conf), callQueue.swapQueue(
getQueueClass(prefix, conf), maxQueueSize, prefix, conf); getSchedulerClass(CommonConfigurationKeys.IPC_NAMESPACE, port, conf),
getQueueClass(CommonConfigurationKeys.IPC_NAMESPACE, port, conf),
maxQueueSize, prefix, conf);
callQueue.setClientBackoffEnabled(getClientBackoffEnable(prefix, conf)); callQueue.setClientBackoffEnabled(getClientBackoffEnable(prefix, conf));
} }
@ -3107,8 +3181,9 @@ protected Server(String bindAddress, int port,
// Setup appropriate callqueue // Setup appropriate callqueue
final String prefix = getQueueClassPrefix(); final String prefix = getQueueClassPrefix();
this.callQueue = new CallQueueManager<Call>(getQueueClass(prefix, conf), this.callQueue = new CallQueueManager<>(
getSchedulerClass(prefix, conf), getQueueClass(CommonConfigurationKeys.IPC_NAMESPACE, port, conf),
getSchedulerClass(CommonConfigurationKeys.IPC_NAMESPACE, port, conf),
getClientBackoffEnable(prefix, conf), maxQueueSize, prefix, conf); getClientBackoffEnable(prefix, conf), maxQueueSize, prefix, conf);
this.secretManager = (SecretManager<TokenIdentifier>) secretManager; this.secretManager = (SecretManager<TokenIdentifier>) secretManager;

View File

@ -2514,6 +2514,21 @@
</description> </description>
</property> </property>
<property>
<name>ipc.callqueue.impl</name>
<value>java.util.concurrent.LinkedBlockingQueue</value>
<description>
The fully qualified name of a class to use as the implementation
of a call queue. The default implementation is
java.util.concurrent.LinkedBlockingQueue (FIFO queue).
Use org.apache.hadoop.ipc.FairCallQueue for the Fair Call Queue.
This config is fallback config for ipc.[port_number].callqueue.impl.
If call queue is not defined at port level, this default
config is used and hence, this is fallback config to
config with port.
</description>
</property>
<property> <property>
<name>ipc.[port_number].scheduler.impl</name> <name>ipc.[port_number].scheduler.impl</name>
<value>org.apache.hadoop.ipc.DefaultRpcScheduler</value> <value>org.apache.hadoop.ipc.DefaultRpcScheduler</value>
@ -2527,6 +2542,24 @@
</description> </description>
</property> </property>
<property>
<name>ipc.scheduler.impl</name>
<value>org.apache.hadoop.ipc.DefaultRpcScheduler</value>
<description>
The fully qualified name of a class to use as the
implementation of the scheduler. The default implementation is
org.apache.hadoop.ipc.DefaultRpcScheduler (no-op scheduler) when
not using FairCallQueue. If using FairCallQueue, defaults to
org.apache.hadoop.ipc.DecayRpcScheduler. Use
org.apache.hadoop.ipc.DecayRpcScheduler in conjunction
with the Fair Call Queue.
This config is fallback config for ipc.[port_number].scheduler.impl.
If scheduler queue is not defined at port level, this default
config is used and hence, this is fallback config to
config with port.
</description>
</property>
<property> <property>
<name>ipc.[port_number].scheduler.priority.levels</name> <name>ipc.[port_number].scheduler.priority.levels</name>
<value>4</value> <value>4</value>

View File

@ -157,7 +157,9 @@ public void initializeMemberVariables() {
// FairCallQueue configs that includes dynamic ports in its keys // FairCallQueue configs that includes dynamic ports in its keys
xmlPropsToSkipCompare.add("ipc.[port_number].backoff.enable"); xmlPropsToSkipCompare.add("ipc.[port_number].backoff.enable");
xmlPropsToSkipCompare.add("ipc.[port_number].callqueue.impl"); xmlPropsToSkipCompare.add("ipc.[port_number].callqueue.impl");
xmlPropsToSkipCompare.add("ipc.callqueue.impl");
xmlPropsToSkipCompare.add("ipc.[port_number].scheduler.impl"); xmlPropsToSkipCompare.add("ipc.[port_number].scheduler.impl");
xmlPropsToSkipCompare.add("ipc.scheduler.impl");
xmlPropsToSkipCompare.add("ipc.[port_number].scheduler.priority.levels"); xmlPropsToSkipCompare.add("ipc.[port_number].scheduler.priority.levels");
xmlPropsToSkipCompare.add( xmlPropsToSkipCompare.add(
"ipc.[port_number].faircallqueue.multiplexer.weights"); "ipc.[port_number].faircallqueue.multiplexer.weights");

View File

@ -218,7 +218,8 @@ public void testFcqBackwardCompatibility() throws InterruptedException {
// Specify only Fair Call Queue without a scheduler // Specify only Fair Call Queue without a scheduler
// Ensure the DecayScheduler will be added to avoid breaking. // Ensure the DecayScheduler will be added to avoid breaking.
Class<? extends RpcScheduler> scheduler = Server.getSchedulerClass(ns, Class<? extends RpcScheduler> scheduler =
Server.getSchedulerClass(CommonConfigurationKeys.IPC_NAMESPACE, 0,
conf); conf);
assertTrue(scheduler.getCanonicalName(). assertTrue(scheduler.getCanonicalName().
equals("org.apache.hadoop.ipc.DecayRpcScheduler")); equals("org.apache.hadoop.ipc.DecayRpcScheduler"));
@ -250,8 +251,8 @@ public void testSchedulerWithoutFCQ() throws InterruptedException {
"LinkedBlockingQueue")); "LinkedBlockingQueue"));
manager = new CallQueueManager<FakeCall>(queue, manager = new CallQueueManager<FakeCall>(queue,
Server.getSchedulerClass(ns, conf), false, Server.getSchedulerClass(CommonConfigurationKeys.IPC_NAMESPACE, 0,
3, "", conf); conf), false, 3, "", conf);
// LinkedBlockingQueue with a capacity of 3 can put 3 calls // LinkedBlockingQueue with a capacity of 3 can put 3 calls
assertCanPut(manager, 3, 3); assertCanPut(manager, 3, 3);