diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index cb09c19568..a6a1b9b30b 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -58,6 +58,9 @@ Release 2.6.0 - UNRELEASED YARN-2393. FairScheduler: Add the notion of steady fair share. (Wei Yan via kasha) + YARN-2395. FairScheduler: Preemption timeout should be configurable per + queue. (Wei Yan via kasha) + IMPROVEMENTS YARN-2197. Add a link to YARN CHANGES.txt in the left side of doc diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java index d4ba88faf1..228a761852 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java @@ -65,13 +65,10 @@ public class AllocationConfiguration { // preempt other jobs' tasks. private final Map minSharePreemptionTimeouts; - // Default min share preemption timeout for queues where it is not set - // explicitly. - private final long defaultMinSharePreemptionTimeout; - - // Preemption timeout for jobs below fair share in seconds. If a job remains - // below half its fair share for this long, it is allowed to preempt tasks. - private final long fairSharePreemptionTimeout; + // Fair share preemption timeout for each queue in seconds. If a job in the + // queue waits this long without receiving its fair share threshold, it is + // allowed to preempt other jobs' tasks. + private final Map fairSharePreemptionTimeouts; private final Map schedulingPolicies; @@ -94,8 +91,8 @@ public AllocationConfiguration(Map minQueueResources, Map schedulingPolicies, SchedulingPolicy defaultSchedulingPolicy, Map minSharePreemptionTimeouts, + Map fairSharePreemptionTimeouts, Map> queueAcls, - long fairSharePreemptionTimeout, long defaultMinSharePreemptionTimeout, QueuePlacementPolicy placementPolicy, Map> configuredQueues) { this.minQueueResources = minQueueResources; @@ -110,9 +107,8 @@ public AllocationConfiguration(Map minQueueResources, this.defaultSchedulingPolicy = defaultSchedulingPolicy; this.schedulingPolicies = schedulingPolicies; this.minSharePreemptionTimeouts = minSharePreemptionTimeouts; + this.fairSharePreemptionTimeouts = fairSharePreemptionTimeouts; this.queueAcls = queueAcls; - this.fairSharePreemptionTimeout = fairSharePreemptionTimeout; - this.defaultMinSharePreemptionTimeout = defaultMinSharePreemptionTimeout; this.placementPolicy = placementPolicy; this.configuredQueues = configuredQueues; } @@ -129,8 +125,7 @@ public AllocationConfiguration(Configuration conf) { queueMaxAMShareDefault = -1.0f; queueAcls = new HashMap>(); minSharePreemptionTimeouts = new HashMap(); - defaultMinSharePreemptionTimeout = Long.MAX_VALUE; - fairSharePreemptionTimeout = Long.MAX_VALUE; + fairSharePreemptionTimeouts = new HashMap(); schedulingPolicies = new HashMap(); defaultSchedulingPolicy = SchedulingPolicy.DEFAULT_POLICY; configuredQueues = new HashMap>(); @@ -159,23 +154,22 @@ public AccessControlList getQueueAcl(String queue, QueueACL operation) { } /** - * Get a queue's min share preemption timeout, in milliseconds. This is the - * time after which jobs in the queue may kill other queues' tasks if they - * are below their min share. + * Get a queue's min share preemption timeout configured in the allocation + * file, in milliseconds. Return -1 if not set. */ public long getMinSharePreemptionTimeout(String queueName) { Long minSharePreemptionTimeout = minSharePreemptionTimeouts.get(queueName); - return (minSharePreemptionTimeout == null) ? defaultMinSharePreemptionTimeout - : minSharePreemptionTimeout; + return (minSharePreemptionTimeout == null) ? -1 : minSharePreemptionTimeout; } - + /** - * Get the fair share preemption, in milliseconds. This is the time - * after which any job may kill other jobs' tasks if it is below half - * its fair share. + * Get a queue's fair share preemption timeout configured in the allocation + * file, in milliseconds. Return -1 if not set. */ - public long getFairSharePreemptionTimeout() { - return fairSharePreemptionTimeout; + public long getFairSharePreemptionTimeout(String queueName) { + Long fairSharePreemptionTimeout = fairSharePreemptionTimeouts.get(queueName); + return (fairSharePreemptionTimeout == null) ? + -1 : fairSharePreemptionTimeout; } public ResourceWeights getQueueWeight(String queue) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java index 4cc88c140d..970ee9956d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java @@ -217,27 +217,28 @@ public synchronized void reloadAllocations() throws IOException, Map queueWeights = new HashMap(); Map queuePolicies = new HashMap(); Map minSharePreemptionTimeouts = new HashMap(); + Map fairSharePreemptionTimeouts = new HashMap(); Map> queueAcls = new HashMap>(); int userMaxAppsDefault = Integer.MAX_VALUE; int queueMaxAppsDefault = Integer.MAX_VALUE; float queueMaxAMShareDefault = -1.0f; - long fairSharePreemptionTimeout = Long.MAX_VALUE; + long defaultFairSharePreemptionTimeout = Long.MAX_VALUE; long defaultMinSharePreemptionTimeout = Long.MAX_VALUE; SchedulingPolicy defaultSchedPolicy = SchedulingPolicy.DEFAULT_POLICY; - + QueuePlacementPolicy newPlacementPolicy = null; // Remember all queue names so we can display them on web UI, etc. // configuredQueues is segregated based on whether it is a leaf queue // or a parent queue. This information is used for creating queues // and also for making queue placement decisions(QueuePlacementRule.java). - Map> configuredQueues = + Map> configuredQueues = new HashMap>(); for (FSQueueType queueType : FSQueueType.values()) { configuredQueues.put(queueType, new HashSet()); } - + // Read and parse the allocations file. DocumentBuilderFactory docBuilderFactory = DocumentBuilderFactory.newInstance(); @@ -276,10 +277,16 @@ public synchronized void reloadAllocations() throws IOException, String text = ((Text)element.getFirstChild()).getData().trim(); int val = Integer.parseInt(text); userMaxAppsDefault = val; - } else if ("fairSharePreemptionTimeout".equals(element.getTagName())) { + } else if ("defaultFairSharePreemptionTimeout".equals(element.getTagName())) { String text = ((Text)element.getFirstChild()).getData().trim(); long val = Long.parseLong(text) * 1000L; - fairSharePreemptionTimeout = val; + defaultFairSharePreemptionTimeout = val; + } else if ("fairSharePreemptionTimeout".equals(element.getTagName())) { + if (defaultFairSharePreemptionTimeout == Long.MAX_VALUE) { + String text = ((Text)element.getFirstChild()).getData().trim(); + long val = Long.parseLong(text) * 1000L; + defaultFairSharePreemptionTimeout = val; + } } else if ("defaultMinSharePreemptionTimeout".equals(element.getTagName())) { String text = ((Text)element.getFirstChild()).getData().trim(); long val = Long.parseLong(text) * 1000L; @@ -304,7 +311,7 @@ public synchronized void reloadAllocations() throws IOException, } } } - + // Load queue elements. A root queue can either be included or omitted. If // it's included, all other queues must be inside it. for (Element element : queueElements) { @@ -318,10 +325,10 @@ public synchronized void reloadAllocations() throws IOException, } loadQueue(parent, element, minQueueResources, maxQueueResources, queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights, - queuePolicies, minSharePreemptionTimeouts, queueAcls, - configuredQueues); + queuePolicies, minSharePreemptionTimeouts, fairSharePreemptionTimeouts, + queueAcls, configuredQueues); } - + // Load placement policy and pass it configured queues Configuration conf = getConfig(); if (placementPolicyElement != null) { @@ -331,11 +338,22 @@ public synchronized void reloadAllocations() throws IOException, newPlacementPolicy = QueuePlacementPolicy.fromConfiguration(conf, configuredQueues); } - - AllocationConfiguration info = new AllocationConfiguration(minQueueResources, maxQueueResources, - queueMaxApps, userMaxApps, queueWeights, queueMaxAMShares, userMaxAppsDefault, - queueMaxAppsDefault, queueMaxAMShareDefault, queuePolicies, defaultSchedPolicy, minSharePreemptionTimeouts, - queueAcls, fairSharePreemptionTimeout, defaultMinSharePreemptionTimeout, + + // Set the min/fair share preemption timeout for the root queue + if (!minSharePreemptionTimeouts.containsKey(QueueManager.ROOT_QUEUE)){ + minSharePreemptionTimeouts.put(QueueManager.ROOT_QUEUE, + defaultMinSharePreemptionTimeout); + } + if (!fairSharePreemptionTimeouts.containsKey(QueueManager.ROOT_QUEUE)) { + fairSharePreemptionTimeouts.put(QueueManager.ROOT_QUEUE, + defaultFairSharePreemptionTimeout); + } + + AllocationConfiguration info = new AllocationConfiguration(minQueueResources, + maxQueueResources, queueMaxApps, userMaxApps, queueWeights, + queueMaxAMShares, userMaxAppsDefault, queueMaxAppsDefault, + queueMaxAMShareDefault, queuePolicies, defaultSchedPolicy, + minSharePreemptionTimeouts, fairSharePreemptionTimeouts, queueAcls, newPlacementPolicy, configuredQueues); lastSuccessfulReload = clock.getTime(); @@ -353,6 +371,7 @@ private void loadQueue(String parentName, Element element, Map Map queueWeights, Map queuePolicies, Map minSharePreemptionTimeouts, + Map fairSharePreemptionTimeouts, Map> queueAcls, Map> configuredQueues) throws AllocationConfigurationException { @@ -395,6 +414,10 @@ private void loadQueue(String parentName, Element element, Map String text = ((Text)field.getFirstChild()).getData().trim(); long val = Long.parseLong(text) * 1000L; minSharePreemptionTimeouts.put(queueName, val); + } else if ("fairSharePreemptionTimeout".equals(field.getTagName())) { + String text = ((Text)field.getFirstChild()).getData().trim(); + long val = Long.parseLong(text) * 1000L; + fairSharePreemptionTimeouts.put(queueName, val); } else if ("schedulingPolicy".equals(field.getTagName()) || "schedulingMode".equals(field.getTagName())) { String text = ((Text)field.getFirstChild()).getData().trim(); @@ -410,8 +433,8 @@ private void loadQueue(String parentName, Element element, Map "pool".equals(field.getTagName())) { loadQueue(queueName, field, minQueueResources, maxQueueResources, queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights, - queuePolicies, minSharePreemptionTimeouts, queueAcls, - configuredQueues); + queuePolicies, minSharePreemptionTimeouts, + fairSharePreemptionTimeouts, queueAcls, configuredQueues); configuredQueues.get(FSQueueType.PARENT).add(queueName); isLeaf = false; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java index 26a706c7f0..1209970ecc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java @@ -77,6 +77,15 @@ public void recomputeSteadyShares() { } } + @Override + public void updatePreemptionTimeouts() { + super.updatePreemptionTimeouts(); + // For child queues + for (FSQueue childQueue : childQueues) { + childQueue.updatePreemptionTimeouts(); + } + } + @Override public Resource getDemand() { return demand; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java index 00f0795e1d..b9fcc4bbd9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java @@ -52,6 +52,9 @@ public abstract class FSQueue implements Queue, Schedulable { protected SchedulingPolicy policy = SchedulingPolicy.DEFAULT_POLICY; + private long fairSharePreemptionTimeout = Long.MAX_VALUE; + private long minSharePreemptionTimeout = Long.MAX_VALUE; + public FSQueue(String name, FairScheduler scheduler, FSParentQueue parent) { this.name = name; this.scheduler = scheduler; @@ -166,13 +169,47 @@ public void setSteadyFairShare(Resource steadyFairShare) { public boolean hasAccess(QueueACL acl, UserGroupInformation user) { return scheduler.getAllocationConfiguration().hasAccess(name, acl, user); } - + + public long getFairSharePreemptionTimeout() { + return fairSharePreemptionTimeout; + } + + public void setFairSharePreemptionTimeout(long fairSharePreemptionTimeout) { + this.fairSharePreemptionTimeout = fairSharePreemptionTimeout; + } + + public long getMinSharePreemptionTimeout() { + return minSharePreemptionTimeout; + } + + public void setMinSharePreemptionTimeout(long minSharePreemptionTimeout) { + this.minSharePreemptionTimeout = minSharePreemptionTimeout; + } + /** * Recomputes the shares for all child queues and applications based on this * queue's current share */ public abstract void recomputeShares(); + /** + * Update the min/fair share preemption timeouts for this queue. + */ + public void updatePreemptionTimeouts() { + // For min share + minSharePreemptionTimeout = scheduler.getAllocationConfiguration() + .getMinSharePreemptionTimeout(getName()); + if (minSharePreemptionTimeout == -1 && parent != null) { + minSharePreemptionTimeout = parent.getMinSharePreemptionTimeout(); + } + // For fair share + fairSharePreemptionTimeout = scheduler.getAllocationConfiguration() + .getFairSharePreemptionTimeout(getName()); + if (fairSharePreemptionTimeout == -1 && parent != null) { + fairSharePreemptionTimeout = parent.getFairSharePreemptionTimeout(); + } + } + /** * Gets the children of this queue, if any. */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 6932a31595..2798b8d5f3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -506,9 +506,8 @@ protected void warnOrKillContainer(RMContainer container) { * identical for some reason). */ protected Resource resToPreempt(FSLeafQueue sched, long curTime) { - String queue = sched.getName(); - long minShareTimeout = allocConf.getMinSharePreemptionTimeout(queue); - long fairShareTimeout = allocConf.getFairSharePreemptionTimeout(); + long minShareTimeout = sched.getMinSharePreemptionTimeout(); + long fairShareTimeout = sched.getFairSharePreemptionTimeout(); Resource resDueToMinShare = Resources.none(); Resource resDueToFairShare = Resources.none(); if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java index 490ba68659..2444ba422d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java @@ -181,6 +181,7 @@ private FSQueue createQueue(String name, FSQueueType queueType) { parent.addChildQueue(leafQueue); queues.put(leafQueue.getName(), leafQueue); leafQueues.add(leafQueue); + setPreemptionTimeout(leafQueue, parent, queueConf); return leafQueue; } else { FSParentQueue newParent = new FSParentQueue(queueName, scheduler, parent); @@ -192,6 +193,7 @@ private FSQueue createQueue(String name, FSQueueType queueType) { } parent.addChildQueue(newParent); queues.put(newParent.getName(), newParent); + setPreemptionTimeout(newParent, parent, queueConf); parent = newParent; } } @@ -199,6 +201,29 @@ private FSQueue createQueue(String name, FSQueueType queueType) { return parent; } + /** + * Set the min/fair share preemption timeouts for the given queue. + * If the timeout is configured in the allocation file, the queue will use + * that value; otherwise, the queue inherits the value from its parent queue. + */ + private void setPreemptionTimeout(FSQueue queue, + FSParentQueue parentQueue, AllocationConfiguration queueConf) { + // For min share + long minSharePreemptionTimeout = + queueConf.getMinSharePreemptionTimeout(queue.getQueueName()); + if (minSharePreemptionTimeout == -1) { + minSharePreemptionTimeout = parentQueue.getMinSharePreemptionTimeout(); + } + queue.setMinSharePreemptionTimeout(minSharePreemptionTimeout); + // For fair share + long fairSharePreemptionTimeout = + queueConf.getFairSharePreemptionTimeout(queue.getQueueName()); + if (fairSharePreemptionTimeout == -1) { + fairSharePreemptionTimeout = parentQueue.getFairSharePreemptionTimeout(); + } + queue.setFairSharePreemptionTimeout(fairSharePreemptionTimeout); + } + /** * Make way for the given queue if possible, by removing incompatible * queues with no apps in them. Incompatibility could be due to @@ -384,5 +409,7 @@ public void updateAllocationConfiguration(AllocationConfiguration queueConf) { // Update steady fair shares for all queues rootQueue.recomputeSteadyShares(); + // Update the fair share preemption timeouts for all queues recursively + rootQueue.updatePreemptionTimeouts(); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java index 2a4992c32a..14b3111c07 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java @@ -186,9 +186,14 @@ public void testAllocationFileParsing() throws Exception { //Make queue F a parent queue without configured leaf queues using the 'type' attribute out.println(""); out.println(""); - //Create hierarchical queues G,H + // Create hierarchical queues G,H, with different min/fair share preemption + // timeouts out.println(""); + out.println("120"); + out.println("50"); out.println(" "); + out.println(" 180"); + out.println(" 40"); out.println(" "); out.println(""); // Set default limit of apps per queue to 15 @@ -204,8 +209,8 @@ public void testAllocationFileParsing() throws Exception { // Set default min share preemption timeout to 2 minutes out.println("120" + ""); - // Set fair share preemption timeout to 5 minutes - out.println("300"); + // Set default fair share preemption timeout to 5 minutes + out.println("300"); // Set default scheduling policy to DRF out.println("drf"); out.println(""); @@ -270,16 +275,30 @@ public void testAllocationFileParsing() throws Exception { assertEquals("alice,bob admins", queueConf.getQueueAcl("root.queueC", QueueACL.SUBMIT_APPLICATIONS).getAclString()); - assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root." + + assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root")); + assertEquals(-1, queueConf.getMinSharePreemptionTimeout("root." + YarnConfiguration.DEFAULT_QUEUE_NAME)); - assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root.queueA")); - assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root.queueB")); - assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root.queueC")); - assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root.queueD")); - assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root.queueA")); + assertEquals(-1, queueConf.getMinSharePreemptionTimeout("root.queueA")); + assertEquals(-1, queueConf.getMinSharePreemptionTimeout("root.queueB")); + assertEquals(-1, queueConf.getMinSharePreemptionTimeout("root.queueC")); + assertEquals(-1, queueConf.getMinSharePreemptionTimeout("root.queueD")); assertEquals(60000, queueConf.getMinSharePreemptionTimeout("root.queueE")); - assertEquals(300000, queueConf.getFairSharePreemptionTimeout()); - + assertEquals(-1, queueConf.getMinSharePreemptionTimeout("root.queueF")); + assertEquals(50000, queueConf.getMinSharePreemptionTimeout("root.queueG")); + assertEquals(40000, queueConf.getMinSharePreemptionTimeout("root.queueG.queueH")); + + assertEquals(300000, queueConf.getFairSharePreemptionTimeout("root")); + assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root." + + YarnConfiguration.DEFAULT_QUEUE_NAME)); + assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueA")); + assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueB")); + assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueC")); + assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueD")); + assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueE")); + assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueF")); + assertEquals(120000, queueConf.getFairSharePreemptionTimeout("root.queueG")); + assertEquals(180000, queueConf.getFairSharePreemptionTimeout("root.queueG.queueH")); + assertTrue(queueConf.getConfiguredQueues() .get(FSQueueType.PARENT) .contains("root.queueF")); @@ -393,16 +412,23 @@ public void testBackwardsCompatibleAllocationFileParsing() throws Exception { assertEquals("alice,bob admins", queueConf.getQueueAcl("root.queueC", QueueACL.SUBMIT_APPLICATIONS).getAclString()); - - assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root." + + assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root")); + assertEquals(-1, queueConf.getMinSharePreemptionTimeout("root." + YarnConfiguration.DEFAULT_QUEUE_NAME)); - assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root.queueA")); - assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root.queueB")); - assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root.queueC")); - assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root.queueD")); - assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root.queueA")); + assertEquals(-1, queueConf.getMinSharePreemptionTimeout("root.queueA")); + assertEquals(-1, queueConf.getMinSharePreemptionTimeout("root.queueB")); + assertEquals(-1, queueConf.getMinSharePreemptionTimeout("root.queueC")); + assertEquals(-1, queueConf.getMinSharePreemptionTimeout("root.queueD")); assertEquals(60000, queueConf.getMinSharePreemptionTimeout("root.queueE")); - assertEquals(300000, queueConf.getFairSharePreemptionTimeout()); + + assertEquals(300000, queueConf.getFairSharePreemptionTimeout("root")); + assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root." + + YarnConfiguration.DEFAULT_QUEUE_NAME)); + assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueA")); + assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueB")); + assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueC")); + assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueD")); + assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueE")); } @Test diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 79e3184e79..6e0127dad4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -1059,7 +1059,11 @@ public void testConfigureRootQueue() throws Exception { out.println(" "); out.println(" 1024mb,4vcores"); out.println(" "); + out.println(" 100"); + out.println(" 120"); out.println(""); + out.println("300"); + out.println("200"); out.println(""); out.close(); @@ -1073,6 +1077,9 @@ public void testConfigureRootQueue() throws Exception { assertNotNull(queueManager.getLeafQueue("child1", false)); assertNotNull(queueManager.getLeafQueue("child2", false)); + + assertEquals(100000, root.getFairSharePreemptionTimeout()); + assertEquals(120000, root.getMinSharePreemptionTimeout()); } @Test (timeout = 5000) @@ -1378,7 +1385,7 @@ public void testPreemptionIsNotDelayedToNextRound() throws Exception { out.println(""); out.println("2"); out.println(""); - out.print("10"); + out.print("10"); out.println(""); out.close(); @@ -1462,7 +1469,7 @@ public void testPreemptionDecision() throws Exception { out.println("1024mb,0vcores"); out.println(""); out.print("5"); - out.print("10"); + out.print("10"); out.println(""); out.close(); @@ -1489,7 +1496,6 @@ public void testPreemptionDecision() throws Exception { NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3); scheduler.handle(nodeEvent3); - // Queue A and B each request three containers ApplicationAttemptId app1 = createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 1); @@ -1563,6 +1569,279 @@ public void testPreemptionDecision() throws Exception { 1536, scheduler.resToPreempt(schedD, clock.getTime()).getMemory()); } + @Test + /** + * Tests the various timing of decision to preempt tasks. + */ + public void testPreemptionDecisionWithVariousTimeout() throws Exception { + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + MockClock clock = new MockClock(); + scheduler.setClock(clock); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println("0mb,0vcores"); + out.println(""); + out.println(""); + out.println("1"); + out.println("1024mb,0vcores"); + out.println(""); + out.println(""); + out.println("2"); + out.println("10"); + out.println("25"); + out.println(""); + out.println("1024mb,0vcores"); + out.println("5"); + out.println(""); + out.println(""); + out.println("1024mb,0vcores"); + out.println("20"); + out.println(""); + out.println(""); + out.println(""); + out.println("1"); + out.println("1024mb,0vcores"); + out.println(""); + out.print("15"); + out.print("30"); + out.println(""); + out.close(); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Check the min/fair share preemption timeout for each queue + QueueManager queueMgr = scheduler.getQueueManager(); + assertEquals(30000, queueMgr.getQueue("root") + .getFairSharePreemptionTimeout()); + assertEquals(30000, queueMgr.getQueue("default") + .getFairSharePreemptionTimeout()); + assertEquals(30000, queueMgr.getQueue("queueA") + .getFairSharePreemptionTimeout()); + assertEquals(25000, queueMgr.getQueue("queueB") + .getFairSharePreemptionTimeout()); + assertEquals(25000, queueMgr.getQueue("queueB.queueB1") + .getFairSharePreemptionTimeout()); + assertEquals(20000, queueMgr.getQueue("queueB.queueB2") + .getFairSharePreemptionTimeout()); + assertEquals(30000, queueMgr.getQueue("queueC") + .getFairSharePreemptionTimeout()); + assertEquals(15000, queueMgr.getQueue("root") + .getMinSharePreemptionTimeout()); + assertEquals(15000, queueMgr.getQueue("default") + .getMinSharePreemptionTimeout()); + assertEquals(15000, queueMgr.getQueue("queueA") + .getMinSharePreemptionTimeout()); + assertEquals(10000, queueMgr.getQueue("queueB") + .getMinSharePreemptionTimeout()); + assertEquals(5000, queueMgr.getQueue("queueB.queueB1") + .getMinSharePreemptionTimeout()); + assertEquals(10000, queueMgr.getQueue("queueB.queueB2") + .getMinSharePreemptionTimeout()); + assertEquals(15000, queueMgr.getQueue("queueC") + .getMinSharePreemptionTimeout()); + + // Create one big node + RMNode node1 = + MockNodes.newNodeInfo(1, Resources.createResource(6 * 1024, 6), 1, + "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + // Queue A takes all resources + for (int i = 0; i < 6; i ++) { + createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 1); + } + + scheduler.update(); + + // Sufficient node check-ins to fully schedule containers + NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1); + for (int i = 0; i < 6; i++) { + scheduler.handle(nodeUpdate1); + } + + // Now new requests arrive from queues B1, B2 and C + createSchedulingRequest(1 * 1024, "queueB.queueB1", "user1", 1, 1); + createSchedulingRequest(1 * 1024, "queueB.queueB1", "user1", 1, 2); + createSchedulingRequest(1 * 1024, "queueB.queueB1", "user1", 1, 3); + createSchedulingRequest(1 * 1024, "queueB.queueB2", "user1", 1, 1); + createSchedulingRequest(1 * 1024, "queueB.queueB2", "user1", 1, 2); + createSchedulingRequest(1 * 1024, "queueB.queueB2", "user1", 1, 3); + createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 1); + createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 2); + createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 3); + + scheduler.update(); + + FSLeafQueue queueB1 = queueMgr.getLeafQueue("queueB.queueB1", true); + FSLeafQueue queueB2 = queueMgr.getLeafQueue("queueB.queueB2", true); + FSLeafQueue queueC = queueMgr.getLeafQueue("queueC", true); + + assertTrue(Resources.equals( + Resources.none(), scheduler.resToPreempt(queueB1, clock.getTime()))); + assertTrue(Resources.equals( + Resources.none(), scheduler.resToPreempt(queueB2, clock.getTime()))); + assertTrue(Resources.equals( + Resources.none(), scheduler.resToPreempt(queueC, clock.getTime()))); + + // After 5 seconds, queueB1 wants to preempt min share + scheduler.update(); + clock.tick(6); + assertEquals( + 1024, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory()); + assertEquals( + 0, scheduler.resToPreempt(queueB2, clock.getTime()).getMemory()); + assertEquals( + 0, scheduler.resToPreempt(queueC, clock.getTime()).getMemory()); + + // After 10 seconds, queueB2 wants to preempt min share + scheduler.update(); + clock.tick(5); + assertEquals( + 1024, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory()); + assertEquals( + 1024, scheduler.resToPreempt(queueB2, clock.getTime()).getMemory()); + assertEquals( + 0, scheduler.resToPreempt(queueC, clock.getTime()).getMemory()); + + // After 15 seconds, queueC wants to preempt min share + scheduler.update(); + clock.tick(5); + assertEquals( + 1024, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory()); + assertEquals( + 1024, scheduler.resToPreempt(queueB2, clock.getTime()).getMemory()); + assertEquals( + 1024, scheduler.resToPreempt(queueC, clock.getTime()).getMemory()); + + // After 20 seconds, queueB2 should want to preempt fair share + scheduler.update(); + clock.tick(5); + assertEquals( + 1024, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory()); + assertEquals( + 1536, scheduler.resToPreempt(queueB2, clock.getTime()).getMemory()); + assertEquals( + 1024, scheduler.resToPreempt(queueC, clock.getTime()).getMemory()); + + // After 25 seconds, queueB1 should want to preempt fair share + scheduler.update(); + clock.tick(5); + assertEquals( + 1536, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory()); + assertEquals( + 1536, scheduler.resToPreempt(queueB2, clock.getTime()).getMemory()); + assertEquals( + 1024, scheduler.resToPreempt(queueC, clock.getTime()).getMemory()); + + // After 30 seconds, queueC should want to preempt fair share + scheduler.update(); + clock.tick(5); + assertEquals( + 1536, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory()); + assertEquals( + 1536, scheduler.resToPreempt(queueB2, clock.getTime()).getMemory()); + assertEquals( + 1536, scheduler.resToPreempt(queueC, clock.getTime()).getMemory()); + } + + @Test + public void testBackwardsCompatiblePreemptionConfiguration() throws Exception { + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + MockClock clock = new MockClock(); + scheduler.setClock(clock); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println("5"); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.print("15"); + out.print("30"); + out.print("40"); + out.println(""); + out.close(); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Check the min/fair share preemption timeout for each queue + QueueManager queueMgr = scheduler.getQueueManager(); + assertEquals(30000, queueMgr.getQueue("root") + .getFairSharePreemptionTimeout()); + assertEquals(30000, queueMgr.getQueue("default") + .getFairSharePreemptionTimeout()); + assertEquals(30000, queueMgr.getQueue("queueA") + .getFairSharePreemptionTimeout()); + assertEquals(30000, queueMgr.getQueue("queueB") + .getFairSharePreemptionTimeout()); + assertEquals(30000, queueMgr.getQueue("queueB.queueB1") + .getFairSharePreemptionTimeout()); + assertEquals(30000, queueMgr.getQueue("queueB.queueB2") + .getFairSharePreemptionTimeout()); + assertEquals(30000, queueMgr.getQueue("queueC") + .getFairSharePreemptionTimeout()); + assertEquals(15000, queueMgr.getQueue("root") + .getMinSharePreemptionTimeout()); + assertEquals(15000, queueMgr.getQueue("default") + .getMinSharePreemptionTimeout()); + assertEquals(15000, queueMgr.getQueue("queueA") + .getMinSharePreemptionTimeout()); + assertEquals(15000, queueMgr.getQueue("queueB") + .getMinSharePreemptionTimeout()); + assertEquals(5000, queueMgr.getQueue("queueB.queueB1") + .getMinSharePreemptionTimeout()); + assertEquals(15000, queueMgr.getQueue("queueB.queueB2") + .getMinSharePreemptionTimeout()); + assertEquals(15000, queueMgr.getQueue("queueC") + .getMinSharePreemptionTimeout()); + + // If both exist, we take the default one + out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println("5"); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.print("15"); + out.print("25"); + out.print("30"); + out.println(""); + out.close(); + + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + assertEquals(25000, queueMgr.getQueue("root") + .getFairSharePreemptionTimeout()); + } + @Test (timeout = 5000) public void testMultipleContainersWaitingForReservation() throws IOException { scheduler.init(conf); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm index a3edadeccc..bd28bfff3f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm @@ -271,6 +271,11 @@ Allocation file format * minSharePreemptionTimeout: number of seconds the queue is under its minimum share before it will try to preempt containers to take resources from other queues. + If not set, the queue will inherit the value from its parent queue. + + * fairSharePreemptionTimeout: number of seconds the queue is under its fair share + threshold before it will try to preempt containers to take resources from other + queues. If not set, the queue will inherit the value from its parent queue. * <>, which represent settings governing the behavior of individual users. They can contain a single property: maxRunningApps, a limit on the @@ -279,14 +284,13 @@ Allocation file format * <>, which sets the default running app limit for any users whose limit is not otherwise specified. - * <>, number of seconds a queue is under - its fair share before it will try to preempt containers to take resources from - other queues. + * <>, which sets the fair share + preemption timeout for the root queue; overridden by fairSharePreemptionTimeout + element in root queue. - * <>, which sets the default number - of seconds the queue is under its minimum share before it will try to preempt - containers to take resources from other queues; overriden by - minSharePreemptionTimeout element in each queue if specified. + * <>, which sets the min share + preemption timeout for the root queue; overridden by minSharePreemptionTimeout + element in root queue. * <>, which sets the default running app limit for queues; overriden by maxRunningApps element in each queue.