YARN-10178: Global Scheduler async thread crash caused by 'Comparison method violates its general contract. Contributed by Andras Gyori (gandras) and Qi Zhu (zhuqi).

This commit is contained in:
Eric Payne 2021-12-21 19:05:39 +00:00
parent 8d251bd629
commit e2d6fd075d

View File

@ -28,12 +28,11 @@
.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.util.resource.Resources;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.function.Supplier;
import java.util.stream.Collectors;
/**
* For two queues with the same priority:
@ -101,19 +100,21 @@ public static int compare(double relativeAssigned1, double relativeAssigned2,
/**
* Comparator that both looks at priority and utilization
*/
private class PriorityQueueComparator implements Comparator<CSQueue> {
private class PriorityQueueComparator
implements Comparator<PriorityQueueResourcesForSorting> {
@Override
public int compare(CSQueue q1, CSQueue q2) {
public int compare(PriorityQueueResourcesForSorting q1Sort,
PriorityQueueResourcesForSorting q2Sort) {
String p = partitionToLookAt.get();
int rc = compareQueueAccessToPartition(q1, q2, p);
int rc = compareQueueAccessToPartition(q1Sort.queue, q2Sort.queue, p);
if (0 != rc) {
return rc;
}
float q1AbsCapacity = q1.getQueueCapacities().getAbsoluteCapacity(p);
float q2AbsCapacity = q2.getQueueCapacities().getAbsoluteCapacity(p);
float q1AbsCapacity = q1Sort.absoluteCapacity;
float q2AbsCapacity = q2Sort.absoluteCapacity;
//If q1's abs capacity > 0 and q2 is 0, then prioritize q1
if (Float.compare(q1AbsCapacity, 0f) > 0 && Float.compare(q2AbsCapacity,
@ -127,28 +128,33 @@ public int compare(CSQueue q1, CSQueue q2) {
q2AbsCapacity, 0f) == 0) {
// both q1 has 0 and q2 has 0 capacity, then fall back to using
// priority, abs used capacity to prioritize
float used1 = q1.getQueueCapacities().getAbsoluteUsedCapacity(p);
float used2 = q2.getQueueCapacities().getAbsoluteUsedCapacity(p);
float used1 = q1Sort.absoluteUsedCapacity;
float used2 = q2Sort.absoluteUsedCapacity;
return compare(q1, q2, used1, used2, p);
return compare(q1Sort, q2Sort, used1, used2,
q1Sort.queue.getPriority().
getPriority(), q2Sort.queue.getPriority().getPriority());
} else{
// both q1 has positive abs capacity and q2 has positive abs
// capacity
float used1 = q1.getQueueCapacities().getUsedCapacity(p);
float used2 = q2.getQueueCapacities().getUsedCapacity(p);
float used1 = q1Sort.usedCapacity;
float used2 = q2Sort.usedCapacity;
return compare(q1, q2, used1, used2, p);
return compare(q1Sort, q2Sort, used1, used2,
q1Sort.queue.getPriority().getPriority(),
q2Sort.queue.getPriority().getPriority());
}
}
private int compare(CSQueue q1, CSQueue q2, float q1Used, float q2Used,
String partition) {
private int compare(PriorityQueueResourcesForSorting q1Sort,
PriorityQueueResourcesForSorting q2Sort, float q1Used,
float q2Used, int q1Prior, int q2Prior) {
int p1 = 0;
int p2 = 0;
if (respectPriority) {
p1 = q1.getPriority().getPriority();
p2 = q2.getPriority().getPriority();
p1 = q1Prior;
p2 = q2Prior;
}
int rc = PriorityUtilizationQueueOrderingPolicy.compare(q1Used, q2Used,
@ -158,16 +164,16 @@ private int compare(CSQueue q1, CSQueue q2, float q1Used, float q2Used,
// capacity goes first
if (0 == rc) {
Resource minEffRes1 =
q1.getQueueResourceQuotas().getConfiguredMinResource(partition);
q1Sort.configuredMinResource;
Resource minEffRes2 =
q2.getQueueResourceQuotas().getConfiguredMinResource(partition);
q2Sort.configuredMinResource;
if (!minEffRes1.equals(Resources.none()) && !minEffRes2.equals(
Resources.none())) {
return minEffRes2.compareTo(minEffRes1);
}
float abs1 = q1.getQueueCapacities().getAbsoluteCapacity(partition);
float abs2 = q2.getQueueCapacities().getAbsoluteCapacity(partition);
float abs1 = q1Sort.absoluteCapacity;
float abs2 = q2Sort.absoluteCapacity;
return Float.compare(abs2, abs1);
}
@ -203,6 +209,37 @@ private int compareQueueAccessToPartition(CSQueue q1, CSQueue q2,
}
}
/**
* A simple storage class to represent a snapshot of a queue.
*/
public static class PriorityQueueResourcesForSorting {
private final float absoluteUsedCapacity;
private final float usedCapacity;
private final Resource configuredMinResource;
private final float absoluteCapacity;
private final CSQueue queue;
PriorityQueueResourcesForSorting(CSQueue queue) {
this.queue = queue;
this.absoluteUsedCapacity =
queue.getQueueCapacities().
getAbsoluteUsedCapacity(partitionToLookAt.get());
this.usedCapacity =
queue.getQueueCapacities().
getUsedCapacity(partitionToLookAt.get());
this.absoluteCapacity =
queue.getQueueCapacities().
getAbsoluteCapacity(partitionToLookAt.get());
this.configuredMinResource =
queue.getQueueResourceQuotas().
getConfiguredMinResource(partitionToLookAt.get());
}
public CSQueue getQueue() {
return queue;
}
}
public PriorityUtilizationQueueOrderingPolicy(boolean respectPriority) {
this.respectPriority = respectPriority;
}
@ -214,12 +251,14 @@ public void setQueues(List<CSQueue> queues) {
@Override
public Iterator<CSQueue> getAssignmentIterator(String partition) {
// Since partitionToLookAt is a thread local variable, and every time we
// copy and sort queues, so it's safe for multi-threading environment.
// partitionToLookAt is a thread local variable, therefore it is safe to mutate it.
PriorityUtilizationQueueOrderingPolicy.partitionToLookAt.set(partition);
List<CSQueue> sortedQueue = new ArrayList<>(queues);
Collections.sort(sortedQueue, new PriorityQueueComparator());
return sortedQueue.iterator();
// Sort the snapshot of the queues in order to avoid breaking the prerequisites of TimSort.
// See YARN-10178 for details.
return queues.stream().map(PriorityQueueResourcesForSorting::new).sorted(
new PriorityQueueComparator()).map(PriorityQueueResourcesForSorting::getQueue).collect(
Collectors.toList()).iterator();
}
@Override