diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java index 7985296fca..7f4fd91be4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java @@ -74,4 +74,8 @@ TempQueuePerPartition getQueueByPartition(String queueName, @Unstable IntraQueuePreemptionOrderPolicy getIntraQueuePreemptionOrderPolicy(); + + boolean getCrossQueuePreemptionConservativeDRF(); + + boolean getInQueuePreemptionConservativeDRF(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java index 28a74498af..d9e9091bc8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java @@ -116,7 +116,9 @@ public Map> selectCandidates( .tryPreemptContainerAndDeductResToObtain(rc, preemptionContext, resToObtainByPartition, c, clusterResource, selectedCandidates, curCandidates, - totalPreemptionAllowed, false); + totalPreemptionAllowed, + preemptionContext.getCrossQueuePreemptionConservativeDRF() + ); if (!preempted) { continue; } @@ -193,7 +195,8 @@ private void preemptAMContainers(Resource clusterResource, boolean preempted = CapacitySchedulerPreemptionUtils .tryPreemptContainerAndDeductResToObtain(rc, preemptionContext, resToObtainByPartition, c, clusterResource, preemptMap, - curCandidates, totalPreemptionAllowed, false); + curCandidates, totalPreemptionAllowed, + preemptionContext.getCrossQueuePreemptionConservativeDRF()); if (preempted) { Resources.subtractFrom(skippedAMSize, c.getAllocatedResource()); } @@ -229,7 +232,8 @@ private void preemptFrom(FiCaSchedulerApp app, CapacitySchedulerPreemptionUtils .tryPreemptContainerAndDeductResToObtain(rc, preemptionContext, resToObtainByPartition, c, clusterResource, selectedContainers, - curCandidates, totalPreemptionAllowed, false); + curCandidates, totalPreemptionAllowed, + preemptionContext.getCrossQueuePreemptionConservativeDRF()); if (!preemptionContext.isObserveOnly()) { preemptionContext.getRMContext().getDispatcher().getEventHandler() @@ -273,7 +277,8 @@ private void preemptFrom(FiCaSchedulerApp app, CapacitySchedulerPreemptionUtils .tryPreemptContainerAndDeductResToObtain(rc, preemptionContext, resToObtainByPartition, c, clusterResource, selectedContainers, - curCandidates, totalPreemptionAllowed, false); + curCandidates, totalPreemptionAllowed, + preemptionContext.getCrossQueuePreemptionConservativeDRF()); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java index 8a1b47b5de..cea1bca773 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java @@ -263,7 +263,8 @@ private void preemptFromLeastStarvedApp(LeafQueue leafQueue, boolean ret = CapacitySchedulerPreemptionUtils .tryPreemptContainerAndDeductResToObtain(rc, preemptionContext, resToObtainByPartition, c, clusterResource, selectedCandidates, - curCandidates, totalPreemptedResourceAllowed, true); + curCandidates, totalPreemptedResourceAllowed, + preemptionContext.getInQueuePreemptionConservativeDRF()); // Subtract from respective user's resource usage once a container is // selected for preemption. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index b343678fa6..b6e7c65cce 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -113,6 +113,9 @@ public enum IntraQueuePreemptionOrderPolicy { private float minimumThresholdForIntraQueuePreemption; private IntraQueuePreemptionOrderPolicy intraQueuePreemptionOrderPolicy; + private boolean crossQueuePreemptionConservativeDRF; + private boolean inQueuePreemptionConservativeDRF; + // Current configuration private CapacitySchedulerConfiguration csConfig; @@ -225,6 +228,18 @@ private void updateConfigIfNeeded() { CapacitySchedulerConfiguration.DEFAULT_INTRAQUEUE_PREEMPTION_ORDER_POLICY) .toUpperCase()); + crossQueuePreemptionConservativeDRF = config.getBoolean( + CapacitySchedulerConfiguration. + CROSS_QUEUE_PREEMPTION_CONSERVATIVE_DRF, + CapacitySchedulerConfiguration. + DEFAULT_CROSS_QUEUE_PREEMPTION_CONSERVATIVE_DRF); + + inQueuePreemptionConservativeDRF = config.getBoolean( + CapacitySchedulerConfiguration. + IN_QUEUE_PREEMPTION_CONSERVATIVE_DRF, + CapacitySchedulerConfiguration. + DEFAULT_IN_QUEUE_PREEMPTION_CONSERVATIVE_DRF); + candidatesSelectionPolicies = new ArrayList<>(); // Do we need white queue-priority preemption policy? @@ -300,7 +315,12 @@ private void updateConfigIfNeeded() { selectCandidatesForResevedContainers + "\n" + "additional_res_balance_based_on_reserved_containers = " + additionalPreemptionBasedOnReservedResource + "\n" + - "Preemption-to-balance-queue-enabled = " + isPreemptionToBalanceRequired); + "Preemption-to-balance-queue-enabled = " + + isPreemptionToBalanceRequired + "\n" + + "cross-queue-preemption.conservative-drf = " + + crossQueuePreemptionConservativeDRF + "\n" + + "in-queue-preemption.conservative-drf = " + + inQueuePreemptionConservativeDRF); csConfig = config; } @@ -425,7 +445,7 @@ private Set getLeafQueueNames(TempQueuePerPartition q) { return leafQueueNames; } - + /** * This method selects and tracks containers to be preemptionCandidates. If a container * is in the target list for more than maxWaitTime it is killed. @@ -784,6 +804,16 @@ public IntraQueuePreemptionOrderPolicy getIntraQueuePreemptionOrderPolicy() { return intraQueuePreemptionOrderPolicy; } + @Override + public boolean getCrossQueuePreemptionConservativeDRF() { + return crossQueuePreemptionConservativeDRF; + } + + @Override + public boolean getInQueuePreemptionConservativeDRF() { + return inQueuePreemptionConservativeDRF; + } + @Override public long getDefaultMaximumKillWaitTimeout() { return maxWaitTime; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 08b38a1707..8e605964e4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -1742,6 +1742,21 @@ public void setAllowZeroCapacitySum(String queue, boolean value) { + INTRA_QUEUE_PREEMPTION_CONFIG_PREFIX + "preemption-order-policy"; public static final String DEFAULT_INTRAQUEUE_PREEMPTION_ORDER_POLICY = "userlimit_first"; + /** + * Flag to determine whether or not to preempt containers from apps where some + * used resources are less than the user's user limit. + */ + public static final String CROSS_QUEUE_PREEMPTION_CONSERVATIVE_DRF = + PREEMPTION_CONFIG_PREFIX + "conservative-drf"; + public static final Boolean DEFAULT_CROSS_QUEUE_PREEMPTION_CONSERVATIVE_DRF = + false; + + public static final String IN_QUEUE_PREEMPTION_CONSERVATIVE_DRF = + PREEMPTION_CONFIG_PREFIX + INTRA_QUEUE_PREEMPTION_CONFIG_PREFIX + + "conservative-drf"; + public static final Boolean DEFAULT_IN_QUEUE_PREEMPTION_CONSERVATIVE_DRF = + true; + /** * Should we allow queues continue grow after all queue reaches their * guaranteed capacity. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyInterQueueWithDRF.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyInterQueueWithDRF.java index c42867a0a0..2720802018 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyInterQueueWithDRF.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyInterQueueWithDRF.java @@ -19,9 +19,11 @@ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.mockframework.ProportionalCapacityPreemptionPolicyMockFramework; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.junit.Before; @@ -29,8 +31,10 @@ import java.io.IOException; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -174,4 +178,72 @@ public void test3ResourceTypesInterQueuePreemption() throws IOException { new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( getAppAttemptId(2)))); } + + @SuppressWarnings("unchecked") + @Test + public void testInterQueuePreemptionWithStrictAndRelaxedDRF() + throws IOException { + + /* + * root + * / \ \ + * a b c + * + * A / B / C have 33.3 / 33.3 / 33.4 resources + * Total cluster resource have mem=61440, cpu=600 + * + * +=================+========================+ + * | used in queue a | user limit for queue a | + * +=================+========================+ + * | 61440:60 | 20480:200 | + * +=================+========================+ + * In this case, the used memory is over the user limit but the used vCores + * is not. If conservative DRF is true, preemptions will not occur. + * If conservative DRF is false (default) preemptions will occur. + */ + String labelsConfig = "=61440:600,true;"; + String nodesConfig = "n1= res=61440:600"; // n1 is default partition + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[61440:600 61440:600 61440:600 20480:20 0]);" + // root + "-a(=[20480:200 61440:600 61440:60 0:0 0]);" + // b + "-b(=[20480:200 61440:600 0:0 20480:20 0]);" + // a + "-c(=[20480:200 61440:600 0:0 0:0 0])"; // c + String appsConfig = + //queueName\t(priority,resource,host,expression,#repeat,reserved) + "a\t" + "(1,1024:1,n1,,60,false,0:0,user1);" + // app1 in a + "b\t" + "(1,0:0,n1,,0,false,20480:20,user2);"; // app2 in b + + conf.setBoolean( + CapacitySchedulerConfiguration.CROSS_QUEUE_PREEMPTION_CONSERVATIVE_DRF, + true); + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + Resource ul = Resource.newInstance(20480, 20); + when(((LeafQueue)(cs.getQueue("root.a"))) + .getResourceLimitForAllUsers(any(), any(), any(), any()) + ).thenReturn(ul); + policy.editSchedule(); + + verify(eventHandler, times(0)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + + reset(eventHandler); + + conf.setBoolean( + CapacitySchedulerConfiguration.CROSS_QUEUE_PREEMPTION_CONSERVATIVE_DRF, + false); + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + ul = Resource.newInstance(20480, 20); + when(((LeafQueue)(cs.getQueue("root.a"))) + .getResourceLimitForAllUsers(any(), any(), any(), any()) + ).thenReturn(ul); + policy.editSchedule(); + + verify(eventHandler, times(20)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueWithDRF.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueWithDRF.java index 940d11ceda..ee88d3161b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueWithDRF.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueWithDRF.java @@ -18,16 +18,20 @@ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.mockframework.ProportionalCapacityPreemptionPolicyMockFramework; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; import org.junit.Before; import org.junit.Test; import java.io.IOException; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.Mockito.times; +import static org.mockito.Mockito.reset; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -111,6 +115,80 @@ public void testSimpleIntraQueuePreemptionWithVCoreResource() getAppAttemptId(3)))); } + @SuppressWarnings("unchecked") + @Test + public void testIntraQueuePreemptionFairOrderingWithStrictAndRelaxedDRF() + throws IOException { + /** + * Continue to allow intra-queue preemption when only one of the user's + * resources is above the user limit. + * Queue structure is: + * + *
+     *       root
+     *     /  |
+     *    a   b
+     * 
+ * + * Guaranteed resource of a and b are 30720:300 and 30720:300 Total cluster + * resource = 61440:600. + * Scenario: Queue B has one running app using 61720:60 resources with no + * pending resources, and one app with no used resources and 30720:30 + * pending resources. + * + * The first part of the test is to show what happens when the conservative + * DRF property is set. Since the memory is above and the vcores is below + * the user limit, only the minimum number of containers is allowed. + * In the second part, since conservative DRF is relaxed, all containers + * needed are allowed to be preempted (minus the AM size). + */ + + conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY, + "userlimit_first"); + conf.set(CapacitySchedulerConfiguration.PREFIX + + "root.b." + CapacitySchedulerConfiguration.ORDERING_POLICY, "fair"); + conf.setBoolean( + CapacitySchedulerConfiguration.IN_QUEUE_PREEMPTION_CONSERVATIVE_DRF, + true); + + String labelsConfig = "=61440:600,true;"; + String nodesConfig = // n1 has no label + "n1= res=61440:600"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[61440:600 61440:600 61440:600 30720:30 0]);" + // root + "-a(=[30720:300 61440:600 0:0 0:0 0]);" + // a + "-b(=[30720:300 61440:600 61440:60 30720:30 0]);"; // b + + String appsConfig = + "b\t" + "(1,1024:1,n1,,60,false,0:0,user1);" + // app1 in b + "b\t" + "(1,0:0,n1,,0,false,30720:30,user3);"; // app2 in b + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + Resource ul = Resource.newInstance(30720, 300); + when(((LeafQueue)(cs.getQueue("root.b"))) + .getResourceLimitForAllUsers(any(), any(), any(), any()) + ).thenReturn(ul); + policy.editSchedule(); + + verify(eventHandler, times(6)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + reset(eventHandler); + + conf.setBoolean( + CapacitySchedulerConfiguration.IN_QUEUE_PREEMPTION_CONSERVATIVE_DRF, + false); + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + when(((LeafQueue)(cs.getQueue("root.b"))) + .getResourceLimitForAllUsers(any(), any(), any(), any()) + ).thenReturn(ul); + policy.editSchedule(); + verify(eventHandler, times(29)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + } + @Test public void testIntraQueuePreemptionWithDominantVCoreResource() throws IOException {