diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java
index 3fe0c68986..97bb4c5617 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java
@@ -24,6 +24,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
+import static java.lang.Math.addExact;
+
/**
* Contains logic for computing the fair shares. A {@link Schedulable}'s fair
* share is {@link Resource} it is entitled to, independent of the current
@@ -31,10 +33,13 @@
* consumption lies at or below its fair share will never have its containers
* preempted.
*/
-public class ComputeFairShares {
+public final class ComputeFairShares {
private static final int COMPUTE_FAIR_SHARES_ITERATIONS = 25;
+ private ComputeFairShares() {
+ }
+
/**
* Compute fair share of the given schedulables.Fair share is an allocation of
* shares considering only active schedulables ie schedulables which have
@@ -100,19 +105,20 @@ public static void computeSteadyShares(
* all Schedulables are only given their minShare) and an upper bound computed
* to be large enough that too many slots are given (by doubling R until we
* use more than totalResources resources). The helper method
- * resourceUsedWithWeightToResourceRatio computes the total resources used with a
- * given value of R.
+ * resourceUsedWithWeightToResourceRatio computes the total resources used
+ * with a given value of R.
*
* The running time of this algorithm is linear in the number of Schedulables,
- * because resourceUsedWithWeightToResourceRatio is linear-time and the number of
- * iterations of binary search is a constant (dependent on desired precision).
+ * because resourceUsedWithWeightToResourceRatio is linear-time and the
+ * number of iterations of binary search is a constant (dependent on desired
+ * precision).
*/
private static void computeSharesInternal(
Collection extends Schedulable> allSchedulables,
Resource totalResources, String type, boolean isSteadyShare) {
Collection schedulables = new ArrayList<>();
- int takenResources = handleFixedFairShares(
+ long takenResources = handleFixedFairShares(
allSchedulables, schedulables, isSteadyShare, type);
if (schedulables.isEmpty()) {
@@ -121,12 +127,11 @@ private static void computeSharesInternal(
// Find an upper bound on R that we can use in our binary search. We start
// at R = 1 and double it until we have either used all the resources or we
// have met all Schedulables' max shares.
- int totalMaxShare = 0;
+ long totalMaxShare = 0;
for (Schedulable sched : schedulables) {
long maxShare = sched.getMaxShare().getResourceValue(type);
- totalMaxShare = (int) Math.min(maxShare + (long)totalMaxShare,
- Integer.MAX_VALUE);
- if (totalMaxShare == Integer.MAX_VALUE) {
+ totalMaxShare = safeAdd(maxShare, totalMaxShare);
+ if (totalMaxShare == Long.MAX_VALUE) {
break;
}
}
@@ -166,23 +171,24 @@ private static void computeSharesInternal(
target = sched.getFairShare();
}
- target.setResourceValue(type, (long)computeShare(sched, right, type));
+ target.setResourceValue(type, computeShare(sched, right, type));
}
}
/**
* Compute the resources that would be used given a weight-to-resource ratio
- * w2rRatio, for use in the computeFairShares algorithm as described in #
+ * w2rRatio, for use in the computeFairShares algorithm as described in
+ * {@link #computeSharesInternal}.
*/
private static long resourceUsedWithWeightToResourceRatio(double w2rRatio,
Collection extends Schedulable> schedulables, String type) {
long resourcesTaken = 0;
for (Schedulable sched : schedulables) {
long share = computeShare(sched, w2rRatio, type);
- if (Long.MAX_VALUE - resourcesTaken < share) {
- return Long.MAX_VALUE;
+ resourcesTaken = safeAdd(resourcesTaken, share);
+ if (resourcesTaken == Long.MAX_VALUE) {
+ break;
}
- resourcesTaken += share;
}
return resourcesTaken;
}
@@ -204,11 +210,11 @@ private static long computeShare(Schedulable sched, double w2rRatio,
* Returns the resources taken by fixed fairshare schedulables,
* and adds the remaining to the passed nonFixedSchedulables.
*/
- private static int handleFixedFairShares(
+ private static long handleFixedFairShares(
Collection extends Schedulable> schedulables,
Collection nonFixedSchedulables,
boolean isSteadyShare, String type) {
- int totalResource = 0;
+ long totalResource = 0;
for (Schedulable sched : schedulables) {
long fixedShare = getFairShareIfFixed(sched, isSteadyShare, type);
@@ -224,15 +230,15 @@ private static int handleFixedFairShares(
}
target.setResourceValue(type, fixedShare);
- totalResource = (int) Math.min((long)totalResource + (long)fixedShare,
- Integer.MAX_VALUE);
+ totalResource = safeAdd(totalResource, fixedShare);
}
}
return totalResource;
}
/**
- * Get the fairshare for the {@link Schedulable} if it is fixed, -1 otherwise.
+ * Get the fairshare for the {@link Schedulable} if it is fixed,
+ * -1 otherwise.
*
* The fairshare is fixed if either the maxShare is 0, weight is 0,
* or the Schedulable is not active for instantaneous fairshare.
@@ -259,4 +265,20 @@ private static long getFairShareIfFixed(Schedulable sched,
return -1;
}
+
+ /**
+ * Safely add two long values. The result will always be a valid long value.
+ * If the addition caused an overflow the return value will be set to
+ * Long.MAX_VALUE
.
+ * @param a first long to add
+ * @param b second long to add
+ * @return result of the addition
+ */
+ private static long safeAdd(long a, long b) {
+ try {
+ return addExact(a, b);
+ } catch (ArithmeticException ae) {
+ return Long.MAX_VALUE;
+ }
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java
index b1fc2d0bd9..c42084e519 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java
@@ -70,7 +70,18 @@ public FakeSchedulable(Resource minShare, float weights) {
weights, Resources.createResource(0, 0),
Resources.createResource(0, 0), 0);
}
-
+
+ public FakeSchedulable(long minShare, long maxShare) {
+ this(minShare, maxShare, 1L);
+ }
+
+ public FakeSchedulable(long minShare, long maxShare, float weights) {
+ this(Resources.createResource(minShare, 0),
+ Resources.createResource(maxShare, 0),
+ weights, Resources.createResource(0, 0),
+ Resources.createResource(0, 0), 0);
+ }
+
public FakeSchedulable(Resource minShare, Resource maxShare,
float weight, Resource fairShare, Resource usage, long startTime) {
this.minShare = minShare;
@@ -146,8 +157,8 @@ public boolean isPreemptable() {
return true;
}
- public void setResourceUsage(Resource usage) {
- this.usage = usage;
+ public void setResourceUsage(Resource resourceUsage) {
+ this.usage = resourceUsage;
}
public final void start(long time) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestComputeFairShares.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestComputeFairShares.java
index b1666d61a5..5d3d49ab82 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestComputeFairShares.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestComputeFairShares.java
@@ -19,10 +19,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.List;
-import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.junit.Assert;
@@ -40,7 +38,7 @@ public class TestComputeFairShares {
@Before
public void setUp() throws Exception {
- scheds = new ArrayList();
+ scheds = new ArrayList<>();
}
/**
@@ -149,22 +147,72 @@ public void testWeightedSharingWithMinShares() {
}
/**
- * Test that shares are computed accurately even when the number of slots is
- * very large.
+ * Test that shares are computed accurately even when the number of
+ * resources is very large.
+ * Test adapted to accommodate long values for resources.
*/
@Test
public void testLargeShares() {
- int million = 1000 * 1000;
- scheds.add(new FakeSchedulable());
- scheds.add(new FakeSchedulable());
- scheds.add(new FakeSchedulable());
- scheds.add(new FakeSchedulable());
+ long giga = 1000L * 1000L * 1000L * 4L;
+ scheds.add(new FakeSchedulable(0L, giga));
+ scheds.add(new FakeSchedulable(0L, giga));
+ scheds.add(new FakeSchedulable(0L, giga));
+ scheds.add(new FakeSchedulable(0L, giga));
ComputeFairShares.computeShares(scheds,
- Resources.createResource(40 * million),
+ Resources.createResource(4 * giga),
ResourceInformation.MEMORY_MB.getName());
- verifyMemoryShares(10 * million, 10 * million, 10 * million, 10 * million);
+ verifyMemoryShares(giga, giga, giga, giga);
}
-
+
+ /**
+ * Test overflow in the resources taken and upper bound.
+ */
+ @Test
+ public void testLargeMinimums() {
+ long giga = 1000L * 1000L * 1000L * 4L;
+ scheds.add(new FakeSchedulable(Long.MAX_VALUE, Long.MAX_VALUE));
+ scheds.add(new FakeSchedulable(giga, giga));
+ ComputeFairShares.computeShares(scheds,
+ Resources.createResource(4 * giga),
+ ResourceInformation.MEMORY_MB.getName());
+ verifyMemoryShares(Long.MAX_VALUE, giga);
+ }
+
+ /**
+ * Test overflow in the upper bound calculation for the binary search.
+ */
+ @Test
+ public void testOverflowMaxShare() {
+ long giga = 1000L * 1000L * 1000L;
+ scheds.add(new FakeSchedulable(0L, giga));
+ scheds.add(new FakeSchedulable(0L, Long.MAX_VALUE));
+ ComputeFairShares.computeShares(scheds,
+ Resources.createResource(2 * giga),
+ ResourceInformation.MEMORY_MB.getName());
+ verifyMemoryShares(giga, giga);
+ }
+
+ /**
+ * Test overflow in the fixed share calculations. The 3th schedulable should
+ * not get any share as all resources are taken by the handleFixedShare()
+ * call.
+ * With the overflow it looked like there were more resources available then
+ * there really are.
+ * The values in the test might not be "real" but they show the overflow.
+ */
+ @Test
+ public void testOverflowFixedShare() {
+ long giga = 1000L * 1000L * 1000L;
+ long minValue = Long.MAX_VALUE - 1L;
+ scheds.add(new FakeSchedulable(giga, giga, 0));
+ scheds.add(new FakeSchedulable(minValue, Long.MAX_VALUE, 0));
+ scheds.add(new FakeSchedulable(0L, giga));
+ ComputeFairShares.computeShares(scheds,
+ Resources.createResource(1000L),
+ ResourceInformation.MEMORY_MB.getName());
+ verifyMemoryShares(giga, minValue, 0);
+ }
+
/**
* Test that being called on an empty list doesn't confuse the algorithm.
*/
@@ -176,7 +224,7 @@ public void testEmptyList() {
}
/**
- * Test that CPU works as well as memory
+ * Test that CPU works as well as memory.
*/
@Test
public void testCPU() {
@@ -192,10 +240,12 @@ public void testCPU() {
/**
* Check that a given list of shares have been assigned to this.scheds.
*/
- private void verifyMemoryShares(int... shares) {
- Assert.assertEquals(scheds.size(), shares.length);
+ private void verifyMemoryShares(long... shares) {
+ Assert.assertEquals("Number of shares and schedulables are not consistent",
+ scheds.size(), shares.length);
for (int i = 0; i < shares.length; i++) {
- Assert.assertEquals(shares[i], scheds.get(i).getFairShare().getMemorySize());
+ Assert.assertEquals("Expected share number " + i + " in list wrong",
+ shares[i], scheds.get(i).getFairShare().getMemorySize());
}
}
@@ -203,23 +253,11 @@ private void verifyMemoryShares(int... shares) {
* Check that a given list of shares have been assigned to this.scheds.
*/
private void verifyCPUShares(int... shares) {
- Assert.assertEquals(scheds.size(), shares.length);
+ Assert.assertEquals("Number of shares and schedulables are not consistent",
+ scheds.size(), shares.length);
for (int i = 0; i < shares.length; i++) {
- Assert.assertEquals(shares[i], scheds.get(i).getFairShare().getVirtualCores());
+ Assert.assertEquals("Expected share number " + i + " in list wrong",
+ shares[i], scheds.get(i).getFairShare().getVirtualCores());
}
}
-
- /**
- * Test computeShares will not enter into infinite loop.
- */
- @Test(timeout = 10000)
- public void testResourceUsedWithWeightToResourceRatio() {
- Collection schedulables = new ArrayList<>();
- schedulables.add(new FakeSchedulable(Integer.MAX_VALUE));
- schedulables.add(new FakeSchedulable(Integer.MAX_VALUE));
-
- Resource totalResource = Resource.newInstance(Integer.MAX_VALUE, 0);
- ComputeFairShares.computeShares(
- schedulables, totalResource, ResourceInformation.MEMORY_URI);
- }
}