diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java index 26c5630a6d..d5df549b28 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java @@ -20,8 +20,8 @@ import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.List; +import java.util.TreeSet; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -188,25 +188,19 @@ public Resource assignContainer(FSSchedulerNode node) { return assigned; } - // Hold the write lock when sorting childQueues - writeLock.lock(); - try { - Collections.sort(childQueues, policy.getComparator()); - } finally { - writeLock.unlock(); - } - - /* - * We are releasing the lock between the sort and iteration of the - * "sorted" list. There could be changes to the list here: - * 1. Add a child queue to the end of the list, this doesn't affect - * container assignment. - * 2. Remove a child queue, this is probably good to take care of so we - * don't assign to a queue that is going to be removed shortly. - */ + // Sort the queues while holding a read lock on this parent only. + // The individual entries are not locked and can change which means that + // the collection of childQueues can not be sorted by calling Sort(). + // Locking each childqueue to prevent changes would have a large + // performance impact. + // We do not have to handle the queue removal case as a queue must be + // empty before removal. Assigning an application to a queue and removal of + // that queue both need the scheduler lock. + TreeSet sortedChildQueues = new TreeSet<>(policy.getComparator()); readLock.lock(); try { - for (FSQueue child : childQueues) { + sortedChildQueues.addAll(childQueues); + for (FSQueue child : sortedChildQueues) { assigned = child.assignContainer(node); if (!Resources.equals(assigned, Resources.none())) { break; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java index 03332b25e2..01eec73b1c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java @@ -143,4 +143,8 @@ public void updateDemand() {} public boolean isPreemptable() { return true; } + + public void setResourceUsage(Resource usage) { + this.usage = usage; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/TestDominantResourceFairnessPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/TestDominantResourceFairnessPolicy.java index 03fd1efbce..55b7163b4a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/TestDominantResourceFairnessPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/TestDominantResourceFairnessPolicy.java @@ -19,11 +19,16 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.util.ArrayList; +import java.util.Collections; import java.util.Comparator; +import java.util.List; import java.util.Map; +import java.util.TreeSet; import org.apache.curator.shaded.com.google.common.base.Joiner; import org.apache.hadoop.conf.Configuration; @@ -443,4 +448,76 @@ private static void addResources(String... resources) { conf.set(YarnConfiguration.RESOURCE_TYPES, Joiner.on(',').join(resources)); ResourceUtils.resetResourceTypes(conf); } + + @Test + public void testModWhileSorting(){ + final List schedulableList = new ArrayList<>(); + for (int i=0; i<10000; i++) { + schedulableList.add( + (FakeSchedulable)createSchedulable((i%10)*100, (i%3)*2)); + } + Comparator DRFComparator = createComparator(100000, 50000); + + // To simulate unallocated resource changes + Thread modThread = modificationThread(schedulableList); + modThread.start(); + + // This should fail: make sure that we do test correctly + // TimSort which is used does not handle the concurrent modification of + // objects it is sorting. + try { + Collections.sort(schedulableList, DRFComparator); + fail("Sorting should have failed and did not"); + } catch (IllegalArgumentException iae) { + assertEquals(iae.getMessage(), "Comparison method violates its general contract!"); + } + try { + modThread.join(); + } catch (InterruptedException ie) { + fail("ModThread join failed: " + ie.getMessage()); + } + + // clean up and try again using TreeSet which should work + schedulableList.clear(); + for (int i=0; i<10000; i++) { + schedulableList.add( + (FakeSchedulable)createSchedulable((i%10)*100, (i%3)*2)); + } + TreeSet sortedSchedulable = new TreeSet<>(DRFComparator); + modThread = modificationThread(schedulableList); + modThread.start(); + sortedSchedulable.addAll(schedulableList); + try { + modThread.join(); + } catch (InterruptedException ie) { + fail("ModThread join failed: " + ie.getMessage()); + } + } + + /** + * Thread to simulate concurrent schedulable changes while sorting + */ + private Thread modificationThread(final List schedulableList) { + Thread modThread = new Thread() { + @Override + public void run() { + try { + // This sleep is needed to make sure the sort has started before the + // modifications start and finish + Thread.sleep(500); + } catch (InterruptedException ie) { + fail("Modification thread interrupted while asleep " + + ie.getMessage()); + } + Resource newUsage = Resources.createResource(0, 0); + for (int j = 0; j < 1000; j++) { + FakeSchedulable sched = schedulableList.get(j * 10); + newUsage.setMemorySize(20000); + newUsage.setVirtualCores(j % 10); + sched.setResourceUsage(newUsage); + } + } + }; + return modThread; + } }