YARN-8436. FSParentQueue: Comparison method violates its general contract. (Wilfred Spiegelenburg via Haibo Chen)

This commit is contained in:
Haibo Chen 2018-07-19 13:21:57 -07:00
parent 45d9568aaa
commit 2564884757
3 changed files with 93 additions and 18 deletions

View File

@ -20,8 +20,8 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.TreeSet;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -188,25 +188,19 @@ public Resource assignContainer(FSSchedulerNode node) {
return assigned;
}
// Hold the write lock when sorting childQueues
writeLock.lock();
try {
Collections.sort(childQueues, policy.getComparator());
} finally {
writeLock.unlock();
}
/*
* We are releasing the lock between the sort and iteration of the
* "sorted" list. There could be changes to the list here:
* 1. Add a child queue to the end of the list, this doesn't affect
* container assignment.
* 2. Remove a child queue, this is probably good to take care of so we
* don't assign to a queue that is going to be removed shortly.
*/
// Sort the queues while holding a read lock on this parent only.
// The individual entries are not locked and can change which means that
// the collection of childQueues can not be sorted by calling Sort().
// Locking each childqueue to prevent changes would have a large
// performance impact.
// We do not have to handle the queue removal case as a queue must be
// empty before removal. Assigning an application to a queue and removal of
// that queue both need the scheduler lock.
TreeSet<FSQueue> sortedChildQueues = new TreeSet<>(policy.getComparator());
readLock.lock();
try {
for (FSQueue child : childQueues) {
sortedChildQueues.addAll(childQueues);
for (FSQueue child : sortedChildQueues) {
assigned = child.assignContainer(node);
if (!Resources.equals(assigned, Resources.none())) {
break;

View File

@ -143,4 +143,8 @@ public void updateDemand() {}
public boolean isPreemptable() {
return true;
}
public void setResourceUsage(Resource usage) {
this.usage = usage;
}
}

View File

@ -19,11 +19,16 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import org.apache.curator.shaded.com.google.common.base.Joiner;
import org.apache.hadoop.conf.Configuration;
@ -443,4 +448,76 @@ private static void addResources(String... resources) {
conf.set(YarnConfiguration.RESOURCE_TYPES, Joiner.on(',').join(resources));
ResourceUtils.resetResourceTypes(conf);
}
@Test
public void testModWhileSorting(){
final List<FakeSchedulable> schedulableList = new ArrayList<>();
for (int i=0; i<10000; i++) {
schedulableList.add(
(FakeSchedulable)createSchedulable((i%10)*100, (i%3)*2));
}
Comparator DRFComparator = createComparator(100000, 50000);
// To simulate unallocated resource changes
Thread modThread = modificationThread(schedulableList);
modThread.start();
// This should fail: make sure that we do test correctly
// TimSort which is used does not handle the concurrent modification of
// objects it is sorting.
try {
Collections.sort(schedulableList, DRFComparator);
fail("Sorting should have failed and did not");
} catch (IllegalArgumentException iae) {
assertEquals(iae.getMessage(), "Comparison method violates its general contract!");
}
try {
modThread.join();
} catch (InterruptedException ie) {
fail("ModThread join failed: " + ie.getMessage());
}
// clean up and try again using TreeSet which should work
schedulableList.clear();
for (int i=0; i<10000; i++) {
schedulableList.add(
(FakeSchedulable)createSchedulable((i%10)*100, (i%3)*2));
}
TreeSet<Schedulable> sortedSchedulable = new TreeSet<>(DRFComparator);
modThread = modificationThread(schedulableList);
modThread.start();
sortedSchedulable.addAll(schedulableList);
try {
modThread.join();
} catch (InterruptedException ie) {
fail("ModThread join failed: " + ie.getMessage());
}
}
/**
* Thread to simulate concurrent schedulable changes while sorting
*/
private Thread modificationThread(final List<FakeSchedulable> schedulableList) {
Thread modThread = new Thread() {
@Override
public void run() {
try {
// This sleep is needed to make sure the sort has started before the
// modifications start and finish
Thread.sleep(500);
} catch (InterruptedException ie) {
fail("Modification thread interrupted while asleep " +
ie.getMessage());
}
Resource newUsage = Resources.createResource(0, 0);
for (int j = 0; j < 1000; j++) {
FakeSchedulable sched = schedulableList.get(j * 10);
newUsage.setMemorySize(20000);
newUsage.setVirtualCores(j % 10);
sched.setResourceUsage(newUsage);
}
}
};
return modThread;
}
}