YARN-469. Make scheduling mode in FS pluggable. (kkambatl via tucu)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1460961 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
2aed48a67f
commit
e74d1f0435
@ -96,6 +96,8 @@ Release 2.0.5-beta - UNRELEASED
|
||||
YARN-497. Yarn unmanaged-am launcher jar does not define a main class in
|
||||
its manifest (Hitesh Shah via bikas)
|
||||
|
||||
YARN-469. Make scheduling mode in FS pluggable. (kkambatl via tucu)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
@ -92,13 +92,7 @@ public void setSchedulingMode(SchedulingMode mode) {
|
||||
|
||||
@Override
|
||||
public void recomputeFairShares() {
|
||||
if (schedulingMode == SchedulingMode.FAIR) {
|
||||
SchedulingAlgorithms.computeFairShares(appScheds, getFairShare());
|
||||
} else {
|
||||
for (AppSchedulable sched: appScheds) {
|
||||
sched.setFairShare(Resources.createResource(0));
|
||||
}
|
||||
}
|
||||
schedulingMode.computeShares(getAppSchedulables(), getFairShare());
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -162,17 +156,9 @@ public Resource assignContainer(FSSchedulerNode node, boolean reserved) {
|
||||
return Resources.none(); // We should never get here
|
||||
}
|
||||
|
||||
// Otherwise, chose app to schedule based on given policy (fair vs fifo).
|
||||
// Otherwise, chose app to schedule based on given policy.
|
||||
else {
|
||||
Comparator<Schedulable> comparator;
|
||||
if (schedulingMode == SchedulingMode.FIFO) {
|
||||
comparator = new SchedulingAlgorithms.FifoComparator();
|
||||
} else if (schedulingMode == SchedulingMode.FAIR) {
|
||||
comparator = new SchedulingAlgorithms.FairShareComparator();
|
||||
} else {
|
||||
throw new RuntimeException("Unsupported queue scheduling mode " +
|
||||
schedulingMode);
|
||||
}
|
||||
Comparator<Schedulable> comparator = schedulingMode.getComparator();
|
||||
|
||||
Collections.sort(appScheds, comparator);
|
||||
for (AppSchedulable sched: appScheds) {
|
||||
|
@ -51,7 +51,7 @@ public void addChildQueue(FSQueue child) {
|
||||
|
||||
@Override
|
||||
public void recomputeFairShares() {
|
||||
SchedulingAlgorithms.computeFairShares(childQueues, getFairShare());
|
||||
SchedulingMode.getDefault().computeShares(childQueues, getFairShare());
|
||||
for (FSQueue childQueue : childQueues) {
|
||||
childQueue.getMetrics().setAvailableResourcesToQueue(childQueue.getFairShare());
|
||||
childQueue.recomputeFairShares();
|
||||
|
@ -803,7 +803,7 @@ private synchronized void nodeUpdate(RMNode nm) {
|
||||
// At most one task is scheduled each iteration of this loop
|
||||
List<FSLeafQueue> scheds = new ArrayList<FSLeafQueue>(
|
||||
queueMgr.getLeafQueues());
|
||||
Collections.sort(scheds, new SchedulingAlgorithms.FairShareComparator());
|
||||
Collections.sort(scheds, SchedulingMode.getDefault().getComparator());
|
||||
boolean assignedContainer = false;
|
||||
for (FSLeafQueue sched : scheds) {
|
||||
Resource assigned = sched.assignContainer(node, false);
|
||||
|
@ -310,7 +310,7 @@ public void reloadAllocs() throws IOException, ParserConfigurationException,
|
||||
int queueMaxAppsDefault = Integer.MAX_VALUE;
|
||||
long fairSharePreemptionTimeout = Long.MAX_VALUE;
|
||||
long defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
|
||||
SchedulingMode defaultSchedulingMode = SchedulingMode.FAIR;
|
||||
SchedulingMode defaultSchedulingMode = SchedulingMode.getDefault();
|
||||
|
||||
// Remember all queue names so we can display them on web UI, etc.
|
||||
List<String> queueNamesInAllocFile = new ArrayList<String>();
|
||||
@ -373,7 +373,8 @@ public void reloadAllocs() throws IOException, ParserConfigurationException,
|
||||
queueMaxAppsDefault = val;}
|
||||
else if ("defaultQueueSchedulingMode".equals(element.getTagName())) {
|
||||
String text = ((Text)element.getFirstChild()).getData().trim();
|
||||
defaultSchedulingMode = parseSchedulingMode(text);
|
||||
SchedulingMode.setDefault(text);
|
||||
defaultSchedulingMode = SchedulingMode.getDefault();
|
||||
} else {
|
||||
LOG.warn("Bad element in allocations file: " + element.getTagName());
|
||||
}
|
||||
@ -449,7 +450,7 @@ private void loadQueue(String parentName, Element element, Map<String, Resource>
|
||||
minSharePreemptionTimeouts.put(queueName, val);
|
||||
} else if ("schedulingMode".equals(field.getTagName())) {
|
||||
String text = ((Text)field.getFirstChild()).getData().trim();
|
||||
queueModes.put(queueName, parseSchedulingMode(text));
|
||||
queueModes.put(queueName, SchedulingMode.parse(text));
|
||||
} else if ("aclSubmitApps".equals(field.getTagName())) {
|
||||
String text = ((Text)field.getFirstChild()).getData().trim();
|
||||
acls.put(QueueACL.SUBMIT_APPLICATIONS, new AccessControlList(text));
|
||||
@ -476,19 +477,6 @@ private void loadQueue(String parentName, Element element, Map<String, Resource>
|
||||
}
|
||||
}
|
||||
|
||||
private SchedulingMode parseSchedulingMode(String text)
|
||||
throws AllocationConfigurationException {
|
||||
text = text.toLowerCase();
|
||||
if (text.equals("fair")) {
|
||||
return SchedulingMode.FAIR;
|
||||
} else if (text.equals("fifo")) {
|
||||
return SchedulingMode.FIFO;
|
||||
} else {
|
||||
throw new AllocationConfigurationException(
|
||||
"Unknown scheduling mode : " + text + "; expected 'fifo' or 'fair'");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the minimum resource allocation for the given queue.
|
||||
* @return the cap set on this queue, or 0 if not set.
|
||||
@ -663,7 +651,7 @@ public QueueManagerInfo() {
|
||||
minSharePreemptionTimeouts = new HashMap<String, Long>();
|
||||
defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
|
||||
fairSharePreemptionTimeout = Long.MAX_VALUE;
|
||||
defaultSchedulingMode = SchedulingMode.FAIR;
|
||||
defaultSchedulingMode = SchedulingMode.getDefault();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -55,7 +55,7 @@
|
||||
*/
|
||||
@Private
|
||||
@Unstable
|
||||
abstract class Schedulable {
|
||||
public abstract class Schedulable {
|
||||
/** Fair share assigned to this Schedulable */
|
||||
private Resource fairShare = Resources.createResource(0);
|
||||
|
||||
|
@ -15,17 +15,104 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import java.util.Collection;
|
||||
import java.util.Comparator;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
* Internal scheduling modes for queues.
|
||||
*/
|
||||
@Private
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.modes.FairSchedulingMode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.modes.FifoSchedulingMode;
|
||||
|
||||
@Public
|
||||
@Unstable
|
||||
public enum SchedulingMode {
|
||||
FAIR, FIFO
|
||||
public abstract class SchedulingMode {
|
||||
private static final ConcurrentHashMap<Class<? extends SchedulingMode>, SchedulingMode> instances =
|
||||
new ConcurrentHashMap<Class<? extends SchedulingMode>, SchedulingMode>();
|
||||
|
||||
private static SchedulingMode DEFAULT_MODE =
|
||||
getInstance(FairSchedulingMode.class);
|
||||
|
||||
public static SchedulingMode getDefault() {
|
||||
return DEFAULT_MODE;
|
||||
}
|
||||
|
||||
public static void setDefault(String className)
|
||||
throws AllocationConfigurationException {
|
||||
DEFAULT_MODE = parse(className);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a {@link SchedulingMode} instance corresponding to the passed clazz
|
||||
*/
|
||||
public static SchedulingMode getInstance(Class<? extends SchedulingMode> clazz) {
|
||||
SchedulingMode mode = instances.get(clazz);
|
||||
if (mode == null) {
|
||||
mode = ReflectionUtils.newInstance(clazz, null);
|
||||
instances.put(clazz, mode);
|
||||
}
|
||||
return mode;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns {@link SchedulingMode} instance corresponding to the
|
||||
* {@link SchedulingMode} passed as a string. The mode can be "fair" for
|
||||
* FairSchedulingMode of "fifo" for FifoSchedulingMode. For custom
|
||||
* {@link SchedulingMode}s in the RM classpath, the mode should be canonical
|
||||
* class name of the {@link SchedulingMode}.
|
||||
*
|
||||
* @param mode canonical class name or "fair" or "fifo"
|
||||
* @throws AllocationConfigurationException
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public static SchedulingMode parse(String mode)
|
||||
throws AllocationConfigurationException {
|
||||
@SuppressWarnings("rawtypes")
|
||||
Class clazz;
|
||||
String text = mode.toLowerCase();
|
||||
if (text.equals("fair")) {
|
||||
clazz = FairSchedulingMode.class;
|
||||
} else if (text.equals("fifo")) {
|
||||
clazz = FifoSchedulingMode.class;
|
||||
} else {
|
||||
try {
|
||||
clazz = Class.forName(mode);
|
||||
} catch (ClassNotFoundException cnfe) {
|
||||
throw new AllocationConfigurationException(mode
|
||||
+ " SchedulingMode class not found!");
|
||||
}
|
||||
}
|
||||
if (!SchedulingMode.class.isAssignableFrom(clazz)) {
|
||||
throw new AllocationConfigurationException(mode
|
||||
+ " does not extend SchedulingMode");
|
||||
}
|
||||
return getInstance(clazz);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return returns the name of SchedulingMode
|
||||
*/
|
||||
public abstract String getName();
|
||||
|
||||
/**
|
||||
* The comparator returned by this method is to be used for sorting the
|
||||
* {@link Schedulable}s in that queue.
|
||||
*
|
||||
* @return the comparator to sort by
|
||||
*/
|
||||
public abstract Comparator<Schedulable> getComparator();
|
||||
|
||||
/**
|
||||
* Computes and updates the shares of {@link Schedulable}s as per the
|
||||
* SchedulingMode, to be used later at schedule time.
|
||||
*
|
||||
* @param schedulables {@link Schedulable}s whose shares are to be updated
|
||||
* @param totalResources Total {@link Resource}s in the cluster
|
||||
*/
|
||||
public abstract void computeShares(
|
||||
Collection<? extends Schedulable> schedulables, Resource totalResources);
|
||||
}
|
||||
|
@ -15,49 +15,27 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.modes;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.Collection;
|
||||
import java.util.Comparator;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Resources;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingMode;
|
||||
|
||||
/**
|
||||
* Utility class containing scheduling algorithms used in the fair scheduler.
|
||||
*/
|
||||
@Private
|
||||
@Unstable
|
||||
class SchedulingAlgorithms {
|
||||
public static final Log LOG = LogFactory.getLog(
|
||||
SchedulingAlgorithms.class.getName());
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
* Compare Schedulables in order of priority and then submission time, as in
|
||||
* the default FIFO scheduler in Hadoop.
|
||||
*/
|
||||
public static class FifoComparator implements Comparator<Schedulable>, Serializable {
|
||||
private static final long serialVersionUID = -5905036205491177060L;
|
||||
public class FairSchedulingMode extends SchedulingMode {
|
||||
@VisibleForTesting
|
||||
public static final String NAME = "FairShare";
|
||||
private FairShareComparator comparator = new FairShareComparator();
|
||||
|
||||
@Override
|
||||
public int compare(Schedulable s1, Schedulable s2) {
|
||||
int res = s1.getPriority().compareTo(s2.getPriority());
|
||||
if (res == 0) {
|
||||
res = (int) Math.signum(s1.getStartTime() - s2.getStartTime());
|
||||
}
|
||||
if (res == 0) {
|
||||
// In the rare case where jobs were submitted at the exact same time,
|
||||
// compare them by name (which will be the JobID) to get a deterministic
|
||||
// ordering, so we don't alternately launch tasks from different jobs.
|
||||
res = s1.getName().compareTo(s2.getName());
|
||||
}
|
||||
return res;
|
||||
}
|
||||
public String getName() {
|
||||
return NAME;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -73,7 +51,8 @@ public int compare(Schedulable s1, Schedulable s2) {
|
||||
* If all weights are equal, slots are given to the job with the fewest tasks;
|
||||
* otherwise, jobs with more weight get proportionally more slots.
|
||||
*/
|
||||
public static class FairShareComparator implements Comparator<Schedulable>, Serializable {
|
||||
private static class FairShareComparator implements Comparator<Schedulable>,
|
||||
Serializable {
|
||||
private static final long serialVersionUID = 5564969375856699313L;
|
||||
|
||||
@Override
|
||||
@ -85,10 +64,10 @@ public int compare(Schedulable s1, Schedulable s2) {
|
||||
boolean s1Needy = Resources.lessThan(s1.getResourceUsage(), minShare1);
|
||||
boolean s2Needy = Resources.lessThan(s2.getResourceUsage(), minShare2);
|
||||
Resource one = Resources.createResource(1);
|
||||
minShareRatio1 = (double) s1.getResourceUsage().getMemory() /
|
||||
Resources.max(minShare1, one).getMemory();
|
||||
minShareRatio2 = (double) s2.getResourceUsage().getMemory() /
|
||||
Resources.max(minShare2, one).getMemory();
|
||||
minShareRatio1 = (double) s1.getResourceUsage().getMemory()
|
||||
/ Resources.max(minShare1, one).getMemory();
|
||||
minShareRatio2 = (double) s2.getResourceUsage().getMemory()
|
||||
/ Resources.max(minShare2, one).getMemory();
|
||||
useToWeightRatio1 = s1.getResourceUsage().getMemory() / s1.getWeight();
|
||||
useToWeightRatio2 = s2.getResourceUsage().getMemory() / s2.getWeight();
|
||||
int res = 0;
|
||||
@ -98,7 +77,8 @@ else if (s2Needy && !s1Needy)
|
||||
res = 1;
|
||||
else if (s1Needy && s2Needy)
|
||||
res = (int) Math.signum(minShareRatio1 - minShareRatio2);
|
||||
else // Neither schedulable is needy
|
||||
else
|
||||
// Neither schedulable is needy
|
||||
res = (int) Math.signum(useToWeightRatio1 - useToWeightRatio2);
|
||||
if (res == 0) {
|
||||
// Apps are tied in fairness ratio. Break the tie by submit time and job
|
||||
@ -111,6 +91,17 @@ else if (s1Needy && s2Needy)
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Comparator<Schedulable> getComparator() {
|
||||
return comparator;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void computeShares(Collection<? extends Schedulable> schedulables,
|
||||
Resource totalResources) {
|
||||
computeFairShares(schedulables, totalResources);
|
||||
}
|
||||
|
||||
/**
|
||||
* Number of iterations for the binary search in computeFairShares. This is
|
||||
* equivalent to the number of bits of precision in the output. 25 iterations
|
||||
@ -121,32 +112,31 @@ else if (s1Needy && s2Needy)
|
||||
/**
|
||||
* Given a set of Schedulables and a number of slots, compute their weighted
|
||||
* fair shares. The min shares and demands of the Schedulables are assumed to
|
||||
* be set beforehand. We compute the fairest possible allocation of shares
|
||||
* to the Schedulables that respects their min shares and demands.
|
||||
* be set beforehand. We compute the fairest possible allocation of shares to
|
||||
* the Schedulables that respects their min shares and demands.
|
||||
*
|
||||
* To understand what this method does, we must first define what weighted
|
||||
* fair sharing means in the presence of minimum shares and demands. If there
|
||||
* were no minimum shares and every Schedulable had an infinite demand (i.e.
|
||||
* could launch infinitely many tasks), then weighted fair sharing would be
|
||||
* achieved if the ratio of slotsAssigned / weight was equal for each
|
||||
* Schedulable and all slots were assigned. Minimum shares and demands add
|
||||
* two further twists:
|
||||
* - Some Schedulables may not have enough tasks to fill all their share.
|
||||
* - Some Schedulables may have a min share higher than their assigned share.
|
||||
* Schedulable and all slots were assigned. Minimum shares and demands add two
|
||||
* further twists: - Some Schedulables may not have enough tasks to fill all
|
||||
* their share. - Some Schedulables may have a min share higher than their
|
||||
* assigned share.
|
||||
*
|
||||
* To deal with these possibilities, we define an assignment of slots as
|
||||
* being fair if there exists a ratio R such that:
|
||||
* - Schedulables S where S.demand < R * S.weight are assigned share S.demand
|
||||
* - Schedulables S where S.minShare > R * S.weight are given share S.minShare
|
||||
* - All other Schedulables S are assigned share R * S.weight
|
||||
* - The sum of all the shares is totalSlots.
|
||||
* To deal with these possibilities, we define an assignment of slots as being
|
||||
* fair if there exists a ratio R such that: - Schedulables S where S.demand <
|
||||
* R * S.weight are assigned share S.demand - Schedulables S where S.minShare
|
||||
* > R * S.weight are given share S.minShare - All other Schedulables S are
|
||||
* assigned share R * S.weight - The sum of all the shares is totalSlots.
|
||||
*
|
||||
* We call R the weight-to-slots ratio because it converts a Schedulable's
|
||||
* weight to the number of slots it is assigned.
|
||||
*
|
||||
* We compute a fair allocation by finding a suitable weight-to-slot ratio R.
|
||||
* To do this, we use binary search. Given a ratio R, we compute the number
|
||||
* of slots that would be used in total with this ratio (the sum of the shares
|
||||
* To do this, we use binary search. Given a ratio R, we compute the number of
|
||||
* slots that would be used in total with this ratio (the sum of the shares
|
||||
* computed using the conditions above). If this number of slots is less than
|
||||
* totalSlots, then R is too small and more slots could be assigned. If the
|
||||
* number of slots is more than totalSlots, then R is too large.
|
||||
@ -154,9 +144,9 @@ else if (s1Needy && s2Needy)
|
||||
* We begin the binary search with a lower bound on R of 0 (which means that
|
||||
* all Schedulables are only given their minShare) and an upper bound computed
|
||||
* to be large enough that too many slots are given (by doubling R until we
|
||||
* either use more than totalSlots slots or we fulfill all jobs' demands).
|
||||
* The helper method slotsUsedWithWeightToSlotRatio computes the total number
|
||||
* of slots used with a given value of R.
|
||||
* either use more than totalSlots slots or we fulfill all jobs' demands). The
|
||||
* helper method slotsUsedWithWeightToSlotRatio computes the total number of
|
||||
* slots used with a given value of R.
|
||||
*
|
||||
* The running time of this algorithm is linear in the number of Schedulables,
|
||||
* because slotsUsedWithWeightToSlotRatio is linear-time and the number of
|
||||
@ -168,12 +158,13 @@ public static void computeFairShares(
|
||||
// at R = 1 and double it until we have either used totalSlots slots or we
|
||||
// have met all Schedulables' demands (if total demand < totalSlots).
|
||||
Resource totalDemand = Resources.createResource(0);
|
||||
for (Schedulable sched: schedulables) {
|
||||
for (Schedulable sched : schedulables) {
|
||||
Resources.addTo(totalDemand, sched.getDemand());
|
||||
}
|
||||
Resource cap = Resources.min(totalDemand, totalResources);
|
||||
double rMax = 1.0;
|
||||
while (Resources.lessThan(resUsedWithWeightToResRatio(rMax, schedulables), cap)) {
|
||||
while (Resources.lessThan(resUsedWithWeightToResRatio(rMax, schedulables),
|
||||
cap)) {
|
||||
rMax *= 2.0;
|
||||
}
|
||||
// Perform the binary search for up to COMPUTE_FAIR_SHARES_ITERATIONS steps
|
||||
@ -181,27 +172,28 @@ public static void computeFairShares(
|
||||
double right = rMax;
|
||||
for (int i = 0; i < COMPUTE_FAIR_SHARES_ITERATIONS; i++) {
|
||||
double mid = (left + right) / 2.0;
|
||||
if (Resources.lessThan(resUsedWithWeightToResRatio(mid, schedulables), cap)) {
|
||||
if (Resources.lessThan(resUsedWithWeightToResRatio(mid, schedulables),
|
||||
cap)) {
|
||||
left = mid;
|
||||
} else {
|
||||
right = mid;
|
||||
}
|
||||
}
|
||||
// Set the fair shares based on the value of R we've converged to
|
||||
for (Schedulable sched: schedulables) {
|
||||
for (Schedulable sched : schedulables) {
|
||||
sched.setFairShare(computeShare(sched, right));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Compute the number of slots that would be used given a weight-to-slot
|
||||
* ratio w2sRatio, for use in the computeFairShares algorithm as described
|
||||
* in #{@link SchedulingAlgorithms#computeFairShares(Collection, double)}.
|
||||
* Compute the number of slots that would be used given a weight-to-slot ratio
|
||||
* w2sRatio, for use in the computeFairShares algorithm as described in #
|
||||
* {@link SchedulingAlgorithms#computeFairShares(Collection, double)}.
|
||||
*/
|
||||
private static Resource resUsedWithWeightToResRatio(double w2sRatio,
|
||||
Collection<? extends Schedulable> schedulables) {
|
||||
Resource slotsTaken = Resources.createResource(0);
|
||||
for (Schedulable sched: schedulables) {
|
||||
for (Schedulable sched : schedulables) {
|
||||
Resource share = computeShare(sched, w2sRatio);
|
||||
Resources.addTo(slotsTaken, share);
|
||||
}
|
||||
@ -210,8 +202,8 @@ private static Resource resUsedWithWeightToResRatio(double w2sRatio,
|
||||
|
||||
/**
|
||||
* Compute the resources assigned to a Schedulable given a particular
|
||||
* res-to-slot ratio r2sRatio, for use in computeFairShares as described
|
||||
* in #{@link SchedulingAlgorithms#computeFairShares(Collection, double)}.
|
||||
* res-to-slot ratio r2sRatio, for use in computeFairShares as described in #
|
||||
* {@link SchedulingAlgorithms#computeFairShares(Collection, double)}.
|
||||
*/
|
||||
private static Resource computeShare(Schedulable sched, double r2sRatio) {
|
||||
double share = sched.getWeight() * r2sRatio;
|
@ -0,0 +1,76 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.modes;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.Collection;
|
||||
import java.util.Comparator;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Resources;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingMode;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
public class FifoSchedulingMode extends SchedulingMode {
|
||||
@VisibleForTesting
|
||||
public static final String NAME = "FIFO";
|
||||
private FifoComparator comparator = new FifoComparator();
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return NAME;
|
||||
}
|
||||
|
||||
/**
|
||||
* Compare Schedulables in order of priority and then submission time, as in
|
||||
* the default FIFO scheduler in Hadoop.
|
||||
*/
|
||||
static class FifoComparator implements Comparator<Schedulable>, Serializable {
|
||||
private static final long serialVersionUID = -5905036205491177060L;
|
||||
|
||||
@Override
|
||||
public int compare(Schedulable s1, Schedulable s2) {
|
||||
int res = s1.getPriority().compareTo(s2.getPriority());
|
||||
if (res == 0) {
|
||||
res = (int) Math.signum(s1.getStartTime() - s2.getStartTime());
|
||||
}
|
||||
if (res == 0) {
|
||||
// In the rare case where jobs were submitted at the exact same time,
|
||||
// compare them by name (which will be the JobID) to get a deterministic
|
||||
// ordering, so we don't alternately launch tasks from different jobs.
|
||||
res = s1.getName().compareTo(s2.getName());
|
||||
}
|
||||
return res;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Comparator<Schedulable> getComparator() {
|
||||
return comparator;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void computeShares(Collection<? extends Schedulable> schedulables,
|
||||
Resource totalResources) {
|
||||
for (Schedulable sched : schedulables) {
|
||||
sched.setFairShare(Resources.createResource(0));
|
||||
}
|
||||
}
|
||||
}
|
@ -24,6 +24,7 @@
|
||||
import junit.framework.Assert;
|
||||
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.modes.FairSchedulingMode;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
@ -32,10 +33,12 @@
|
||||
*/
|
||||
public class TestComputeFairShares {
|
||||
private List<Schedulable> scheds;
|
||||
private SchedulingMode schedulingMode;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
scheds = new ArrayList<Schedulable>();
|
||||
schedulingMode = new FairSchedulingMode();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -48,7 +51,8 @@ public void testEqualSharing() {
|
||||
scheds.add(new FakeSchedulable(50));
|
||||
scheds.add(new FakeSchedulable(30));
|
||||
scheds.add(new FakeSchedulable(20));
|
||||
SchedulingAlgorithms.computeFairShares(scheds, Resources.createResource(40));
|
||||
schedulingMode.computeShares(scheds,
|
||||
Resources.createResource(40));
|
||||
verifyShares(10, 10, 10, 10);
|
||||
}
|
||||
|
||||
@ -65,7 +69,8 @@ public void testLowDemands() {
|
||||
scheds.add(new FakeSchedulable(50));
|
||||
scheds.add(new FakeSchedulable(11));
|
||||
scheds.add(new FakeSchedulable(3));
|
||||
SchedulingAlgorithms.computeFairShares(scheds, Resources.createResource(40));
|
||||
schedulingMode.computeShares(scheds,
|
||||
Resources.createResource(40));
|
||||
verifyShares(13, 13, 11, 3);
|
||||
}
|
||||
|
||||
@ -83,7 +88,8 @@ public void testMinShares() {
|
||||
scheds.add(new FakeSchedulable(10, 20));
|
||||
scheds.add(new FakeSchedulable(10, 0));
|
||||
scheds.add(new FakeSchedulable(3, 2));
|
||||
SchedulingAlgorithms.computeFairShares(scheds, Resources.createResource(40));
|
||||
schedulingMode.computeShares(scheds,
|
||||
Resources.createResource(40));
|
||||
verifyShares(20, 10, 7, 3);
|
||||
}
|
||||
|
||||
@ -97,7 +103,8 @@ public void testWeightedSharing() {
|
||||
scheds.add(new FakeSchedulable(50, 0, 1.0));
|
||||
scheds.add(new FakeSchedulable(30, 0, 1.0));
|
||||
scheds.add(new FakeSchedulable(20, 0, 0.5));
|
||||
SchedulingAlgorithms.computeFairShares(scheds, Resources.createResource(45));
|
||||
schedulingMode.computeShares(scheds,
|
||||
Resources.createResource(45));
|
||||
verifyShares(20, 10, 10, 5);
|
||||
}
|
||||
|
||||
@ -114,7 +121,8 @@ public void testWeightedSharingWithLowDemands() {
|
||||
scheds.add(new FakeSchedulable(11, 0, 1.0));
|
||||
scheds.add(new FakeSchedulable(30, 0, 1.0));
|
||||
scheds.add(new FakeSchedulable(20, 0, 0.5));
|
||||
SchedulingAlgorithms.computeFairShares(scheds, Resources.createResource(45));
|
||||
schedulingMode.computeShares(scheds,
|
||||
Resources.createResource(45));
|
||||
verifyShares(10, 11, 16, 8);
|
||||
}
|
||||
|
||||
@ -131,7 +139,8 @@ public void testWeightedSharingWithMinShares() {
|
||||
scheds.add(new FakeSchedulable(11, 0, 1.0));
|
||||
scheds.add(new FakeSchedulable(30, 5, 1.0));
|
||||
scheds.add(new FakeSchedulable(20, 15, 0.5));
|
||||
SchedulingAlgorithms.computeFairShares(scheds, Resources.createResource(45));
|
||||
schedulingMode.computeShares(scheds,
|
||||
Resources.createResource(45));
|
||||
verifyShares(10, 10, 10, 15);
|
||||
}
|
||||
|
||||
@ -146,7 +155,9 @@ public void testLargeShares() {
|
||||
scheds.add(new FakeSchedulable(50 * million));
|
||||
scheds.add(new FakeSchedulable(30 * million));
|
||||
scheds.add(new FakeSchedulable(20 * million));
|
||||
SchedulingAlgorithms.computeFairShares(scheds, Resources.createResource(40 * million));
|
||||
schedulingMode
|
||||
.computeShares(scheds,
|
||||
Resources.createResource(40 * million));
|
||||
verifyShares(10 * million, 10 * million, 10 * million, 10 * million);
|
||||
}
|
||||
|
||||
@ -159,7 +170,8 @@ public void testZeroDemand() {
|
||||
scheds.add(new FakeSchedulable(50));
|
||||
scheds.add(new FakeSchedulable(30));
|
||||
scheds.add(new FakeSchedulable(0));
|
||||
SchedulingAlgorithms.computeFairShares(scheds, Resources.createResource(30));
|
||||
schedulingMode.computeShares(scheds,
|
||||
Resources.createResource(30));
|
||||
verifyShares(10, 10, 10, 0);
|
||||
}
|
||||
|
||||
@ -168,7 +180,8 @@ public void testZeroDemand() {
|
||||
*/
|
||||
@Test
|
||||
public void testEmptyList() {
|
||||
SchedulingAlgorithms.computeFairShares(scheds, Resources.createResource(40));
|
||||
schedulingMode.computeShares(scheds,
|
||||
Resources.createResource(40));
|
||||
verifyShares();
|
||||
}
|
||||
|
||||
|
@ -72,6 +72,7 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.modes.FifoSchedulingMode;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
@ -1325,7 +1326,7 @@ public void testFifoWithinQueue() throws Exception {
|
||||
FSSchedulerApp app2 = scheduler.applications.get(attId2);
|
||||
|
||||
FSLeafQueue queue1 = scheduler.getQueueManager().getLeafQueue("queue1");
|
||||
queue1.setSchedulingMode(SchedulingMode.FIFO);
|
||||
queue1.setSchedulingMode(new FifoSchedulingMode());
|
||||
|
||||
scheduler.update();
|
||||
|
||||
|
@ -0,0 +1,59 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.modes.FairSchedulingMode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.modes.FifoSchedulingMode;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestSchedulingMode {
|
||||
|
||||
@Test(timeout = 1000)
|
||||
public void testParseSchedulingMode() throws AllocationConfigurationException {
|
||||
|
||||
// Class name
|
||||
SchedulingMode sm = SchedulingMode
|
||||
.parse(FairSchedulingMode.class.getName());
|
||||
assertTrue("Invalid scheduler name",
|
||||
sm.getName().equals(FairSchedulingMode.NAME));
|
||||
|
||||
// Canonical name
|
||||
sm = SchedulingMode.parse(FairSchedulingMode.class
|
||||
.getCanonicalName());
|
||||
assertTrue("Invalid scheduler name",
|
||||
sm.getName().equals(FairSchedulingMode.NAME));
|
||||
|
||||
// Class
|
||||
sm = SchedulingMode.getInstance(FairSchedulingMode.class);
|
||||
assertTrue("Invalid scheduler name",
|
||||
sm.getName().equals(FairSchedulingMode.NAME));
|
||||
|
||||
// Shortname - fair
|
||||
sm = SchedulingMode.parse("fair");
|
||||
assertTrue("Invalid scheduler name",
|
||||
sm.getName().equals(FairSchedulingMode.NAME));
|
||||
|
||||
// Shortname - fifo
|
||||
sm = SchedulingMode.parse("fifo");
|
||||
assertTrue("Invalid scheduler name",
|
||||
sm.getName().equals(FifoSchedulingMode.NAME));
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user