diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java index 9a5bc79ae0..796b6662dd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java @@ -22,6 +22,7 @@ import org.apache.commons.lang.NotImplementedException; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability.Evolving; @@ -66,8 +67,10 @@ public abstract class Resource implements Comparable { // copy array, etc. protected static final int NUM_MANDATORY_RESOURCES = 2; - protected static final int MEMORY_INDEX = 0; - protected static final int VCORES_INDEX = 1; + @Private + public static final int MEMORY_INDEX = 0; + @Private + public static final int VCORES_INDEX = 1; @Public @Stable diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java index dad62fbd14..59908efafd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java @@ -19,7 +19,6 @@ package org.apache.hadoop.yarn.api.records; import com.google.common.collect.ImmutableMap; -import org.apache.curator.shaded.com.google.common.reflect.ClassPath; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes; import org.apache.hadoop.yarn.util.UnitsConversionUtil; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java index e58b357296..59635d987a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java @@ -47,8 +47,12 @@ public class DominantResourceFairnessPolicy extends SchedulingPolicy { public static final String NAME = "DRF"; - private static final DominantResourceFairnessComparator COMPARATOR = - new DominantResourceFairnessComparator(); + private static final int NUM_RESOURCES = + ResourceUtils.getNumberOfKnownResourceTypes(); + private static final DominantResourceFairnessComparator COMPARATORN = + new DominantResourceFairnessComparatorN(); + private static final DominantResourceFairnessComparator COMPARATOR2 = + new DominantResourceFairnessComparator2(); private static final DominantResourceCalculator CALCULATOR = new DominantResourceCalculator(); @@ -59,7 +63,15 @@ public String getName() { @Override public Comparator getComparator() { - return COMPARATOR; + if (NUM_RESOURCES == 2) { + // To improve performance, if we know we're dealing with the common + // case of only CPU and memory, then handle CPU and memory explicitly. + return COMPARATOR2; + } else { + // Otherwise, do it the generic way. + return COMPARATORN; + } + } @Override @@ -107,25 +119,56 @@ public Resource getHeadroom(Resource queueFairShare, Resource queueUsage, @Override public void initialize(FSContext fsContext) { - COMPARATOR.setFSContext(fsContext); + COMPARATORN.setFSContext(fsContext); + COMPARATOR2.setFSContext(fsContext); } /** * This class compares two {@link Schedulable} instances according to the * DRF policy. If neither instance is below min share, approximate fair share - * ratios are compared. + * ratios are compared. Subclasses of this class will do the actual work of + * the comparison, specialized for the number of configured resource types. */ - public static class DominantResourceFairnessComparator + public abstract static class DominantResourceFairnessComparator implements Comparator { - private FSContext fsContext; + protected FSContext fsContext; public void setFSContext(FSContext fsContext) { this.fsContext = fsContext; } + /** + * This method is used when apps are tied in fairness ratio. It breaks + * the tie by submit time and job name to get a deterministic ordering, + * which is useful for unit tests. + * + * @param s1 the first item to compare + * @param s2 the second item to compare + * @return < 0, 0, or > 0 if the first item is less than, equal to, + * or greater than the second item, respectively + */ + protected int compareAttribrutes(Schedulable s1, Schedulable s2) { + int res = (int) Math.signum(s1.getStartTime() - s2.getStartTime()); + + if (res == 0) { + res = s1.getName().compareTo(s2.getName()); + } + + return res; + } + } + + /** + * This class compares two {@link Schedulable} instances according to the + * DRF policy. If neither instance is below min share, approximate fair share + * ratios are compared. This class makes no assumptions about the number of + * resource types. + */ + @VisibleForTesting + static class DominantResourceFairnessComparatorN + extends DominantResourceFairnessComparator { @Override public int compare(Schedulable s1, Schedulable s2) { - ResourceInformation[] info = ResourceUtils.getResourceTypesArray(); Resource usage1 = s1.getResourceUsage(); Resource usage2 = s2.getResourceUsage(); Resource minShare1 = s1.getMinShare(); @@ -135,8 +178,8 @@ public int compare(Schedulable s1, Schedulable s2) { // These arrays hold the usage, fair, and min share ratios for each // resource type. ratios[0][x] are the usage ratios, ratios[1][x] are // the fair share ratios, and ratios[2][x] are the min share ratios. - float[][] ratios1 = new float[info.length][3]; - float[][] ratios2 = new float[info.length][3]; + float[][] ratios1 = new float[NUM_RESOURCES][3]; + float[][] ratios2 = new float[NUM_RESOURCES][3]; // Calculate cluster shares and approximate fair shares for each // resource type of both schedulables. @@ -155,7 +198,7 @@ public int compare(Schedulable s1, Schedulable s2) { usage2.getResources()[dominant2].getValue() < minShare2.getResources()[dominant2].getValue(); - int res = 0; + int res; if (!s2Needy && !s1Needy) { // Sort shares by usage ratio and compare them by approximate fair share @@ -176,13 +219,7 @@ public int compare(Schedulable s1, Schedulable s2) { } if (res == 0) { - // Apps are tied in fairness ratio. Break the tie by submit time and job - // name to get a deterministic ordering, which is useful for unit tests. - res = (int) Math.signum(s1.getStartTime() - s2.getStartTime()); - - if (res == 0) { - res = s1.getName().compareTo(s2.getName()); - } + res = compareAttribrutes(s1, s2); } return res; @@ -206,7 +243,7 @@ void sortRatios(float[][] ratios1, float[][]ratios2) { /** * Calculate a resource's usage ratio and approximate fair share ratio. - * The {@code shares} array will be populated with both the usage ratio + * The {@code ratios} array will be populated with both the usage ratio * and the approximate fair share ratio for each resource type. The usage * ratio is calculated as {@code resource} divided by {@code cluster}. * The approximate fair share ratio is calculated as the usage ratio @@ -221,18 +258,18 @@ void sortRatios(float[][] ratios1, float[][]ratios2) { * because when comparing resources, the resource with the higher weight * will be assigned by the scheduler a proportionally higher fair share. * - * The {@code shares} array must be at least n x 2, where n + * The {@code ratios} array must be at least n x 2, where n * is the number of resource types. Only the first and second indices of - * the inner arrays in the {@code shares} array will be used, e.g. - * {@code shares[x][0]} and {@code shares[x][1]}. + * the inner arrays in the {@code ratios} array will be used, e.g. + * {@code ratios[x][0]} and {@code ratios[x][1]}. * * The return value will be the index of the dominant resource type in the - * {@code shares} array. The dominant resource is the resource type for + * {@code ratios} array. The dominant resource is the resource type for * which {@code resource} has the largest usage ratio. * * @param resource the resource for which to calculate ratios * @param cluster the total cluster resources - * @param ratios the shares array to populate + * @param ratios the share ratios array to populate * @param weight the resource weight * @return the index of the resource type with the largest cluster share */ @@ -275,7 +312,7 @@ int calculateClusterAndFairRatios(Resource resource, Resource cluster, * * @param resource the resource for which to calculate min shares * @param minShare the min share - * @param ratios the shares array to populate + * @param ratios the share ratios array to populate */ @VisibleForTesting void calculateMinShareRatios(Resource resource, Resource minShare, @@ -320,4 +357,155 @@ int compareRatios(float[][] ratios1, float[][] ratios2, int index) { return ret; } } + + /** + * This class compares two {@link Schedulable} instances according to the + * DRF policy in the special case that only CPU and memory are configured. + * If neither instance is below min share, approximate fair share + * ratios are compared. + */ + @VisibleForTesting + static class DominantResourceFairnessComparator2 + extends DominantResourceFairnessComparator { + @Override + public int compare(Schedulable s1, Schedulable s2) { + ResourceInformation[] resourceInfo1 = + s1.getResourceUsage().getResources(); + ResourceInformation[] resourceInfo2 = + s2.getResourceUsage().getResources(); + ResourceInformation[] minShareInfo1 = s1.getMinShare().getResources(); + ResourceInformation[] minShareInfo2 = s2.getMinShare().getResources(); + ResourceInformation[] clusterInfo = + fsContext.getClusterResource().getResources(); + double[] shares1 = new double[2]; + double[] shares2 = new double[2]; + + int dominant1 = calculateClusterAndFairRatios(resourceInfo1, + s1.getWeight(), clusterInfo, shares1); + int dominant2 = calculateClusterAndFairRatios(resourceInfo2, + s2.getWeight(), clusterInfo, shares2); + + // A queue is needy for its min share if its dominant resource + // (with respect to the cluster capacity) is below its configured min + // share for that resource + boolean s1Needy = resourceInfo1[dominant1].getValue() < + minShareInfo1[dominant1].getValue(); + boolean s2Needy = resourceInfo1[dominant2].getValue() < + minShareInfo2[dominant2].getValue(); + + int res; + + if (!s2Needy && !s1Needy) { + res = (int) Math.signum(shares1[dominant1] - shares2[dominant2]); + + if (res == 0) { + // Because memory and CPU are indices 0 and 1, we can find the + // non-dominant index by subtracting the dominant index from 1. + res = (int) Math.signum(shares1[1 - dominant1] - + shares2[1 - dominant2]); + } + } else if (s1Needy && !s2Needy) { + res = -1; + } else if (s2Needy && !s1Needy) { + res = 1; + } else { + double[] minShares1 = + calculateMinShareRatios(resourceInfo1, minShareInfo1); + double[] minShares2 = + calculateMinShareRatios(resourceInfo2, minShareInfo2); + + res = (int) Math.signum(minShares1[dominant1] - minShares2[dominant2]); + + if (res == 0) { + res = (int) Math.signum(minShares1[1 - dominant1] - + minShares2[1 - dominant2]); + } + } + + if (res == 0) { + res = compareAttribrutes(s1, s2); + } + + return res; + } + + /** + * Calculate a resource's usage ratio and approximate fair share ratio + * assuming that CPU and memory are the only configured resource types. + * The {@code shares} array will be populated with the approximate fair + * share ratio for each resource type. The approximate fair share ratio + * is calculated as {@code resourceInfo} divided by {@code cluster} and + * the {@code weight}. If the cluster's resources are 100MB and + * 10 vcores, the usage ({@code resourceInfo}) is 10 MB and 5 CPU, and the + * weights are 2, the fair share ratios will be 0.05 and 0.25. + * + * The approximate fair share ratio is the usage divided by the + * approximate fair share, i.e. the cluster resources times the weight. + * The approximate fair share is an acceptable proxy for the fair share + * because when comparing resources, the resource with the higher weight + * will be assigned by the scheduler a proportionally higher fair share. + * + * The length of the {@code shares} array must be at least 2. + * + * The return value will be the index of the dominant resource type in the + * {@code shares} array. The dominant resource is the resource type for + * which {@code resourceInfo} has the largest usage ratio. + * + * @param resourceInfo the resource for which to calculate ratios + * @param weight the resource weight + * @param clusterInfo the total cluster resources + * @param shares the share ratios array to populate + * @return the index of the resource type with the largest cluster share + */ + @VisibleForTesting + int calculateClusterAndFairRatios(ResourceInformation[] resourceInfo, + float weight, ResourceInformation[] clusterInfo, double[] shares) { + int dominant; + + shares[Resource.MEMORY_INDEX] = + ((double) resourceInfo[Resource.MEMORY_INDEX].getValue()) / + clusterInfo[Resource.MEMORY_INDEX].getValue(); + shares[Resource.VCORES_INDEX] = + ((double) resourceInfo[Resource.VCORES_INDEX].getValue()) / + clusterInfo[Resource.VCORES_INDEX].getValue(); + dominant = + shares[Resource.VCORES_INDEX] > shares[Resource.MEMORY_INDEX] ? + Resource.VCORES_INDEX : Resource.MEMORY_INDEX; + + shares[Resource.MEMORY_INDEX] /= weight; + shares[Resource.VCORES_INDEX] /= weight; + + return dominant; + } + + /** + * Calculate a resource's min share ratios assuming that CPU and memory + * are the only configured resource types. The return array will be + * populated with the {@code resourceInfo} divided by {@code minShareInfo} + * for each resource type. If the min shares are 5 MB and 10 vcores, and + * the usage ({@code resourceInfo}) is 10 MB and 5 CPU, the ratios will + * be 2 and 0.5. + * + * The length of the {@code ratios} array must be 2. + * + * @param resourceInfo the resource for which to calculate min shares + * @param minShareInfo the min share + * @return the share ratios + */ + @VisibleForTesting + double[] calculateMinShareRatios(ResourceInformation[] resourceInfo, + ResourceInformation[] minShareInfo) { + double[] minShares1 = new double[2]; + + // both are needy below min share + minShares1[Resource.MEMORY_INDEX] = + ((double) resourceInfo[Resource.MEMORY_INDEX].getValue()) / + minShareInfo[Resource.MEMORY_INDEX].getValue(); + minShares1[Resource.VCORES_INDEX] = + ((double) resourceInfo[Resource.VCORES_INDEX].getValue()) / + minShareInfo[Resource.VCORES_INDEX].getValue(); + + return minShares1; + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/TestDominantResourceFairnessPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/TestDominantResourceFairnessPolicy.java index 097558feb1..03fd1efbce 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/TestDominantResourceFairnessPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/TestDominantResourceFairnessPolicy.java @@ -24,21 +24,22 @@ import java.util.Comparator; import java.util.Map; + import org.apache.curator.shaded.com.google.common.base.Joiner; import org.apache.hadoop.conf.Configuration; - import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSContext; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FakeSchedulable; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy.DominantResourceFairnessComparator; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy.DominantResourceFairnessComparatorN; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy.DominantResourceFairnessComparator2; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Assert; -import org.junit.BeforeClass; +import org.junit.Before; import org.junit.Test; /** @@ -46,8 +47,8 @@ * container before sched2 */ public class TestDominantResourceFairnessPolicy { - @BeforeClass - public static void setup() { + @Before + public void setup() { addResources("test"); } @@ -77,7 +78,6 @@ private Schedulable createSchedulable(int memUsage, int cpuUsage, return createSchedulable(memUsage, cpuUsage, weights, 0, 0); } - private Schedulable createSchedulable(int memUsage, int cpuUsage, float weights, int minMemShare, int minCpuShare) { Resource usage = BuilderUtils.newResource(memUsage, cpuUsage); @@ -97,6 +97,12 @@ public void testSameDominantResource() { c.compare(s1, s2) < 0); } + @Test + public void testSameDominantResource2() { + ResourceUtils.resetResourceTypes(new Configuration()); + testSameDominantResource(); + } + @Test public void testDifferentDominantResource() { Comparator c = createComparator(8000, 8); @@ -107,6 +113,12 @@ public void testDifferentDominantResource() { c.compare(s1, s2) < 0); } + @Test + public void testDifferentDominantResource2() { + ResourceUtils.resetResourceTypes(new Configuration()); + testDifferentDominantResource(); + } + @Test public void testOneIsNeedy() { Comparator c = createComparator(8000, 8); @@ -117,6 +129,12 @@ public void testOneIsNeedy() { c.compare(s1, s2) < 0); } + @Test + public void testOneIsNeedy2() { + ResourceUtils.resetResourceTypes(new Configuration()); + testOneIsNeedy(); + } + @Test public void testBothAreNeedy() { Comparator c = createComparator(8000, 100); @@ -137,6 +155,12 @@ public void testBothAreNeedy() { c.compare(s1, s2) < 0); } + @Test + public void testBothAreNeedy2() { + ResourceUtils.resetResourceTypes(new Configuration()); + testBothAreNeedy(); + } + @Test public void testEvenWeightsSameDominantResource() { assertTrue(createComparator(8000, 8).compare( @@ -147,6 +171,12 @@ public void testEvenWeightsSameDominantResource() { createSchedulable(1000, 2)) < 0); } + @Test + public void testEvenWeightsSameDominantResource2() { + ResourceUtils.resetResourceTypes(new Configuration()); + testEvenWeightsSameDominantResource(); + } + @Test public void testEvenWeightsDifferentDominantResource() { assertTrue(createComparator(8000, 8).compare( @@ -157,14 +187,20 @@ public void testEvenWeightsDifferentDominantResource() { createSchedulable(1000, 2)) < 0); } + @Test + public void testEvenWeightsDifferentDominantResource2() { + ResourceUtils.resetResourceTypes(new Configuration()); + testEvenWeightsDifferentDominantResource(); + } + @Test public void testSortShares() { float[][] ratios1 = {{0.3f, 2.0f}, {0.2f, 1.0f}, {0.4f, 0.1f}}; float[][] ratios2 = {{0.2f, 9.0f}, {0.3f, 2.0f}, {0.25f, 0.1f}}; float[][] expected1 = {{0.4f, 0.1f}, {0.3f, 2.0f}, {0.2f, 1.0f}}; float[][] expected2 = {{0.3f, 2.0f}, {0.25f, 0.1f}, {0.2f, 9.0f}}; - DominantResourceFairnessComparator comparator = - new DominantResourceFairnessComparator(); + DominantResourceFairnessComparatorN comparator = + new DominantResourceFairnessComparatorN(); comparator.sortRatios(ratios1, ratios2); @@ -184,8 +220,8 @@ public void testCalculateClusterAndFairRatios() { Resource used = Resources.createResource(10, 5); Resource capacity = Resources.createResource(100, 10); float[][] shares = new float[3][2]; - DominantResourceFairnessComparator comparator = - new DominantResourceFairnessComparator(); + DominantResourceFairnessComparatorN comparator = + new DominantResourceFairnessComparatorN(); used.setResourceValue("test", 2L); capacity.setResourceValue("test", 5L); @@ -206,14 +242,34 @@ public void testCalculateClusterAndFairRatios() { dominant); } + @Test + public void testCalculateClusterAndFairRatios2() { + ResourceUtils.resetResourceTypes(new Configuration()); + Resource used = Resources.createResource(10, 5); + Resource capacity = Resources.createResource(100, 10); + double[] shares = new double[2]; + DominantResourceFairnessComparator2 comparator = + new DominantResourceFairnessComparator2(); + int dominant = + comparator.calculateClusterAndFairRatios(used.getResources(), 1.0f, + capacity.getResources(), shares); + + assertEquals("Calculated usage ratio for memory (10MB out of 100MB) is " + + "incorrect", 0.1, shares[Resource.MEMORY_INDEX], .00001); + assertEquals("Calculated usage ratio for vcores (5 out of 10) is " + + "incorrect", 0.5, shares[Resource.VCORES_INDEX], .00001); + assertEquals("The wrong dominant resource index was returned", + Resource.VCORES_INDEX, dominant); + } + @Test public void testCalculateMinShareRatios() { Map index = ResourceUtils.getResourceTypeIndex(); Resource used = Resources.createResource(10, 5); Resource minShares = Resources.createResource(5, 10); float[][] ratios = new float[3][3]; - DominantResourceFairnessComparator comparator = - new DominantResourceFairnessComparator(); + DominantResourceFairnessComparatorN comparator = + new DominantResourceFairnessComparatorN(); used.setResourceValue("test", 2L); minShares.setResourceValue("test", 0L); @@ -231,6 +287,24 @@ public void testCalculateMinShareRatios() { 0.00001f); } + @Test + public void testCalculateMinShareRatios2() { + ResourceUtils.resetResourceTypes(new Configuration()); + Resource used = Resources.createResource(10, 5); + Resource minShares = Resources.createResource(5, 10); + DominantResourceFairnessComparator2 comparator = + new DominantResourceFairnessComparator2(); + + double[] ratios = + comparator.calculateMinShareRatios(used.getResources(), + minShares.getResources()); + + assertEquals("Calculated min share ratio for memory (10MB out of 5MB) is " + + "incorrect", 2.0, ratios[Resource.MEMORY_INDEX], .00001f); + assertEquals("Calculated min share ratio for vcores (5 out of 10) is " + + "incorrect", 0.5, ratios[Resource.VCORES_INDEX], .00001f); + } + @Test public void testCompareShares() { float[][] ratios1 = { @@ -248,8 +322,8 @@ public void testCompareShares() { {0.2f, 0.1f, 2.0f}, {0.1f, 2.0f, 1.0f} }; - DominantResourceFairnessComparator comparator = - new DominantResourceFairnessComparator(); + DominantResourceFairnessComparatorN comparator = + new DominantResourceFairnessComparatorN(); int ret = comparator.compareRatios(ratios1, ratios2, 0);