diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index f8a4937922..c6d4b36d89 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -96,6 +96,8 @@ Release 2.0.5-beta - UNRELEASED YARN-497. Yarn unmanaged-am launcher jar does not define a main class in its manifest (Hitesh Shah via bikas) + YARN-469. Make scheduling mode in FS pluggable. (kkambatl via tucu) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index ff51e9506a..2c8555005b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -92,13 +92,7 @@ public void setSchedulingMode(SchedulingMode mode) { @Override public void recomputeFairShares() { - if (schedulingMode == SchedulingMode.FAIR) { - SchedulingAlgorithms.computeFairShares(appScheds, getFairShare()); - } else { - for (AppSchedulable sched: appScheds) { - sched.setFairShare(Resources.createResource(0)); - } - } + schedulingMode.computeShares(getAppSchedulables(), getFairShare()); } @Override @@ -162,17 +156,9 @@ public Resource assignContainer(FSSchedulerNode node, boolean reserved) { return Resources.none(); // We should never get here } - // Otherwise, chose app to schedule based on given policy (fair vs fifo). + // Otherwise, chose app to schedule based on given policy. else { - Comparator comparator; - if (schedulingMode == SchedulingMode.FIFO) { - comparator = new SchedulingAlgorithms.FifoComparator(); - } else if (schedulingMode == SchedulingMode.FAIR) { - comparator = new SchedulingAlgorithms.FairShareComparator(); - } else { - throw new RuntimeException("Unsupported queue scheduling mode " + - schedulingMode); - } + Comparator comparator = schedulingMode.getComparator(); Collections.sort(appScheds, comparator); for (AppSchedulable sched: appScheds) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java index e6b5a8f494..dec5d888bd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java @@ -51,7 +51,7 @@ public void addChildQueue(FSQueue child) { @Override public void recomputeFairShares() { - SchedulingAlgorithms.computeFairShares(childQueues, getFairShare()); + SchedulingMode.getDefault().computeShares(childQueues, getFairShare()); for (FSQueue childQueue : childQueues) { childQueue.getMetrics().setAvailableResourcesToQueue(childQueue.getFairShare()); childQueue.recomputeFairShares(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 45cad65738..53cc45a382 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -803,7 +803,7 @@ private synchronized void nodeUpdate(RMNode nm) { // At most one task is scheduled each iteration of this loop List scheds = new ArrayList( queueMgr.getLeafQueues()); - Collections.sort(scheds, new SchedulingAlgorithms.FairShareComparator()); + Collections.sort(scheds, SchedulingMode.getDefault().getComparator()); boolean assignedContainer = false; for (FSLeafQueue sched : scheds) { Resource assigned = sched.assignContainer(node, false); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java index b10bfa0569..fc76d02c5f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java @@ -310,7 +310,7 @@ public void reloadAllocs() throws IOException, ParserConfigurationException, int queueMaxAppsDefault = Integer.MAX_VALUE; long fairSharePreemptionTimeout = Long.MAX_VALUE; long defaultMinSharePreemptionTimeout = Long.MAX_VALUE; - SchedulingMode defaultSchedulingMode = SchedulingMode.FAIR; + SchedulingMode defaultSchedulingMode = SchedulingMode.getDefault(); // Remember all queue names so we can display them on web UI, etc. List queueNamesInAllocFile = new ArrayList(); @@ -373,7 +373,8 @@ public void reloadAllocs() throws IOException, ParserConfigurationException, queueMaxAppsDefault = val;} else if ("defaultQueueSchedulingMode".equals(element.getTagName())) { String text = ((Text)element.getFirstChild()).getData().trim(); - defaultSchedulingMode = parseSchedulingMode(text); + SchedulingMode.setDefault(text); + defaultSchedulingMode = SchedulingMode.getDefault(); } else { LOG.warn("Bad element in allocations file: " + element.getTagName()); } @@ -449,7 +450,7 @@ private void loadQueue(String parentName, Element element, Map minSharePreemptionTimeouts.put(queueName, val); } else if ("schedulingMode".equals(field.getTagName())) { String text = ((Text)field.getFirstChild()).getData().trim(); - queueModes.put(queueName, parseSchedulingMode(text)); + queueModes.put(queueName, SchedulingMode.parse(text)); } else if ("aclSubmitApps".equals(field.getTagName())) { String text = ((Text)field.getFirstChild()).getData().trim(); acls.put(QueueACL.SUBMIT_APPLICATIONS, new AccessControlList(text)); @@ -476,19 +477,6 @@ private void loadQueue(String parentName, Element element, Map } } - private SchedulingMode parseSchedulingMode(String text) - throws AllocationConfigurationException { - text = text.toLowerCase(); - if (text.equals("fair")) { - return SchedulingMode.FAIR; - } else if (text.equals("fifo")) { - return SchedulingMode.FIFO; - } else { - throw new AllocationConfigurationException( - "Unknown scheduling mode : " + text + "; expected 'fifo' or 'fair'"); - } - } - /** * Get the minimum resource allocation for the given queue. * @return the cap set on this queue, or 0 if not set. @@ -663,7 +651,7 @@ public QueueManagerInfo() { minSharePreemptionTimeouts = new HashMap(); defaultMinSharePreemptionTimeout = Long.MAX_VALUE; fairSharePreemptionTimeout = Long.MAX_VALUE; - defaultSchedulingMode = SchedulingMode.FAIR; + defaultSchedulingMode = SchedulingMode.getDefault(); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java index cbcbd46f5f..1dc0630dd7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java @@ -55,7 +55,7 @@ */ @Private @Unstable -abstract class Schedulable { +public abstract class Schedulable { /** Fair share assigned to this Schedulable */ private Resource fairShare = Resources.createResource(0); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingMode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingMode.java index b84d824f6c..96948524a1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingMode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingMode.java @@ -15,17 +15,104 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.classification.InterfaceStability.Unstable; +import java.util.Collection; +import java.util.Comparator; +import java.util.concurrent.ConcurrentHashMap; -/** - * Internal scheduling modes for queues. - */ -@Private +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.modes.FairSchedulingMode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.modes.FifoSchedulingMode; + +@Public @Unstable -public enum SchedulingMode { - FAIR, FIFO +public abstract class SchedulingMode { + private static final ConcurrentHashMap, SchedulingMode> instances = + new ConcurrentHashMap, SchedulingMode>(); + + private static SchedulingMode DEFAULT_MODE = + getInstance(FairSchedulingMode.class); + + public static SchedulingMode getDefault() { + return DEFAULT_MODE; + } + + public static void setDefault(String className) + throws AllocationConfigurationException { + DEFAULT_MODE = parse(className); + } + + /** + * Returns a {@link SchedulingMode} instance corresponding to the passed clazz + */ + public static SchedulingMode getInstance(Class clazz) { + SchedulingMode mode = instances.get(clazz); + if (mode == null) { + mode = ReflectionUtils.newInstance(clazz, null); + instances.put(clazz, mode); + } + return mode; + } + + /** + * Returns {@link SchedulingMode} instance corresponding to the + * {@link SchedulingMode} passed as a string. The mode can be "fair" for + * FairSchedulingMode of "fifo" for FifoSchedulingMode. For custom + * {@link SchedulingMode}s in the RM classpath, the mode should be canonical + * class name of the {@link SchedulingMode}. + * + * @param mode canonical class name or "fair" or "fifo" + * @throws AllocationConfigurationException + */ + @SuppressWarnings("unchecked") + public static SchedulingMode parse(String mode) + throws AllocationConfigurationException { + @SuppressWarnings("rawtypes") + Class clazz; + String text = mode.toLowerCase(); + if (text.equals("fair")) { + clazz = FairSchedulingMode.class; + } else if (text.equals("fifo")) { + clazz = FifoSchedulingMode.class; + } else { + try { + clazz = Class.forName(mode); + } catch (ClassNotFoundException cnfe) { + throw new AllocationConfigurationException(mode + + " SchedulingMode class not found!"); + } + } + if (!SchedulingMode.class.isAssignableFrom(clazz)) { + throw new AllocationConfigurationException(mode + + " does not extend SchedulingMode"); + } + return getInstance(clazz); + } + + /** + * @return returns the name of SchedulingMode + */ + public abstract String getName(); + + /** + * The comparator returned by this method is to be used for sorting the + * {@link Schedulable}s in that queue. + * + * @return the comparator to sort by + */ + public abstract Comparator getComparator(); + + /** + * Computes and updates the shares of {@link Schedulable}s as per the + * SchedulingMode, to be used later at schedule time. + * + * @param schedulables {@link Schedulable}s whose shares are to be updated + * @param totalResources Total {@link Resource}s in the cluster + */ + public abstract void computeShares( + Collection schedulables, Resource totalResources); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingAlgorithms.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/modes/FairSchedulingMode.java similarity index 71% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingAlgorithms.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/modes/FairSchedulingMode.java index a7a321c5f9..56a37f4889 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingAlgorithms.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/modes/FairSchedulingMode.java @@ -15,65 +15,44 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - -package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.modes; import java.io.Serializable; import java.util.Collection; import java.util.Comparator; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Resources; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingMode; -/** - * Utility class containing scheduling algorithms used in the fair scheduler. - */ -@Private -@Unstable -class SchedulingAlgorithms { - public static final Log LOG = LogFactory.getLog( - SchedulingAlgorithms.class.getName()); +import com.google.common.annotations.VisibleForTesting; - /** - * Compare Schedulables in order of priority and then submission time, as in - * the default FIFO scheduler in Hadoop. - */ - public static class FifoComparator implements Comparator, Serializable { - private static final long serialVersionUID = -5905036205491177060L; +public class FairSchedulingMode extends SchedulingMode { + @VisibleForTesting + public static final String NAME = "FairShare"; + private FairShareComparator comparator = new FairShareComparator(); - @Override - public int compare(Schedulable s1, Schedulable s2) { - int res = s1.getPriority().compareTo(s2.getPriority()); - if (res == 0) { - res = (int) Math.signum(s1.getStartTime() - s2.getStartTime()); - } - if (res == 0) { - // In the rare case where jobs were submitted at the exact same time, - // compare them by name (which will be the JobID) to get a deterministic - // ordering, so we don't alternately launch tasks from different jobs. - res = s1.getName().compareTo(s2.getName()); - } - return res; - } + @Override + public String getName() { + return NAME; } /** * Compare Schedulables via weighted fair sharing. In addition, Schedulables * below their min share get priority over those whose min share is met. - * + * * Schedulables below their min share are compared by how far below it they * are as a ratio. For example, if job A has 8 out of a min share of 10 tasks * and job B has 50 out of a min share of 100, then job B is scheduled next, * because B is at 50% of its min share and A is at 80% of its min share. - * + * * Schedulables above their min share are compared by (runningTasks / weight). * If all weights are equal, slots are given to the job with the fewest tasks; * otherwise, jobs with more weight get proportionally more slots. */ - public static class FairShareComparator implements Comparator, Serializable { + private static class FairShareComparator implements Comparator, + Serializable { private static final long serialVersionUID = 5564969375856699313L; @Override @@ -85,10 +64,10 @@ public int compare(Schedulable s1, Schedulable s2) { boolean s1Needy = Resources.lessThan(s1.getResourceUsage(), minShare1); boolean s2Needy = Resources.lessThan(s2.getResourceUsage(), minShare2); Resource one = Resources.createResource(1); - minShareRatio1 = (double) s1.getResourceUsage().getMemory() / - Resources.max(minShare1, one).getMemory(); - minShareRatio2 = (double) s2.getResourceUsage().getMemory() / - Resources.max(minShare2, one).getMemory(); + minShareRatio1 = (double) s1.getResourceUsage().getMemory() + / Resources.max(minShare1, one).getMemory(); + minShareRatio2 = (double) s2.getResourceUsage().getMemory() + / Resources.max(minShare2, one).getMemory(); useToWeightRatio1 = s1.getResourceUsage().getMemory() / s1.getWeight(); useToWeightRatio2 = s2.getResourceUsage().getMemory() / s2.getWeight(); int res = 0; @@ -98,7 +77,8 @@ else if (s2Needy && !s1Needy) res = 1; else if (s1Needy && s2Needy) res = (int) Math.signum(minShareRatio1 - minShareRatio2); - else // Neither schedulable is needy + else + // Neither schedulable is needy res = (int) Math.signum(useToWeightRatio1 - useToWeightRatio2); if (res == 0) { // Apps are tied in fairness ratio. Break the tie by submit time and job @@ -111,6 +91,17 @@ else if (s1Needy && s2Needy) } } + @Override + public Comparator getComparator() { + return comparator; + } + + @Override + public void computeShares(Collection schedulables, + Resource totalResources) { + computeFairShares(schedulables, totalResources); + } + /** * Number of iterations for the binary search in computeFairShares. This is * equivalent to the number of bits of precision in the output. 25 iterations @@ -121,43 +112,42 @@ else if (s1Needy && s2Needy) /** * Given a set of Schedulables and a number of slots, compute their weighted * fair shares. The min shares and demands of the Schedulables are assumed to - * be set beforehand. We compute the fairest possible allocation of shares - * to the Schedulables that respects their min shares and demands. - * + * be set beforehand. We compute the fairest possible allocation of shares to + * the Schedulables that respects their min shares and demands. + * * To understand what this method does, we must first define what weighted * fair sharing means in the presence of minimum shares and demands. If there * were no minimum shares and every Schedulable had an infinite demand (i.e. * could launch infinitely many tasks), then weighted fair sharing would be * achieved if the ratio of slotsAssigned / weight was equal for each - * Schedulable and all slots were assigned. Minimum shares and demands add - * two further twists: - * - Some Schedulables may not have enough tasks to fill all their share. - * - Some Schedulables may have a min share higher than their assigned share. - * - * To deal with these possibilities, we define an assignment of slots as - * being fair if there exists a ratio R such that: - * - Schedulables S where S.demand < R * S.weight are assigned share S.demand - * - Schedulables S where S.minShare > R * S.weight are given share S.minShare - * - All other Schedulables S are assigned share R * S.weight - * - The sum of all the shares is totalSlots. - * + * Schedulable and all slots were assigned. Minimum shares and demands add two + * further twists: - Some Schedulables may not have enough tasks to fill all + * their share. - Some Schedulables may have a min share higher than their + * assigned share. + * + * To deal with these possibilities, we define an assignment of slots as being + * fair if there exists a ratio R such that: - Schedulables S where S.demand < + * R * S.weight are assigned share S.demand - Schedulables S where S.minShare + * > R * S.weight are given share S.minShare - All other Schedulables S are + * assigned share R * S.weight - The sum of all the shares is totalSlots. + * * We call R the weight-to-slots ratio because it converts a Schedulable's * weight to the number of slots it is assigned. - * + * * We compute a fair allocation by finding a suitable weight-to-slot ratio R. - * To do this, we use binary search. Given a ratio R, we compute the number - * of slots that would be used in total with this ratio (the sum of the shares + * To do this, we use binary search. Given a ratio R, we compute the number of + * slots that would be used in total with this ratio (the sum of the shares * computed using the conditions above). If this number of slots is less than * totalSlots, then R is too small and more slots could be assigned. If the * number of slots is more than totalSlots, then R is too large. - * + * * We begin the binary search with a lower bound on R of 0 (which means that * all Schedulables are only given their minShare) and an upper bound computed * to be large enough that too many slots are given (by doubling R until we - * either use more than totalSlots slots or we fulfill all jobs' demands). - * The helper method slotsUsedWithWeightToSlotRatio computes the total number - * of slots used with a given value of R. - * + * either use more than totalSlots slots or we fulfill all jobs' demands). The + * helper method slotsUsedWithWeightToSlotRatio computes the total number of + * slots used with a given value of R. + * * The running time of this algorithm is linear in the number of Schedulables, * because slotsUsedWithWeightToSlotRatio is linear-time and the number of * iterations of binary search is a constant (dependent on desired precision). @@ -168,12 +158,13 @@ public static void computeFairShares( // at R = 1 and double it until we have either used totalSlots slots or we // have met all Schedulables' demands (if total demand < totalSlots). Resource totalDemand = Resources.createResource(0); - for (Schedulable sched: schedulables) { + for (Schedulable sched : schedulables) { Resources.addTo(totalDemand, sched.getDemand()); } Resource cap = Resources.min(totalDemand, totalResources); double rMax = 1.0; - while (Resources.lessThan(resUsedWithWeightToResRatio(rMax, schedulables), cap)) { + while (Resources.lessThan(resUsedWithWeightToResRatio(rMax, schedulables), + cap)) { rMax *= 2.0; } // Perform the binary search for up to COMPUTE_FAIR_SHARES_ITERATIONS steps @@ -181,27 +172,28 @@ public static void computeFairShares( double right = rMax; for (int i = 0; i < COMPUTE_FAIR_SHARES_ITERATIONS; i++) { double mid = (left + right) / 2.0; - if (Resources.lessThan(resUsedWithWeightToResRatio(mid, schedulables), cap)) { + if (Resources.lessThan(resUsedWithWeightToResRatio(mid, schedulables), + cap)) { left = mid; } else { right = mid; } } // Set the fair shares based on the value of R we've converged to - for (Schedulable sched: schedulables) { + for (Schedulable sched : schedulables) { sched.setFairShare(computeShare(sched, right)); } } /** - * Compute the number of slots that would be used given a weight-to-slot - * ratio w2sRatio, for use in the computeFairShares algorithm as described - * in #{@link SchedulingAlgorithms#computeFairShares(Collection, double)}. + * Compute the number of slots that would be used given a weight-to-slot ratio + * w2sRatio, for use in the computeFairShares algorithm as described in # + * {@link SchedulingAlgorithms#computeFairShares(Collection, double)}. */ private static Resource resUsedWithWeightToResRatio(double w2sRatio, Collection schedulables) { Resource slotsTaken = Resources.createResource(0); - for (Schedulable sched: schedulables) { + for (Schedulable sched : schedulables) { Resource share = computeShare(sched, w2sRatio); Resources.addTo(slotsTaken, share); } @@ -210,8 +202,8 @@ private static Resource resUsedWithWeightToResRatio(double w2sRatio, /** * Compute the resources assigned to a Schedulable given a particular - * res-to-slot ratio r2sRatio, for use in computeFairShares as described - * in #{@link SchedulingAlgorithms#computeFairShares(Collection, double)}. + * res-to-slot ratio r2sRatio, for use in computeFairShares as described in # + * {@link SchedulingAlgorithms#computeFairShares(Collection, double)}. */ private static Resource computeShare(Schedulable sched, double r2sRatio) { double share = sched.getWeight() * r2sRatio; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/modes/FifoSchedulingMode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/modes/FifoSchedulingMode.java new file mode 100644 index 0000000000..f3c5e368dd --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/modes/FifoSchedulingMode.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.modes; + +import java.io.Serializable; +import java.util.Collection; +import java.util.Comparator; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Resources; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingMode; + +import com.google.common.annotations.VisibleForTesting; + +public class FifoSchedulingMode extends SchedulingMode { + @VisibleForTesting + public static final String NAME = "FIFO"; + private FifoComparator comparator = new FifoComparator(); + + @Override + public String getName() { + return NAME; + } + + /** + * Compare Schedulables in order of priority and then submission time, as in + * the default FIFO scheduler in Hadoop. + */ + static class FifoComparator implements Comparator, Serializable { + private static final long serialVersionUID = -5905036205491177060L; + + @Override + public int compare(Schedulable s1, Schedulable s2) { + int res = s1.getPriority().compareTo(s2.getPriority()); + if (res == 0) { + res = (int) Math.signum(s1.getStartTime() - s2.getStartTime()); + } + if (res == 0) { + // In the rare case where jobs were submitted at the exact same time, + // compare them by name (which will be the JobID) to get a deterministic + // ordering, so we don't alternately launch tasks from different jobs. + res = s1.getName().compareTo(s2.getName()); + } + return res; + } + } + + @Override + public Comparator getComparator() { + return comparator; + } + + @Override + public void computeShares(Collection schedulables, + Resource totalResources) { + for (Schedulable sched : schedulables) { + sched.setFairShare(Resources.createResource(0)); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestComputeFairShares.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestComputeFairShares.java index 05054b332e..3a36fc15ea 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestComputeFairShares.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestComputeFairShares.java @@ -24,6 +24,7 @@ import junit.framework.Assert; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.modes.FairSchedulingMode; import org.junit.Before; import org.junit.Test; @@ -32,10 +33,12 @@ */ public class TestComputeFairShares { private List scheds; + private SchedulingMode schedulingMode; @Before public void setUp() throws Exception { scheds = new ArrayList(); + schedulingMode = new FairSchedulingMode(); } /** @@ -48,7 +51,8 @@ public void testEqualSharing() { scheds.add(new FakeSchedulable(50)); scheds.add(new FakeSchedulable(30)); scheds.add(new FakeSchedulable(20)); - SchedulingAlgorithms.computeFairShares(scheds, Resources.createResource(40)); + schedulingMode.computeShares(scheds, + Resources.createResource(40)); verifyShares(10, 10, 10, 10); } @@ -65,7 +69,8 @@ public void testLowDemands() { scheds.add(new FakeSchedulable(50)); scheds.add(new FakeSchedulable(11)); scheds.add(new FakeSchedulable(3)); - SchedulingAlgorithms.computeFairShares(scheds, Resources.createResource(40)); + schedulingMode.computeShares(scheds, + Resources.createResource(40)); verifyShares(13, 13, 11, 3); } @@ -83,7 +88,8 @@ public void testMinShares() { scheds.add(new FakeSchedulable(10, 20)); scheds.add(new FakeSchedulable(10, 0)); scheds.add(new FakeSchedulable(3, 2)); - SchedulingAlgorithms.computeFairShares(scheds, Resources.createResource(40)); + schedulingMode.computeShares(scheds, + Resources.createResource(40)); verifyShares(20, 10, 7, 3); } @@ -97,7 +103,8 @@ public void testWeightedSharing() { scheds.add(new FakeSchedulable(50, 0, 1.0)); scheds.add(new FakeSchedulable(30, 0, 1.0)); scheds.add(new FakeSchedulable(20, 0, 0.5)); - SchedulingAlgorithms.computeFairShares(scheds, Resources.createResource(45)); + schedulingMode.computeShares(scheds, + Resources.createResource(45)); verifyShares(20, 10, 10, 5); } @@ -114,7 +121,8 @@ public void testWeightedSharingWithLowDemands() { scheds.add(new FakeSchedulable(11, 0, 1.0)); scheds.add(new FakeSchedulable(30, 0, 1.0)); scheds.add(new FakeSchedulable(20, 0, 0.5)); - SchedulingAlgorithms.computeFairShares(scheds, Resources.createResource(45)); + schedulingMode.computeShares(scheds, + Resources.createResource(45)); verifyShares(10, 11, 16, 8); } @@ -131,7 +139,8 @@ public void testWeightedSharingWithMinShares() { scheds.add(new FakeSchedulable(11, 0, 1.0)); scheds.add(new FakeSchedulable(30, 5, 1.0)); scheds.add(new FakeSchedulable(20, 15, 0.5)); - SchedulingAlgorithms.computeFairShares(scheds, Resources.createResource(45)); + schedulingMode.computeShares(scheds, + Resources.createResource(45)); verifyShares(10, 10, 10, 15); } @@ -146,7 +155,9 @@ public void testLargeShares() { scheds.add(new FakeSchedulable(50 * million)); scheds.add(new FakeSchedulable(30 * million)); scheds.add(new FakeSchedulable(20 * million)); - SchedulingAlgorithms.computeFairShares(scheds, Resources.createResource(40 * million)); + schedulingMode + .computeShares(scheds, + Resources.createResource(40 * million)); verifyShares(10 * million, 10 * million, 10 * million, 10 * million); } @@ -159,7 +170,8 @@ public void testZeroDemand() { scheds.add(new FakeSchedulable(50)); scheds.add(new FakeSchedulable(30)); scheds.add(new FakeSchedulable(0)); - SchedulingAlgorithms.computeFairShares(scheds, Resources.createResource(30)); + schedulingMode.computeShares(scheds, + Resources.createResource(30)); verifyShares(10, 10, 10, 0); } @@ -168,7 +180,8 @@ public void testZeroDemand() { */ @Test public void testEmptyList() { - SchedulingAlgorithms.computeFairShares(scheds, Resources.createResource(40)); + schedulingMode.computeShares(scheds, + Resources.createResource(40)); verifyShares(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index a5cfadd255..d7b32ec532 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -72,6 +72,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.modes.FifoSchedulingMode; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -1325,7 +1326,7 @@ public void testFifoWithinQueue() throws Exception { FSSchedulerApp app2 = scheduler.applications.get(attId2); FSLeafQueue queue1 = scheduler.getQueueManager().getLeafQueue("queue1"); - queue1.setSchedulingMode(SchedulingMode.FIFO); + queue1.setSchedulingMode(new FifoSchedulingMode()); scheduler.update(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingMode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingMode.java new file mode 100644 index 0000000000..71d43a317d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingMode.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; + +import static org.junit.Assert.assertTrue; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.modes.FairSchedulingMode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.modes.FifoSchedulingMode; +import org.junit.Test; + +public class TestSchedulingMode { + + @Test(timeout = 1000) + public void testParseSchedulingMode() throws AllocationConfigurationException { + + // Class name + SchedulingMode sm = SchedulingMode + .parse(FairSchedulingMode.class.getName()); + assertTrue("Invalid scheduler name", + sm.getName().equals(FairSchedulingMode.NAME)); + + // Canonical name + sm = SchedulingMode.parse(FairSchedulingMode.class + .getCanonicalName()); + assertTrue("Invalid scheduler name", + sm.getName().equals(FairSchedulingMode.NAME)); + + // Class + sm = SchedulingMode.getInstance(FairSchedulingMode.class); + assertTrue("Invalid scheduler name", + sm.getName().equals(FairSchedulingMode.NAME)); + + // Shortname - fair + sm = SchedulingMode.parse("fair"); + assertTrue("Invalid scheduler name", + sm.getName().equals(FairSchedulingMode.NAME)); + + // Shortname - fifo + sm = SchedulingMode.parse("fifo"); + assertTrue("Invalid scheduler name", + sm.getName().equals(FifoSchedulingMode.NAME)); + } +}