From 4202750040f91f8dcc218ecc7d3ccf81a8e68b2a Mon Sep 17 00:00:00 2001 From: lfengnan Date: Tue, 28 Apr 2020 16:14:55 -0700 Subject: [PATCH] HADOOP-17010. Add queue capacity support for FairCallQueue (#1977) --- .../hadoop/fs/CommonConfigurationKeys.java | 3 + .../apache/hadoop/ipc/CallQueueManager.java | 56 +++++++++++++++-- .../org/apache/hadoop/ipc/FairCallQueue.java | 25 ++++++-- .../src/site/markdown/FairCallQueue.md | 5 ++ .../apache/hadoop/ipc/TestFairCallQueue.java | 63 +++++++++++++++++++ 5 files changed, 143 insertions(+), 9 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java index 40ddfba3c0..c08af395ad 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java @@ -114,6 +114,9 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic { "callqueue.overflow.trigger.failover"; public static final boolean IPC_CALLQUEUE_SERVER_FAILOVER_ENABLE_DEFAULT = false; + /** Callqueue subqueue capacity weights. */ + public static final String IPC_CALLQUEUE_CAPACITY_WEIGHTS_KEY = + "callqueue.capacity.weights"; /** * IPC scheduler priority levels. diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java index 81b7d34d0d..53ac34b612 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java @@ -22,6 +22,7 @@ import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.util.AbstractQueue; +import java.util.Arrays; import java.util.Collection; import java.util.Iterator; import java.util.concurrent.BlockingQueue; @@ -77,8 +78,10 @@ public CallQueueManager(Class> backingClass, int priorityLevels = parseNumLevels(namespace, conf); this.scheduler = createScheduler(schedulerClass, priorityLevels, namespace, conf); + int[] capacityWeights = parseCapacityWeights(priorityLevels, + namespace, conf); BlockingQueue bq = createCallQueueInstance(backingClass, - priorityLevels, maxQueueSize, namespace, conf); + priorityLevels, maxQueueSize, namespace, capacityWeights, conf); this.clientBackOffEnabled = clientBackOffEnabled; this.serverFailOverEnabled = conf.getBoolean( namespace + "." + @@ -146,13 +149,14 @@ private static T createScheduler( private > T createCallQueueInstance( Class theClass, int priorityLevels, int maxLen, String ns, - Configuration conf) { + int[] capacityWeights, Configuration conf) { // Used for custom, configurable callqueues try { Constructor ctor = theClass.getDeclaredConstructor(int.class, - int.class, String.class, Configuration.class); - return ctor.newInstance(priorityLevels, maxLen, ns, conf); + int.class, String.class, int[].class, Configuration.class); + return ctor.newInstance(priorityLevels, maxLen, ns, + capacityWeights, conf); } catch (RuntimeException e) { throw e; } catch (InvocationTargetException e) { @@ -343,6 +347,47 @@ private static int parseNumLevels(String ns, Configuration conf) { return retval; } + /** + * Read the weights of capacity in callqueue and pass the value to + * callqueue constructions. + */ + private static int[] parseCapacityWeights( + int priorityLevels, String ns, Configuration conf) { + int[] weights = conf.getInts(ns + "." + + CommonConfigurationKeys.IPC_CALLQUEUE_CAPACITY_WEIGHTS_KEY); + if (weights.length == 0) { + weights = getDefaultQueueCapacityWeights(priorityLevels); + } else if (weights.length != priorityLevels) { + throw new IllegalArgumentException( + CommonConfigurationKeys.IPC_CALLQUEUE_CAPACITY_WEIGHTS_KEY + " must " + + "specify " + priorityLevels + " capacity weights: one for each " + + "priority level"); + } else { + // only allow positive numbers + for (int w : weights) { + if (w <= 0) { + throw new IllegalArgumentException( + CommonConfigurationKeys.IPC_CALLQUEUE_CAPACITY_WEIGHTS_KEY + + " only takes positive weights. " + w + " capacity weight " + + "found"); + } + } + } + return weights; + } + + /** + * By default, queue capacity is the same for all priority levels. + * + * @param priorityLevels number of levels + * @return default weights + */ + public static int[] getDefaultQueueCapacityWeights(int priorityLevels) { + int[] weights = new int[priorityLevels]; + Arrays.fill(weights, 1); + return weights; + } + /** * Replaces active queue with the newly requested one and transfers * all calls to the newQ before returning. @@ -355,8 +400,9 @@ public synchronized void swapQueue( this.scheduler.stop(); RpcScheduler newScheduler = createScheduler(schedulerClass, priorityLevels, ns, conf); + int[] capacityWeights = parseCapacityWeights(priorityLevels, ns, conf); BlockingQueue newQ = createCallQueueInstance(queueClassToUse, - priorityLevels, maxSize, ns, conf); + priorityLevels, maxSize, ns, capacityWeights, conf); // Our current queue becomes the old queue BlockingQueue oldQ = putRef.get(); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java index d15a71000b..939149fcc5 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java @@ -80,17 +80,27 @@ private void signalNotEmpty() { /* Failover if queue is filled up */ private boolean serverFailOverEnabled; + + @VisibleForTesting + public FairCallQueue(int priorityLevels, int capacity, String ns, + Configuration conf) { + this(priorityLevels, capacity, ns, + CallQueueManager.getDefaultQueueCapacityWeights(priorityLevels), conf); + } + /** * Create a FairCallQueue. * @param capacity the total size of all sub-queues * @param ns the prefix to use for configuration + * @param capacityWeights the weights array for capacity allocation + * among subqueues * @param conf the configuration to read from * Notes: Each sub-queue has a capacity of `capacity / numSubqueues`. * The first or the highest priority sub-queue has an excess capacity * of `capacity % numSubqueues` */ public FairCallQueue(int priorityLevels, int capacity, String ns, - Configuration conf) { + int[] capacityWeights, Configuration conf) { if(priorityLevels < 1) { throw new IllegalArgumentException("Number of Priority Levels must be " + "at least 1"); @@ -101,11 +111,18 @@ public FairCallQueue(int priorityLevels, int capacity, String ns, this.queues = new ArrayList>(numQueues); this.overflowedCalls = new ArrayList(numQueues); - int queueCapacity = capacity / numQueues; - int capacityForFirstQueue = queueCapacity + (capacity % numQueues); + int totalWeights = 0; + for (int i = 0; i < capacityWeights.length; i++) { + totalWeights += capacityWeights[i]; + } + int residueCapacity = capacity % totalWeights; + int unitCapacity = capacity / totalWeights; + int queueCapacity; for(int i=0; i < numQueues; i++) { + queueCapacity = unitCapacity * capacityWeights[i]; if (i == 0) { - this.queues.add(new LinkedBlockingQueue(capacityForFirstQueue)); + this.queues.add(new LinkedBlockingQueue( + queueCapacity + residueCapacity)); } else { this.queues.add(new LinkedBlockingQueue(queueCapacity)); } diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/FairCallQueue.md b/hadoop-common-project/hadoop-common/src/site/markdown/FairCallQueue.md index 22ac05a53b..887d3053d2 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/FairCallQueue.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/FairCallQueue.md @@ -126,6 +126,7 @@ omitted. |:---- |:---- |:---- |:--- | | backoff.enable | General | Whether or not to enable client backoff when a queue is full. | false | | callqueue.impl | General | The fully qualified name of a class to use as the implementation of a call queue. Use `org.apache.hadoop.ipc.FairCallQueue` for the Fair Call Queue. | `java.util.concurrent.LinkedBlockingQueue` (FIFO queue) | +| callqueue.capacity.weights | General | The capacity allocation weights among all subqueues. A postive int array whose length is equal to the `scheduler.priority.levels` is expected where each int is the relative weight out of total capacity. i.e. if a queue with capacity weight `w`, its queue capacity is `capacity * w/sum(weights)` | | scheduler.impl | General | The fully qualified name of a class to use as the implementation of the scheduler. Use `org.apache.hadoop.ipc.DecayRpcScheduler` in conjunction with the Fair Call Queue. | `org.apache.hadoop.ipc.DefaultRpcScheduler` (no-op scheduler)
If using FairCallQueue, defaults to `org.apache.hadoop.ipc.DecayRpcScheduler` | | scheduler.priority.levels | RpcScheduler, CallQueue | How many priority levels to use within the scheduler and call queue. | 4 | | faircallqueue.multiplexer.weights | WeightedRoundRobinMultiplexer | How much weight to give to each priority queue. This should be a comma-separated list of length equal to the number of priority levels. | Weights descend by a factor of 2 (e.g., for 4 levels: `8,4,2,1`) | @@ -151,6 +152,10 @@ processed. ipc.8020.callqueue.impl org.apache.hadoop.ipc.FairCallQueue + + ipc.8020.callqueue.capacity.weights + 7,3 + ipc.8020.scheduler.impl org.apache.hadoop.ipc.DecayRpcScheduler diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java index e6a5f5e564..f478957592 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java @@ -104,6 +104,9 @@ public void testTotalCapacityOfSubQueues() { assertThat(fairCallQueue.remainingCapacity()).isEqualTo(1025); fairCallQueue = new FairCallQueue(7, 1025, "ns", conf); assertThat(fairCallQueue.remainingCapacity()).isEqualTo(1025); + fairCallQueue = new FairCallQueue(7, 1025, "ns", + new int[]{7, 6, 5, 4, 3, 2, 1}, conf); + assertThat(fairCallQueue.remainingCapacity()).isEqualTo(1025); } @Test @@ -157,6 +160,66 @@ public int getAndAdvanceCurrentIndex() { assertNull(fcq.poll()); } + @Test + public void testQueueCapacity() { + int numQueues = 2; + int capacity = 4; + Configuration conf = new Configuration(); + List calls = new ArrayList<>(); + + // default weights i.e. all queues share capacity + fcq = new FairCallQueue(numQueues, 4, "ns", conf); + FairCallQueue fcq1 = new FairCallQueue( + numQueues, capacity, "ns", new int[]{3, 1}, conf); + + for (int i=0; i < capacity; i++) { + Schedulable call = mockCall("u", i%2); + calls.add(call); + fcq.add(call); + fcq1.add(call); + + call = mockCall("u", (i++)%2); + calls.add(call); + fcq.add(call); + fcq1.add(call); + } + + final AtomicInteger currentIndex = new AtomicInteger(); + fcq.setMultiplexer(new RpcMultiplexer(){ + @Override + public int getAndAdvanceCurrentIndex() { + return currentIndex.get(); + } + }); + fcq1.setMultiplexer(new RpcMultiplexer(){ + @Override + public int getAndAdvanceCurrentIndex() { + return currentIndex.get(); + } + }); + + // either queue will have two calls + // v + // 0 2 + // 1 3 + currentIndex.set(1); + assertSame(calls.get(2), fcq.poll()); + assertSame(calls.get(3), fcq.poll()); + assertSame(calls.get(0), fcq.poll()); + assertSame(calls.get(1), fcq.poll()); + + // queues with different number of calls + // v + // 0 3 + // 1 + // 2 + currentIndex.set(1); + assertSame(calls.get(3), fcq1.poll()); + assertSame(calls.get(0), fcq1.poll()); + assertSame(calls.get(1), fcq1.poll()); + assertSame(calls.get(2), fcq1.poll()); + } + @SuppressWarnings("unchecked") @Test public void testInsertionWithFailover() {