diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java index fa6f34adaf..2cc96f4c16 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java @@ -18,6 +18,9 @@ package org.apache.hadoop.ipc; +import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CALLQUEUE_SERVER_FAILOVER_ENABLE; +import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CALLQUEUE_SERVER_FAILOVER_ENABLE_DEFAULT; + import java.io.IOException; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; @@ -63,7 +66,7 @@ static Class convertSchedulerClass( } private volatile boolean clientBackOffEnabled; - private boolean serverFailOverEnabled; + private volatile boolean serverFailOverEnabled; // Atomic refs point to active callQueue // We have two so we can better control swapping @@ -81,18 +84,15 @@ public CallQueueManager(Class> backingClass, namespace, conf); int[] capacityWeights = parseCapacityWeights(priorityLevels, namespace, conf); + this.serverFailOverEnabled = getServerFailOverEnable(namespace, conf); BlockingQueue bq = createCallQueueInstance(backingClass, priorityLevels, maxQueueSize, namespace, capacityWeights, conf); this.clientBackOffEnabled = clientBackOffEnabled; - this.serverFailOverEnabled = conf.getBoolean( - namespace + "." + - CommonConfigurationKeys.IPC_CALLQUEUE_SERVER_FAILOVER_ENABLE, - CommonConfigurationKeys.IPC_CALLQUEUE_SERVER_FAILOVER_ENABLE_DEFAULT); this.putRef = new AtomicReference>(bq); this.takeRef = new AtomicReference>(bq); LOG.info("Using callQueue: {}, queueCapacity: {}, " + - "scheduler: {}, ipcBackoff: {}.", - backingClass, maxQueueSize, schedulerClass, clientBackOffEnabled); + "scheduler: {}, ipcBackoff: {}, ipcFailOver: {}.", + backingClass, maxQueueSize, schedulerClass, clientBackOffEnabled, serverFailOverEnabled); } @VisibleForTesting // only! @@ -105,6 +105,41 @@ public CallQueueManager(Class> backingClass, this.serverFailOverEnabled = serverFailOverEnabled; } + /** + * Return boolean value configured by property 'ipc..callqueue.overflow.trigger.failover' + * if it is present. If the config is not present, default config + * (without port) is used to derive class i.e 'ipc.callqueue.overflow.trigger.failover', + * and derived value is returned if configured. Otherwise, default value + * {@link CommonConfigurationKeys#IPC_CALLQUEUE_SERVER_FAILOVER_ENABLE_DEFAULT} is returned. + * + * @param namespace Namespace "ipc" + "." + Server's listener port. + * @param conf Configuration properties. + * @return Value returned based on configuration. + */ + private boolean getServerFailOverEnable(String namespace, Configuration conf) { + String propertyKey = namespace + "." + + CommonConfigurationKeys.IPC_CALLQUEUE_SERVER_FAILOVER_ENABLE; + + if (conf.get(propertyKey) != null) { + return conf.getBoolean(propertyKey, + CommonConfigurationKeys.IPC_CALLQUEUE_SERVER_FAILOVER_ENABLE_DEFAULT); + } + + String[] nsPort = namespace.split("\\."); + if (nsPort.length == 2) { + // Only if ns is split with ".", we can separate namespace and port. + // In the absence of "ipc..callqueue.overflow.trigger.failover" property, + // we look up "ipc.callqueue.overflow.trigger.failover" property. + return conf.getBoolean(nsPort[0] + "." + + IPC_CALLQUEUE_SERVER_FAILOVER_ENABLE, IPC_CALLQUEUE_SERVER_FAILOVER_ENABLE_DEFAULT); + } + + // Otherwise return default value. + LOG.info("{} not specified set default value is {}", + IPC_CALLQUEUE_SERVER_FAILOVER_ENABLE, IPC_CALLQUEUE_SERVER_FAILOVER_ENABLE_DEFAULT); + return CommonConfigurationKeys.IPC_CALLQUEUE_SERVER_FAILOVER_ENABLE_DEFAULT; + } + private static T createScheduler( Class theClass, int priorityLevels, String ns, Configuration conf) { // Used for custom, configurable scheduler @@ -155,9 +190,9 @@ private > T createCallQueueInstance( // Used for custom, configurable callqueues try { Constructor ctor = theClass.getDeclaredConstructor(int.class, - int.class, String.class, int[].class, Configuration.class); - return ctor.newInstance(priorityLevels, maxLen, ns, - capacityWeights, conf); + int.class, String.class, int[].class, boolean.class, Configuration.class); + return ctor.newInstance(priorityLevels, maxLen, ns, capacityWeights, + this.serverFailOverEnabled, conf); } catch (RuntimeException e) { throw e; } catch (InvocationTargetException e) { @@ -199,6 +234,20 @@ boolean isClientBackoffEnabled() { return clientBackOffEnabled; } + @VisibleForTesting + public boolean isServerFailOverEnabled() { + return serverFailOverEnabled; + } + + @VisibleForTesting + public boolean isServerFailOverEnabledByQueue() { + BlockingQueue bq = putRef.get(); + if (bq instanceof FairCallQueue) { + return ((FairCallQueue) bq).isServerFailOverEnabled(); + } + return false; + } + // Based on policy to determine back off current call boolean shouldBackOff(Schedulable e) { return scheduler.shouldBackOff(e); @@ -421,6 +470,9 @@ public synchronized void swapQueue( RpcScheduler newScheduler = createScheduler(schedulerClass, priorityLevels, ns, conf); int[] capacityWeights = parseCapacityWeights(priorityLevels, ns, conf); + + // Update serverFailOverEnabled. + this.serverFailOverEnabled = getServerFailOverEnable(ns, conf); BlockingQueue newQ = createCallQueueInstance(queueClassToUse, priorityLevels, maxSize, ns, capacityWeights, conf); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java index d416e797fb..187a26bac8 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java @@ -35,7 +35,6 @@ import org.apache.hadoop.classification.VisibleForTesting; import org.apache.commons.lang3.NotImplementedException; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.ipc.CallQueueManager.CallQueueOverflowException; import org.apache.hadoop.metrics2.MetricsCollector; import org.apache.hadoop.metrics2.MetricsRecordBuilder; @@ -90,7 +89,16 @@ private void signalNotEmpty() { public FairCallQueue(int priorityLevels, int capacity, String ns, Configuration conf) { this(priorityLevels, capacity, ns, - CallQueueManager.getDefaultQueueCapacityWeights(priorityLevels), conf); + CallQueueManager.getDefaultQueueCapacityWeights(priorityLevels), + false, conf); + } + + @VisibleForTesting + public FairCallQueue(int priorityLevels, int capacity, String ns, boolean serverFailOverEnabled, + Configuration conf) { + this(priorityLevels, capacity, ns, + CallQueueManager.getDefaultQueueCapacityWeights(priorityLevels), + serverFailOverEnabled, conf); } /** @@ -101,18 +109,21 @@ public FairCallQueue(int priorityLevels, int capacity, String ns, * @param ns the prefix to use for configuration * @param capacityWeights the weights array for capacity allocation * among subqueues + * @param serverFailOverEnabled whether or not to enable callqueue overflow trigger failover + * for stateless servers when RPC call queue is filled * @param conf the configuration to read from * Notes: Each sub-queue has a capacity of `capacity / numSubqueues`. * The first or the highest priority sub-queue has an excess capacity * of `capacity % numSubqueues` */ public FairCallQueue(int priorityLevels, int capacity, String ns, - int[] capacityWeights, Configuration conf) { + int[] capacityWeights, boolean serverFailOverEnabled, Configuration conf) { if(priorityLevels < 1) { throw new IllegalArgumentException("Number of Priority Levels must be " + "at least 1"); } int numQueues = priorityLevels; + this.serverFailOverEnabled = serverFailOverEnabled; LOG.info("FairCallQueue is in use with " + numQueues + " queues with total capacity of " + capacity); @@ -135,10 +146,6 @@ public FairCallQueue(int priorityLevels, int capacity, String ns, } this.overflowedCalls.add(new AtomicLong(0)); } - this.serverFailOverEnabled = conf.getBoolean( - ns + "." + - CommonConfigurationKeys.IPC_CALLQUEUE_SERVER_FAILOVER_ENABLE, - CommonConfigurationKeys.IPC_CALLQUEUE_SERVER_FAILOVER_ENABLE_DEFAULT); this.multiplexer = new WeightedRoundRobinMultiplexer(numQueues, ns, conf); // Make this the active source of metrics @@ -493,4 +500,9 @@ public long[] getOverflowedCalls() { public void setMultiplexer(RpcMultiplexer newMux) { this.multiplexer = newMux; } + + @VisibleForTesting + public boolean isServerFailOverEnabled() { + return serverFailOverEnabled; + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index a594d2be01..73c86c09fc 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -3857,6 +3857,16 @@ public void setClientBackoffEnabled(boolean value) { callQueue.setClientBackoffEnabled(value); } + @VisibleForTesting + public boolean isServerFailOverEnabled() { + return callQueue.isServerFailOverEnabled(); + } + + @VisibleForTesting + public boolean isServerFailOverEnabledByQueue() { + return callQueue.isServerFailOverEnabledByQueue(); + } + /** * The maximum size of the rpc call queue of this server. * @return The maximum size of the rpc call queue. diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml index 24f6ca2784..6c3597a83f 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml +++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml @@ -2580,13 +2580,23 @@ The switch to turn S3A auditing on or off. - callqueue.overflow.trigger.failover + ipc.[port_number].callqueue.overflow.trigger.failover false Enable callqueue overflow trigger failover for stateless servers. + + ipc.callqueue.overflow.trigger.failover + false + + This property is used as fallback property in case + "ipc.[port_number].callqueue.overflow.trigger.failover" is not defined. + It determines whether or not to enable callqueue overflow trigger failover for stateless servers. + + + diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java index f7303fb0f5..b07ba76e8e 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java @@ -149,6 +149,10 @@ public void initializeMemberVariables() { xmlPropsToSkipCompare.add("fs.azure.saskey.usecontainersaskeyforallaccess"); xmlPropsToSkipCompare.add("fs.azure.user.agent.prefix"); + // Properties in enable callqueue overflow trigger failover for stateless servers. + xmlPropsToSkipCompare.add("ipc.[port_number].callqueue.overflow.trigger.failover"); + xmlPropsToSkipCompare.add("ipc.callqueue.overflow.trigger.failover"); + // FairCallQueue configs that includes dynamic ports in its keys xmlPropsToSkipCompare.add("ipc.[port_number].backoff.enable"); xmlPropsToSkipCompare.add("ipc.backoff.enable"); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java index 4a60520a36..545ddb40ff 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java @@ -19,6 +19,7 @@ package org.apache.hadoop.ipc; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -517,4 +518,29 @@ public void testCallQueueOverflowExceptions() throws Exception { verify(queue, times(0)).put(call); verify(queue, times(0)).add(call); } + + @Test + public void testCallQueueOverEnabled() { + // default ipc.callqueue.overflow.trigger.failover' configure false. + String ns = "ipc.8888"; + conf.setBoolean("ipc.callqueue.overflow.trigger.failover", false); + manager = new CallQueueManager<>(fcqueueClass, rpcSchedulerClass, false, + 10, ns, conf); + assertFalse(manager.isServerFailOverEnabled()); + assertFalse(manager.isServerFailOverEnabledByQueue()); + + // set ipc.8888.callqueue.overflow.trigger.failover configure true. + conf.setBoolean("ipc.8888.callqueue.overflow.trigger.failover", true); + manager = new CallQueueManager<>(fcqueueClass, rpcSchedulerClass, false, + 10, ns, conf); + assertTrue(manager.isServerFailOverEnabled()); + assertTrue(manager.isServerFailOverEnabledByQueue()); + + // set ipc.callqueue.overflow.trigger.failover' configure true. + conf.setBoolean("ipc.callqueue.overflow.trigger.failover", true); + manager = new CallQueueManager<>(fcqueueClass, rpcSchedulerClass, false, + 10, ns, conf); + assertTrue(manager.isServerFailOverEnabled()); + assertTrue(manager.isServerFailOverEnabledByQueue()); + } } \ No newline at end of file diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java index 1fed9a3176..06b65dc4df 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java @@ -28,7 +28,6 @@ import static org.mockito.Mockito.when; import static org.mockito.Mockito.times; -import org.apache.hadoop.fs.CommonConfigurationKeys; import org.junit.Before; import org.junit.Test; import static org.junit.Assert.assertEquals; @@ -105,7 +104,7 @@ public void testTotalCapacityOfSubQueues() { fairCallQueue = new FairCallQueue(7, 1025, "ns", conf); assertThat(fairCallQueue.remainingCapacity()).isEqualTo(1025); fairCallQueue = new FairCallQueue(7, 1025, "ns", - new int[]{7, 6, 5, 4, 3, 2, 1}, conf); + new int[]{7, 6, 5, 4, 3, 2, 1}, false, conf); assertThat(fairCallQueue.remainingCapacity()).isEqualTo(1025); } @@ -170,7 +169,7 @@ public void testQueueCapacity() { // default weights i.e. all queues share capacity fcq = new FairCallQueue(numQueues, 4, "ns", conf); FairCallQueue fcq1 = new FairCallQueue( - numQueues, capacity, "ns", new int[]{1, 3}, conf); + numQueues, capacity, "ns", new int[]{1, 3}, false, conf); for (int i=0; i < capacity; i++) { Schedulable call = mockCall("u", i%2); @@ -221,11 +220,10 @@ public void testInsertionWithFailover() { Configuration conf = new Configuration(); // Config for server to throw StandbyException instead of the // regular RetriableException if call queue is full. - conf.setBoolean( - "ns." + CommonConfigurationKeys.IPC_CALLQUEUE_SERVER_FAILOVER_ENABLE, - true); + // 3 queues, 2 slots each. - fcq = Mockito.spy(new FairCallQueue<>(3, 6, "ns", conf)); + fcq = Mockito.spy(new FairCallQueue<>(3, 6, "ns", + true, conf)); Schedulable p0 = mockCall("a", 0); Schedulable p1 = mockCall("b", 1); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/TestRefreshCallQueue.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/TestRefreshCallQueue.java index e21a5a3073..873a524c98 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/TestRefreshCallQueue.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/TestRefreshCallQueue.java @@ -88,7 +88,7 @@ public void tearDown() throws IOException { @SuppressWarnings("serial") public static class MockCallQueue extends LinkedBlockingQueue { public MockCallQueue(int levels, int cap, String ns, int[] capacityWeights, - Configuration conf) { + boolean serverFailOverEnabled, Configuration conf) { super(cap); mockQueueConstructions++; } @@ -172,6 +172,6 @@ public void testRefreshCallQueueWithFairCallQueue() throws Exception { // check callQueueSize has changed assertEquals(150 * serviceHandlerCount, rpcServer.getClientRpcServer() .getMaxQueueSize()); - } + } } \ No newline at end of file