From e001f8ee39f9f2b4e661c3fe3af65f53348bbabf Mon Sep 17 00:00:00 2001 From: Viraj Jasani Date: Wed, 28 Jul 2021 22:40:07 +0530 Subject: [PATCH] HADOOP-17814. Provide fallbacks for identity/cost providers and backoff enable (#3230) Reviewed-by: Wei-Chiu Chuang Signed-off-by: Takanobu Asanuma --- .../apache/hadoop/ipc/DecayRpcScheduler.java | 24 ++++++++ .../java/org/apache/hadoop/ipc/Server.java | 33 +++++++++- .../src/main/resources/core-default.xml | 35 +++++++++++ .../conf/TestCommonConfigurationFields.java | 3 + .../hadoop/ipc/TestDecayRpcScheduler.java | 61 +++++++++++++++++++ 5 files changed, 154 insertions(+), 2 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java index 7f6f0c4723..28f1a7fff1 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java @@ -283,6 +283,18 @@ private CostProvider parseCostProvider(String ns, Configuration conf) { ns + "." + CommonConfigurationKeys.IPC_COST_PROVIDER_KEY, CostProvider.class); + if (providers.size() < 1) { + String[] nsPort = ns.split("\\."); + if (nsPort.length == 2) { + // Only if ns is split with ".", we can separate namespace and port. + // In the absence of "ipc..cost-provider.impl" property, + // we look up "ipc.cost-provider.impl" property. + providers = conf.getInstances( + nsPort[0] + "." + CommonConfigurationKeys.IPC_COST_PROVIDER_KEY, + CostProvider.class); + } + } + if (providers.size() < 1) { LOG.info("CostProvider not specified, defaulting to DefaultCostProvider"); return new DefaultCostProvider(); @@ -303,6 +315,18 @@ private IdentityProvider parseIdentityProvider(String ns, ns + "." + CommonConfigurationKeys.IPC_IDENTITY_PROVIDER_KEY, IdentityProvider.class); + if (providers.size() < 1) { + String[] nsPort = ns.split("\\."); + if (nsPort.length == 2) { + // Only if ns is split with ".", we can separate namespace and port. + // In the absence of "ipc..identity-provider.impl" property, + // we look up "ipc.identity-provider.impl" property. + providers = conf.getInstances( + nsPort[0] + "." + CommonConfigurationKeys.IPC_IDENTITY_PROVIDER_KEY, + IdentityProvider.class); + } + } + if (providers.size() < 1) { LOG.info("IdentityProvider not specified, " + "defaulting to UserIdentityProvider"); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 8acdc0a99b..1e67203f79 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -831,12 +831,14 @@ public synchronized void refreshCallQueue(Configuration conf) { getSchedulerClass(CommonConfigurationKeys.IPC_NAMESPACE, port, conf), getQueueClass(CommonConfigurationKeys.IPC_NAMESPACE, port, conf), maxQueueSize, prefix, conf); - callQueue.setClientBackoffEnabled(getClientBackoffEnable(prefix, conf)); + callQueue.setClientBackoffEnabled(getClientBackoffEnable( + CommonConfigurationKeys.IPC_NAMESPACE, port, conf)); } /** * Get from config if client backoff is enabled on that port. */ + @Deprecated static boolean getClientBackoffEnable( String prefix, Configuration conf) { String name = prefix + "." + @@ -845,6 +847,32 @@ static boolean getClientBackoffEnable( CommonConfigurationKeys.IPC_BACKOFF_ENABLE_DEFAULT); } + /** + * Return boolean value configured by property 'ipc..backoff.enable' + * if it is present. If the config is not present, default config + * (without port) is used to derive class i.e 'ipc.backoff.enable', + * and derived value is returned if configured. Otherwise, default value + * {@link CommonConfigurationKeys#IPC_BACKOFF_ENABLE_DEFAULT} is returned. + * + * @param namespace Namespace "ipc". + * @param port Server's listener port. + * @param conf Configuration properties. + * @return Value returned based on configuration. + */ + static boolean getClientBackoffEnable( + String namespace, int port, Configuration conf) { + String name = namespace + "." + port + "." + + CommonConfigurationKeys.IPC_BACKOFF_ENABLE; + boolean valueWithPort = conf.getBoolean(name, + CommonConfigurationKeys.IPC_BACKOFF_ENABLE_DEFAULT); + if (valueWithPort != CommonConfigurationKeys.IPC_BACKOFF_ENABLE_DEFAULT) { + return valueWithPort; + } + return conf.getBoolean(namespace + "." + + CommonConfigurationKeys.IPC_BACKOFF_ENABLE, + CommonConfigurationKeys.IPC_BACKOFF_ENABLE_DEFAULT); + } + /** A generic call queued for handling. */ public static class Call implements Schedulable, PrivilegedExceptionAction { @@ -3184,7 +3212,8 @@ protected Server(String bindAddress, int port, this.callQueue = new CallQueueManager<>( getQueueClass(CommonConfigurationKeys.IPC_NAMESPACE, port, conf), getSchedulerClass(CommonConfigurationKeys.IPC_NAMESPACE, port, conf), - getClientBackoffEnable(prefix, conf), maxQueueSize, prefix, conf); + getClientBackoffEnable(CommonConfigurationKeys.IPC_NAMESPACE, port, conf), + maxQueueSize, prefix, conf); this.secretManager = (SecretManager) secretManager; this.authorize = diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml index c01c2f8f41..dd66da42ae 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml +++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml @@ -2504,6 +2504,17 @@ + + ipc.backoff.enable + false + + This property is used as fallback property in case + "ipc.[port_number].backoff.enable" is not defined. + It determines whether or not to enable client backoff when + a queue is full. + + + ipc.[port_number].callqueue.impl java.util.concurrent.LinkedBlockingQueue @@ -2586,6 +2597,17 @@ + + ipc.identity-provider.impl + org.apache.hadoop.ipc.UserIdentityProvider + + This property is used as fallback property in case + "ipc.[port_number].identity-provider.impl" is not defined. + The identity provider mapping user requests to their identity. + This property applies to DecayRpcScheduler. + + + ipc.[port_number].cost-provider.impl org.apache.hadoop.ipc.DefaultCostProvider @@ -2596,6 +2618,19 @@ + + ipc.cost-provider.impl + org.apache.hadoop.ipc.DefaultCostProvider + + This property is used as fallback property in case + "ipc.[port_number].cost-provider.impl" is not defined. + The cost provider mapping user requests to their cost. To + enable determination of cost based on processing time, use + org.apache.hadoop.ipc.WeightedTimeCostProvider. + This property applies to DecayRpcScheduler. + + + ipc.[port_number].decay-scheduler.period-ms 5000 diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java index 448c47c959..9fcf4a5eb5 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java @@ -156,6 +156,7 @@ public void initializeMemberVariables() { // FairCallQueue configs that includes dynamic ports in its keys xmlPropsToSkipCompare.add("ipc.[port_number].backoff.enable"); + xmlPropsToSkipCompare.add("ipc.backoff.enable"); xmlPropsToSkipCompare.add("ipc.[port_number].callqueue.impl"); xmlPropsToSkipCompare.add("ipc.callqueue.impl"); xmlPropsToSkipCompare.add("ipc.[port_number].scheduler.impl"); @@ -164,7 +165,9 @@ public void initializeMemberVariables() { xmlPropsToSkipCompare.add( "ipc.[port_number].faircallqueue.multiplexer.weights"); xmlPropsToSkipCompare.add("ipc.[port_number].identity-provider.impl"); + xmlPropsToSkipCompare.add("ipc.identity-provider.impl"); xmlPropsToSkipCompare.add("ipc.[port_number].cost-provider.impl"); + xmlPropsToSkipCompare.add("ipc.cost-provider.impl"); xmlPropsToSkipCompare.add("ipc.[port_number].decay-scheduler.period-ms"); xmlPropsToSkipCompare.add("ipc.[port_number].decay-scheduler.decay-factor"); xmlPropsToSkipCompare.add("ipc.[port_number].decay-scheduler.thresholds"); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java index fee43b83da..4ae3de1b15 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java @@ -55,6 +55,29 @@ private Schedulable mockCall(String id) { return mockCall; } + private static class TestIdentityProvider implements IdentityProvider { + public String makeIdentity(Schedulable obj) { + UserGroupInformation ugi = obj.getUserGroupInformation(); + if (ugi == null) { + return null; + } + return ugi.getShortUserName(); + } + } + + private static class TestCostProvider implements CostProvider { + + @Override + public void init(String namespace, Configuration conf) { + // No-op + } + + @Override + public long getCost(ProcessingDetails details) { + return 1; + } + } + private DecayRpcScheduler scheduler; @Test(expected=IllegalArgumentException.class) @@ -83,6 +106,44 @@ public void testParsePeriod() { assertEquals(1058L, scheduler.getDecayPeriodMillis()); } + @Test + @SuppressWarnings("deprecation") + public void testParsePeriodWithPortLessIdentityProvider() { + // By default + scheduler = new DecayRpcScheduler(1, "ipc.50", new Configuration()); + assertEquals(DecayRpcScheduler.IPC_SCHEDULER_DECAYSCHEDULER_PERIOD_DEFAULT, + scheduler.getDecayPeriodMillis()); + + // Custom + Configuration conf = new Configuration(); + conf.setLong("ipc.51." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY, + 1058); + conf.unset("ipc.51." + CommonConfigurationKeys.IPC_IDENTITY_PROVIDER_KEY); + conf.set("ipc." + CommonConfigurationKeys.IPC_IDENTITY_PROVIDER_KEY, + "org.apache.hadoop.ipc.TestDecayRpcScheduler$TestIdentityProvider"); + scheduler = new DecayRpcScheduler(1, "ipc.51", conf); + assertEquals(1058L, scheduler.getDecayPeriodMillis()); + } + + @Test + @SuppressWarnings("deprecation") + public void testParsePeriodWithPortLessCostProvider() { + // By default + scheduler = new DecayRpcScheduler(1, "ipc.52", new Configuration()); + assertEquals(DecayRpcScheduler.IPC_SCHEDULER_DECAYSCHEDULER_PERIOD_DEFAULT, + scheduler.getDecayPeriodMillis()); + + // Custom + Configuration conf = new Configuration(); + conf.setLong("ipc.52." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY, + 1058); + conf.unset("ipc.52." + CommonConfigurationKeys.IPC_COST_PROVIDER_KEY); + conf.set("ipc." + CommonConfigurationKeys.IPC_COST_PROVIDER_KEY, + "org.apache.hadoop.ipc.TestDecayRpcScheduler$TestCostProvider"); + scheduler = new DecayRpcScheduler(1, "ipc.52", conf); + assertEquals(1058L, scheduler.getDecayPeriodMillis()); + } + @Test @SuppressWarnings("deprecation") public void testParseFactor() {