diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index a1e86bbdd0..d4abc55dcc 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -345,6 +345,9 @@ Release 2.5.0 - UNRELEASED IMPROVEMENTS + HADOOP-10278. Refactor to make CallQueue pluggable. (Chris Li via + Arpit Agarwal) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java index b57b3f2ce5..e2d4fbcb9d 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java @@ -82,6 +82,14 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic { /** Default value for IPC_SERVER_HANDLER_QUEUE_SIZE_KEY */ public static final int IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT = 100; + /** + * CallQueue related settings. These are not used directly, but rather + * combined with a namespace and port. For instance: + * IPC_CALLQUEUE_NAMESPACE + ".8020." + IPC_CALLQUEUE_IMPL_KEY + */ + public static final String IPC_CALLQUEUE_NAMESPACE = "ipc"; + public static final String IPC_CALLQUEUE_IMPL_KEY = "callqueue.impl"; + /** Internal buffer size for Lzo compressor/decompressors */ public static final String IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY = "io.compression.codec.lzo.buffersize"; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java new file mode 100644 index 0000000000..dae7ace4bd --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ipc; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.TimeUnit; + +import java.lang.reflect.Constructor; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.conf.Configuration; + +/** + * Abstracts queue operations for different blocking queues. + */ +public class CallQueueManager { + public static final Log LOG = LogFactory.getLog(CallQueueManager.class); + + // Atomic refs point to active callQueue + // We have two so we can better control swapping + private final AtomicReference> putRef; + private final AtomicReference> takeRef; + + public CallQueueManager(Class backingClass, int maxQueueSize, + String namespace, Configuration conf) { + BlockingQueue bq = createCallQueueInstance(backingClass, + maxQueueSize, namespace, conf); + this.putRef = new AtomicReference>(bq); + this.takeRef = new AtomicReference>(bq); + LOG.info("Using callQueue " + backingClass); + } + + @SuppressWarnings("unchecked") + private BlockingQueue createCallQueueInstance(Class theClass, int maxLen, + String ns, Configuration conf) { + + // Used for custom, configurable callqueues + try { + Constructor ctor = theClass.getDeclaredConstructor(int.class, String.class, + Configuration.class); + return (BlockingQueue)ctor.newInstance(maxLen, ns, conf); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + } + + // Used for LinkedBlockingQueue, ArrayBlockingQueue, etc + try { + Constructor ctor = theClass.getDeclaredConstructor(int.class); + return (BlockingQueue)ctor.newInstance(maxLen); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + } + + // Last attempt + try { + Constructor ctor = theClass.getDeclaredConstructor(); + return (BlockingQueue)ctor.newInstance(); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + } + + // Nothing worked + throw new RuntimeException(theClass.getName() + + " could not be constructed."); + } + + /** + * Insert e into the backing queue or block until we can. + * If we block and the queue changes on us, we will insert while the + * queue is drained. + */ + public void put(E e) throws InterruptedException { + putRef.get().put(e); + } + + /** + * Retrieve an E from the backing queue or block until we can. + * Guaranteed to return an element from the current queue. + */ + public E take() throws InterruptedException { + E e = null; + + while (e == null) { + e = takeRef.get().poll(1000L, TimeUnit.MILLISECONDS); + } + + return e; + } + + public int size() { + return takeRef.get().size(); + } + + /** + * Replaces active queue with the newly requested one and transfers + * all calls to the newQ before returning. + */ + public synchronized void swapQueue(Class queueClassToUse, int maxSize, + String ns, Configuration conf) { + BlockingQueue newQ = createCallQueueInstance(queueClassToUse, maxSize, + ns, conf); + + // Our current queue becomes the old queue + BlockingQueue oldQ = putRef.get(); + + // Swap putRef first: allow blocked puts() to be unblocked + putRef.set(newQ); + + // Wait for handlers to drain the oldQ + while (!queueIsReallyEmpty(oldQ)) {} + + // Swap takeRef to handle new calls + takeRef.set(newQ); + + LOG.info("Old Queue: " + stringRepr(oldQ) + ", " + + "Replacement: " + stringRepr(newQ)); + } + + /** + * Checks if queue is empty by checking at two points in time. + * This doesn't mean the queue might not fill up at some point later, but + * it should decrease the probability that we lose a call this way. + */ + private boolean queueIsReallyEmpty(BlockingQueue q) { + boolean wasEmpty = q.isEmpty(); + try { + Thread.sleep(10); + } catch (InterruptedException ie) { + return false; + } + return q.isEmpty() && wasEmpty; + } + + private String stringRepr(Object o) { + return o.getClass().getName() + '@' + Integer.toHexString(o.hashCode()); + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 9871a3d138..92d8bbaf26 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -365,7 +365,7 @@ public static boolean isRpcInvocation() { private final boolean tcpNoDelay; // if T then disable Nagle's Algorithm volatile private boolean running = true; // true while server runs - private BlockingQueue callQueue; // queued calls + private CallQueueManager callQueue; // maintains the set of client connections and handles idle timeouts private ConnectionManager connectionManager; @@ -469,6 +469,19 @@ public ServiceAuthorizationManager getServiceAuthorizationManager() { return serviceAuthorizationManager; } + /* + * Refresh the call queue + */ + public synchronized void refreshCallQueue(Configuration conf) { + // Create the next queue + String prefix = CommonConfigurationKeys.IPC_CALLQUEUE_NAMESPACE + "." + + this.port; + Class queueClassToUse = conf.getClass(prefix + "." + + CommonConfigurationKeys.IPC_CALLQUEUE_IMPL_KEY, LinkedBlockingQueue.class); + + callQueue.swapQueue(queueClassToUse, maxQueueSize, prefix, conf); + } + /** A call queued for handling. */ public static class Call { private final int callId; // the client's call id @@ -2193,7 +2206,15 @@ protected Server(String bindAddress, int port, this.readerPendingConnectionQueue = conf.getInt( CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_KEY, CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_DEFAULT); - this.callQueue = new LinkedBlockingQueue(maxQueueSize); + + // Setup appropriate callqueue + String prefix = CommonConfigurationKeys.IPC_CALLQUEUE_NAMESPACE + "." + + this.port; + Class queueClassToUse = conf.getClass(prefix + "." + + CommonConfigurationKeys.IPC_CALLQUEUE_IMPL_KEY, LinkedBlockingQueue.class); + this.callQueue = new CallQueueManager(queueClassToUse, maxQueueSize, + prefix, conf); + this.secretManager = (SecretManager) secretManager; this.authorize = conf.getBoolean(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java new file mode 100644 index 0000000000..3e519af8d2 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java @@ -0,0 +1,223 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ipc; + +import java.util.concurrent.LinkedBlockingQueue; +import java.util.HashMap; +import java.util.ArrayList; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Test; +import org.junit.Before; +import org.junit.After; + +public class TestCallQueueManager { + private CallQueueManager manager; + + public class FakeCall { + public final int tag; // Can be used for unique identification + + public FakeCall(int tag) { + this.tag = tag; + } + } + + /** + * Putter produces FakeCalls + */ + public class Putter implements Runnable { + private final CallQueueManager cq; + + public final int tag; + public volatile int callsAdded = 0; // How many calls we added, accurate unless interrupted + private final int maxCalls; + + private boolean isRunning = true; + + public Putter(CallQueueManager aCq, int maxCalls, int tag) { + this.maxCalls = maxCalls; + this.cq = aCq; + this.tag = tag; + } + + public void run() { + try { + // Fill up to max (which is infinite if maxCalls < 0) + while (isRunning && (callsAdded < maxCalls || maxCalls < 0)) { + cq.put(new FakeCall(this.tag)); + callsAdded++; + } + } catch (InterruptedException e) { + return; + } + } + + public void stop() { + this.isRunning = false; + } + } + + /** + * Taker consumes FakeCalls + */ + public class Taker implements Runnable { + private final CallQueueManager cq; + + public final int tag; // if >= 0 means we will only take the matching tag, and put back + // anything else + public volatile int callsTaken = 0; // total calls taken, accurate if we aren't interrupted + public volatile FakeCall lastResult = null; // the last thing we took + private final int maxCalls; // maximum calls to take + + public Taker(CallQueueManager aCq, int maxCalls, int tag) { + this.maxCalls = maxCalls; + this.cq = aCq; + this.tag = tag; + } + + public void run() { + try { + // Take while we don't exceed maxCalls, or if maxCalls is undefined (< 0) + while (callsTaken < maxCalls || maxCalls < 0) { + FakeCall res = cq.take(); + + if (tag >= 0 && res.tag != this.tag) { + // This call does not match our tag, we should put it back and try again + cq.put(res); + } else { + callsTaken++; + lastResult = res; + } + } + } catch (InterruptedException e) { + return; + } + } + } + + // Assert we can take exactly the numberOfTakes + public void assertCanTake(CallQueueManager cq, int numberOfTakes, + int takeAttempts) throws InterruptedException { + + Taker taker = new Taker(cq, takeAttempts, -1); + Thread t = new Thread(taker); + t.start(); + t.join(100); + + assertEquals(taker.callsTaken, numberOfTakes); + t.interrupt(); + } + + // Assert we can put exactly the numberOfPuts + public void assertCanPut(CallQueueManager cq, int numberOfPuts, + int putAttempts) throws InterruptedException { + + Putter putter = new Putter(cq, putAttempts, -1); + Thread t = new Thread(putter); + t.start(); + t.join(100); + + assertEquals(putter.callsAdded, numberOfPuts); + t.interrupt(); + } + + + @Test + public void testCallQueueCapacity() throws InterruptedException { + manager = new CallQueueManager(LinkedBlockingQueue.class, 10, "", null); + + assertCanPut(manager, 10, 20); // Will stop at 10 due to capacity + } + + @Test + public void testEmptyConsume() throws InterruptedException { + manager = new CallQueueManager(LinkedBlockingQueue.class, 10, "", null); + + assertCanTake(manager, 0, 1); // Fails since it's empty + } + + @Test(timeout=60000) + public void testSwapUnderContention() throws InterruptedException { + manager = new CallQueueManager(LinkedBlockingQueue.class, 5000, "", null); + + ArrayList producers = new ArrayList(); + ArrayList consumers = new ArrayList(); + + HashMap threads = new HashMap(); + + // Create putters and takers + for (int i=0; i < 50; i++) { + Putter p = new Putter(manager, -1, -1); + Thread pt = new Thread(p); + producers.add(p); + threads.put(p, pt); + + pt.start(); + } + + for (int i=0; i < 20; i++) { + Taker t = new Taker(manager, -1, -1); + Thread tt = new Thread(t); + consumers.add(t); + threads.put(t, tt); + + tt.start(); + } + + Thread.sleep(10); + + assertTrue(manager.size() > 0); + + for (int i=0; i < 5; i++) { + manager.swapQueue(LinkedBlockingQueue.class, 5000, "", null); + } + + // Stop the producers + for (Putter p : producers) { + p.stop(); + } + + // Wait for consumers to wake up, then consume + Thread.sleep(2000); + assertEquals(0, manager.size()); + + // Ensure no calls were dropped + long totalCallsCreated = 0; + long totalCallsConsumed = 0; + + for (Putter p : producers) { + totalCallsCreated += p.callsAdded; + threads.get(p).interrupt(); + } + for (Taker t : consumers) { + totalCallsConsumed += t.callsTaken; + threads.get(t).interrupt(); + } + + assertEquals(totalCallsConsumed, totalCallsCreated); + } +} \ No newline at end of file