diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java index c8f68a26a0..c3c594512b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java @@ -31,12 +31,16 @@ import java.util.Comparator; import java.util.Iterator; import java.util.PriorityQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Calculate how much resources need to be preempted for each queue, * will be used by {@link PreemptionCandidatesSelector}. */ public class AbstractPreemptableResourceCalculator { + private static final Logger LOG = LoggerFactory.getLogger( + AbstractPreemptableResourceCalculator.class); protected final CapacitySchedulerPreemptionContext context; protected final ResourceCalculator rc; @@ -76,6 +80,34 @@ private double getIdealPctOfGuaranteed(TempQueuePerPartition q) { } } + private static class NormalizationTuple { + private Resource numerator; + private Resource denominator; + + NormalizationTuple(Resource numer, Resource denom) { + this.numerator = numer; + this.denominator = denom; + } + + long getNumeratorValue(int i) { + return numerator.getResourceInformation(i).getValue(); + } + + long getDenominatorValue(int i) { + String nUnits = numerator.getResourceInformation(i).getUnits(); + ResourceInformation dResourceInformation = denominator + .getResourceInformation(i); + return UnitsConversionUtil.convert( + dResourceInformation.getUnits(), nUnits, dResourceInformation.getValue()); + } + + float getNormalizedValue(int i) { + long nValue = getNumeratorValue(i); + long dValue = getDenominatorValue(i); + return dValue == 0 ? 0.0f : (float) nValue / dValue; + } + } + /** * PreemptableResourceCalculator constructor. * @@ -175,7 +207,7 @@ protected void computeFixpointAllocation(Resource totGuarant, unassigned, Resources.none())) { // we compute normalizedGuarantees capacity based on currently active // queues - resetCapacity(unassigned, orderedByNeed, ignoreGuarantee); + resetCapacity(orderedByNeed, ignoreGuarantee); // For each underserved queue (or set of queues if multiple are equally // underserved), offer its share of the unassigned resources based on its @@ -252,47 +284,146 @@ protected void initIdealAssignment(Resource totGuarant, /** * Computes a normalizedGuaranteed capacity based on active queues. * - * @param clusterResource - * the total amount of resources in the cluster * @param queues * the list of queues to consider * @param ignoreGuar * ignore guarantee. */ - private void resetCapacity(Resource clusterResource, - Collection queues, boolean ignoreGuar) { + private void resetCapacity(Collection queues, + boolean ignoreGuar) { Resource activeCap = Resource.newInstance(0, 0); + float activeTotalAbsCap = 0.0f; int maxLength = ResourceUtils.getNumberOfCountableResourceTypes(); if (ignoreGuar) { - for (TempQueuePerPartition q : queues) { - for (int i = 0; i < maxLength; i++) { - q.normalizedGuarantee[i] = 1.0f / queues.size(); + for (int i = 0; i < maxLength; i++) { + for (TempQueuePerPartition q : queues) { + computeNormGuarEvenly(q, queues.size(), i); } } } else { for (TempQueuePerPartition q : queues) { Resources.addTo(activeCap, q.getGuaranteed()); + activeTotalAbsCap += q.getAbsCapacity(); } - for (TempQueuePerPartition q : queues) { - for (int i = 0; i < maxLength; i++) { - ResourceInformation nResourceInformation = q.getGuaranteed() - .getResourceInformation(i); - ResourceInformation dResourceInformation = activeCap - .getResourceInformation(i); - long nValue = nResourceInformation.getValue(); - long dValue = UnitsConversionUtil.convert( - dResourceInformation.getUnits(), nResourceInformation.getUnits(), - dResourceInformation.getValue()); - if (dValue != 0) { - q.normalizedGuarantee[i] = (float) nValue / dValue; + // loop through all resource types and normalize guaranteed capacity for all queues + for (int i = 0; i < maxLength; i++) { + boolean useAbsCapBasedNorm = false; + // if the sum of absolute capacity of all queues involved is 0, + // we should normalize evenly + boolean useEvenlyDistNorm = activeTotalAbsCap == 0; + + // loop through all the queues once to determine the + // right normalization strategy for current processing resource type + for (TempQueuePerPartition q : queues) { + NormalizationTuple normTuple = new NormalizationTuple( + q.getGuaranteed(), activeCap); + long queueGuaranValue = normTuple.getNumeratorValue(i); + long totalActiveGuaranValue = normTuple.getDenominatorValue(i); + + if (queueGuaranValue == 0 && q.getAbsCapacity() != 0 && totalActiveGuaranValue != 0) { + // when the rounded value of a resource type is 0 but its absolute capacity is not 0, + // we should consider taking the normalized guarantee based on absolute capacity + useAbsCapBasedNorm = true; + break; + } + + if (totalActiveGuaranValue == 0) { + // If totalActiveGuaranValue from activeCap is zero, that means the guaranteed capacity + // of this resource dimension for all active queues is tiny (close to 0). + // For example, if a queue has 1% of minCapacity on a cluster with a totalVcores of 48, + // then the idealAssigned Vcores for this queue is (48 * 0.01)=0.48 which then + // get rounded/casted into 0 (double -> long) + // In this scenario where the denominator is 0, we can just spread resources across + // all tiny queues evenly since their absoluteCapacity are roughly the same + useEvenlyDistNorm = true; + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Queue normalization strategy: " + + "absoluteCapacityBasedNormalization(" + useAbsCapBasedNorm + + "), evenlyDistributedNormalization(" + useEvenlyDistNorm + + "), defaultNormalization(" + !(useAbsCapBasedNorm || useEvenlyDistNorm) + ")"); + } + + // loop through all the queues again to apply normalization strategy + for (TempQueuePerPartition q : queues) { + if (useAbsCapBasedNorm) { + computeNormGuarFromAbsCapacity(q, activeTotalAbsCap, i); + } else if (useEvenlyDistNorm) { + computeNormGuarEvenly(q, queues.size(), i); + } else { + computeDefaultNormGuar(q, activeCap, i); } } } } } + /** + * Computes the normalized guaranteed capacity based on the weight of a queue's abs capacity. + * + * Example: + * There are two active queues: queueA & queueB, and + * their configured absolute minimum capacity is 1% and 3% respectively. + * + * Then their normalized guaranteed capacity are: + * normalized_guar_queueA = 0.01 / (0.01 + 0.03) = 0.25 + * normalized_guar_queueB = 0.03 / (0.01 + 0.03) = 0.75 + * + * @param q + * the queue to consider + * @param activeTotalAbsCap + * the sum of absolute capacity of all active queues + * @param resourceTypeIdx + * index of the processing resource type + */ + private static void computeNormGuarFromAbsCapacity(TempQueuePerPartition q, + float activeTotalAbsCap, + int resourceTypeIdx) { + if (activeTotalAbsCap != 0) { + q.normalizedGuarantee[resourceTypeIdx] = q.getAbsCapacity() / activeTotalAbsCap; + } + } + + /** + * Computes the normalized guaranteed capacity evenly based on num of active queues. + * + * @param q + * the queue to consider + * @param numOfActiveQueues + * number of active queues + * @param resourceTypeIdx + * index of the processing resource type + */ + private static void computeNormGuarEvenly(TempQueuePerPartition q, + int numOfActiveQueues, + int resourceTypeIdx) { + q.normalizedGuarantee[resourceTypeIdx] = 1.0f / numOfActiveQueues; + } + + /** + * The default way to compute a queue's normalized guaranteed capacity. + * + * For each resource type, divide a queue's configured guaranteed amount (MBs/Vcores) by + * the total amount of guaranteed resource of all active queues + * + * @param q + * the queue to consider + * @param activeCap + * total guaranteed resources of all active queues + * @param resourceTypeIdx + * index of the processing resource type + */ + private static void computeDefaultNormGuar(TempQueuePerPartition q, + Resource activeCap, + int resourceTypeIdx) { + NormalizationTuple normTuple = new NormalizationTuple(q.getGuaranteed(), activeCap); + q.normalizedGuarantee[resourceTypeIdx] = normTuple.getNormalizedValue(resourceTypeIdx); + } + // Take the most underserved TempQueue (the one on the head). Collect and // return the list of all queues that have the same idealAssigned // percentage of guaranteed. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java index 958c08e803..78075bb5c1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java @@ -201,6 +201,10 @@ Resource offer(Resource avail, ResourceCalculator rc, return remain; } + public float getAbsCapacity() { + return absCapacity; + } + public Resource getGuaranteed() { if(!effMinRes.equals(Resources.none())) { return Resources.clone(effMinRes);