From ecf665c6facf89d3b87b6e3cc684274b8155ca60 Mon Sep 17 00:00:00 2001 From: Tamas Domok Date: Wed, 24 Apr 2024 14:58:50 +0200 Subject: [PATCH] YARN-11191. Fix potentional deadlock in GlobalScheduler refreshQueues (#6732) --- .../scheduler/capacity/AbstractLeafQueue.java | 5 ++ .../capacity/AbstractParentQueue.java | 13 ++++ .../scheduler/capacity/CSQueue.java | 6 ++ .../preemption/PreemptionManager.java | 6 +- .../capacity/TestCapacityScheduler.java | 78 +++++++++++++++++++ 5 files changed, 106 insertions(+), 2 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractLeafQueue.java index e40b8aaeb0..565f89de32 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractLeafQueue.java @@ -369,6 +369,11 @@ public List getChildQueues() { return null; } + @Override + public List getChildQueuesByTryLock() { + return null; + } + /** * Set user limit. * @param userLimit new user limit diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractParentQueue.java index 9cb545d378..87333bf50a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractParentQueue.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.LockSupport; import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList; import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap; @@ -1347,6 +1348,18 @@ public List getChildQueues() { } + @Override + public List getChildQueuesByTryLock() { + try { + while (!readLock.tryLock()){ + LockSupport.parkNanos(10000); + } + return new ArrayList<>(childQueues); + } finally { + readLock.unlock(); + } + } + @Override public void recoverContainer(Resource clusterResource, SchedulerApplicationAttempt attempt, RMContainer rmContainer) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index a5672a8bcd..df3199220b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -175,6 +175,12 @@ public interface CSQueue extends SchedulerQueue { * @return child queues */ public List getChildQueues(); + + /** + * Get child queues By tryLock. + * @return child queues + */ + List getChildQueuesByTryLock(); /** * Check if the user has permission to perform the operation diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java index 408198f704..3aab8e8a50 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java @@ -28,6 +28,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -55,8 +56,9 @@ public void refreshQueues(CSQueue parent, CSQueue current) { new PreemptableQueue(parentEntity)); } - if (current.getChildQueues() != null) { - for (CSQueue child : current.getChildQueues()) { + List childQueues = current.getChildQueuesByTryLock(); + if (childQueues != null) { + for (CSQueue child : childQueues) { refreshQueues(current, child); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 3bef712547..bddba79f6c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -88,6 +88,7 @@ import org.apache.hadoop.service.ServiceStateException; import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -3047,4 +3048,81 @@ public void testReservedContainerLeakWhenMoveApplication() throws Exception { Assert.assertEquals(0, desQueue.getUsedResources().getMemorySize()); rm1.close(); } + + /** + * (YARN-11191) This test ensures that no deadlock happens while the + * refreshQueues is called on the preemptionManager (refresh thread) and the + * AbstractCSQueue.getTotalKillableResource is called from the schedule thread. + * + * @throws Exception TestTimedOutException means deadlock + */ + @Test (timeout = 20000) + public void testRefreshQueueWithOpenPreemption() throws Exception { + CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); + csConf.setQueues(new QueuePath(CapacitySchedulerConfiguration.ROOT), new String[]{"a"}); + QueuePath a = new QueuePath("root.a"); + csConf.setCapacity(a, 100); + csConf.setQueues(a, new String[]{"b"}); + QueuePath b = new QueuePath("root.a.b"); + csConf.setCapacity(b, 100); + + YarnConfiguration conf = new YarnConfiguration(csConf); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + RMNodeLabelsManager mgr = new NullRMNodeLabelsManager(); + mgr.init(conf); + try (MockRM rm = new MockRM(csConf)) { + CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler(); + PreemptionManager preemptionManager = scheduler.getPreemptionManager(); + rm.getRMContext().setNodeLabelManager(mgr); + rm.start(); + + AbstractParentQueue queue = (AbstractParentQueue) scheduler.getQueue("a"); + + // The scheduler thread holds the queue's read-lock for 5 seconds + // then the preemption's read-lock is used + Thread schedulerThread = new Thread(() -> { + queue.readLock.lock(); + try { + Thread.sleep(5 * 1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + preemptionManager.getKillableContainers("a", + queue.getDefaultNodeLabelExpression()); + queue.readLock.unlock(); + }, "SCHEDULE"); + + // The complete thread locks/unlocks the queue's write-lock after 1 seconds + Thread completeThread = new Thread(() -> { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + queue.writeLock.lock(); + queue.writeLock.unlock(); + }, "COMPLETE"); + + + // The refresh thread holds the preemption's write-lock after 2 seconds + // while it calls the getChildQueues(ByTryLock) that + // locks(tryLocks) the queue's read-lock + Thread refreshThread = new Thread(() -> { + try { + Thread.sleep(2 * 1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + preemptionManager.refreshQueues(queue.getParent(), queue); + }, "REFRESH"); + schedulerThread.start(); + completeThread.start(); + refreshThread.start(); + + schedulerThread.join(); + completeThread.join(); + refreshThread.join(); + } + } }