YARN-1957. Consider the max capacity of the queue when computing the ideal

capacity for preemption. Contributed by Carlo Curino


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1594414 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Christopher Douglas 2014-05-13 23:15:27 +00:00
parent 4b54bd8320
commit 45b42676f9
3 changed files with 192 additions and 56 deletions

View File

@ -216,6 +216,9 @@ Release 2.4.1 - UNRELEASED
causing both RMs to be stuck in standby mode when automatic failover is
enabled. (Karthik Kambatla and Xuan Gong via vinodkv)
YARN-1957. Consider the max capacity of the queue when computing the ideal
capacity for preemption. (Carlo Curino via cdouglas)
Release 2.4.0 - 2014-04-07
INCOMPATIBLE CHANGES

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
@ -293,32 +294,29 @@ private void computeIdealResourceDistribution(ResourceCalculator rc,
// with the total capacity for this set of queues
Resource unassigned = Resources.clone(tot_guarant);
//assign all cluster resources until no more demand, or no resources are left
while (!qAlloc.isEmpty() && Resources.greaterThan(rc, tot_guarant,
unassigned, Resources.none())) {
Resource wQassigned = Resource.newInstance(0, 0);
// group queues based on whether they have non-zero guaranteed capacity
Set<TempQueue> nonZeroGuarQueues = new HashSet<TempQueue>();
Set<TempQueue> zeroGuarQueues = new HashSet<TempQueue>();
// we compute normalizedGuarantees capacity based on currently active
// queues
resetCapacity(rc, unassigned, qAlloc);
// offer for each queue their capacity first and in following invocations
// their share of over-capacity
for (Iterator<TempQueue> i = qAlloc.iterator(); i.hasNext();) {
TempQueue sub = i.next();
Resource wQavail =
Resources.multiply(unassigned, sub.normalizedGuarantee);
Resource wQidle = sub.offer(wQavail, rc, tot_guarant);
Resource wQdone = Resources.subtract(wQavail, wQidle);
// if the queue returned a value > 0 it means it is fully satisfied
// and it is removed from the list of active queues qAlloc
if (!Resources.greaterThan(rc, tot_guarant,
wQdone, Resources.none())) {
i.remove();
}
Resources.addTo(wQassigned, wQdone);
for (TempQueue q : qAlloc) {
if (Resources
.greaterThan(rc, tot_guarant, q.guaranteed, Resources.none())) {
nonZeroGuarQueues.add(q);
} else {
zeroGuarQueues.add(q);
}
Resources.subtractFrom(unassigned, wQassigned);
}
// first compute the allocation as a fixpoint based on guaranteed capacity
computeFixpointAllocation(rc, tot_guarant, nonZeroGuarQueues, unassigned,
false);
// if any capacity is left unassigned, distributed among zero-guarantee
// queues uniformly (i.e., not based on guaranteed capacity, as this is zero)
if (!zeroGuarQueues.isEmpty()
&& Resources.greaterThan(rc, tot_guarant, unassigned, Resources.none())) {
computeFixpointAllocation(rc, tot_guarant, zeroGuarQueues, unassigned,
true);
}
// based on ideal assignment computed above and current assignment we derive
@ -354,6 +352,46 @@ private void computeIdealResourceDistribution(ResourceCalculator rc,
}
/**
* Given a set of queues compute the fix-point distribution of unassigned
* resources among them. As pending request of a queue are exhausted, the
* queue is removed from the set and remaining capacity redistributed among
* remaining queues. The distribution is weighted based on guaranteed
* capacity, unless asked to ignoreGuarantee, in which case resources are
* distributed uniformly.
*/
private void computeFixpointAllocation(ResourceCalculator rc,
Resource tot_guarant, Collection<TempQueue> qAlloc, Resource unassigned,
boolean ignoreGuarantee) {
//assign all cluster resources until no more demand, or no resources are left
while (!qAlloc.isEmpty() && Resources.greaterThan(rc, tot_guarant,
unassigned, Resources.none())) {
Resource wQassigned = Resource.newInstance(0, 0);
// we compute normalizedGuarantees capacity based on currently active
// queues
resetCapacity(rc, unassigned, qAlloc, ignoreGuarantee);
// offer for each queue their capacity first and in following invocations
// their share of over-capacity
for (Iterator<TempQueue> i = qAlloc.iterator(); i.hasNext();) {
TempQueue sub = i.next();
Resource wQavail =
Resources.multiply(unassigned, sub.normalizedGuarantee);
Resource wQidle = sub.offer(wQavail, rc, tot_guarant);
Resource wQdone = Resources.subtract(wQavail, wQidle);
// if the queue returned a value > 0 it means it is fully satisfied
// and it is removed from the list of active queues qAlloc
if (!Resources.greaterThan(rc, tot_guarant,
wQdone, Resources.none())) {
i.remove();
}
Resources.addTo(wQassigned, wQdone);
}
Resources.subtractFrom(unassigned, wQassigned);
}
}
/**
* Computes a normalizedGuaranteed capacity based on active queues
* @param rc resource calculator
@ -361,14 +399,21 @@ private void computeIdealResourceDistribution(ResourceCalculator rc,
* @param queues the list of queues to consider
*/
private void resetCapacity(ResourceCalculator rc, Resource clusterResource,
List<TempQueue> queues) {
Collection<TempQueue> queues, boolean ignoreGuar) {
Resource activeCap = Resource.newInstance(0, 0);
for (TempQueue q : queues) {
Resources.addTo(activeCap, q.guaranteed);
}
for (TempQueue q : queues) {
q.normalizedGuarantee = Resources.divide(rc, clusterResource,
q.guaranteed, activeCap);
if (ignoreGuar) {
for (TempQueue q : queues) {
q.normalizedGuarantee = (float) 1.0f / ((float) queues.size());
}
} else {
for (TempQueue q : queues) {
Resources.addTo(activeCap, q.guaranteed);
}
for (TempQueue q : queues) {
q.normalizedGuarantee = Resources.divide(rc, clusterResource,
q.guaranteed, activeCap);
}
}
}
@ -515,18 +560,25 @@ public String getPolicyName() {
private TempQueue cloneQueues(CSQueue root, Resource clusterResources) {
TempQueue ret;
synchronized (root) {
float absUsed = root.getAbsoluteUsedCapacity();
String queueName = root.getQueueName();
float absUsed = root.getAbsoluteUsedCapacity();
float absCap = root.getAbsoluteCapacity();
float absMaxCap = root.getAbsoluteMaximumCapacity();
Resource current = Resources.multiply(clusterResources, absUsed);
Resource guaranteed =
Resources.multiply(clusterResources, root.getAbsoluteCapacity());
Resource guaranteed = Resources.multiply(clusterResources, absCap);
Resource maxCapacity = Resources.multiply(clusterResources, absMaxCap);
if (root instanceof LeafQueue) {
LeafQueue l = (LeafQueue) root;
Resource pending = l.getTotalResourcePending();
ret = new TempQueue(root.getQueueName(), current, pending, guaranteed);
ret = new TempQueue(queueName, current, pending, guaranteed,
maxCapacity);
ret.setLeafQueue(l);
} else {
Resource pending = Resource.newInstance(0, 0);
ret = new TempQueue(root.getQueueName(), current, pending, guaranteed);
ret = new TempQueue(root.getQueueName(), current, pending, guaranteed,
maxCapacity);
for (CSQueue c : root.getChildQueues()) {
ret.addChild(cloneQueues(c, clusterResources));
}
@ -563,6 +615,7 @@ static class TempQueue {
final Resource current;
final Resource pending;
final Resource guaranteed;
final Resource maxCapacity;
Resource idealAssigned;
Resource toBePreempted;
Resource actuallyPreempted;
@ -573,11 +626,12 @@ static class TempQueue {
LeafQueue leafQueue;
TempQueue(String queueName, Resource current, Resource pending,
Resource guaranteed) {
Resource guaranteed, Resource maxCapacity) {
this.queueName = queueName;
this.current = current;
this.pending = pending;
this.guaranteed = guaranteed;
this.maxCapacity = maxCapacity;
this.idealAssigned = Resource.newInstance(0, 0);
this.actuallyPreempted = Resource.newInstance(0, 0);
this.toBePreempted = Resource.newInstance(0, 0);
@ -614,12 +668,12 @@ public ArrayList<TempQueue> getChildren(){
// the unused ones
Resource offer(Resource avail, ResourceCalculator rc,
Resource clusterResource) {
// remain = avail - min(avail, current + pending - assigned)
Resource accepted = Resources.min(rc, clusterResource,
avail,
Resources.subtract(
Resources.add(current, pending),
idealAssigned));
// remain = avail - min(avail, (max - assigned), (current + pending - assigned))
Resource accepted =
Resources.min(rc, clusterResource,
Resources.subtract(maxCapacity, idealAssigned),
Resources.min(rc, clusterResource, avail, Resources.subtract(
Resources.add(current, pending), idealAssigned)));
Resource remain = Resources.subtract(avail, accepted);
Resources.addTo(idealAssigned, accepted);
return remain;
@ -628,13 +682,15 @@ Resource offer(Resource avail, ResourceCalculator rc,
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("CUR: ").append(current)
sb.append(" NAME: " + queueName)
.append(" CUR: ").append(current)
.append(" PEN: ").append(pending)
.append(" GAR: ").append(guaranteed)
.append(" NORM: ").append(normalizedGuarantee)
.append(" IDEAL_ASSIGNED: ").append(idealAssigned)
.append(" IDEAL_PREEMPT: ").append(toBePreempted)
.append(" ACTUAL_PREEMPT: ").append(actuallyPreempted);
.append(" ACTUAL_PREEMPT: ").append(actuallyPreempted)
.append("\n");
return sb.toString();
}

View File

@ -115,6 +115,7 @@ public void testIgnore() {
int[][] qData = new int[][]{
// / A B C
{ 100, 40, 40, 20 }, // abs
{ 100, 100, 100, 100 }, // maxCap
{ 100, 0, 60, 40 }, // used
{ 0, 0, 0, 0 }, // pending
{ 0, 0, 0, 0 }, // reserved
@ -133,6 +134,7 @@ public void testProportionalPreemption() {
int[][] qData = new int[][]{
// / A B C D
{ 100, 10, 40, 20, 30 }, // abs
{ 100, 100, 100, 100, 100 }, // maxCap
{ 100, 30, 60, 10, 0 }, // used
{ 45, 20, 5, 20, 0 }, // pending
{ 0, 0, 0, 0, 0 }, // reserved
@ -145,11 +147,32 @@ public void testProportionalPreemption() {
verify(mDisp, times(16)).handle(argThat(new IsPreemptionRequestFor(appA)));
}
@Test
public void testMaxCap() {
int[][] qData = new int[][]{
// / A B C
{ 100, 40, 40, 20 }, // abs
{ 100, 100, 45, 100 }, // maxCap
{ 100, 55, 45, 0 }, // used
{ 20, 10, 10, 0 }, // pending
{ 0, 0, 0, 0 }, // reserved
{ 2, 1, 1, 0 }, // apps
{ -1, 1, 1, 0 }, // req granularity
{ 3, 0, 0, 0 }, // subqueues
};
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule();
// despite the imbalance, since B is at maxCap, do not correct
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA)));
}
@Test
public void testPreemptCycle() {
int[][] qData = new int[][]{
// / A B C
{ 100, 40, 40, 20 }, // abs
{ 100, 100, 100, 100 }, // maxCap
{ 100, 0, 60, 40 }, // used
{ 10, 10, 0, 0 }, // pending
{ 0, 0, 0, 0 }, // reserved
@ -169,6 +192,7 @@ public void testExpireKill() {
int[][] qData = new int[][]{
// / A B C
{ 100, 40, 40, 20 }, // abs
{ 100, 100, 100, 100 }, // maxCap
{ 100, 0, 60, 40 }, // used
{ 10, 10, 0, 0 }, // pending
{ 0, 0, 0, 0 }, // reserved
@ -205,6 +229,7 @@ public void testDeadzone() {
int[][] qData = new int[][]{
// / A B C
{ 100, 40, 40, 20 }, // abs
{ 100, 100, 100, 100 }, // maxCap
{ 100, 39, 43, 21 }, // used
{ 10, 10, 0, 0 }, // pending
{ 0, 0, 0, 0 }, // reserved
@ -224,6 +249,7 @@ public void testOverCapacityImbalance() {
int[][] qData = new int[][]{
// / A B C
{ 100, 40, 40, 20 }, // abs
{ 100, 100, 100, 100 }, // maxCap
{ 100, 55, 45, 0 }, // used
{ 20, 10, 10, 0 }, // pending
{ 0, 0, 0, 0 }, // reserved
@ -242,6 +268,7 @@ public void testNaturalTermination() {
int[][] qData = new int[][]{
// / A B C
{ 100, 40, 40, 20 }, // abs
{ 100, 100, 100, 100 }, // maxCap
{ 100, 55, 45, 0 }, // used
{ 20, 10, 10, 0 }, // pending
{ 0, 0, 0, 0 }, // reserved
@ -261,6 +288,7 @@ public void testObserveOnly() {
int[][] qData = new int[][]{
// / A B C
{ 100, 40, 40, 20 }, // abs
{ 100, 100, 100, 100 }, // maxCap
{ 100, 90, 10, 0 }, // used
{ 80, 10, 20, 50 }, // pending
{ 0, 0, 0, 0 }, // reserved
@ -280,6 +308,7 @@ public void testHierarchical() {
int[][] qData = new int[][] {
// / A B C D E F
{ 200, 100, 50, 50, 100, 10, 90 }, // abs
{ 200, 200, 200, 200, 200, 200, 200 }, // maxCap
{ 200, 110, 60, 50, 90, 90, 0 }, // used
{ 10, 0, 0, 0, 10, 0, 10 }, // pending
{ 0, 0, 0, 0, 0, 0, 0 }, // reserved
@ -294,11 +323,55 @@ public void testHierarchical() {
verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appA)));
}
@Test
public void testZeroGuar() {
int[][] qData = new int[][] {
// / A B C D E F
{ 200, 100, 0, 99, 100, 10, 90 }, // abs
{ 200, 200, 200, 200, 200, 200, 200 }, // maxCap
{ 170, 80, 60, 20, 90, 90, 0 }, // used
{ 10, 0, 0, 0, 10, 0, 10 }, // pending
{ 0, 0, 0, 0, 0, 0, 0 }, // reserved
{ 4, 2, 1, 1, 2, 1, 1 }, // apps
{ -1, -1, 1, 1, -1, 1, 1 }, // req granularity
{ 2, 2, 0, 0, 2, 0, 0 }, // subqueues
};
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule();
// verify capacity taken from A1, not B1 despite B1 being far over
// its absolute guaranteed capacity
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA)));
}
@Test
public void testZeroGuarOverCap() {
int[][] qData = new int[][] {
// / A B C D E F
{ 200, 100, 0, 99, 0, 100, 100 }, // abs
{ 200, 200, 200, 200, 200, 200, 200 }, // maxCap
{ 170, 170, 60, 20, 90, 0, 0 }, // used
{ 85, 50, 30, 10, 10, 20, 20 }, // pending
{ 0, 0, 0, 0, 0, 0, 0 }, // reserved
{ 4, 3, 1, 1, 1, 1, 1 }, // apps
{ -1, -1, 1, 1, 1, -1, 1 }, // req granularity
{ 2, 3, 0, 0, 0, 1, 0 }, // subqueues
};
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule();
// we verify both that C has priority on B and D (has it has >0 guarantees)
// and that B and D are force to share their over capacity fairly (as they
// are both zero-guarantees) hence D sees some of its containers preempted
verify(mDisp, times(14)).handle(argThat(new IsPreemptionRequestFor(appC)));
}
@Test
public void testHierarchicalLarge() {
int[][] qData = new int[][] {
// / A B C D E F G H I
{ 400, 200, 60,140, 100, 70, 30, 100, 10, 90 }, // abs
{ 400, 200, 60, 140, 100, 70, 30, 100, 10, 90 }, // abs
{ 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, }, // maxCap
{ 400, 210, 70,140, 100, 50, 50, 90, 90, 0 }, // used
{ 10, 0, 0, 0, 0, 0, 0, 0, 0, 15 }, // pending
{ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }, // reserved
@ -382,24 +455,25 @@ ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData) {
when(mCS.getRootQueue()).thenReturn(mRoot);
Resource clusterResources =
Resource.newInstance(leafAbsCapacities(qData[0], qData[6]), 0);
Resource.newInstance(leafAbsCapacities(qData[0], qData[7]), 0);
when(mCS.getClusterResources()).thenReturn(clusterResources);
return policy;
}
ParentQueue buildMockRootQueue(Random r, int[]... queueData) {
int[] abs = queueData[0];
int[] used = queueData[1];
int[] pending = queueData[2];
int[] reserved = queueData[3];
int[] apps = queueData[4];
int[] gran = queueData[5];
int[] queues = queueData[6];
int[] maxCap = queueData[1];
int[] used = queueData[2];
int[] pending = queueData[3];
int[] reserved = queueData[4];
int[] apps = queueData[5];
int[] gran = queueData[6];
int[] queues = queueData[7];
return mockNested(abs, used, pending, reserved, apps, gran, queues);
return mockNested(abs, maxCap, used, pending, reserved, apps, gran, queues);
}
ParentQueue mockNested(int[] abs, int[] used,
ParentQueue mockNested(int[] abs, int[] maxCap, int[] used,
int[] pending, int[] reserved, int[] apps, int[] gran, int[] queues) {
float tot = leafAbsCapacities(abs, queues);
Deque<ParentQueue> pqs = new LinkedList<ParentQueue>();
@ -407,6 +481,8 @@ ParentQueue mockNested(int[] abs, int[] used,
when(root.getQueueName()).thenReturn("/");
when(root.getAbsoluteUsedCapacity()).thenReturn(used[0] / tot);
when(root.getAbsoluteCapacity()).thenReturn(abs[0] / tot);
when(root.getAbsoluteMaximumCapacity()).thenReturn(maxCap[0] / tot);
for (int i = 1; i < queues.length; ++i) {
final CSQueue q;
final ParentQueue p = pqs.removeLast();
@ -420,6 +496,7 @@ ParentQueue mockNested(int[] abs, int[] used,
when(q.getQueueName()).thenReturn(queueName);
when(q.getAbsoluteUsedCapacity()).thenReturn(used[i] / tot);
when(q.getAbsoluteCapacity()).thenReturn(abs[i] / tot);
when(q.getAbsoluteMaximumCapacity()).thenReturn(maxCap[i] / tot);
}
assert 0 == pqs.size();
return root;