From 11be3f70e029c2324b167563168c8a254d234aef Mon Sep 17 00:00:00 2001 From: Karthik Kambatla Date: Wed, 15 Feb 2017 23:51:22 -0800 Subject: [PATCH] YARN-4212. FairScheduler: Can't create a DRF queue under a FAIR policy queue. (Yufei Gu via kasha) --- .../fair/AllocationConfiguration.java | 11 +- .../scheduler/fair/FSLeafQueue.java | 9 - .../scheduler/fair/FSParentQueue.java | 13 - .../scheduler/fair/FSQueue.java | 50 ++- .../scheduler/fair/QueueManager.java | 28 +- .../scheduler/fair/SchedulingPolicy.java | 36 +-- .../DominantResourceFairnessPolicy.java | 5 - .../fair/policies/FairSharePolicy.java | 15 +- .../scheduler/fair/policies/FifoPolicy.java | 14 +- .../scheduler/fair/TestFSAppStarvation.java | 10 +- .../scheduler/fair/TestFairScheduler.java | 1 - .../scheduler/fair/TestSchedulingPolicy.java | 302 ++++++++++++++---- 12 files changed, 340 insertions(+), 154 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java index 7bd2616813..f143aa6504 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java @@ -408,9 +408,8 @@ public void setAverageCapacity(int avgCapacity) { * Initialize a {@link FSQueue} with queue-specific properties and its * metrics. * @param queue the FSQueue needed to be initialized - * @param scheduler the scheduler which the queue belonged to */ - public void initFSQueue(FSQueue queue, FairScheduler scheduler){ + public void initFSQueue(FSQueue queue){ // Set queue-specific properties. String name = queue.getName(); queue.setWeights(getQueueWeight(name)); @@ -419,14 +418,6 @@ public void initFSQueue(FSQueue queue, FairScheduler scheduler){ queue.setMaxRunningApps(getQueueMaxApps(name)); queue.setMaxAMShare(getQueueMaxAMShare(name)); queue.setMaxChildQueueResource(getMaxChildResources(name)); - try { - SchedulingPolicy policy = getSchedulingPolicy(name); - policy.initialize(scheduler.getClusterResource()); - queue.setPolicy(policy); - } catch (AllocationConfigurationException ex) { - LOG.warn("Failed to set the scheduling policy " - + getDefaultSchedulingPolicy(), ex); - } // Set queue metrics. queue.getMetrics().setMinShare(getMinResources(name)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index c4b2de6b24..59bde5b011 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -197,15 +197,6 @@ public void collectSchedulerApplications( } } - @Override - public void setPolicy(SchedulingPolicy policy) - throws AllocationConfigurationException { - if (!SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_LEAF)) { - throwPolicyDoesnotApplyException(policy); - } - super.policy = policy; - } - @Override public void updateInternal(boolean checkStarvation) { readLock.lock(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java index 2528f3daea..1c8e9ced59 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java @@ -239,19 +239,6 @@ public List getChildQueues() { } } - @Override - public void setPolicy(SchedulingPolicy policy) - throws AllocationConfigurationException { - boolean allowed = - SchedulingPolicy.isApplicableTo(policy, (parent == null) - ? SchedulingPolicy.DEPTH_ROOT - : SchedulingPolicy.DEPTH_INTERMEDIATE); - if (!allowed) { - throwPolicyDoesnotApplyException(policy); - } - super.policy = policy; - } - void incrementRunnableApps() { writeLock.lock(); try { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java index ee4c35aa24..7e8b85872a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java @@ -91,20 +91,23 @@ public FSQueue(String name, FairScheduler scheduler, FSParentQueue parent) { this.queueEntity = new PrivilegedEntity(EntityType.QUEUE, name); this.metrics = FSQueueMetrics.forQueue(getName(), parent, true, scheduler.getConf()); this.parent = parent; + setPolicy(scheduler.getAllocationConfiguration().getSchedulingPolicy(name)); reinit(false); } /** * Initialize a queue by setting its queue-specific properties and its - * metrics. - * This function is invoked when a new queue is created or reloading the - * allocation configuration. + * metrics. This method is invoked when creating a new queue or reloading + * the allocation file. + * This method does not set policies for queues when reloading the allocation + * file since we need to either set all new policies or nothing, which is + * handled by method {@link #verifyAndSetPolicyFromConf}. * * @param recursive whether child queues should be reinitialized recursively */ public void reinit(boolean recursive) { AllocationConfiguration allocConf = scheduler.getAllocationConfiguration(); - allocConf.initFSQueue(this, scheduler); + allocConf.initFSQueue(this); updatePreemptionVariables(); if (recursive) { @@ -131,15 +134,11 @@ public FSParentQueue getParent() { return parent; } - protected void throwPolicyDoesnotApplyException(SchedulingPolicy policy) - throws AllocationConfigurationException { - throw new AllocationConfigurationException("SchedulingPolicy " + policy - + " does not apply to queue " + getName()); + public void setPolicy(SchedulingPolicy policy) { + policy.initialize(scheduler.getClusterResource()); + this.policy = policy; } - public abstract void setPolicy(SchedulingPolicy policy) - throws AllocationConfigurationException; - public void setWeights(ResourceWeights weights){ this.weights = weights; } @@ -463,4 +462,33 @@ boolean fitsInMaxShare(Resource additionalResource) { } return true; } + + /** + * Recursively check policies for queues in pre-order. Get queue policies + * from the allocation file instead of properties of {@link FSQueue} objects. + * Set the policy for current queue if there is no policy violation for its + * children. This method is invoked while reloading the allocation file. + * + * @param queueConf allocation configuration + * @return true if no policy violation and successfully set polices + * for queues; false otherwise + */ + public boolean verifyAndSetPolicyFromConf(AllocationConfiguration queueConf) { + SchedulingPolicy queuePolicy = queueConf.getSchedulingPolicy(getName()); + + for (FSQueue child : getChildQueues()) { + if (!queuePolicy.isChildPolicyAllowed( + queueConf.getSchedulingPolicy(child.getName()))) { + return false; + } + boolean success = child.verifyAndSetPolicyFromConf(queueConf); + if (!success) { + return false; + } + } + + // Set the policy if no policy violation for all children + setPolicy(queuePolicy); + return true; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java index 934bcfd4f2..3c601fa1a1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java @@ -35,6 +35,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy; import org.xml.sax.SAXException; import com.google.common.base.CharMatcher; @@ -42,10 +43,10 @@ import java.util.Iterator; import java.util.Set; import org.apache.hadoop.yarn.api.records.Resource; + /** * Maintains a list of queues as well as scheduling parameters for each queue, * such as guaranteed share allocations, from the fair scheduler config file. - * */ @Private @Unstable @@ -72,6 +73,9 @@ public FSParentQueue getRootQueue() { public void initialize(Configuration conf) throws IOException, SAXException, AllocationConfigurationException, ParserConfigurationException { + // Policies of root and default queue are set to + // SchedulingPolicy.DEFAULT_POLICY since the allocation file hasn't been + // loaded yet. rootQueue = new FSParentQueue("root", scheduler, null); queues.put(rootQueue.getName(), rootQueue); @@ -80,7 +84,7 @@ public void initialize(Configuration conf) throws IOException, // Recursively reinitialize to propagate queue properties rootQueue.reinit(true); } - + /** * Get a leaf queue by name, creating it if the create param is true and is necessary. * If the queue is not or can not be a leaf queue, i.e. it already exists as a @@ -272,12 +276,25 @@ private FSQueue createNewQueues(FSQueueType queueType, FSParentQueue newParent = null; String queueName = i.next(); + // Check if child policy is allowed + SchedulingPolicy childPolicy = scheduler.getAllocationConfiguration(). + getSchedulingPolicy(queueName); + if (!parent.getPolicy().isChildPolicyAllowed(childPolicy)) { + LOG.error("Can't create queue '" + queueName + "'."); + return null; + } + // Only create a leaf queue at the very end if (!i.hasNext() && (queueType != FSQueueType.PARENT)) { FSLeafQueue leafQueue = new FSLeafQueue(queueName, scheduler, parent); leafQueues.add(leafQueue); queue = leafQueue; } else { + if (childPolicy instanceof FifoPolicy) { + LOG.error("Can't create queue '" + queueName + "', since " + + FifoPolicy.NAME + " is only for leaf queues."); + return null; + } newParent = new FSParentQueue(queueName, scheduler, parent); queue = newParent; } @@ -479,6 +496,13 @@ private String ensureRootPrefix(String name) { public void updateAllocationConfiguration(AllocationConfiguration queueConf) { // Create leaf queues and the parent queues in a leaf's ancestry if they do not exist synchronized (queues) { + // Verify and set scheduling policies for existing queues before creating + // any queue, since we need parent policies to determine if we can create + // its children. + if (!rootQueue.verifyAndSetPolicyFromConf(queueConf)) { + LOG.error("Setting scheduling policies for existing queues failed!"); + } + for (String name : queueConf.getConfiguredQueues().get( FSQueueType.LEAF)) { if (removeEmptyIncompatibleQueues(name, FSQueueType.LEAF)) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java index 9eda46cd8c..3fe36f3813 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java @@ -42,12 +42,6 @@ public abstract class SchedulingPolicy { public static final SchedulingPolicy DEFAULT_POLICY = getInstance(FairSharePolicy.class); - public static final byte DEPTH_LEAF = (byte) 1; - public static final byte DEPTH_INTERMEDIATE = (byte) 2; - public static final byte DEPTH_ROOT = (byte) 4; - public static final byte DEPTH_PARENT = (byte) 6; // Root and Intermediate - public static final byte DEPTH_ANY = (byte) 7; - /** * Returns a {@link SchedulingPolicy} instance corresponding to the passed clazz */ @@ -113,27 +107,6 @@ public void initialize(Resource clusterCapacity) {} */ public abstract String getName(); - /** - * Specifies the depths in the hierarchy, this {@link SchedulingPolicy} - * applies to - * - * @return depth equal to one of fields {@link SchedulingPolicy}#DEPTH_* - */ - public abstract byte getApplicableDepth(); - - /** - * Checks if the specified {@link SchedulingPolicy} can be used for a queue at - * the specified depth in the hierarchy - * - * @param policy {@link SchedulingPolicy} we are checking the - * depth-applicability for - * @param depth queue's depth in the hierarchy - * @return true if policy is applicable to passed depth, false otherwise - */ - public static boolean isApplicableTo(SchedulingPolicy policy, byte depth) { - return ((policy.getApplicableDepth() & depth) == depth) ? true : false; - } - /** * The comparator returned by this method is to be used for sorting the * {@link Schedulable}s in that queue. @@ -191,4 +164,13 @@ public abstract boolean checkIfUsageOverFairShare( public abstract Resource getHeadroom(Resource queueFairShare, Resource queueUsage, Resource maxAvailable); + /** + * Check whether the policy of a child queue is allowed. + * + * @param childPolicy the policy of child queue + * @return true if the child policy is allowed; false otherwise + */ + public boolean isChildPolicyAllowed(SchedulingPolicy childPolicy) { + return true; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java index ad41b11f80..6f04cb78ac 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java @@ -57,11 +57,6 @@ public String getName() { return NAME; } - @Override - public byte getApplicableDepth() { - return SchedulingPolicy.DEPTH_ANY; - } - @Override public Comparator getComparator() { return COMPARATOR; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java index d47ea07c65..9036a03c09 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java @@ -21,6 +21,8 @@ import java.util.Collection; import java.util.Comparator; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.Resource; @@ -40,6 +42,7 @@ @Private @Unstable public class FairSharePolicy extends SchedulingPolicy { + private static final Log LOG = LogFactory.getLog(FifoPolicy.class); @VisibleForTesting public static final String NAME = "fair"; private static final DefaultResourceCalculator RESOURCE_CALCULATOR = @@ -175,7 +178,15 @@ public boolean checkIfUsageOverFairShare(Resource usage, Resource fairShare) { } @Override - public byte getApplicableDepth() { - return SchedulingPolicy.DEPTH_ANY; + public boolean isChildPolicyAllowed(SchedulingPolicy childPolicy) { + if (childPolicy instanceof DominantResourceFairnessPolicy) { + LOG.error("Queue policy can't be " + DominantResourceFairnessPolicy.NAME + + " if the parent policy is " + getName() + ". Choose " + + getName() + " or " + FifoPolicy.NAME + " for child queues instead." + + " Please note that " + FifoPolicy.NAME + + " is only for leaf queues."); + return false; + } + return true; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java index 3e2cb9f0ae..7dd45cb9f3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java @@ -21,14 +21,14 @@ import java.util.Collection; import java.util.Comparator; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy; - - import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -38,6 +38,8 @@ @Private @Unstable public class FifoPolicy extends SchedulingPolicy { + private static final Log LOG = LogFactory.getLog(FifoPolicy.class); + @VisibleForTesting public static final String NAME = "FIFO"; private static final FifoComparator COMPARATOR = new FifoComparator(); @@ -127,9 +129,11 @@ public Resource getHeadroom(Resource queueFairShare, return headroom; } - @Override - public byte getApplicableDepth() { - return SchedulingPolicy.DEPTH_LEAF; + public boolean isChildPolicyAllowed(SchedulingPolicy childPolicy) { + LOG.error(getName() + " policy is only for leaf queues. Please choose " + + DominantResourceFairnessPolicy.NAME + " or " + FairSharePolicy.NAME + + " for parent queues."); + return false; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java index 3a79ac0891..2eacc9ee3b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java @@ -197,7 +197,7 @@ private void setupStarvedCluster() throws IOException { out.println("0" + ""); out.println("fair"); - addChildQueue(out); + addChildQueue(out, "fair"); out.println(""); // DRF queue with fairshare preemption enabled @@ -207,9 +207,10 @@ private void setupStarvedCluster() throws IOException { out.println("0" + ""); out.println("drf"); - addChildQueue(out); + addChildQueue(out, "drf"); out.println(""); - + out.println("drf" + + ""); out.println(""); out.close(); @@ -237,13 +238,14 @@ private void setupStarvedCluster() throws IOException { assertEquals(8, scheduler.getSchedulerApp(app).getLiveContainers().size()); } - private void addChildQueue(PrintWriter out) { + private void addChildQueue(PrintWriter out, String policy) { // Child queue under fairshare with same settings out.println(""); out.println("1" + ""); out.println("0" + ""); + out.println("" + policy + ""); out.println(""); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index da5d3ad601..0c3a614175 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -5164,5 +5164,4 @@ public void testUpdateDemand() throws IOException { Resources.equals(aQueue.getDemand(), maxResource) && Resources.equals(bQueue.getDemand(), maxResource)); } - } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java index bd49ccaa2e..8dccf6e613 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java @@ -18,9 +18,10 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; import java.util.Collection; import java.util.Comparator; import java.util.Stack; @@ -30,16 +31,29 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; -import org.mockito.Mockito; + +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertNotNull; public class TestSchedulingPolicy { private static final Log LOG = LogFactory.getLog(TestSchedulingPolicy.class); + private final static String ALLOC_FILE = + new File(FairSchedulerTestBase.TEST_DIR, "test-queues").getAbsolutePath(); + private FairSchedulerConfiguration conf; + private FairScheduler scheduler; + + @Before + public void setUp() throws Exception { + scheduler = new FairScheduler(); + conf = new FairSchedulerConfiguration(); + } @Test(timeout = 1000) public void testParseSchedulingPolicy() @@ -78,66 +92,6 @@ public void testParseSchedulingPolicy() sm.getName().equals(FifoPolicy.NAME)); } - /** - * Trivial tests that make sure - * {@link SchedulingPolicy#isApplicableTo(SchedulingPolicy, byte)} works as - * expected for the possible values of depth - * - * @throws AllocationConfigurationException - */ - @Test(timeout = 1000) - public void testIsApplicableTo() throws AllocationConfigurationException { - final String ERR = "Broken SchedulingPolicy#isApplicableTo"; - - // fifo - SchedulingPolicy policy = SchedulingPolicy.parse("fifo"); - assertTrue(ERR, - SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_LEAF)); - assertFalse(ERR, SchedulingPolicy.isApplicableTo( - SchedulingPolicy.parse("fifo"), SchedulingPolicy.DEPTH_INTERMEDIATE)); - assertFalse(ERR, SchedulingPolicy.isApplicableTo( - SchedulingPolicy.parse("fifo"), SchedulingPolicy.DEPTH_ROOT)); - - - // fair - policy = SchedulingPolicy.parse("fair"); - assertTrue(ERR, - SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_LEAF)); - assertTrue(ERR, SchedulingPolicy.isApplicableTo(policy, - SchedulingPolicy.DEPTH_INTERMEDIATE)); - assertTrue(ERR, - SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_ROOT)); - assertTrue(ERR, - SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_PARENT)); - assertTrue(ERR, - SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_ANY)); - - // drf - policy = SchedulingPolicy.parse("drf"); - assertTrue(ERR, - SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_LEAF)); - assertTrue(ERR, SchedulingPolicy.isApplicableTo(policy, - SchedulingPolicy.DEPTH_INTERMEDIATE)); - assertTrue(ERR, - SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_ROOT)); - assertTrue(ERR, - SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_PARENT)); - assertTrue(ERR, - SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_ANY)); - - policy = Mockito.mock(SchedulingPolicy.class); - Mockito.when(policy.getApplicableDepth()).thenReturn( - SchedulingPolicy.DEPTH_PARENT); - assertTrue(ERR, SchedulingPolicy.isApplicableTo(policy, - SchedulingPolicy.DEPTH_INTERMEDIATE)); - assertTrue(ERR, - SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_ROOT)); - assertTrue(ERR, - SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_PARENT)); - assertFalse(ERR, - SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_ANY)); - } - /** * Test whether {@link FairSharePolicy.FairShareComparator} is transitive. */ @@ -353,4 +307,222 @@ public boolean isPreemptable() { } } + @Test + public void testSchedulingPolicyViolation() throws IOException { + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println("fair"); + out.println(" "); + out.println(" drf"); + out.println(" "); + out.println(" "); + out.println(" fair"); + out.println(" "); + out.println(""); + out.println("drf" + + ""); + out.println(""); + out.close(); + + scheduler.init(conf); + + FSQueue child1 = scheduler.getQueueManager().getQueue("child1"); + assertNull("Queue 'child1' should be null since its policy isn't allowed to" + + " be 'drf' if its parent policy is 'fair'.", child1); + + // dynamic queue + FSQueue dynamicQueue = scheduler.getQueueManager(). + getLeafQueue("dynamicQueue", true); + assertNull("Dynamic queue should be null since it isn't allowed to be 'drf'" + + " policy if its parent policy is 'fair'.", dynamicQueue); + + // Set child1 to 'fair' and child2 to 'drf', the reload the allocation file. + out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println("fair"); + out.println(" "); + out.println(" fair"); + out.println(" "); + out.println(" "); + out.println(" drf"); + out.println(" "); + out.println(""); + out.println("drf" + + ""); + out.println(""); + out.close(); + + scheduler.reinitialize(conf, null); + child1 = scheduler.getQueueManager().getQueue("child1"); + assertNotNull("Queue 'child1' should be not null since its policy is " + + "allowed to be 'fair' if its parent policy is 'fair'.", child1); + + // Detect the policy violation of Child2, keep the original policy instead + // of setting the new policy. + FSQueue child2 = scheduler.getQueueManager().getQueue("child2"); + assertTrue("Queue 'child2' should be 'fair' since its new policy 'drf' " + + "is not allowed.", child2.getPolicy() instanceof FairSharePolicy); + } + + @Test + public void testSchedulingPolicyViolationInTheMiddleLevel() + throws IOException { + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println("fair"); + out.println(" "); + out.println(" fair"); + out.println(" "); + out.println(" drf"); + out.println(" "); + out.println(" fair"); + out.println(" "); + out.println(" "); + out.println(" "); + out.println(""); + out.println(""); + out.close(); + + scheduler.init(conf); + + FSQueue level2 = scheduler.getQueueManager().getQueue("level2"); + assertNotNull("Queue 'level2' shouldn't be null since its policy is allowed" + + " to be 'fair' if its parent policy is 'fair'.", level2); + FSQueue level3 = scheduler.getQueueManager().getQueue("level2.level3"); + assertNull("Queue 'level3' should be null since its policy isn't allowed" + + " to be 'drf' if its parent policy is 'fair'.", level3); + FSQueue leaf = scheduler.getQueueManager().getQueue("level2.level3.leaf"); + assertNull("Queue 'leaf' should be null since its parent failed to create.", + leaf); + } + + @Test + public void testFIFOPolicyOnlyForLeafQueues() + throws IOException { + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println(" "); + out.println(" fifo"); + out.println(" "); + out.println(" fair"); + out.println(" "); + out.println(" "); + out.println(""); + out.println(""); + out.close(); + + scheduler.init(conf); + + FSQueue intermediate = scheduler.getQueueManager().getQueue("intermediate"); + assertNull("Queue 'intermediate' should be null since 'fifo' is only for " + + "leaf queue.", intermediate); + + out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println(" "); + out.println(" fair"); + out.println(" "); + out.println(" fifo"); + out.println(" "); + out.println(" "); + out.println(""); + out.println(""); + out.close(); + + scheduler.reinitialize(conf, null); + + assertNotNull(scheduler.getQueueManager().getQueue("intermediate")); + + FSQueue leaf = scheduler.getQueueManager().getQueue("intermediate.leaf"); + assertNotNull("Queue 'leaf' should be null since 'fifo' is only for " + + "leaf queue.", leaf); + } + + @Test + public void testPolicyReinitilization() throws IOException { + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println("fair"); + out.println(" "); + out.println(" fair"); + out.println(" "); + out.println(" "); + out.println(" fair"); + out.println(" "); + out.println(""); + out.println(""); + out.close(); + + scheduler.init(conf); + + // Set child1 to 'drf' which is not allowed, then reload the allocation file + out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println("fair"); + out.println(" "); + out.println(" drf"); + out.println(" "); + out.println(" "); + out.println(" fifo"); + out.println(" "); + out.println(""); + out.println(""); + out.close(); + + scheduler.reinitialize(conf, null); + + FSQueue child1 = scheduler.getQueueManager().getQueue("child1"); + assertTrue("Queue 'child1' should still be 'fair' since 'drf' isn't allowed" + + " if its parent policy is 'fair'.", + child1.getPolicy() instanceof FairSharePolicy); + FSQueue child2 = scheduler.getQueueManager().getQueue("child2"); + assertTrue("Queue 'child2' should still be 'fair' there is a policy" + + " violation while reinitialization.", + child2.getPolicy() instanceof FairSharePolicy); + + // Set both child1 and root to 'drf', then reload the allocation file + out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println("drf"); + out.println(" "); + out.println(" drf"); + out.println(" "); + out.println(" "); + out.println(" fifo"); + out.println(" "); + out.println(""); + out.println(""); + out.close(); + + scheduler.reinitialize(conf, null); + + child1 = scheduler.getQueueManager().getQueue("child1"); + assertTrue("Queue 'child1' should be 'drf' since both 'root' and 'child1'" + + " are 'drf'.", + child1.getPolicy() instanceof DominantResourceFairnessPolicy); + child2 = scheduler.getQueueManager().getQueue("child2"); + assertTrue("Queue 'child2' should still be 'fifo' there is no policy" + + " violation while reinitialization.", + child2.getPolicy() instanceof FifoPolicy); + } }