YARN-9173. FairShare calculation broken for large values after YARN-8833. Contributed by Wilfred Spiegelenburg.
This commit is contained in:
parent
f87b3b11c4
commit
944cf87223
@ -24,6 +24,8 @@
|
|||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
|
||||||
|
|
||||||
|
import static java.lang.Math.addExact;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Contains logic for computing the fair shares. A {@link Schedulable}'s fair
|
* Contains logic for computing the fair shares. A {@link Schedulable}'s fair
|
||||||
* share is {@link Resource} it is entitled to, independent of the current
|
* share is {@link Resource} it is entitled to, independent of the current
|
||||||
@ -31,10 +33,13 @@
|
|||||||
* consumption lies at or below its fair share will never have its containers
|
* consumption lies at or below its fair share will never have its containers
|
||||||
* preempted.
|
* preempted.
|
||||||
*/
|
*/
|
||||||
public class ComputeFairShares {
|
public final class ComputeFairShares {
|
||||||
|
|
||||||
private static final int COMPUTE_FAIR_SHARES_ITERATIONS = 25;
|
private static final int COMPUTE_FAIR_SHARES_ITERATIONS = 25;
|
||||||
|
|
||||||
|
private ComputeFairShares() {
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Compute fair share of the given schedulables.Fair share is an allocation of
|
* Compute fair share of the given schedulables.Fair share is an allocation of
|
||||||
* shares considering only active schedulables ie schedulables which have
|
* shares considering only active schedulables ie schedulables which have
|
||||||
@ -100,19 +105,20 @@ public static void computeSteadyShares(
|
|||||||
* all Schedulables are only given their minShare) and an upper bound computed
|
* all Schedulables are only given their minShare) and an upper bound computed
|
||||||
* to be large enough that too many slots are given (by doubling R until we
|
* to be large enough that too many slots are given (by doubling R until we
|
||||||
* use more than totalResources resources). The helper method
|
* use more than totalResources resources). The helper method
|
||||||
* resourceUsedWithWeightToResourceRatio computes the total resources used with a
|
* resourceUsedWithWeightToResourceRatio computes the total resources used
|
||||||
* given value of R.
|
* with a given value of R.
|
||||||
* <p>
|
* <p>
|
||||||
* The running time of this algorithm is linear in the number of Schedulables,
|
* The running time of this algorithm is linear in the number of Schedulables,
|
||||||
* because resourceUsedWithWeightToResourceRatio is linear-time and the number of
|
* because resourceUsedWithWeightToResourceRatio is linear-time and the
|
||||||
* iterations of binary search is a constant (dependent on desired precision).
|
* number of iterations of binary search is a constant (dependent on desired
|
||||||
|
* precision).
|
||||||
*/
|
*/
|
||||||
private static void computeSharesInternal(
|
private static void computeSharesInternal(
|
||||||
Collection<? extends Schedulable> allSchedulables,
|
Collection<? extends Schedulable> allSchedulables,
|
||||||
Resource totalResources, String type, boolean isSteadyShare) {
|
Resource totalResources, String type, boolean isSteadyShare) {
|
||||||
|
|
||||||
Collection<Schedulable> schedulables = new ArrayList<>();
|
Collection<Schedulable> schedulables = new ArrayList<>();
|
||||||
int takenResources = handleFixedFairShares(
|
long takenResources = handleFixedFairShares(
|
||||||
allSchedulables, schedulables, isSteadyShare, type);
|
allSchedulables, schedulables, isSteadyShare, type);
|
||||||
|
|
||||||
if (schedulables.isEmpty()) {
|
if (schedulables.isEmpty()) {
|
||||||
@ -121,12 +127,11 @@ private static void computeSharesInternal(
|
|||||||
// Find an upper bound on R that we can use in our binary search. We start
|
// Find an upper bound on R that we can use in our binary search. We start
|
||||||
// at R = 1 and double it until we have either used all the resources or we
|
// at R = 1 and double it until we have either used all the resources or we
|
||||||
// have met all Schedulables' max shares.
|
// have met all Schedulables' max shares.
|
||||||
int totalMaxShare = 0;
|
long totalMaxShare = 0;
|
||||||
for (Schedulable sched : schedulables) {
|
for (Schedulable sched : schedulables) {
|
||||||
long maxShare = sched.getMaxShare().getResourceValue(type);
|
long maxShare = sched.getMaxShare().getResourceValue(type);
|
||||||
totalMaxShare = (int) Math.min(maxShare + (long)totalMaxShare,
|
totalMaxShare = safeAdd(maxShare, totalMaxShare);
|
||||||
Integer.MAX_VALUE);
|
if (totalMaxShare == Long.MAX_VALUE) {
|
||||||
if (totalMaxShare == Integer.MAX_VALUE) {
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -166,23 +171,24 @@ private static void computeSharesInternal(
|
|||||||
target = sched.getFairShare();
|
target = sched.getFairShare();
|
||||||
}
|
}
|
||||||
|
|
||||||
target.setResourceValue(type, (long)computeShare(sched, right, type));
|
target.setResourceValue(type, computeShare(sched, right, type));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Compute the resources that would be used given a weight-to-resource ratio
|
* Compute the resources that would be used given a weight-to-resource ratio
|
||||||
* w2rRatio, for use in the computeFairShares algorithm as described in #
|
* w2rRatio, for use in the computeFairShares algorithm as described in
|
||||||
|
* {@link #computeSharesInternal}.
|
||||||
*/
|
*/
|
||||||
private static long resourceUsedWithWeightToResourceRatio(double w2rRatio,
|
private static long resourceUsedWithWeightToResourceRatio(double w2rRatio,
|
||||||
Collection<? extends Schedulable> schedulables, String type) {
|
Collection<? extends Schedulable> schedulables, String type) {
|
||||||
long resourcesTaken = 0;
|
long resourcesTaken = 0;
|
||||||
for (Schedulable sched : schedulables) {
|
for (Schedulable sched : schedulables) {
|
||||||
long share = computeShare(sched, w2rRatio, type);
|
long share = computeShare(sched, w2rRatio, type);
|
||||||
if (Long.MAX_VALUE - resourcesTaken < share) {
|
resourcesTaken = safeAdd(resourcesTaken, share);
|
||||||
return Long.MAX_VALUE;
|
if (resourcesTaken == Long.MAX_VALUE) {
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
resourcesTaken += share;
|
|
||||||
}
|
}
|
||||||
return resourcesTaken;
|
return resourcesTaken;
|
||||||
}
|
}
|
||||||
@ -204,11 +210,11 @@ private static long computeShare(Schedulable sched, double w2rRatio,
|
|||||||
* Returns the resources taken by fixed fairshare schedulables,
|
* Returns the resources taken by fixed fairshare schedulables,
|
||||||
* and adds the remaining to the passed nonFixedSchedulables.
|
* and adds the remaining to the passed nonFixedSchedulables.
|
||||||
*/
|
*/
|
||||||
private static int handleFixedFairShares(
|
private static long handleFixedFairShares(
|
||||||
Collection<? extends Schedulable> schedulables,
|
Collection<? extends Schedulable> schedulables,
|
||||||
Collection<Schedulable> nonFixedSchedulables,
|
Collection<Schedulable> nonFixedSchedulables,
|
||||||
boolean isSteadyShare, String type) {
|
boolean isSteadyShare, String type) {
|
||||||
int totalResource = 0;
|
long totalResource = 0;
|
||||||
|
|
||||||
for (Schedulable sched : schedulables) {
|
for (Schedulable sched : schedulables) {
|
||||||
long fixedShare = getFairShareIfFixed(sched, isSteadyShare, type);
|
long fixedShare = getFairShareIfFixed(sched, isSteadyShare, type);
|
||||||
@ -224,15 +230,15 @@ private static int handleFixedFairShares(
|
|||||||
}
|
}
|
||||||
|
|
||||||
target.setResourceValue(type, fixedShare);
|
target.setResourceValue(type, fixedShare);
|
||||||
totalResource = (int) Math.min((long)totalResource + (long)fixedShare,
|
totalResource = safeAdd(totalResource, fixedShare);
|
||||||
Integer.MAX_VALUE);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return totalResource;
|
return totalResource;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the fairshare for the {@link Schedulable} if it is fixed, -1 otherwise.
|
* Get the fairshare for the {@link Schedulable} if it is fixed,
|
||||||
|
* -1 otherwise.
|
||||||
*
|
*
|
||||||
* The fairshare is fixed if either the maxShare is 0, weight is 0,
|
* The fairshare is fixed if either the maxShare is 0, weight is 0,
|
||||||
* or the Schedulable is not active for instantaneous fairshare.
|
* or the Schedulable is not active for instantaneous fairshare.
|
||||||
@ -259,4 +265,20 @@ private static long getFairShareIfFixed(Schedulable sched,
|
|||||||
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Safely add two long values. The result will always be a valid long value.
|
||||||
|
* If the addition caused an overflow the return value will be set to
|
||||||
|
* <code>Long.MAX_VALUE</code>.
|
||||||
|
* @param a first long to add
|
||||||
|
* @param b second long to add
|
||||||
|
* @return result of the addition
|
||||||
|
*/
|
||||||
|
private static long safeAdd(long a, long b) {
|
||||||
|
try {
|
||||||
|
return addExact(a, b);
|
||||||
|
} catch (ArithmeticException ae) {
|
||||||
|
return Long.MAX_VALUE;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -70,7 +70,18 @@ public FakeSchedulable(Resource minShare, float weights) {
|
|||||||
weights, Resources.createResource(0, 0),
|
weights, Resources.createResource(0, 0),
|
||||||
Resources.createResource(0, 0), 0);
|
Resources.createResource(0, 0), 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public FakeSchedulable(long minShare, long maxShare) {
|
||||||
|
this(minShare, maxShare, 1L);
|
||||||
|
}
|
||||||
|
|
||||||
|
public FakeSchedulable(long minShare, long maxShare, float weights) {
|
||||||
|
this(Resources.createResource(minShare, 0),
|
||||||
|
Resources.createResource(maxShare, 0),
|
||||||
|
weights, Resources.createResource(0, 0),
|
||||||
|
Resources.createResource(0, 0), 0);
|
||||||
|
}
|
||||||
|
|
||||||
public FakeSchedulable(Resource minShare, Resource maxShare,
|
public FakeSchedulable(Resource minShare, Resource maxShare,
|
||||||
float weight, Resource fairShare, Resource usage, long startTime) {
|
float weight, Resource fairShare, Resource usage, long startTime) {
|
||||||
this.minShare = minShare;
|
this.minShare = minShare;
|
||||||
@ -146,8 +157,8 @@ public boolean isPreemptable() {
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setResourceUsage(Resource usage) {
|
public void setResourceUsage(Resource resourceUsage) {
|
||||||
this.usage = usage;
|
this.usage = resourceUsage;
|
||||||
}
|
}
|
||||||
|
|
||||||
public final void start(long time) {
|
public final void start(long time) {
|
||||||
|
@ -19,10 +19,8 @@
|
|||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||||
|
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
@ -40,7 +38,7 @@ public class TestComputeFairShares {
|
|||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
scheds = new ArrayList<Schedulable>();
|
scheds = new ArrayList<>();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -149,22 +147,72 @@ public void testWeightedSharingWithMinShares() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test that shares are computed accurately even when the number of slots is
|
* Test that shares are computed accurately even when the number of
|
||||||
* very large.
|
* resources is very large.
|
||||||
|
* Test adapted to accommodate long values for resources.
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testLargeShares() {
|
public void testLargeShares() {
|
||||||
int million = 1000 * 1000;
|
long giga = 1000L * 1000L * 1000L * 4L;
|
||||||
scheds.add(new FakeSchedulable());
|
scheds.add(new FakeSchedulable(0L, giga));
|
||||||
scheds.add(new FakeSchedulable());
|
scheds.add(new FakeSchedulable(0L, giga));
|
||||||
scheds.add(new FakeSchedulable());
|
scheds.add(new FakeSchedulable(0L, giga));
|
||||||
scheds.add(new FakeSchedulable());
|
scheds.add(new FakeSchedulable(0L, giga));
|
||||||
ComputeFairShares.computeShares(scheds,
|
ComputeFairShares.computeShares(scheds,
|
||||||
Resources.createResource(40 * million),
|
Resources.createResource(4 * giga),
|
||||||
ResourceInformation.MEMORY_MB.getName());
|
ResourceInformation.MEMORY_MB.getName());
|
||||||
verifyMemoryShares(10 * million, 10 * million, 10 * million, 10 * million);
|
verifyMemoryShares(giga, giga, giga, giga);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test overflow in the resources taken and upper bound.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testLargeMinimums() {
|
||||||
|
long giga = 1000L * 1000L * 1000L * 4L;
|
||||||
|
scheds.add(new FakeSchedulable(Long.MAX_VALUE, Long.MAX_VALUE));
|
||||||
|
scheds.add(new FakeSchedulable(giga, giga));
|
||||||
|
ComputeFairShares.computeShares(scheds,
|
||||||
|
Resources.createResource(4 * giga),
|
||||||
|
ResourceInformation.MEMORY_MB.getName());
|
||||||
|
verifyMemoryShares(Long.MAX_VALUE, giga);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test overflow in the upper bound calculation for the binary search.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testOverflowMaxShare() {
|
||||||
|
long giga = 1000L * 1000L * 1000L;
|
||||||
|
scheds.add(new FakeSchedulable(0L, giga));
|
||||||
|
scheds.add(new FakeSchedulable(0L, Long.MAX_VALUE));
|
||||||
|
ComputeFairShares.computeShares(scheds,
|
||||||
|
Resources.createResource(2 * giga),
|
||||||
|
ResourceInformation.MEMORY_MB.getName());
|
||||||
|
verifyMemoryShares(giga, giga);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test overflow in the fixed share calculations. The 3th schedulable should
|
||||||
|
* not get any share as all resources are taken by the handleFixedShare()
|
||||||
|
* call.
|
||||||
|
* With the overflow it looked like there were more resources available then
|
||||||
|
* there really are.
|
||||||
|
* The values in the test might not be "real" but they show the overflow.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testOverflowFixedShare() {
|
||||||
|
long giga = 1000L * 1000L * 1000L;
|
||||||
|
long minValue = Long.MAX_VALUE - 1L;
|
||||||
|
scheds.add(new FakeSchedulable(giga, giga, 0));
|
||||||
|
scheds.add(new FakeSchedulable(minValue, Long.MAX_VALUE, 0));
|
||||||
|
scheds.add(new FakeSchedulable(0L, giga));
|
||||||
|
ComputeFairShares.computeShares(scheds,
|
||||||
|
Resources.createResource(1000L),
|
||||||
|
ResourceInformation.MEMORY_MB.getName());
|
||||||
|
verifyMemoryShares(giga, minValue, 0);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test that being called on an empty list doesn't confuse the algorithm.
|
* Test that being called on an empty list doesn't confuse the algorithm.
|
||||||
*/
|
*/
|
||||||
@ -176,7 +224,7 @@ public void testEmptyList() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test that CPU works as well as memory
|
* Test that CPU works as well as memory.
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testCPU() {
|
public void testCPU() {
|
||||||
@ -192,10 +240,12 @@ public void testCPU() {
|
|||||||
/**
|
/**
|
||||||
* Check that a given list of shares have been assigned to this.scheds.
|
* Check that a given list of shares have been assigned to this.scheds.
|
||||||
*/
|
*/
|
||||||
private void verifyMemoryShares(int... shares) {
|
private void verifyMemoryShares(long... shares) {
|
||||||
Assert.assertEquals(scheds.size(), shares.length);
|
Assert.assertEquals("Number of shares and schedulables are not consistent",
|
||||||
|
scheds.size(), shares.length);
|
||||||
for (int i = 0; i < shares.length; i++) {
|
for (int i = 0; i < shares.length; i++) {
|
||||||
Assert.assertEquals(shares[i], scheds.get(i).getFairShare().getMemorySize());
|
Assert.assertEquals("Expected share number " + i + " in list wrong",
|
||||||
|
shares[i], scheds.get(i).getFairShare().getMemorySize());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -203,23 +253,11 @@ private void verifyMemoryShares(int... shares) {
|
|||||||
* Check that a given list of shares have been assigned to this.scheds.
|
* Check that a given list of shares have been assigned to this.scheds.
|
||||||
*/
|
*/
|
||||||
private void verifyCPUShares(int... shares) {
|
private void verifyCPUShares(int... shares) {
|
||||||
Assert.assertEquals(scheds.size(), shares.length);
|
Assert.assertEquals("Number of shares and schedulables are not consistent",
|
||||||
|
scheds.size(), shares.length);
|
||||||
for (int i = 0; i < shares.length; i++) {
|
for (int i = 0; i < shares.length; i++) {
|
||||||
Assert.assertEquals(shares[i], scheds.get(i).getFairShare().getVirtualCores());
|
Assert.assertEquals("Expected share number " + i + " in list wrong",
|
||||||
|
shares[i], scheds.get(i).getFairShare().getVirtualCores());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Test computeShares will not enter into infinite loop.
|
|
||||||
*/
|
|
||||||
@Test(timeout = 10000)
|
|
||||||
public void testResourceUsedWithWeightToResourceRatio() {
|
|
||||||
Collection<Schedulable> schedulables = new ArrayList<>();
|
|
||||||
schedulables.add(new FakeSchedulable(Integer.MAX_VALUE));
|
|
||||||
schedulables.add(new FakeSchedulable(Integer.MAX_VALUE));
|
|
||||||
|
|
||||||
Resource totalResource = Resource.newInstance(Integer.MAX_VALUE, 0);
|
|
||||||
ComputeFairShares.computeShares(
|
|
||||||
schedulables, totalResource, ResourceInformation.MEMORY_URI);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user