YARN-2395. FairScheduler: Preemption timeout should be configurable per queue. (Wei Yan via kasha)

This commit is contained in:
Karthik Kambatla 2014-08-30 01:17:13 -07:00
parent 5c14bc426b
commit 0f34e6f387
10 changed files with 474 additions and 73 deletions

View File

@ -58,6 +58,9 @@ Release 2.6.0 - UNRELEASED
YARN-2393. FairScheduler: Add the notion of steady fair share.
(Wei Yan via kasha)
YARN-2395. FairScheduler: Preemption timeout should be configurable per
queue. (Wei Yan via kasha)
IMPROVEMENTS
YARN-2197. Add a link to YARN CHANGES.txt in the left side of doc

View File

@ -65,13 +65,10 @@ public class AllocationConfiguration {
// preempt other jobs' tasks.
private final Map<String, Long> minSharePreemptionTimeouts;
// Default min share preemption timeout for queues where it is not set
// explicitly.
private final long defaultMinSharePreemptionTimeout;
// Preemption timeout for jobs below fair share in seconds. If a job remains
// below half its fair share for this long, it is allowed to preempt tasks.
private final long fairSharePreemptionTimeout;
// Fair share preemption timeout for each queue in seconds. If a job in the
// queue waits this long without receiving its fair share threshold, it is
// allowed to preempt other jobs' tasks.
private final Map<String, Long> fairSharePreemptionTimeouts;
private final Map<String, SchedulingPolicy> schedulingPolicies;
@ -94,8 +91,8 @@ public AllocationConfiguration(Map<String, Resource> minQueueResources,
Map<String, SchedulingPolicy> schedulingPolicies,
SchedulingPolicy defaultSchedulingPolicy,
Map<String, Long> minSharePreemptionTimeouts,
Map<String, Long> fairSharePreemptionTimeouts,
Map<String, Map<QueueACL, AccessControlList>> queueAcls,
long fairSharePreemptionTimeout, long defaultMinSharePreemptionTimeout,
QueuePlacementPolicy placementPolicy,
Map<FSQueueType, Set<String>> configuredQueues) {
this.minQueueResources = minQueueResources;
@ -110,9 +107,8 @@ public AllocationConfiguration(Map<String, Resource> minQueueResources,
this.defaultSchedulingPolicy = defaultSchedulingPolicy;
this.schedulingPolicies = schedulingPolicies;
this.minSharePreemptionTimeouts = minSharePreemptionTimeouts;
this.fairSharePreemptionTimeouts = fairSharePreemptionTimeouts;
this.queueAcls = queueAcls;
this.fairSharePreemptionTimeout = fairSharePreemptionTimeout;
this.defaultMinSharePreemptionTimeout = defaultMinSharePreemptionTimeout;
this.placementPolicy = placementPolicy;
this.configuredQueues = configuredQueues;
}
@ -129,8 +125,7 @@ public AllocationConfiguration(Configuration conf) {
queueMaxAMShareDefault = -1.0f;
queueAcls = new HashMap<String, Map<QueueACL, AccessControlList>>();
minSharePreemptionTimeouts = new HashMap<String, Long>();
defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
fairSharePreemptionTimeout = Long.MAX_VALUE;
fairSharePreemptionTimeouts = new HashMap<String, Long>();
schedulingPolicies = new HashMap<String, SchedulingPolicy>();
defaultSchedulingPolicy = SchedulingPolicy.DEFAULT_POLICY;
configuredQueues = new HashMap<FSQueueType, Set<String>>();
@ -159,23 +154,22 @@ public AccessControlList getQueueAcl(String queue, QueueACL operation) {
}
/**
* Get a queue's min share preemption timeout, in milliseconds. This is the
* time after which jobs in the queue may kill other queues' tasks if they
* are below their min share.
* Get a queue's min share preemption timeout configured in the allocation
* file, in milliseconds. Return -1 if not set.
*/
public long getMinSharePreemptionTimeout(String queueName) {
Long minSharePreemptionTimeout = minSharePreemptionTimeouts.get(queueName);
return (minSharePreemptionTimeout == null) ? defaultMinSharePreemptionTimeout
: minSharePreemptionTimeout;
return (minSharePreemptionTimeout == null) ? -1 : minSharePreemptionTimeout;
}
/**
* Get the fair share preemption, in milliseconds. This is the time
* after which any job may kill other jobs' tasks if it is below half
* its fair share.
* Get a queue's fair share preemption timeout configured in the allocation
* file, in milliseconds. Return -1 if not set.
*/
public long getFairSharePreemptionTimeout() {
return fairSharePreemptionTimeout;
public long getFairSharePreemptionTimeout(String queueName) {
Long fairSharePreemptionTimeout = fairSharePreemptionTimeouts.get(queueName);
return (fairSharePreemptionTimeout == null) ?
-1 : fairSharePreemptionTimeout;
}
public ResourceWeights getQueueWeight(String queue) {

View File

@ -217,12 +217,13 @@ public synchronized void reloadAllocations() throws IOException,
Map<String, ResourceWeights> queueWeights = new HashMap<String, ResourceWeights>();
Map<String, SchedulingPolicy> queuePolicies = new HashMap<String, SchedulingPolicy>();
Map<String, Long> minSharePreemptionTimeouts = new HashMap<String, Long>();
Map<String, Long> fairSharePreemptionTimeouts = new HashMap<String, Long>();
Map<String, Map<QueueACL, AccessControlList>> queueAcls =
new HashMap<String, Map<QueueACL, AccessControlList>>();
int userMaxAppsDefault = Integer.MAX_VALUE;
int queueMaxAppsDefault = Integer.MAX_VALUE;
float queueMaxAMShareDefault = -1.0f;
long fairSharePreemptionTimeout = Long.MAX_VALUE;
long defaultFairSharePreemptionTimeout = Long.MAX_VALUE;
long defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
SchedulingPolicy defaultSchedPolicy = SchedulingPolicy.DEFAULT_POLICY;
@ -276,10 +277,16 @@ public synchronized void reloadAllocations() throws IOException,
String text = ((Text)element.getFirstChild()).getData().trim();
int val = Integer.parseInt(text);
userMaxAppsDefault = val;
} else if ("fairSharePreemptionTimeout".equals(element.getTagName())) {
} else if ("defaultFairSharePreemptionTimeout".equals(element.getTagName())) {
String text = ((Text)element.getFirstChild()).getData().trim();
long val = Long.parseLong(text) * 1000L;
fairSharePreemptionTimeout = val;
defaultFairSharePreemptionTimeout = val;
} else if ("fairSharePreemptionTimeout".equals(element.getTagName())) {
if (defaultFairSharePreemptionTimeout == Long.MAX_VALUE) {
String text = ((Text)element.getFirstChild()).getData().trim();
long val = Long.parseLong(text) * 1000L;
defaultFairSharePreemptionTimeout = val;
}
} else if ("defaultMinSharePreemptionTimeout".equals(element.getTagName())) {
String text = ((Text)element.getFirstChild()).getData().trim();
long val = Long.parseLong(text) * 1000L;
@ -318,8 +325,8 @@ public synchronized void reloadAllocations() throws IOException,
}
loadQueue(parent, element, minQueueResources, maxQueueResources,
queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights,
queuePolicies, minSharePreemptionTimeouts, queueAcls,
configuredQueues);
queuePolicies, minSharePreemptionTimeouts, fairSharePreemptionTimeouts,
queueAcls, configuredQueues);
}
// Load placement policy and pass it configured queues
@ -332,10 +339,21 @@ public synchronized void reloadAllocations() throws IOException,
configuredQueues);
}
AllocationConfiguration info = new AllocationConfiguration(minQueueResources, maxQueueResources,
queueMaxApps, userMaxApps, queueWeights, queueMaxAMShares, userMaxAppsDefault,
queueMaxAppsDefault, queueMaxAMShareDefault, queuePolicies, defaultSchedPolicy, minSharePreemptionTimeouts,
queueAcls, fairSharePreemptionTimeout, defaultMinSharePreemptionTimeout,
// Set the min/fair share preemption timeout for the root queue
if (!minSharePreemptionTimeouts.containsKey(QueueManager.ROOT_QUEUE)){
minSharePreemptionTimeouts.put(QueueManager.ROOT_QUEUE,
defaultMinSharePreemptionTimeout);
}
if (!fairSharePreemptionTimeouts.containsKey(QueueManager.ROOT_QUEUE)) {
fairSharePreemptionTimeouts.put(QueueManager.ROOT_QUEUE,
defaultFairSharePreemptionTimeout);
}
AllocationConfiguration info = new AllocationConfiguration(minQueueResources,
maxQueueResources, queueMaxApps, userMaxApps, queueWeights,
queueMaxAMShares, userMaxAppsDefault, queueMaxAppsDefault,
queueMaxAMShareDefault, queuePolicies, defaultSchedPolicy,
minSharePreemptionTimeouts, fairSharePreemptionTimeouts, queueAcls,
newPlacementPolicy, configuredQueues);
lastSuccessfulReload = clock.getTime();
@ -353,6 +371,7 @@ private void loadQueue(String parentName, Element element, Map<String, Resource>
Map<String, ResourceWeights> queueWeights,
Map<String, SchedulingPolicy> queuePolicies,
Map<String, Long> minSharePreemptionTimeouts,
Map<String, Long> fairSharePreemptionTimeouts,
Map<String, Map<QueueACL, AccessControlList>> queueAcls,
Map<FSQueueType, Set<String>> configuredQueues)
throws AllocationConfigurationException {
@ -395,6 +414,10 @@ private void loadQueue(String parentName, Element element, Map<String, Resource>
String text = ((Text)field.getFirstChild()).getData().trim();
long val = Long.parseLong(text) * 1000L;
minSharePreemptionTimeouts.put(queueName, val);
} else if ("fairSharePreemptionTimeout".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
long val = Long.parseLong(text) * 1000L;
fairSharePreemptionTimeouts.put(queueName, val);
} else if ("schedulingPolicy".equals(field.getTagName())
|| "schedulingMode".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
@ -410,8 +433,8 @@ private void loadQueue(String parentName, Element element, Map<String, Resource>
"pool".equals(field.getTagName())) {
loadQueue(queueName, field, minQueueResources, maxQueueResources,
queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights,
queuePolicies, minSharePreemptionTimeouts, queueAcls,
configuredQueues);
queuePolicies, minSharePreemptionTimeouts,
fairSharePreemptionTimeouts, queueAcls, configuredQueues);
configuredQueues.get(FSQueueType.PARENT).add(queueName);
isLeaf = false;
}

View File

@ -77,6 +77,15 @@ public void recomputeSteadyShares() {
}
}
@Override
public void updatePreemptionTimeouts() {
super.updatePreemptionTimeouts();
// For child queues
for (FSQueue childQueue : childQueues) {
childQueue.updatePreemptionTimeouts();
}
}
@Override
public Resource getDemand() {
return demand;

View File

@ -52,6 +52,9 @@ public abstract class FSQueue implements Queue, Schedulable {
protected SchedulingPolicy policy = SchedulingPolicy.DEFAULT_POLICY;
private long fairSharePreemptionTimeout = Long.MAX_VALUE;
private long minSharePreemptionTimeout = Long.MAX_VALUE;
public FSQueue(String name, FairScheduler scheduler, FSParentQueue parent) {
this.name = name;
this.scheduler = scheduler;
@ -167,12 +170,46 @@ public boolean hasAccess(QueueACL acl, UserGroupInformation user) {
return scheduler.getAllocationConfiguration().hasAccess(name, acl, user);
}
public long getFairSharePreemptionTimeout() {
return fairSharePreemptionTimeout;
}
public void setFairSharePreemptionTimeout(long fairSharePreemptionTimeout) {
this.fairSharePreemptionTimeout = fairSharePreemptionTimeout;
}
public long getMinSharePreemptionTimeout() {
return minSharePreemptionTimeout;
}
public void setMinSharePreemptionTimeout(long minSharePreemptionTimeout) {
this.minSharePreemptionTimeout = minSharePreemptionTimeout;
}
/**
* Recomputes the shares for all child queues and applications based on this
* queue's current share
*/
public abstract void recomputeShares();
/**
* Update the min/fair share preemption timeouts for this queue.
*/
public void updatePreemptionTimeouts() {
// For min share
minSharePreemptionTimeout = scheduler.getAllocationConfiguration()
.getMinSharePreemptionTimeout(getName());
if (minSharePreemptionTimeout == -1 && parent != null) {
minSharePreemptionTimeout = parent.getMinSharePreemptionTimeout();
}
// For fair share
fairSharePreemptionTimeout = scheduler.getAllocationConfiguration()
.getFairSharePreemptionTimeout(getName());
if (fairSharePreemptionTimeout == -1 && parent != null) {
fairSharePreemptionTimeout = parent.getFairSharePreemptionTimeout();
}
}
/**
* Gets the children of this queue, if any.
*/

View File

@ -506,9 +506,8 @@ protected void warnOrKillContainer(RMContainer container) {
* identical for some reason).
*/
protected Resource resToPreempt(FSLeafQueue sched, long curTime) {
String queue = sched.getName();
long minShareTimeout = allocConf.getMinSharePreemptionTimeout(queue);
long fairShareTimeout = allocConf.getFairSharePreemptionTimeout();
long minShareTimeout = sched.getMinSharePreemptionTimeout();
long fairShareTimeout = sched.getFairSharePreemptionTimeout();
Resource resDueToMinShare = Resources.none();
Resource resDueToFairShare = Resources.none();
if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) {

View File

@ -181,6 +181,7 @@ private FSQueue createQueue(String name, FSQueueType queueType) {
parent.addChildQueue(leafQueue);
queues.put(leafQueue.getName(), leafQueue);
leafQueues.add(leafQueue);
setPreemptionTimeout(leafQueue, parent, queueConf);
return leafQueue;
} else {
FSParentQueue newParent = new FSParentQueue(queueName, scheduler, parent);
@ -192,6 +193,7 @@ private FSQueue createQueue(String name, FSQueueType queueType) {
}
parent.addChildQueue(newParent);
queues.put(newParent.getName(), newParent);
setPreemptionTimeout(newParent, parent, queueConf);
parent = newParent;
}
}
@ -199,6 +201,29 @@ private FSQueue createQueue(String name, FSQueueType queueType) {
return parent;
}
/**
* Set the min/fair share preemption timeouts for the given queue.
* If the timeout is configured in the allocation file, the queue will use
* that value; otherwise, the queue inherits the value from its parent queue.
*/
private void setPreemptionTimeout(FSQueue queue,
FSParentQueue parentQueue, AllocationConfiguration queueConf) {
// For min share
long minSharePreemptionTimeout =
queueConf.getMinSharePreemptionTimeout(queue.getQueueName());
if (minSharePreemptionTimeout == -1) {
minSharePreemptionTimeout = parentQueue.getMinSharePreemptionTimeout();
}
queue.setMinSharePreemptionTimeout(minSharePreemptionTimeout);
// For fair share
long fairSharePreemptionTimeout =
queueConf.getFairSharePreemptionTimeout(queue.getQueueName());
if (fairSharePreemptionTimeout == -1) {
fairSharePreemptionTimeout = parentQueue.getFairSharePreemptionTimeout();
}
queue.setFairSharePreemptionTimeout(fairSharePreemptionTimeout);
}
/**
* Make way for the given queue if possible, by removing incompatible
* queues with no apps in them. Incompatibility could be due to
@ -384,5 +409,7 @@ public void updateAllocationConfiguration(AllocationConfiguration queueConf) {
// Update steady fair shares for all queues
rootQueue.recomputeSteadyShares();
// Update the fair share preemption timeouts for all queues recursively
rootQueue.updatePreemptionTimeouts();
}
}

View File

@ -186,9 +186,14 @@ public void testAllocationFileParsing() throws Exception {
//Make queue F a parent queue without configured leaf queues using the 'type' attribute
out.println("<queue name=\"queueF\" type=\"parent\" >");
out.println("</queue>");
//Create hierarchical queues G,H
// Create hierarchical queues G,H, with different min/fair share preemption
// timeouts
out.println("<queue name=\"queueG\">");
out.println("<fairSharePreemptionTimeout>120</fairSharePreemptionTimeout>");
out.println("<minSharePreemptionTimeout>50</minSharePreemptionTimeout>");
out.println(" <queue name=\"queueH\">");
out.println(" <fairSharePreemptionTimeout>180</fairSharePreemptionTimeout>");
out.println(" <minSharePreemptionTimeout>40</minSharePreemptionTimeout>");
out.println(" </queue>");
out.println("</queue>");
// Set default limit of apps per queue to 15
@ -204,8 +209,8 @@ public void testAllocationFileParsing() throws Exception {
// Set default min share preemption timeout to 2 minutes
out.println("<defaultMinSharePreemptionTimeout>120"
+ "</defaultMinSharePreemptionTimeout>");
// Set fair share preemption timeout to 5 minutes
out.println("<fairSharePreemptionTimeout>300</fairSharePreemptionTimeout>");
// Set default fair share preemption timeout to 5 minutes
out.println("<defaultFairSharePreemptionTimeout>300</defaultFairSharePreemptionTimeout>");
// Set default scheduling policy to DRF
out.println("<defaultQueueSchedulingPolicy>drf</defaultQueueSchedulingPolicy>");
out.println("</allocations>");
@ -270,15 +275,29 @@ public void testAllocationFileParsing() throws Exception {
assertEquals("alice,bob admins", queueConf.getQueueAcl("root.queueC",
QueueACL.SUBMIT_APPLICATIONS).getAclString());
assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root." +
assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root"));
assertEquals(-1, queueConf.getMinSharePreemptionTimeout("root." +
YarnConfiguration.DEFAULT_QUEUE_NAME));
assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root.queueA"));
assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root.queueB"));
assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root.queueC"));
assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root.queueD"));
assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root.queueA"));
assertEquals(-1, queueConf.getMinSharePreemptionTimeout("root.queueA"));
assertEquals(-1, queueConf.getMinSharePreemptionTimeout("root.queueB"));
assertEquals(-1, queueConf.getMinSharePreemptionTimeout("root.queueC"));
assertEquals(-1, queueConf.getMinSharePreemptionTimeout("root.queueD"));
assertEquals(60000, queueConf.getMinSharePreemptionTimeout("root.queueE"));
assertEquals(300000, queueConf.getFairSharePreemptionTimeout());
assertEquals(-1, queueConf.getMinSharePreemptionTimeout("root.queueF"));
assertEquals(50000, queueConf.getMinSharePreemptionTimeout("root.queueG"));
assertEquals(40000, queueConf.getMinSharePreemptionTimeout("root.queueG.queueH"));
assertEquals(300000, queueConf.getFairSharePreemptionTimeout("root"));
assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root." +
YarnConfiguration.DEFAULT_QUEUE_NAME));
assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueA"));
assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueB"));
assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueC"));
assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueD"));
assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueE"));
assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueF"));
assertEquals(120000, queueConf.getFairSharePreemptionTimeout("root.queueG"));
assertEquals(180000, queueConf.getFairSharePreemptionTimeout("root.queueG.queueH"));
assertTrue(queueConf.getConfiguredQueues()
.get(FSQueueType.PARENT)
@ -393,16 +412,23 @@ public void testBackwardsCompatibleAllocationFileParsing() throws Exception {
assertEquals("alice,bob admins", queueConf.getQueueAcl("root.queueC",
QueueACL.SUBMIT_APPLICATIONS).getAclString());
assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root." +
assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root"));
assertEquals(-1, queueConf.getMinSharePreemptionTimeout("root." +
YarnConfiguration.DEFAULT_QUEUE_NAME));
assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root.queueA"));
assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root.queueB"));
assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root.queueC"));
assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root.queueD"));
assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root.queueA"));
assertEquals(-1, queueConf.getMinSharePreemptionTimeout("root.queueA"));
assertEquals(-1, queueConf.getMinSharePreemptionTimeout("root.queueB"));
assertEquals(-1, queueConf.getMinSharePreemptionTimeout("root.queueC"));
assertEquals(-1, queueConf.getMinSharePreemptionTimeout("root.queueD"));
assertEquals(60000, queueConf.getMinSharePreemptionTimeout("root.queueE"));
assertEquals(300000, queueConf.getFairSharePreemptionTimeout());
assertEquals(300000, queueConf.getFairSharePreemptionTimeout("root"));
assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root." +
YarnConfiguration.DEFAULT_QUEUE_NAME));
assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueA"));
assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueB"));
assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueC"));
assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueD"));
assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueE"));
}
@Test

View File

@ -1059,7 +1059,11 @@ public void testConfigureRootQueue() throws Exception {
out.println(" <queue name=\"child2\">");
out.println(" <minResources>1024mb,4vcores</minResources>");
out.println(" </queue>");
out.println(" <fairSharePreemptionTimeout>100</fairSharePreemptionTimeout>");
out.println(" <minSharePreemptionTimeout>120</minSharePreemptionTimeout>");
out.println("</queue>");
out.println("<defaultFairSharePreemptionTimeout>300</defaultFairSharePreemptionTimeout>");
out.println("<defaultMinSharePreemptionTimeout>200</defaultMinSharePreemptionTimeout>");
out.println("</allocations>");
out.close();
@ -1073,6 +1077,9 @@ public void testConfigureRootQueue() throws Exception {
assertNotNull(queueManager.getLeafQueue("child1", false));
assertNotNull(queueManager.getLeafQueue("child2", false));
assertEquals(100000, root.getFairSharePreemptionTimeout());
assertEquals(120000, root.getMinSharePreemptionTimeout());
}
@Test (timeout = 5000)
@ -1378,7 +1385,7 @@ public void testPreemptionIsNotDelayedToNextRound() throws Exception {
out.println("<queue name=\"queueB\">");
out.println("<weight>2</weight>");
out.println("</queue>");
out.print("<fairSharePreemptionTimeout>10</fairSharePreemptionTimeout>");
out.print("<defaultFairSharePreemptionTimeout>10</defaultFairSharePreemptionTimeout>");
out.println("</allocations>");
out.close();
@ -1462,7 +1469,7 @@ public void testPreemptionDecision() throws Exception {
out.println("<minResources>1024mb,0vcores</minResources>");
out.println("</queue>");
out.print("<defaultMinSharePreemptionTimeout>5</defaultMinSharePreemptionTimeout>");
out.print("<fairSharePreemptionTimeout>10</fairSharePreemptionTimeout>");
out.print("<defaultFairSharePreemptionTimeout>10</defaultFairSharePreemptionTimeout>");
out.println("</allocations>");
out.close();
@ -1489,7 +1496,6 @@ public void testPreemptionDecision() throws Exception {
NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3);
scheduler.handle(nodeEvent3);
// Queue A and B each request three containers
ApplicationAttemptId app1 =
createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 1);
@ -1563,6 +1569,279 @@ public void testPreemptionDecision() throws Exception {
1536, scheduler.resToPreempt(schedD, clock.getTime()).getMemory());
}
@Test
/**
* Tests the various timing of decision to preempt tasks.
*/
public void testPreemptionDecisionWithVariousTimeout() throws Exception {
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
MockClock clock = new MockClock();
scheduler.setClock(clock);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"default\">");
out.println("<maxResources>0mb,0vcores</maxResources>");
out.println("</queue>");
out.println("<queue name=\"queueA\">");
out.println("<weight>1</weight>");
out.println("<minResources>1024mb,0vcores</minResources>");
out.println("</queue>");
out.println("<queue name=\"queueB\">");
out.println("<weight>2</weight>");
out.println("<minSharePreemptionTimeout>10</minSharePreemptionTimeout>");
out.println("<fairSharePreemptionTimeout>25</fairSharePreemptionTimeout>");
out.println("<queue name=\"queueB1\">");
out.println("<minResources>1024mb,0vcores</minResources>");
out.println("<minSharePreemptionTimeout>5</minSharePreemptionTimeout>");
out.println("</queue>");
out.println("<queue name=\"queueB2\">");
out.println("<minResources>1024mb,0vcores</minResources>");
out.println("<fairSharePreemptionTimeout>20</fairSharePreemptionTimeout>");
out.println("</queue>");
out.println("</queue>");
out.println("<queue name=\"queueC\">");
out.println("<weight>1</weight>");
out.println("<minResources>1024mb,0vcores</minResources>");
out.println("</queue>");
out.print("<defaultMinSharePreemptionTimeout>15</defaultMinSharePreemptionTimeout>");
out.print("<defaultFairSharePreemptionTimeout>30</defaultFairSharePreemptionTimeout>");
out.println("</allocations>");
out.close();
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// Check the min/fair share preemption timeout for each queue
QueueManager queueMgr = scheduler.getQueueManager();
assertEquals(30000, queueMgr.getQueue("root")
.getFairSharePreemptionTimeout());
assertEquals(30000, queueMgr.getQueue("default")
.getFairSharePreemptionTimeout());
assertEquals(30000, queueMgr.getQueue("queueA")
.getFairSharePreemptionTimeout());
assertEquals(25000, queueMgr.getQueue("queueB")
.getFairSharePreemptionTimeout());
assertEquals(25000, queueMgr.getQueue("queueB.queueB1")
.getFairSharePreemptionTimeout());
assertEquals(20000, queueMgr.getQueue("queueB.queueB2")
.getFairSharePreemptionTimeout());
assertEquals(30000, queueMgr.getQueue("queueC")
.getFairSharePreemptionTimeout());
assertEquals(15000, queueMgr.getQueue("root")
.getMinSharePreemptionTimeout());
assertEquals(15000, queueMgr.getQueue("default")
.getMinSharePreemptionTimeout());
assertEquals(15000, queueMgr.getQueue("queueA")
.getMinSharePreemptionTimeout());
assertEquals(10000, queueMgr.getQueue("queueB")
.getMinSharePreemptionTimeout());
assertEquals(5000, queueMgr.getQueue("queueB.queueB1")
.getMinSharePreemptionTimeout());
assertEquals(10000, queueMgr.getQueue("queueB.queueB2")
.getMinSharePreemptionTimeout());
assertEquals(15000, queueMgr.getQueue("queueC")
.getMinSharePreemptionTimeout());
// Create one big node
RMNode node1 =
MockNodes.newNodeInfo(1, Resources.createResource(6 * 1024, 6), 1,
"127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
// Queue A takes all resources
for (int i = 0; i < 6; i ++) {
createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 1);
}
scheduler.update();
// Sufficient node check-ins to fully schedule containers
NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
for (int i = 0; i < 6; i++) {
scheduler.handle(nodeUpdate1);
}
// Now new requests arrive from queues B1, B2 and C
createSchedulingRequest(1 * 1024, "queueB.queueB1", "user1", 1, 1);
createSchedulingRequest(1 * 1024, "queueB.queueB1", "user1", 1, 2);
createSchedulingRequest(1 * 1024, "queueB.queueB1", "user1", 1, 3);
createSchedulingRequest(1 * 1024, "queueB.queueB2", "user1", 1, 1);
createSchedulingRequest(1 * 1024, "queueB.queueB2", "user1", 1, 2);
createSchedulingRequest(1 * 1024, "queueB.queueB2", "user1", 1, 3);
createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 1);
createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 2);
createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 3);
scheduler.update();
FSLeafQueue queueB1 = queueMgr.getLeafQueue("queueB.queueB1", true);
FSLeafQueue queueB2 = queueMgr.getLeafQueue("queueB.queueB2", true);
FSLeafQueue queueC = queueMgr.getLeafQueue("queueC", true);
assertTrue(Resources.equals(
Resources.none(), scheduler.resToPreempt(queueB1, clock.getTime())));
assertTrue(Resources.equals(
Resources.none(), scheduler.resToPreempt(queueB2, clock.getTime())));
assertTrue(Resources.equals(
Resources.none(), scheduler.resToPreempt(queueC, clock.getTime())));
// After 5 seconds, queueB1 wants to preempt min share
scheduler.update();
clock.tick(6);
assertEquals(
1024, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory());
assertEquals(
0, scheduler.resToPreempt(queueB2, clock.getTime()).getMemory());
assertEquals(
0, scheduler.resToPreempt(queueC, clock.getTime()).getMemory());
// After 10 seconds, queueB2 wants to preempt min share
scheduler.update();
clock.tick(5);
assertEquals(
1024, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory());
assertEquals(
1024, scheduler.resToPreempt(queueB2, clock.getTime()).getMemory());
assertEquals(
0, scheduler.resToPreempt(queueC, clock.getTime()).getMemory());
// After 15 seconds, queueC wants to preempt min share
scheduler.update();
clock.tick(5);
assertEquals(
1024, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory());
assertEquals(
1024, scheduler.resToPreempt(queueB2, clock.getTime()).getMemory());
assertEquals(
1024, scheduler.resToPreempt(queueC, clock.getTime()).getMemory());
// After 20 seconds, queueB2 should want to preempt fair share
scheduler.update();
clock.tick(5);
assertEquals(
1024, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory());
assertEquals(
1536, scheduler.resToPreempt(queueB2, clock.getTime()).getMemory());
assertEquals(
1024, scheduler.resToPreempt(queueC, clock.getTime()).getMemory());
// After 25 seconds, queueB1 should want to preempt fair share
scheduler.update();
clock.tick(5);
assertEquals(
1536, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory());
assertEquals(
1536, scheduler.resToPreempt(queueB2, clock.getTime()).getMemory());
assertEquals(
1024, scheduler.resToPreempt(queueC, clock.getTime()).getMemory());
// After 30 seconds, queueC should want to preempt fair share
scheduler.update();
clock.tick(5);
assertEquals(
1536, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory());
assertEquals(
1536, scheduler.resToPreempt(queueB2, clock.getTime()).getMemory());
assertEquals(
1536, scheduler.resToPreempt(queueC, clock.getTime()).getMemory());
}
@Test
public void testBackwardsCompatiblePreemptionConfiguration() throws Exception {
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
MockClock clock = new MockClock();
scheduler.setClock(clock);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"default\">");
out.println("</queue>");
out.println("<queue name=\"queueA\">");
out.println("</queue>");
out.println("<queue name=\"queueB\">");
out.println("<queue name=\"queueB1\">");
out.println("<minSharePreemptionTimeout>5</minSharePreemptionTimeout>");
out.println("</queue>");
out.println("<queue name=\"queueB2\">");
out.println("</queue>");
out.println("</queue>");
out.println("<queue name=\"queueC\">");
out.println("</queue>");
out.print("<defaultMinSharePreemptionTimeout>15</defaultMinSharePreemptionTimeout>");
out.print("<defaultFairSharePreemptionTimeout>30</defaultFairSharePreemptionTimeout>");
out.print("<fairSharePreemptionTimeout>40</fairSharePreemptionTimeout>");
out.println("</allocations>");
out.close();
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// Check the min/fair share preemption timeout for each queue
QueueManager queueMgr = scheduler.getQueueManager();
assertEquals(30000, queueMgr.getQueue("root")
.getFairSharePreemptionTimeout());
assertEquals(30000, queueMgr.getQueue("default")
.getFairSharePreemptionTimeout());
assertEquals(30000, queueMgr.getQueue("queueA")
.getFairSharePreemptionTimeout());
assertEquals(30000, queueMgr.getQueue("queueB")
.getFairSharePreemptionTimeout());
assertEquals(30000, queueMgr.getQueue("queueB.queueB1")
.getFairSharePreemptionTimeout());
assertEquals(30000, queueMgr.getQueue("queueB.queueB2")
.getFairSharePreemptionTimeout());
assertEquals(30000, queueMgr.getQueue("queueC")
.getFairSharePreemptionTimeout());
assertEquals(15000, queueMgr.getQueue("root")
.getMinSharePreemptionTimeout());
assertEquals(15000, queueMgr.getQueue("default")
.getMinSharePreemptionTimeout());
assertEquals(15000, queueMgr.getQueue("queueA")
.getMinSharePreemptionTimeout());
assertEquals(15000, queueMgr.getQueue("queueB")
.getMinSharePreemptionTimeout());
assertEquals(5000, queueMgr.getQueue("queueB.queueB1")
.getMinSharePreemptionTimeout());
assertEquals(15000, queueMgr.getQueue("queueB.queueB2")
.getMinSharePreemptionTimeout());
assertEquals(15000, queueMgr.getQueue("queueC")
.getMinSharePreemptionTimeout());
// If both exist, we take the default one
out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"default\">");
out.println("</queue>");
out.println("<queue name=\"queueA\">");
out.println("</queue>");
out.println("<queue name=\"queueB\">");
out.println("<queue name=\"queueB1\">");
out.println("<minSharePreemptionTimeout>5</minSharePreemptionTimeout>");
out.println("</queue>");
out.println("<queue name=\"queueB2\">");
out.println("</queue>");
out.println("</queue>");
out.println("<queue name=\"queueC\">");
out.println("</queue>");
out.print("<defaultMinSharePreemptionTimeout>15</defaultMinSharePreemptionTimeout>");
out.print("<defaultFairSharePreemptionTimeout>25</defaultFairSharePreemptionTimeout>");
out.print("<fairSharePreemptionTimeout>30</fairSharePreemptionTimeout>");
out.println("</allocations>");
out.close();
scheduler.reinitialize(conf, resourceManager.getRMContext());
assertEquals(25000, queueMgr.getQueue("root")
.getFairSharePreemptionTimeout());
}
@Test (timeout = 5000)
public void testMultipleContainersWaitingForReservation() throws IOException {
scheduler.init(conf);

View File

@ -271,6 +271,11 @@ Allocation file format
* minSharePreemptionTimeout: number of seconds the queue is under its minimum share
before it will try to preempt containers to take resources from other queues.
If not set, the queue will inherit the value from its parent queue.
* fairSharePreemptionTimeout: number of seconds the queue is under its fair share
threshold before it will try to preempt containers to take resources from other
queues. If not set, the queue will inherit the value from its parent queue.
* <<User elements>>, which represent settings governing the behavior of individual
users. They can contain a single property: maxRunningApps, a limit on the
@ -279,14 +284,13 @@ Allocation file format
* <<A userMaxAppsDefault element>>, which sets the default running app limit
for any users whose limit is not otherwise specified.
* <<A fairSharePreemptionTimeout element>>, number of seconds a queue is under
its fair share before it will try to preempt containers to take resources from
other queues.
* <<A defaultFairSharePreemptionTimeout element>>, which sets the fair share
preemption timeout for the root queue; overridden by fairSharePreemptionTimeout
element in root queue.
* <<A defaultMinSharePreemptionTimeout element>>, which sets the default number
of seconds the queue is under its minimum share before it will try to preempt
containers to take resources from other queues; overriden by
minSharePreemptionTimeout element in each queue if specified.
* <<A defaultMinSharePreemptionTimeout element>>, which sets the min share
preemption timeout for the root queue; overridden by minSharePreemptionTimeout
element in root queue.
* <<A queueMaxAppsDefault element>>, which sets the default running app limit
for queues; overriden by maxRunningApps element in each queue.