From 6736a1ab7033523ed5f304fdfed46d7f348665b4 Mon Sep 17 00:00:00 2001 From: Andrew Wang Date: Thu, 23 Jul 2015 14:42:35 -0700 Subject: [PATCH] HADOOP-12189. Improve CallQueueManager#swapQueue to make queue elements drop nearly impossible. Contributed by Zhihai Xu. --- .../hadoop-common/CHANGES.txt | 3 +++ .../apache/hadoop/ipc/CallQueueManager.java | 27 ++++++++++++------- .../hadoop/ipc/TestCallQueueManager.java | 6 ++--- 3 files changed, 24 insertions(+), 12 deletions(-) diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index f1a3bc91db..6c18add6d5 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -719,6 +719,9 @@ Release 2.8.0 - UNRELEASED HADOOP-12161. Add getStoragePolicy API to the FileSystem interface. (Brahma Reddy Battula via Arpit Agarwal) + HADOOP-12189. Improve CallQueueManager#swapQueue to make queue elements + drop nearly impossible. (Zhihai Xu via wang) + OPTIMIZATIONS HADOOP-11785. Reduce the number of listStatus operation in distcp diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java index 1568bd6f9a..c10f839db4 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java @@ -32,11 +32,15 @@ import org.apache.hadoop.conf.Configuration; */ public class CallQueueManager { public static final Log LOG = LogFactory.getLog(CallQueueManager.class); + // Number of checkpoints for empty queue. + private static final int CHECKPOINT_NUM = 20; + // Interval to check empty queue. + private static final long CHECKPOINT_INTERVAL_MS = 10; @SuppressWarnings("unchecked") static Class> convertQueueClass( - Class queneClass, Class elementClass) { - return (Class>)queneClass; + Class queueClass, Class elementClass) { + return (Class>)queueClass; } private final boolean clientBackOffEnabled; @@ -159,18 +163,23 @@ public class CallQueueManager { } /** - * Checks if queue is empty by checking at two points in time. + * Checks if queue is empty by checking at CHECKPOINT_NUM points with + * CHECKPOINT_INTERVAL_MS interval. * This doesn't mean the queue might not fill up at some point later, but * it should decrease the probability that we lose a call this way. */ private boolean queueIsReallyEmpty(BlockingQueue q) { - boolean wasEmpty = q.isEmpty(); - try { - Thread.sleep(10); - } catch (InterruptedException ie) { - return false; + for (int i = 0; i < CHECKPOINT_NUM; i++) { + try { + Thread.sleep(CHECKPOINT_INTERVAL_MS); + } catch (InterruptedException ie) { + return false; + } + if (!q.isEmpty()) { + return false; + } } - return q.isEmpty() && wasEmpty; + return true; } private String stringRepr(Object o) { diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java index 6e1838e291..51a97506fa 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java @@ -165,7 +165,7 @@ public class TestCallQueueManager { HashMap threads = new HashMap(); // Create putters and takers - for (int i=0; i < 50; i++) { + for (int i=0; i < 1000; i++) { Putter p = new Putter(manager, -1, -1); Thread pt = new Thread(p); producers.add(p); @@ -174,7 +174,7 @@ public class TestCallQueueManager { pt.start(); } - for (int i=0; i < 20; i++) { + for (int i=0; i < 100; i++) { Taker t = new Taker(manager, -1, -1); Thread tt = new Thread(t); consumers.add(t); @@ -183,7 +183,7 @@ public class TestCallQueueManager { tt.start(); } - Thread.sleep(10); + Thread.sleep(500); for (int i=0; i < 5; i++) { manager.swapQueue(queueClass, 5000, "", null);