HADOOP-17814. Provide fallbacks for identity/cost providers and backoff enable (#3230)

Reviewed-by: Wei-Chiu Chuang <weichiu@apache.org>
Signed-off-by: Takanobu Asanuma <tasanuma@apache.org>
This commit is contained in:
Viraj Jasani 2021-07-28 22:40:07 +05:30 committed by GitHub
parent f2b6c03fc1
commit e001f8ee39
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 154 additions and 2 deletions

View File

@ -283,6 +283,18 @@ private CostProvider parseCostProvider(String ns, Configuration conf) {
ns + "." + CommonConfigurationKeys.IPC_COST_PROVIDER_KEY, ns + "." + CommonConfigurationKeys.IPC_COST_PROVIDER_KEY,
CostProvider.class); CostProvider.class);
if (providers.size() < 1) {
String[] nsPort = ns.split("\\.");
if (nsPort.length == 2) {
// Only if ns is split with ".", we can separate namespace and port.
// In the absence of "ipc.<port>.cost-provider.impl" property,
// we look up "ipc.cost-provider.impl" property.
providers = conf.getInstances(
nsPort[0] + "." + CommonConfigurationKeys.IPC_COST_PROVIDER_KEY,
CostProvider.class);
}
}
if (providers.size() < 1) { if (providers.size() < 1) {
LOG.info("CostProvider not specified, defaulting to DefaultCostProvider"); LOG.info("CostProvider not specified, defaulting to DefaultCostProvider");
return new DefaultCostProvider(); return new DefaultCostProvider();
@ -303,6 +315,18 @@ private IdentityProvider parseIdentityProvider(String ns,
ns + "." + CommonConfigurationKeys.IPC_IDENTITY_PROVIDER_KEY, ns + "." + CommonConfigurationKeys.IPC_IDENTITY_PROVIDER_KEY,
IdentityProvider.class); IdentityProvider.class);
if (providers.size() < 1) {
String[] nsPort = ns.split("\\.");
if (nsPort.length == 2) {
// Only if ns is split with ".", we can separate namespace and port.
// In the absence of "ipc.<port>.identity-provider.impl" property,
// we look up "ipc.identity-provider.impl" property.
providers = conf.getInstances(
nsPort[0] + "." + CommonConfigurationKeys.IPC_IDENTITY_PROVIDER_KEY,
IdentityProvider.class);
}
}
if (providers.size() < 1) { if (providers.size() < 1) {
LOG.info("IdentityProvider not specified, " + LOG.info("IdentityProvider not specified, " +
"defaulting to UserIdentityProvider"); "defaulting to UserIdentityProvider");

View File

@ -831,12 +831,14 @@ public synchronized void refreshCallQueue(Configuration conf) {
getSchedulerClass(CommonConfigurationKeys.IPC_NAMESPACE, port, conf), getSchedulerClass(CommonConfigurationKeys.IPC_NAMESPACE, port, conf),
getQueueClass(CommonConfigurationKeys.IPC_NAMESPACE, port, conf), getQueueClass(CommonConfigurationKeys.IPC_NAMESPACE, port, conf),
maxQueueSize, prefix, conf); maxQueueSize, prefix, conf);
callQueue.setClientBackoffEnabled(getClientBackoffEnable(prefix, conf)); callQueue.setClientBackoffEnabled(getClientBackoffEnable(
CommonConfigurationKeys.IPC_NAMESPACE, port, conf));
} }
/** /**
* Get from config if client backoff is enabled on that port. * Get from config if client backoff is enabled on that port.
*/ */
@Deprecated
static boolean getClientBackoffEnable( static boolean getClientBackoffEnable(
String prefix, Configuration conf) { String prefix, Configuration conf) {
String name = prefix + "." + String name = prefix + "." +
@ -845,6 +847,32 @@ static boolean getClientBackoffEnable(
CommonConfigurationKeys.IPC_BACKOFF_ENABLE_DEFAULT); CommonConfigurationKeys.IPC_BACKOFF_ENABLE_DEFAULT);
} }
/**
* Return boolean value configured by property 'ipc.<port>.backoff.enable'
* if it is present. If the config is not present, default config
* (without port) is used to derive class i.e 'ipc.backoff.enable',
* and derived value is returned if configured. Otherwise, default value
* {@link CommonConfigurationKeys#IPC_BACKOFF_ENABLE_DEFAULT} is returned.
*
* @param namespace Namespace "ipc".
* @param port Server's listener port.
* @param conf Configuration properties.
* @return Value returned based on configuration.
*/
static boolean getClientBackoffEnable(
String namespace, int port, Configuration conf) {
String name = namespace + "." + port + "." +
CommonConfigurationKeys.IPC_BACKOFF_ENABLE;
boolean valueWithPort = conf.getBoolean(name,
CommonConfigurationKeys.IPC_BACKOFF_ENABLE_DEFAULT);
if (valueWithPort != CommonConfigurationKeys.IPC_BACKOFF_ENABLE_DEFAULT) {
return valueWithPort;
}
return conf.getBoolean(namespace + "."
+ CommonConfigurationKeys.IPC_BACKOFF_ENABLE,
CommonConfigurationKeys.IPC_BACKOFF_ENABLE_DEFAULT);
}
/** A generic call queued for handling. */ /** A generic call queued for handling. */
public static class Call implements Schedulable, public static class Call implements Schedulable,
PrivilegedExceptionAction<Void> { PrivilegedExceptionAction<Void> {
@ -3184,7 +3212,8 @@ protected Server(String bindAddress, int port,
this.callQueue = new CallQueueManager<>( this.callQueue = new CallQueueManager<>(
getQueueClass(CommonConfigurationKeys.IPC_NAMESPACE, port, conf), getQueueClass(CommonConfigurationKeys.IPC_NAMESPACE, port, conf),
getSchedulerClass(CommonConfigurationKeys.IPC_NAMESPACE, port, conf), getSchedulerClass(CommonConfigurationKeys.IPC_NAMESPACE, port, conf),
getClientBackoffEnable(prefix, conf), maxQueueSize, prefix, conf); getClientBackoffEnable(CommonConfigurationKeys.IPC_NAMESPACE, port, conf),
maxQueueSize, prefix, conf);
this.secretManager = (SecretManager<TokenIdentifier>) secretManager; this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
this.authorize = this.authorize =

View File

@ -2504,6 +2504,17 @@
</description> </description>
</property> </property>
<property>
<name>ipc.backoff.enable</name>
<value>false</value>
<description>
This property is used as fallback property in case
"ipc.[port_number].backoff.enable" is not defined.
It determines whether or not to enable client backoff when
a queue is full.
</description>
</property>
<property> <property>
<name>ipc.[port_number].callqueue.impl</name> <name>ipc.[port_number].callqueue.impl</name>
<value>java.util.concurrent.LinkedBlockingQueue</value> <value>java.util.concurrent.LinkedBlockingQueue</value>
@ -2586,6 +2597,17 @@
</description> </description>
</property> </property>
<property>
<name>ipc.identity-provider.impl</name>
<value>org.apache.hadoop.ipc.UserIdentityProvider</value>
<description>
This property is used as fallback property in case
"ipc.[port_number].identity-provider.impl" is not defined.
The identity provider mapping user requests to their identity.
This property applies to DecayRpcScheduler.
</description>
</property>
<property> <property>
<name>ipc.[port_number].cost-provider.impl</name> <name>ipc.[port_number].cost-provider.impl</name>
<value>org.apache.hadoop.ipc.DefaultCostProvider</value> <value>org.apache.hadoop.ipc.DefaultCostProvider</value>
@ -2596,6 +2618,19 @@
</description> </description>
</property> </property>
<property>
<name>ipc.cost-provider.impl</name>
<value>org.apache.hadoop.ipc.DefaultCostProvider</value>
<description>
This property is used as fallback property in case
"ipc.[port_number].cost-provider.impl" is not defined.
The cost provider mapping user requests to their cost. To
enable determination of cost based on processing time, use
org.apache.hadoop.ipc.WeightedTimeCostProvider.
This property applies to DecayRpcScheduler.
</description>
</property>
<property> <property>
<name>ipc.[port_number].decay-scheduler.period-ms</name> <name>ipc.[port_number].decay-scheduler.period-ms</name>
<value>5000</value> <value>5000</value>

View File

@ -156,6 +156,7 @@ public void initializeMemberVariables() {
// FairCallQueue configs that includes dynamic ports in its keys // FairCallQueue configs that includes dynamic ports in its keys
xmlPropsToSkipCompare.add("ipc.[port_number].backoff.enable"); xmlPropsToSkipCompare.add("ipc.[port_number].backoff.enable");
xmlPropsToSkipCompare.add("ipc.backoff.enable");
xmlPropsToSkipCompare.add("ipc.[port_number].callqueue.impl"); xmlPropsToSkipCompare.add("ipc.[port_number].callqueue.impl");
xmlPropsToSkipCompare.add("ipc.callqueue.impl"); xmlPropsToSkipCompare.add("ipc.callqueue.impl");
xmlPropsToSkipCompare.add("ipc.[port_number].scheduler.impl"); xmlPropsToSkipCompare.add("ipc.[port_number].scheduler.impl");
@ -164,7 +165,9 @@ public void initializeMemberVariables() {
xmlPropsToSkipCompare.add( xmlPropsToSkipCompare.add(
"ipc.[port_number].faircallqueue.multiplexer.weights"); "ipc.[port_number].faircallqueue.multiplexer.weights");
xmlPropsToSkipCompare.add("ipc.[port_number].identity-provider.impl"); xmlPropsToSkipCompare.add("ipc.[port_number].identity-provider.impl");
xmlPropsToSkipCompare.add("ipc.identity-provider.impl");
xmlPropsToSkipCompare.add("ipc.[port_number].cost-provider.impl"); xmlPropsToSkipCompare.add("ipc.[port_number].cost-provider.impl");
xmlPropsToSkipCompare.add("ipc.cost-provider.impl");
xmlPropsToSkipCompare.add("ipc.[port_number].decay-scheduler.period-ms"); xmlPropsToSkipCompare.add("ipc.[port_number].decay-scheduler.period-ms");
xmlPropsToSkipCompare.add("ipc.[port_number].decay-scheduler.decay-factor"); xmlPropsToSkipCompare.add("ipc.[port_number].decay-scheduler.decay-factor");
xmlPropsToSkipCompare.add("ipc.[port_number].decay-scheduler.thresholds"); xmlPropsToSkipCompare.add("ipc.[port_number].decay-scheduler.thresholds");

View File

@ -55,6 +55,29 @@ private Schedulable mockCall(String id) {
return mockCall; return mockCall;
} }
private static class TestIdentityProvider implements IdentityProvider {
public String makeIdentity(Schedulable obj) {
UserGroupInformation ugi = obj.getUserGroupInformation();
if (ugi == null) {
return null;
}
return ugi.getShortUserName();
}
}
private static class TestCostProvider implements CostProvider {
@Override
public void init(String namespace, Configuration conf) {
// No-op
}
@Override
public long getCost(ProcessingDetails details) {
return 1;
}
}
private DecayRpcScheduler scheduler; private DecayRpcScheduler scheduler;
@Test(expected=IllegalArgumentException.class) @Test(expected=IllegalArgumentException.class)
@ -83,6 +106,44 @@ public void testParsePeriod() {
assertEquals(1058L, scheduler.getDecayPeriodMillis()); assertEquals(1058L, scheduler.getDecayPeriodMillis());
} }
@Test
@SuppressWarnings("deprecation")
public void testParsePeriodWithPortLessIdentityProvider() {
// By default
scheduler = new DecayRpcScheduler(1, "ipc.50", new Configuration());
assertEquals(DecayRpcScheduler.IPC_SCHEDULER_DECAYSCHEDULER_PERIOD_DEFAULT,
scheduler.getDecayPeriodMillis());
// Custom
Configuration conf = new Configuration();
conf.setLong("ipc.51." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY,
1058);
conf.unset("ipc.51." + CommonConfigurationKeys.IPC_IDENTITY_PROVIDER_KEY);
conf.set("ipc." + CommonConfigurationKeys.IPC_IDENTITY_PROVIDER_KEY,
"org.apache.hadoop.ipc.TestDecayRpcScheduler$TestIdentityProvider");
scheduler = new DecayRpcScheduler(1, "ipc.51", conf);
assertEquals(1058L, scheduler.getDecayPeriodMillis());
}
@Test
@SuppressWarnings("deprecation")
public void testParsePeriodWithPortLessCostProvider() {
// By default
scheduler = new DecayRpcScheduler(1, "ipc.52", new Configuration());
assertEquals(DecayRpcScheduler.IPC_SCHEDULER_DECAYSCHEDULER_PERIOD_DEFAULT,
scheduler.getDecayPeriodMillis());
// Custom
Configuration conf = new Configuration();
conf.setLong("ipc.52." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY,
1058);
conf.unset("ipc.52." + CommonConfigurationKeys.IPC_COST_PROVIDER_KEY);
conf.set("ipc." + CommonConfigurationKeys.IPC_COST_PROVIDER_KEY,
"org.apache.hadoop.ipc.TestDecayRpcScheduler$TestCostProvider");
scheduler = new DecayRpcScheduler(1, "ipc.52", conf);
assertEquals(1058L, scheduler.getDecayPeriodMillis());
}
@Test @Test
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
public void testParseFactor() { public void testParseFactor() {