YARN-11191. Fix potentional deadlock in GlobalScheduler refreshQueues (#6732)

This commit is contained in:
Tamas Domok 2024-04-24 14:58:50 +02:00 committed by GitHub
parent 5d0a40c143
commit ecf665c6fa
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 106 additions and 2 deletions

View File

@ -369,6 +369,11 @@ public List<CSQueue> getChildQueues() {
return null; return null;
} }
@Override
public List<CSQueue> getChildQueuesByTryLock() {
return null;
}
/** /**
* Set user limit. * Set user limit.
* @param userLimit new user limit * @param userLimit new user limit

View File

@ -27,6 +27,7 @@
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList; import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap; import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
@ -1347,6 +1348,18 @@ public List<CSQueue> getChildQueues() {
} }
@Override
public List<CSQueue> getChildQueuesByTryLock() {
try {
while (!readLock.tryLock()){
LockSupport.parkNanos(10000);
}
return new ArrayList<>(childQueues);
} finally {
readLock.unlock();
}
}
@Override @Override
public void recoverContainer(Resource clusterResource, public void recoverContainer(Resource clusterResource,
SchedulerApplicationAttempt attempt, RMContainer rmContainer) { SchedulerApplicationAttempt attempt, RMContainer rmContainer) {

View File

@ -176,6 +176,12 @@ public interface CSQueue extends SchedulerQueue<CSQueue> {
*/ */
public List<CSQueue> getChildQueues(); public List<CSQueue> getChildQueues();
/**
* Get child queues By tryLock.
* @return child queues
*/
List<CSQueue> getChildQueuesByTryLock();
/** /**
* Check if the <code>user</code> has permission to perform the operation * Check if the <code>user</code> has permission to perform the operation
* @param acl ACL * @param acl ACL

View File

@ -28,6 +28,7 @@
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -55,8 +56,9 @@ public void refreshQueues(CSQueue parent, CSQueue current) {
new PreemptableQueue(parentEntity)); new PreemptableQueue(parentEntity));
} }
if (current.getChildQueues() != null) { List<CSQueue> childQueues = current.getChildQueuesByTryLock();
for (CSQueue child : current.getChildQueues()) { if (childQueues != null) {
for (CSQueue child : childQueues) {
refreshQueues(current, child); refreshQueues(current, child);
} }
} }

View File

@ -88,6 +88,7 @@
import org.apache.hadoop.service.ServiceStateException; import org.apache.hadoop.service.ServiceStateException;
import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -3047,4 +3048,81 @@ public void testReservedContainerLeakWhenMoveApplication() throws Exception {
Assert.assertEquals(0, desQueue.getUsedResources().getMemorySize()); Assert.assertEquals(0, desQueue.getUsedResources().getMemorySize());
rm1.close(); rm1.close();
} }
/**
* (YARN-11191) This test ensures that no deadlock happens while the
* refreshQueues is called on the preemptionManager (refresh thread) and the
* AbstractCSQueue.getTotalKillableResource is called from the schedule thread.
*
* @throws Exception TestTimedOutException means deadlock
*/
@Test (timeout = 20000)
public void testRefreshQueueWithOpenPreemption() throws Exception {
CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration();
csConf.setQueues(new QueuePath(CapacitySchedulerConfiguration.ROOT), new String[]{"a"});
QueuePath a = new QueuePath("root.a");
csConf.setCapacity(a, 100);
csConf.setQueues(a, new String[]{"b"});
QueuePath b = new QueuePath("root.a.b");
csConf.setCapacity(b, 100);
YarnConfiguration conf = new YarnConfiguration(csConf);
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
mgr.init(conf);
try (MockRM rm = new MockRM(csConf)) {
CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler();
PreemptionManager preemptionManager = scheduler.getPreemptionManager();
rm.getRMContext().setNodeLabelManager(mgr);
rm.start();
AbstractParentQueue queue = (AbstractParentQueue) scheduler.getQueue("a");
// The scheduler thread holds the queue's read-lock for 5 seconds
// then the preemption's read-lock is used
Thread schedulerThread = new Thread(() -> {
queue.readLock.lock();
try {
Thread.sleep(5 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
preemptionManager.getKillableContainers("a",
queue.getDefaultNodeLabelExpression());
queue.readLock.unlock();
}, "SCHEDULE");
// The complete thread locks/unlocks the queue's write-lock after 1 seconds
Thread completeThread = new Thread(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
queue.writeLock.lock();
queue.writeLock.unlock();
}, "COMPLETE");
// The refresh thread holds the preemption's write-lock after 2 seconds
// while it calls the getChildQueues(ByTryLock) that
// locks(tryLocks) the queue's read-lock
Thread refreshThread = new Thread(() -> {
try {
Thread.sleep(2 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
preemptionManager.refreshQueues(queue.getParent(), queue);
}, "REFRESH");
schedulerThread.start();
completeThread.start();
refreshThread.start();
schedulerThread.join();
completeThread.join();
refreshThread.join();
}
}
} }