HADOOP-17010. Add queue capacity support for FairCallQueue (#1977)
This commit is contained in:
parent
ab36429559
commit
4202750040
@ -114,6 +114,9 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
|
|||||||
"callqueue.overflow.trigger.failover";
|
"callqueue.overflow.trigger.failover";
|
||||||
public static final boolean IPC_CALLQUEUE_SERVER_FAILOVER_ENABLE_DEFAULT =
|
public static final boolean IPC_CALLQUEUE_SERVER_FAILOVER_ENABLE_DEFAULT =
|
||||||
false;
|
false;
|
||||||
|
/** Callqueue subqueue capacity weights. */
|
||||||
|
public static final String IPC_CALLQUEUE_CAPACITY_WEIGHTS_KEY =
|
||||||
|
"callqueue.capacity.weights";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* IPC scheduler priority levels.
|
* IPC scheduler priority levels.
|
||||||
|
@ -22,6 +22,7 @@ import java.io.IOException;
|
|||||||
import java.lang.reflect.Constructor;
|
import java.lang.reflect.Constructor;
|
||||||
import java.lang.reflect.InvocationTargetException;
|
import java.lang.reflect.InvocationTargetException;
|
||||||
import java.util.AbstractQueue;
|
import java.util.AbstractQueue;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
||||||
@ -77,8 +78,10 @@ public class CallQueueManager<E extends Schedulable>
|
|||||||
int priorityLevels = parseNumLevels(namespace, conf);
|
int priorityLevels = parseNumLevels(namespace, conf);
|
||||||
this.scheduler = createScheduler(schedulerClass, priorityLevels,
|
this.scheduler = createScheduler(schedulerClass, priorityLevels,
|
||||||
namespace, conf);
|
namespace, conf);
|
||||||
|
int[] capacityWeights = parseCapacityWeights(priorityLevels,
|
||||||
|
namespace, conf);
|
||||||
BlockingQueue<E> bq = createCallQueueInstance(backingClass,
|
BlockingQueue<E> bq = createCallQueueInstance(backingClass,
|
||||||
priorityLevels, maxQueueSize, namespace, conf);
|
priorityLevels, maxQueueSize, namespace, capacityWeights, conf);
|
||||||
this.clientBackOffEnabled = clientBackOffEnabled;
|
this.clientBackOffEnabled = clientBackOffEnabled;
|
||||||
this.serverFailOverEnabled = conf.getBoolean(
|
this.serverFailOverEnabled = conf.getBoolean(
|
||||||
namespace + "." +
|
namespace + "." +
|
||||||
@ -146,13 +149,14 @@ public class CallQueueManager<E extends Schedulable>
|
|||||||
|
|
||||||
private <T extends BlockingQueue<E>> T createCallQueueInstance(
|
private <T extends BlockingQueue<E>> T createCallQueueInstance(
|
||||||
Class<T> theClass, int priorityLevels, int maxLen, String ns,
|
Class<T> theClass, int priorityLevels, int maxLen, String ns,
|
||||||
Configuration conf) {
|
int[] capacityWeights, Configuration conf) {
|
||||||
|
|
||||||
// Used for custom, configurable callqueues
|
// Used for custom, configurable callqueues
|
||||||
try {
|
try {
|
||||||
Constructor<T> ctor = theClass.getDeclaredConstructor(int.class,
|
Constructor<T> ctor = theClass.getDeclaredConstructor(int.class,
|
||||||
int.class, String.class, Configuration.class);
|
int.class, String.class, int[].class, Configuration.class);
|
||||||
return ctor.newInstance(priorityLevels, maxLen, ns, conf);
|
return ctor.newInstance(priorityLevels, maxLen, ns,
|
||||||
|
capacityWeights, conf);
|
||||||
} catch (RuntimeException e) {
|
} catch (RuntimeException e) {
|
||||||
throw e;
|
throw e;
|
||||||
} catch (InvocationTargetException e) {
|
} catch (InvocationTargetException e) {
|
||||||
@ -343,6 +347,47 @@ public class CallQueueManager<E extends Schedulable>
|
|||||||
return retval;
|
return retval;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Read the weights of capacity in callqueue and pass the value to
|
||||||
|
* callqueue constructions.
|
||||||
|
*/
|
||||||
|
private static int[] parseCapacityWeights(
|
||||||
|
int priorityLevels, String ns, Configuration conf) {
|
||||||
|
int[] weights = conf.getInts(ns + "." +
|
||||||
|
CommonConfigurationKeys.IPC_CALLQUEUE_CAPACITY_WEIGHTS_KEY);
|
||||||
|
if (weights.length == 0) {
|
||||||
|
weights = getDefaultQueueCapacityWeights(priorityLevels);
|
||||||
|
} else if (weights.length != priorityLevels) {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
CommonConfigurationKeys.IPC_CALLQUEUE_CAPACITY_WEIGHTS_KEY + " must "
|
||||||
|
+ "specify " + priorityLevels + " capacity weights: one for each "
|
||||||
|
+ "priority level");
|
||||||
|
} else {
|
||||||
|
// only allow positive numbers
|
||||||
|
for (int w : weights) {
|
||||||
|
if (w <= 0) {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
CommonConfigurationKeys.IPC_CALLQUEUE_CAPACITY_WEIGHTS_KEY +
|
||||||
|
" only takes positive weights. " + w + " capacity weight " +
|
||||||
|
"found");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return weights;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* By default, queue capacity is the same for all priority levels.
|
||||||
|
*
|
||||||
|
* @param priorityLevels number of levels
|
||||||
|
* @return default weights
|
||||||
|
*/
|
||||||
|
public static int[] getDefaultQueueCapacityWeights(int priorityLevels) {
|
||||||
|
int[] weights = new int[priorityLevels];
|
||||||
|
Arrays.fill(weights, 1);
|
||||||
|
return weights;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Replaces active queue with the newly requested one and transfers
|
* Replaces active queue with the newly requested one and transfers
|
||||||
* all calls to the newQ before returning.
|
* all calls to the newQ before returning.
|
||||||
@ -355,8 +400,9 @@ public class CallQueueManager<E extends Schedulable>
|
|||||||
this.scheduler.stop();
|
this.scheduler.stop();
|
||||||
RpcScheduler newScheduler = createScheduler(schedulerClass, priorityLevels,
|
RpcScheduler newScheduler = createScheduler(schedulerClass, priorityLevels,
|
||||||
ns, conf);
|
ns, conf);
|
||||||
|
int[] capacityWeights = parseCapacityWeights(priorityLevels, ns, conf);
|
||||||
BlockingQueue<E> newQ = createCallQueueInstance(queueClassToUse,
|
BlockingQueue<E> newQ = createCallQueueInstance(queueClassToUse,
|
||||||
priorityLevels, maxSize, ns, conf);
|
priorityLevels, maxSize, ns, capacityWeights, conf);
|
||||||
|
|
||||||
// Our current queue becomes the old queue
|
// Our current queue becomes the old queue
|
||||||
BlockingQueue<E> oldQ = putRef.get();
|
BlockingQueue<E> oldQ = putRef.get();
|
||||||
|
@ -80,17 +80,27 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
|
|||||||
|
|
||||||
/* Failover if queue is filled up */
|
/* Failover if queue is filled up */
|
||||||
private boolean serverFailOverEnabled;
|
private boolean serverFailOverEnabled;
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public FairCallQueue(int priorityLevels, int capacity, String ns,
|
||||||
|
Configuration conf) {
|
||||||
|
this(priorityLevels, capacity, ns,
|
||||||
|
CallQueueManager.getDefaultQueueCapacityWeights(priorityLevels), conf);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a FairCallQueue.
|
* Create a FairCallQueue.
|
||||||
* @param capacity the total size of all sub-queues
|
* @param capacity the total size of all sub-queues
|
||||||
* @param ns the prefix to use for configuration
|
* @param ns the prefix to use for configuration
|
||||||
|
* @param capacityWeights the weights array for capacity allocation
|
||||||
|
* among subqueues
|
||||||
* @param conf the configuration to read from
|
* @param conf the configuration to read from
|
||||||
* Notes: Each sub-queue has a capacity of `capacity / numSubqueues`.
|
* Notes: Each sub-queue has a capacity of `capacity / numSubqueues`.
|
||||||
* The first or the highest priority sub-queue has an excess capacity
|
* The first or the highest priority sub-queue has an excess capacity
|
||||||
* of `capacity % numSubqueues`
|
* of `capacity % numSubqueues`
|
||||||
*/
|
*/
|
||||||
public FairCallQueue(int priorityLevels, int capacity, String ns,
|
public FairCallQueue(int priorityLevels, int capacity, String ns,
|
||||||
Configuration conf) {
|
int[] capacityWeights, Configuration conf) {
|
||||||
if(priorityLevels < 1) {
|
if(priorityLevels < 1) {
|
||||||
throw new IllegalArgumentException("Number of Priority Levels must be " +
|
throw new IllegalArgumentException("Number of Priority Levels must be " +
|
||||||
"at least 1");
|
"at least 1");
|
||||||
@ -101,11 +111,18 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
|
|||||||
|
|
||||||
this.queues = new ArrayList<BlockingQueue<E>>(numQueues);
|
this.queues = new ArrayList<BlockingQueue<E>>(numQueues);
|
||||||
this.overflowedCalls = new ArrayList<AtomicLong>(numQueues);
|
this.overflowedCalls = new ArrayList<AtomicLong>(numQueues);
|
||||||
int queueCapacity = capacity / numQueues;
|
int totalWeights = 0;
|
||||||
int capacityForFirstQueue = queueCapacity + (capacity % numQueues);
|
for (int i = 0; i < capacityWeights.length; i++) {
|
||||||
|
totalWeights += capacityWeights[i];
|
||||||
|
}
|
||||||
|
int residueCapacity = capacity % totalWeights;
|
||||||
|
int unitCapacity = capacity / totalWeights;
|
||||||
|
int queueCapacity;
|
||||||
for(int i=0; i < numQueues; i++) {
|
for(int i=0; i < numQueues; i++) {
|
||||||
|
queueCapacity = unitCapacity * capacityWeights[i];
|
||||||
if (i == 0) {
|
if (i == 0) {
|
||||||
this.queues.add(new LinkedBlockingQueue<E>(capacityForFirstQueue));
|
this.queues.add(new LinkedBlockingQueue<E>(
|
||||||
|
queueCapacity + residueCapacity));
|
||||||
} else {
|
} else {
|
||||||
this.queues.add(new LinkedBlockingQueue<E>(queueCapacity));
|
this.queues.add(new LinkedBlockingQueue<E>(queueCapacity));
|
||||||
}
|
}
|
||||||
|
@ -126,6 +126,7 @@ omitted.
|
|||||||
|:---- |:---- |:---- |:--- |
|
|:---- |:---- |:---- |:--- |
|
||||||
| backoff.enable | General | Whether or not to enable client backoff when a queue is full. | false |
|
| backoff.enable | General | Whether or not to enable client backoff when a queue is full. | false |
|
||||||
| callqueue.impl | General | The fully qualified name of a class to use as the implementation of a call queue. Use `org.apache.hadoop.ipc.FairCallQueue` for the Fair Call Queue. | `java.util.concurrent.LinkedBlockingQueue` (FIFO queue) |
|
| callqueue.impl | General | The fully qualified name of a class to use as the implementation of a call queue. Use `org.apache.hadoop.ipc.FairCallQueue` for the Fair Call Queue. | `java.util.concurrent.LinkedBlockingQueue` (FIFO queue) |
|
||||||
|
| callqueue.capacity.weights | General | The capacity allocation weights among all subqueues. A postive int array whose length is equal to the `scheduler.priority.levels` is expected where each int is the relative weight out of total capacity. i.e. if a queue with capacity weight `w`, its queue capacity is `capacity * w/sum(weights)` |
|
||||||
| scheduler.impl | General | The fully qualified name of a class to use as the implementation of the scheduler. Use `org.apache.hadoop.ipc.DecayRpcScheduler` in conjunction with the Fair Call Queue. | `org.apache.hadoop.ipc.DefaultRpcScheduler` (no-op scheduler) <br/> If using FairCallQueue, defaults to `org.apache.hadoop.ipc.DecayRpcScheduler` |
|
| scheduler.impl | General | The fully qualified name of a class to use as the implementation of the scheduler. Use `org.apache.hadoop.ipc.DecayRpcScheduler` in conjunction with the Fair Call Queue. | `org.apache.hadoop.ipc.DefaultRpcScheduler` (no-op scheduler) <br/> If using FairCallQueue, defaults to `org.apache.hadoop.ipc.DecayRpcScheduler` |
|
||||||
| scheduler.priority.levels | RpcScheduler, CallQueue | How many priority levels to use within the scheduler and call queue. | 4 |
|
| scheduler.priority.levels | RpcScheduler, CallQueue | How many priority levels to use within the scheduler and call queue. | 4 |
|
||||||
| faircallqueue.multiplexer.weights | WeightedRoundRobinMultiplexer | How much weight to give to each priority queue. This should be a comma-separated list of length equal to the number of priority levels. | Weights descend by a factor of 2 (e.g., for 4 levels: `8,4,2,1`) |
|
| faircallqueue.multiplexer.weights | WeightedRoundRobinMultiplexer | How much weight to give to each priority queue. This should be a comma-separated list of length equal to the number of priority levels. | Weights descend by a factor of 2 (e.g., for 4 levels: `8,4,2,1`) |
|
||||||
@ -151,6 +152,10 @@ processed.
|
|||||||
<name>ipc.8020.callqueue.impl</name>
|
<name>ipc.8020.callqueue.impl</name>
|
||||||
<value>org.apache.hadoop.ipc.FairCallQueue</value>
|
<value>org.apache.hadoop.ipc.FairCallQueue</value>
|
||||||
</property>
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>ipc.8020.callqueue.capacity.weights</name>
|
||||||
|
<value>7,3</value>
|
||||||
|
</property>
|
||||||
<property>
|
<property>
|
||||||
<name>ipc.8020.scheduler.impl</name>
|
<name>ipc.8020.scheduler.impl</name>
|
||||||
<value>org.apache.hadoop.ipc.DecayRpcScheduler</value>
|
<value>org.apache.hadoop.ipc.DecayRpcScheduler</value>
|
||||||
|
@ -104,6 +104,9 @@ public class TestFairCallQueue {
|
|||||||
assertThat(fairCallQueue.remainingCapacity()).isEqualTo(1025);
|
assertThat(fairCallQueue.remainingCapacity()).isEqualTo(1025);
|
||||||
fairCallQueue = new FairCallQueue<Schedulable>(7, 1025, "ns", conf);
|
fairCallQueue = new FairCallQueue<Schedulable>(7, 1025, "ns", conf);
|
||||||
assertThat(fairCallQueue.remainingCapacity()).isEqualTo(1025);
|
assertThat(fairCallQueue.remainingCapacity()).isEqualTo(1025);
|
||||||
|
fairCallQueue = new FairCallQueue<Schedulable>(7, 1025, "ns",
|
||||||
|
new int[]{7, 6, 5, 4, 3, 2, 1}, conf);
|
||||||
|
assertThat(fairCallQueue.remainingCapacity()).isEqualTo(1025);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -157,6 +160,66 @@ public class TestFairCallQueue {
|
|||||||
assertNull(fcq.poll());
|
assertNull(fcq.poll());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testQueueCapacity() {
|
||||||
|
int numQueues = 2;
|
||||||
|
int capacity = 4;
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
List<Schedulable> calls = new ArrayList<>();
|
||||||
|
|
||||||
|
// default weights i.e. all queues share capacity
|
||||||
|
fcq = new FairCallQueue<Schedulable>(numQueues, 4, "ns", conf);
|
||||||
|
FairCallQueue<Schedulable> fcq1 = new FairCallQueue<Schedulable>(
|
||||||
|
numQueues, capacity, "ns", new int[]{3, 1}, conf);
|
||||||
|
|
||||||
|
for (int i=0; i < capacity; i++) {
|
||||||
|
Schedulable call = mockCall("u", i%2);
|
||||||
|
calls.add(call);
|
||||||
|
fcq.add(call);
|
||||||
|
fcq1.add(call);
|
||||||
|
|
||||||
|
call = mockCall("u", (i++)%2);
|
||||||
|
calls.add(call);
|
||||||
|
fcq.add(call);
|
||||||
|
fcq1.add(call);
|
||||||
|
}
|
||||||
|
|
||||||
|
final AtomicInteger currentIndex = new AtomicInteger();
|
||||||
|
fcq.setMultiplexer(new RpcMultiplexer(){
|
||||||
|
@Override
|
||||||
|
public int getAndAdvanceCurrentIndex() {
|
||||||
|
return currentIndex.get();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
fcq1.setMultiplexer(new RpcMultiplexer(){
|
||||||
|
@Override
|
||||||
|
public int getAndAdvanceCurrentIndex() {
|
||||||
|
return currentIndex.get();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// either queue will have two calls
|
||||||
|
// v
|
||||||
|
// 0 2
|
||||||
|
// 1 3
|
||||||
|
currentIndex.set(1);
|
||||||
|
assertSame(calls.get(2), fcq.poll());
|
||||||
|
assertSame(calls.get(3), fcq.poll());
|
||||||
|
assertSame(calls.get(0), fcq.poll());
|
||||||
|
assertSame(calls.get(1), fcq.poll());
|
||||||
|
|
||||||
|
// queues with different number of calls
|
||||||
|
// v
|
||||||
|
// 0 3
|
||||||
|
// 1
|
||||||
|
// 2
|
||||||
|
currentIndex.set(1);
|
||||||
|
assertSame(calls.get(3), fcq1.poll());
|
||||||
|
assertSame(calls.get(0), fcq1.poll());
|
||||||
|
assertSame(calls.get(1), fcq1.poll());
|
||||||
|
assertSame(calls.get(2), fcq1.poll());
|
||||||
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
@Test
|
@Test
|
||||||
public void testInsertionWithFailover() {
|
public void testInsertionWithFailover() {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user