YARN-3275. CapacityScheduler: Preemption happening on non-preemptable queues. Contributed by Eric Payne
This commit is contained in:
parent
c79710302e
commit
27e8ea820f
@ -719,6 +719,9 @@ Release 2.7.0 - UNRELEASED
|
||||
YARN-3227. Timeline renew delegation token fails when RM user's TGT is expired
|
||||
(Zhijie Shen via xgong)
|
||||
|
||||
YARN-3275. CapacityScheduler: Preemption happening on non-preemptable
|
||||
queues (Eric Payne via jlowe)
|
||||
|
||||
Release 2.6.0 - 2014-11-18
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -260,4 +260,9 @@ public static Resource componentwiseMin(Resource lhs, Resource rhs) {
|
||||
return createResource(Math.min(lhs.getMemory(), rhs.getMemory()),
|
||||
Math.min(lhs.getVirtualCores(), rhs.getVirtualCores()));
|
||||
}
|
||||
|
||||
public static Resource componentwiseMax(Resource lhs, Resource rhs) {
|
||||
return createResource(Math.max(lhs.getMemory(), rhs.getMemory()),
|
||||
Math.max(lhs.getVirtualCores(), rhs.getVirtualCores()));
|
||||
}
|
||||
}
|
||||
|
@ -527,6 +527,17 @@ private Map<ApplicationAttemptId,Set<RMContainer>> getContainersToPreempt(
|
||||
List<RMContainer> skippedAMContainerlist = new ArrayList<RMContainer>();
|
||||
|
||||
for (TempQueue qT : queues) {
|
||||
if (qT.preemptionDisabled && qT.leafQueue != null) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
if (Resources.greaterThan(rc, clusterResource,
|
||||
qT.toBePreempted, Resource.newInstance(0, 0))) {
|
||||
LOG.debug("Tried to preempt the following "
|
||||
+ "resources from non-preemptable queue: "
|
||||
+ qT.queueName + " - Resources: " + qT.toBePreempted);
|
||||
}
|
||||
}
|
||||
continue;
|
||||
}
|
||||
// we act only if we are violating balance by more than
|
||||
// maxIgnoredOverCapacity
|
||||
if (Resources.greaterThan(rc, clusterResource, qT.current,
|
||||
@ -734,6 +745,7 @@ private TempQueue cloneQueues(CSQueue root, Resource clusterResources) {
|
||||
float absUsed = root.getAbsoluteUsedCapacity();
|
||||
float absCap = root.getAbsoluteCapacity();
|
||||
float absMaxCap = root.getAbsoluteMaximumCapacity();
|
||||
boolean preemptionDisabled = root.getPreemptionDisabled();
|
||||
|
||||
Resource current = Resources.multiply(clusterResources, absUsed);
|
||||
Resource guaranteed = Resources.multiply(clusterResources, absCap);
|
||||
@ -747,8 +759,8 @@ private TempQueue cloneQueues(CSQueue root, Resource clusterResources) {
|
||||
LeafQueue l = (LeafQueue) root;
|
||||
Resource pending = l.getTotalResourcePending();
|
||||
ret = new TempQueue(queueName, current, pending, guaranteed,
|
||||
maxCapacity);
|
||||
if (root.getPreemptionDisabled()) {
|
||||
maxCapacity, preemptionDisabled);
|
||||
if (preemptionDisabled) {
|
||||
ret.untouchableExtra = extra;
|
||||
} else {
|
||||
ret.preemptableExtra = extra;
|
||||
@ -757,7 +769,7 @@ private TempQueue cloneQueues(CSQueue root, Resource clusterResources) {
|
||||
} else {
|
||||
Resource pending = Resource.newInstance(0, 0);
|
||||
ret = new TempQueue(root.getQueueName(), current, pending, guaranteed,
|
||||
maxCapacity);
|
||||
maxCapacity, false);
|
||||
Resource childrensPreemptable = Resource.newInstance(0, 0);
|
||||
for (CSQueue c : root.getChildQueues()) {
|
||||
TempQueue subq = cloneQueues(c, clusterResources);
|
||||
@ -816,9 +828,10 @@ static class TempQueue {
|
||||
|
||||
final ArrayList<TempQueue> children;
|
||||
LeafQueue leafQueue;
|
||||
boolean preemptionDisabled;
|
||||
|
||||
TempQueue(String queueName, Resource current, Resource pending,
|
||||
Resource guaranteed, Resource maxCapacity) {
|
||||
Resource guaranteed, Resource maxCapacity, boolean preemptionDisabled) {
|
||||
this.queueName = queueName;
|
||||
this.current = current;
|
||||
this.pending = pending;
|
||||
@ -831,6 +844,7 @@ static class TempQueue {
|
||||
this.children = new ArrayList<TempQueue>();
|
||||
this.untouchableExtra = Resource.newInstance(0, 0);
|
||||
this.preemptableExtra = Resource.newInstance(0, 0);
|
||||
this.preemptionDisabled = preemptionDisabled;
|
||||
}
|
||||
|
||||
public void setLeafQueue(LeafQueue l){
|
||||
@ -862,10 +876,13 @@ public ArrayList<TempQueue> getChildren(){
|
||||
// the unused ones
|
||||
Resource offer(Resource avail, ResourceCalculator rc,
|
||||
Resource clusterResource) {
|
||||
Resource absMaxCapIdealAssignedDelta = Resources.componentwiseMax(
|
||||
Resources.subtract(maxCapacity, idealAssigned),
|
||||
Resource.newInstance(0, 0));
|
||||
// remain = avail - min(avail, (max - assigned), (current + pending - assigned))
|
||||
Resource accepted =
|
||||
Resources.min(rc, clusterResource,
|
||||
Resources.subtract(maxCapacity, idealAssigned),
|
||||
absMaxCapIdealAssignedDelta,
|
||||
Resources.min(rc, clusterResource, avail, Resources.subtract(
|
||||
Resources.add(current, pending), idealAssigned)));
|
||||
Resource remain = Resources.subtract(avail, accepted);
|
||||
|
@ -531,6 +531,30 @@ public void testPerQueueDisablePreemptionRootDisablesAll() {
|
||||
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appF))); // queueI
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPerQueueDisablePreemptionOverAbsMaxCapacity() {
|
||||
int[][] qData = new int[][] {
|
||||
// / A D
|
||||
// B C E F
|
||||
{1000, 725, 360, 365, 275, 17, 258 }, // absCap
|
||||
{1000,1000,1000,1000, 550, 109,1000 }, // absMaxCap
|
||||
{1000, 741, 396, 345, 259, 110, 149 }, // used
|
||||
{ 40, 20, 0, 20, 20, 20, 0 }, // pending
|
||||
{ 0, 0, 0, 0, 0, 0, 0 }, // reserved
|
||||
// appA appB appC appD
|
||||
{ 4, 2, 1, 1, 2, 1, 1 }, // apps
|
||||
{ -1, -1, 1, 1, -1, 1, 1 }, // req granulrity
|
||||
{ 2, 2, 0, 0, 2, 0, 0 }, // subqueues
|
||||
};
|
||||
// QueueE inherits non-preemption from QueueD
|
||||
schedConf.setPreemptionDisabled("root.queueD", true);
|
||||
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
|
||||
policy.editSchedule();
|
||||
// appC is running on QueueE. QueueE is over absMaxCap, but is not
|
||||
// preemptable. Therefore, appC resources should not be preempted.
|
||||
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appC)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOverCapacityImbalance() {
|
||||
int[][] qData = new int[][]{
|
||||
|
Loading…
Reference in New Issue
Block a user