diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index 036fd2f314..2156b0952d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -467,6 +467,9 @@ private void containerBasedPreemptOrKill(CSQueue root, Resource totalPreemptionAllowed = Resources.multiply(clusterResources, percentageClusterPreemptionAllowed); + //clear under served queues for every run + partitionToUnderServedQueues.clear(); + // based on ideal allocation select containers to be preemptionCandidates from each // queue and each application Map> toPreempt = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java index 64b56fba3b..fa66cbc163 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java @@ -206,6 +206,11 @@ public void buildEnv(String labelsConfig, String nodesConfig, mClock); } + public void updateQueueConfig(String queuesConfig) { + ParentQueue root = mockQueueHierarchy(queuesConfig); + when(cs.getRootQueue()).thenReturn(root); + } + private void mockContainers(String containersConfig, FiCaSchedulerApp app, ApplicationAttemptId attemptId, String queueName, List reservedContainers, List liveContainers) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueue.java index 6c5aa670d4..750054c3cc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueue.java @@ -71,9 +71,9 @@ public void testSimpleIntraQueuePreemption() throws IOException { "n1= res=100"; String queuesConfig = // guaranteed,max,used,pending,reserved - "root(=[100 100 79 120 0]);" + // root + "root(=[100 100 79 110 0]);" + // root "-a(=[11 100 11 50 0]);" + // a - "-b(=[40 100 38 60 0]);" + // b + "-b(=[40 100 38 50 0]);" + // b "-c(=[20 100 10 10 0]);" + // c "-d(=[29 100 20 0 0])"; // d @@ -128,9 +128,9 @@ public void testNoIntraQueuePreemptionWithPreemptionDisabledOnQueues() "n1= res=100"; String queuesConfig = // guaranteed,max,used,pending,reserved - "root(=[100 100 80 120 0]);" + // root + "root(=[100 100 80 110 0]);" + // root "-a(=[11 100 11 50 0]);" + // a - "-b(=[40 100 38 60 0]);" + // b + "-b(=[40 100 38 50 0]);" + // b "-c(=[20 100 10 10 0]);" + // c "-d(=[29 100 20 0 0])"; // d @@ -942,4 +942,86 @@ public void testComplexNodePartitionIntraQueuePreemption() verify(mDisp, times(22)) .handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(6)))); } + + @Test + public void testIntraQueuePreemptionAfterQueueDropped() + throws IOException { + /** + * Test intra queue preemption after under-served queue dropped, + * At first, Queue structure is: + * + *
+     *       root
+     *     /  | | \
+     *    a  b  c  d
+     * 
+ * + * After dropped under-served queue "c", Queue structure is: + * + *
+     *       root
+     *     /  |  \
+     *    a   b  d
+     * 
+ * + * Verify no exception is thrown and preemption results is correct + */ + conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY, + "priority_first"); + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 79 110 0]);" + // root + "-a(=[11 100 11 50 0]);" + // a + "-b(=[40 100 38 50 0]);" + // b + "-c(=[20 100 10 10 0]);" + // c + "-d(=[29 100 20 0 0])"; // d + + String appsConfig = + // queueName\t(priority,resource,host,expression,#repeat,reserved, + // pending) + "a\t" // app1 in a + + "(1,1,n1,,6,false,25);" + // app1 a + "a\t" // app2 in a + + "(1,1,n1,,5,false,25);" + // app2 a + "b\t" // app3 in b + + "(4,1,n1,,34,false,20);" + // app3 b + "b\t" // app4 in b + + "(4,1,n1,,2,false,10);" + // app4 b + "b\t" // app4 in b + + "(5,1,n1,,1,false,10);" + // app5 b + "b\t" // app4 in b + + "(6,1,n1,,1,false,10);" + // app6 in b + "c\t" // app1 in a + + "(1,1,n1,,10,false,10);" + "d\t" // app7 in c + + "(1,1,n1,,20,false,0)"; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 69 100 0]);" + // root + "-a(=[11 100 11 50 0]);" + // a + "-b(=[40 100 38 50 0]);" + // b + "-d(=[49 100 20 0 0])"; // d + + updateQueueConfig(queuesConfig); + + // will throw YarnRuntimeException(This shouldn't happen, cannot find + // TempQueuePerPartition for queueName=c) without patch in YARN-8709 + policy.editSchedule(); + + // For queue B, app3 and app4 were of lower priority. Hence take 8 + // containers from them by hitting the intraQueuePreemptionDemand of 20%. + verify(mDisp, times(1)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(4)))); + verify(mDisp, times(7)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(3)))); + } }