diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 50a6b82afd..0291c758e0 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -560,6 +560,10 @@ Release 2.6.0 - UNRELEASED HADOOP-10224. JavaKeyStoreProvider has to protect against corrupting underlying store. (asuresh via tucu) + HADOOP-10282. Create a FairCallQueue: a multi-level call queue which + schedules incoming calls and multiplexes outgoing calls. (Chris Li via + Arpit Agarwal) + BUG FIXES HADOOP-10781. Unportable getgrouplist() usage breaks FreeBSD (Dmitry diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java new file mode 100644 index 0000000000..0b56243db5 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java @@ -0,0 +1,449 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ipc; + +import java.lang.ref.WeakReference; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.AbstractQueue; +import java.util.HashMap; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.lang.NotImplementedException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.util.MBeans; + +/** + * A queue with multiple levels for each priority. + */ +public class FairCallQueue extends AbstractQueue + implements BlockingQueue { + // Configuration Keys + public static final int IPC_CALLQUEUE_PRIORITY_LEVELS_DEFAULT = 4; + public static final String IPC_CALLQUEUE_PRIORITY_LEVELS_KEY = + "faircallqueue.priority-levels"; + + public static final Log LOG = LogFactory.getLog(FairCallQueue.class); + + /* The queues */ + private final ArrayList> queues; + + /* Read locks */ + private final ReentrantLock takeLock = new ReentrantLock(); + private final Condition notEmpty = takeLock.newCondition(); + private void signalNotEmpty() { + takeLock.lock(); + try { + notEmpty.signal(); + } finally { + takeLock.unlock(); + } + } + + /* Scheduler picks which queue to place in */ + private RpcScheduler scheduler; + + /* Multiplexer picks which queue to draw from */ + private RpcMultiplexer multiplexer; + + /* Statistic tracking */ + private final ArrayList overflowedCalls; + + /** + * Create a FairCallQueue. + * @param capacity the maximum size of each sub-queue + * @param ns the prefix to use for configuration + * @param conf the configuration to read from + * Notes: the FairCallQueue has no fixed capacity. Rather, it has a minimum + * capacity of `capacity` and a maximum capacity of `capacity * number_queues` + */ + public FairCallQueue(int capacity, String ns, Configuration conf) { + int numQueues = parseNumQueues(ns, conf); + LOG.info("FairCallQueue is in use with " + numQueues + " queues."); + + this.queues = new ArrayList>(numQueues); + this.overflowedCalls = new ArrayList(numQueues); + + for(int i=0; i < numQueues; i++) { + this.queues.add(new LinkedBlockingQueue(capacity)); + this.overflowedCalls.add(new AtomicLong(0)); + } + + this.scheduler = new DecayRpcScheduler(numQueues, ns, conf); + this.multiplexer = new WeightedRoundRobinMultiplexer(numQueues, ns, conf); + + // Make this the active source of metrics + MetricsProxy mp = MetricsProxy.getInstance(ns); + mp.setDelegate(this); + } + + /** + * Read the number of queues from the configuration. + * This will affect the FairCallQueue's overall capacity. + * @throws IllegalArgumentException on invalid queue count + */ + private static int parseNumQueues(String ns, Configuration conf) { + int retval = conf.getInt(ns + "." + IPC_CALLQUEUE_PRIORITY_LEVELS_KEY, + IPC_CALLQUEUE_PRIORITY_LEVELS_DEFAULT); + if(retval < 1) { + throw new IllegalArgumentException("numQueues must be at least 1"); + } + return retval; + } + + /** + * Returns the first non-empty queue with equal or lesser priority + * than startIdx. Wraps around, searching a maximum of N + * queues, where N is this.queues.size(). + * + * @param startIdx the queue number to start searching at + * @return the first non-empty queue with less priority, or null if + * everything was empty + */ + private BlockingQueue getFirstNonEmptyQueue(int startIdx) { + final int numQueues = this.queues.size(); + for(int i=0; i < numQueues; i++) { + int idx = (i + startIdx) % numQueues; // offset and wrap around + BlockingQueue queue = this.queues.get(idx); + if (queue.size() != 0) { + return queue; + } + } + + // All queues were empty + return null; + } + + /* AbstractQueue and BlockingQueue methods */ + + /** + * Put and offer follow the same pattern: + * 1. Get a priorityLevel from the scheduler + * 2. Get the nth sub-queue matching this priorityLevel + * 3. delegate the call to this sub-queue. + * + * But differ in how they handle overflow: + * - Put will move on to the next queue until it lands on the last queue + * - Offer does not attempt other queues on overflow + */ + @Override + public void put(E e) throws InterruptedException { + int priorityLevel = scheduler.getPriorityLevel(e); + + final int numLevels = this.queues.size(); + while (true) { + BlockingQueue q = this.queues.get(priorityLevel); + boolean res = q.offer(e); + if (!res) { + // Update stats + this.overflowedCalls.get(priorityLevel).getAndIncrement(); + + // If we failed to insert, try again on the next level + priorityLevel++; + + if (priorityLevel == numLevels) { + // That was the last one, we will block on put in the last queue + // Delete this line to drop the call + this.queues.get(priorityLevel-1).put(e); + break; + } + } else { + break; + } + } + + + signalNotEmpty(); + } + + @Override + public boolean offer(E e, long timeout, TimeUnit unit) + throws InterruptedException { + int priorityLevel = scheduler.getPriorityLevel(e); + BlockingQueue q = this.queues.get(priorityLevel); + boolean ret = q.offer(e, timeout, unit); + + signalNotEmpty(); + + return ret; + } + + @Override + public boolean offer(E e) { + int priorityLevel = scheduler.getPriorityLevel(e); + BlockingQueue q = this.queues.get(priorityLevel); + boolean ret = q.offer(e); + + signalNotEmpty(); + + return ret; + } + + @Override + public E take() throws InterruptedException { + int startIdx = this.multiplexer.getAndAdvanceCurrentIndex(); + + takeLock.lockInterruptibly(); + try { + // Wait while queue is empty + for (;;) { + BlockingQueue q = this.getFirstNonEmptyQueue(startIdx); + if (q != null) { + // Got queue, so return if we can poll out an object + E e = q.poll(); + if (e != null) { + return e; + } + } + + notEmpty.await(); + } + } finally { + takeLock.unlock(); + } + } + + @Override + public E poll(long timeout, TimeUnit unit) + throws InterruptedException { + + int startIdx = this.multiplexer.getAndAdvanceCurrentIndex(); + + long nanos = unit.toNanos(timeout); + takeLock.lockInterruptibly(); + try { + for (;;) { + BlockingQueue q = this.getFirstNonEmptyQueue(startIdx); + if (q != null) { + E e = q.poll(); + if (e != null) { + // Escape condition: there might be something available + return e; + } + } + + if (nanos <= 0) { + // Wait has elapsed + return null; + } + + try { + // Now wait on the condition for a bit. If we get + // spuriously awoken we'll re-loop + nanos = notEmpty.awaitNanos(nanos); + } catch (InterruptedException ie) { + notEmpty.signal(); // propagate to a non-interrupted thread + throw ie; + } + } + } finally { + takeLock.unlock(); + } + } + + /** + * poll() provides no strict consistency: it is possible for poll to return + * null even though an element is in the queue. + */ + @Override + public E poll() { + int startIdx = this.multiplexer.getAndAdvanceCurrentIndex(); + + BlockingQueue q = this.getFirstNonEmptyQueue(startIdx); + if (q == null) { + return null; // everything is empty + } + + // Delegate to the sub-queue's poll, which could still return null + return q.poll(); + } + + /** + * Peek, like poll, provides no strict consistency. + */ + @Override + public E peek() { + BlockingQueue q = this.getFirstNonEmptyQueue(0); + if (q == null) { + return null; + } else { + return q.peek(); + } + } + + /** + * Size returns the sum of all sub-queue sizes, so it may be greater than + * capacity. + * Note: size provides no strict consistency, and should not be used to + * control queue IO. + */ + @Override + public int size() { + int size = 0; + for (BlockingQueue q : this.queues) { + size += q.size(); + } + return size; + } + + /** + * Iterator is not implemented, as it is not needed. + */ + @Override + public Iterator iterator() { + throw new NotImplementedException(); + } + + /** + * drainTo defers to each sub-queue. Note that draining from a FairCallQueue + * to another FairCallQueue will likely fail, since the incoming calls + * may be scheduled differently in the new FairCallQueue. Nonetheless this + * method is provided for completeness. + */ + @Override + public int drainTo(Collection c, int maxElements) { + int sum = 0; + for (BlockingQueue q : this.queues) { + sum += q.drainTo(c, maxElements); + } + return sum; + } + + @Override + public int drainTo(Collection c) { + int sum = 0; + for (BlockingQueue q : this.queues) { + sum += q.drainTo(c); + } + return sum; + } + + /** + * Returns maximum remaining capacity. This does not reflect how much you can + * ideally fit in this FairCallQueue, as that would depend on the scheduler's + * decisions. + */ + @Override + public int remainingCapacity() { + int sum = 0; + for (BlockingQueue q : this.queues) { + sum += q.remainingCapacity(); + } + return sum; + } + + /** + * MetricsProxy is a singleton because we may init multiple + * FairCallQueues, but the metrics system cannot unregister beans cleanly. + */ + private static final class MetricsProxy implements FairCallQueueMXBean { + // One singleton per namespace + private static final HashMap INSTANCES = + new HashMap(); + + // Weakref for delegate, so we don't retain it forever if it can be GC'd + private WeakReference delegate; + + // Keep track of how many objects we registered + private int revisionNumber = 0; + + private MetricsProxy(String namespace) { + MBeans.register(namespace, "FairCallQueue", this); + } + + public static synchronized MetricsProxy getInstance(String namespace) { + MetricsProxy mp = INSTANCES.get(namespace); + if (mp == null) { + // We must create one + mp = new MetricsProxy(namespace); + INSTANCES.put(namespace, mp); + } + return mp; + } + + public void setDelegate(FairCallQueue obj) { + this.delegate = new WeakReference(obj); + this.revisionNumber++; + } + + @Override + public int[] getQueueSizes() { + FairCallQueue obj = this.delegate.get(); + if (obj == null) { + return new int[]{}; + } + + return obj.getQueueSizes(); + } + + @Override + public long[] getOverflowedCalls() { + FairCallQueue obj = this.delegate.get(); + if (obj == null) { + return new long[]{}; + } + + return obj.getOverflowedCalls(); + } + + @Override public int getRevision() { + return revisionNumber; + } + } + + // FairCallQueueMXBean + public int[] getQueueSizes() { + int numQueues = queues.size(); + int[] sizes = new int[numQueues]; + for (int i=0; i < numQueues; i++) { + sizes[i] = queues.get(i).size(); + } + return sizes; + } + + public long[] getOverflowedCalls() { + int numQueues = queues.size(); + long[] calls = new long[numQueues]; + for (int i=0; i < numQueues; i++) { + calls[i] = overflowedCalls.get(i).get(); + } + return calls; + } + + // For testing + @VisibleForTesting + public void setScheduler(RpcScheduler newScheduler) { + this.scheduler = newScheduler; + } + + @VisibleForTesting + public void setMultiplexer(RpcMultiplexer newMux) { + this.multiplexer = newMux; + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueueMXBean.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueueMXBean.java new file mode 100644 index 0000000000..bd68ecb1ad --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueueMXBean.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ipc; + +public interface FairCallQueueMXBean { + // Get the size of each subqueue, the index corrosponding to the priority + // level. + int[] getQueueSizes(); + long[] getOverflowedCalls(); + int getRevision(); +} \ No newline at end of file diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcMultiplexer.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcMultiplexer.java new file mode 100644 index 0000000000..01eecc55cf --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcMultiplexer.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ipc; + +/** + * Implement this interface to make a pluggable multiplexer in the + * FairCallQueue. + */ +public interface RpcMultiplexer { + /** + * Should get current index and optionally perform whatever is needed + * to prepare the next index. + * @return current index + */ + int getAndAdvanceCurrentIndex(); +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WeightedRoundRobinMultiplexer.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WeightedRoundRobinMultiplexer.java index 497ca75746..cfda94734c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WeightedRoundRobinMultiplexer.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WeightedRoundRobinMultiplexer.java @@ -38,7 +38,7 @@ * There may be more reads than the minimum due to race conditions. This is * allowed by design for performance reasons. */ -public class WeightedRoundRobinMultiplexer { +public class WeightedRoundRobinMultiplexer implements RpcMultiplexer { // Config keys public static final String IPC_CALLQUEUE_WRRMUX_WEIGHTS_KEY = "faircallqueue.multiplexer.weights"; diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java new file mode 100644 index 0000000000..acbedc50f9 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java @@ -0,0 +1,392 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ipc; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import junit.framework.TestCase; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.BlockingQueue; + +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.conf.Configuration; +import org.mockito.Matchers; + +import static org.apache.hadoop.ipc.FairCallQueue.IPC_CALLQUEUE_PRIORITY_LEVELS_KEY; + +public class TestFairCallQueue extends TestCase { + private FairCallQueue fcq; + + private Schedulable mockCall(String id) { + Schedulable mockCall = mock(Schedulable.class); + UserGroupInformation ugi = mock(UserGroupInformation.class); + + when(ugi.getUserName()).thenReturn(id); + when(mockCall.getUserGroupInformation()).thenReturn(ugi); + + return mockCall; + } + + // A scheduler which always schedules into priority zero + private RpcScheduler alwaysZeroScheduler; + { + RpcScheduler sched = mock(RpcScheduler.class); + when(sched.getPriorityLevel(Matchers.any())).thenReturn(0); // always queue 0 + alwaysZeroScheduler = sched; + } + + public void setUp() { + Configuration conf = new Configuration(); + conf.setInt("ns." + IPC_CALLQUEUE_PRIORITY_LEVELS_KEY, 2); + + fcq = new FairCallQueue(5, "ns", conf); + } + + // + // Ensure that FairCallQueue properly implements BlockingQueue + // + public void testPollReturnsNullWhenEmpty() { + assertNull(fcq.poll()); + } + + public void testPollReturnsTopCallWhenNotEmpty() { + Schedulable call = mockCall("c"); + assertTrue(fcq.offer(call)); + + assertEquals(call, fcq.poll()); + + // Poll took it out so the fcq is empty + assertEquals(0, fcq.size()); + } + + public void testOfferSucceeds() { + fcq.setScheduler(alwaysZeroScheduler); + + for (int i = 0; i < 5; i++) { + // We can fit 10 calls + assertTrue(fcq.offer(mockCall("c"))); + } + + assertEquals(5, fcq.size()); + } + + public void testOfferFailsWhenFull() { + fcq.setScheduler(alwaysZeroScheduler); + for (int i = 0; i < 5; i++) { assertTrue(fcq.offer(mockCall("c"))); } + + assertFalse(fcq.offer(mockCall("c"))); // It's full + + assertEquals(5, fcq.size()); + } + + public void testOfferSucceedsWhenScheduledLowPriority() { + // Scheduler will schedule into queue 0 x 5, then queue 1 + RpcScheduler sched = mock(RpcScheduler.class); + when(sched.getPriorityLevel(Matchers.any())).thenReturn(0, 0, 0, 0, 0, 1, 0); + fcq.setScheduler(sched); + for (int i = 0; i < 5; i++) { assertTrue(fcq.offer(mockCall("c"))); } + + assertTrue(fcq.offer(mockCall("c"))); + + assertEquals(6, fcq.size()); + } + + public void testPeekNullWhenEmpty() { + assertNull(fcq.peek()); + } + + public void testPeekNonDestructive() { + Schedulable call = mockCall("c"); + assertTrue(fcq.offer(call)); + + assertEquals(call, fcq.peek()); + assertEquals(call, fcq.peek()); // Non-destructive + assertEquals(1, fcq.size()); + } + + public void testPeekPointsAtHead() { + Schedulable call = mockCall("c"); + Schedulable next = mockCall("b"); + fcq.offer(call); + fcq.offer(next); + + assertEquals(call, fcq.peek()); // Peek points at the head + } + + public void testPollTimeout() throws InterruptedException { + fcq.setScheduler(alwaysZeroScheduler); + + assertNull(fcq.poll(10, TimeUnit.MILLISECONDS)); + } + + public void testPollSuccess() throws InterruptedException { + fcq.setScheduler(alwaysZeroScheduler); + + Schedulable call = mockCall("c"); + assertTrue(fcq.offer(call)); + + assertEquals(call, fcq.poll(10, TimeUnit.MILLISECONDS)); + + assertEquals(0, fcq.size()); + } + + public void testOfferTimeout() throws InterruptedException { + fcq.setScheduler(alwaysZeroScheduler); + for (int i = 0; i < 5; i++) { + assertTrue(fcq.offer(mockCall("c"), 10, TimeUnit.MILLISECONDS)); + } + + assertFalse(fcq.offer(mockCall("e"), 10, TimeUnit.MILLISECONDS)); // It's full + + assertEquals(5, fcq.size()); + } + + public void testDrainTo() { + Configuration conf = new Configuration(); + conf.setInt("ns." + IPC_CALLQUEUE_PRIORITY_LEVELS_KEY, 2); + FairCallQueue fcq2 = new FairCallQueue(10, "ns", conf); + + fcq.setScheduler(alwaysZeroScheduler); + fcq2.setScheduler(alwaysZeroScheduler); + + // Start with 3 in fcq, to be drained + for (int i = 0; i < 3; i++) { + fcq.offer(mockCall("c")); + } + + fcq.drainTo(fcq2); + + assertEquals(0, fcq.size()); + assertEquals(3, fcq2.size()); + } + + public void testDrainToWithLimit() { + Configuration conf = new Configuration(); + conf.setInt("ns." + IPC_CALLQUEUE_PRIORITY_LEVELS_KEY, 2); + FairCallQueue fcq2 = new FairCallQueue(10, "ns", conf); + + fcq.setScheduler(alwaysZeroScheduler); + fcq2.setScheduler(alwaysZeroScheduler); + + // Start with 3 in fcq, to be drained + for (int i = 0; i < 3; i++) { + fcq.offer(mockCall("c")); + } + + fcq.drainTo(fcq2, 2); + + assertEquals(1, fcq.size()); + assertEquals(2, fcq2.size()); + } + + public void testInitialRemainingCapacity() { + assertEquals(10, fcq.remainingCapacity()); + } + + public void testFirstQueueFullRemainingCapacity() { + fcq.setScheduler(alwaysZeroScheduler); + while (fcq.offer(mockCall("c"))) ; // Queue 0 will fill up first, then queue 1 + + assertEquals(5, fcq.remainingCapacity()); + } + + public void testAllQueuesFullRemainingCapacity() { + RpcScheduler sched = mock(RpcScheduler.class); + when(sched.getPriorityLevel(Matchers.any())).thenReturn(0, 0, 0, 0, 0, 1, 1, 1, 1, 1); + fcq.setScheduler(sched); + while (fcq.offer(mockCall("c"))) ; + + assertEquals(0, fcq.remainingCapacity()); + assertEquals(10, fcq.size()); + } + + public void testQueuesPartialFilledRemainingCapacity() { + RpcScheduler sched = mock(RpcScheduler.class); + when(sched.getPriorityLevel(Matchers.any())).thenReturn(0, 1, 0, 1, 0); + fcq.setScheduler(sched); + for (int i = 0; i < 5; i++) { fcq.offer(mockCall("c")); } + + assertEquals(5, fcq.remainingCapacity()); + assertEquals(5, fcq.size()); + } + + /** + * Putter produces FakeCalls + */ + public class Putter implements Runnable { + private final BlockingQueue cq; + + public final String tag; + public volatile int callsAdded = 0; // How many calls we added, accurate unless interrupted + private final int maxCalls; + + public Putter(BlockingQueue aCq, int maxCalls, String tag) { + this.maxCalls = maxCalls; + this.cq = aCq; + this.tag = tag; + } + + private String getTag() { + if (this.tag != null) return this.tag; + return ""; + } + + @Override + public void run() { + try { + // Fill up to max (which is infinite if maxCalls < 0) + while (callsAdded < maxCalls || maxCalls < 0) { + cq.put(mockCall(getTag())); + callsAdded++; + } + } catch (InterruptedException e) { + return; + } + } + } + + /** + * Taker consumes FakeCalls + */ + public class Taker implements Runnable { + private final BlockingQueue cq; + + public final String tag; // if >= 0 means we will only take the matching tag, and put back + // anything else + public volatile int callsTaken = 0; // total calls taken, accurate if we aren't interrupted + public volatile Schedulable lastResult = null; // the last thing we took + private final int maxCalls; // maximum calls to take + + private IdentityProvider uip; + + public Taker(BlockingQueue aCq, int maxCalls, String tag) { + this.maxCalls = maxCalls; + this.cq = aCq; + this.tag = tag; + this.uip = new UserIdentityProvider(); + } + + @Override + public void run() { + try { + // Take while we don't exceed maxCalls, or if maxCalls is undefined (< 0) + while (callsTaken < maxCalls || maxCalls < 0) { + Schedulable res = cq.take(); + String identity = uip.makeIdentity(res); + + if (tag != null && this.tag.equals(identity)) { + // This call does not match our tag, we should put it back and try again + cq.put(res); + } else { + callsTaken++; + lastResult = res; + } + } + } catch (InterruptedException e) { + return; + } + } + } + + // Assert we can take exactly the numberOfTakes + public void assertCanTake(BlockingQueue cq, int numberOfTakes, + int takeAttempts) throws InterruptedException { + + Taker taker = new Taker(cq, takeAttempts, "default"); + Thread t = new Thread(taker); + t.start(); + t.join(100); + + assertEquals(numberOfTakes, taker.callsTaken); + t.interrupt(); + } + + // Assert we can put exactly the numberOfPuts + public void assertCanPut(BlockingQueue cq, int numberOfPuts, + int putAttempts) throws InterruptedException { + + Putter putter = new Putter(cq, putAttempts, null); + Thread t = new Thread(putter); + t.start(); + t.join(100); + + assertEquals(numberOfPuts, putter.callsAdded); + t.interrupt(); + } + + // Make sure put will overflow into lower queues when the top is full + public void testPutOverflows() throws InterruptedException { + fcq.setScheduler(alwaysZeroScheduler); + + // We can fit more than 5, even though the scheduler suggests the top queue + assertCanPut(fcq, 8, 8); + assertEquals(8, fcq.size()); + } + + public void testPutBlocksWhenAllFull() throws InterruptedException { + fcq.setScheduler(alwaysZeroScheduler); + + assertCanPut(fcq, 10, 10); // Fill up + assertEquals(10, fcq.size()); + + // Put more which causes overflow + assertCanPut(fcq, 0, 1); // Will block + } + + public void testTakeBlocksWhenEmpty() throws InterruptedException { + fcq.setScheduler(alwaysZeroScheduler); + assertCanTake(fcq, 0, 1); + } + + public void testTakeRemovesCall() throws InterruptedException { + fcq.setScheduler(alwaysZeroScheduler); + Schedulable call = mockCall("c"); + fcq.offer(call); + + assertEquals(call, fcq.take()); + assertEquals(0, fcq.size()); + } + + public void testTakeTriesNextQueue() throws InterruptedException { + // Make a FCQ filled with calls in q 1 but empty in q 0 + RpcScheduler q1Scheduler = mock(RpcScheduler.class); + when(q1Scheduler.getPriorityLevel(Matchers.any())).thenReturn(1); + fcq.setScheduler(q1Scheduler); + + // A mux which only draws from q 0 + RpcMultiplexer q0mux = mock(RpcMultiplexer.class); + when(q0mux.getAndAdvanceCurrentIndex()).thenReturn(0); + fcq.setMultiplexer(q0mux); + + Schedulable call = mockCall("c"); + fcq.put(call); + + // Take from q1 even though mux said q0, since q0 empty + assertEquals(call, fcq.take()); + assertEquals(0, fcq.size()); + } +} \ No newline at end of file