+ * -*/ @SuppressWarnings({ "unchecked", "rawtypes" }) private ParentQueue mockQueueHierarchy(String queueExprs) { @@ -491,6 +501,10 @@ public class ProportionalCapacityPreemptionPolicyMockFramework { queue = parentQueue; List( =[guaranteed max used pending], \ + * =[guaranteed max used pending]) + * {key1=value1,key2=value2}; // Additional configs + *
+ * http://www.apache.org/licenses/LICENSE-2.0 + *
+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; + +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + +import static org.mockito.Matchers.argThat; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class TestPreemptionForQueueWithPriorities + extends ProportionalCapacityPreemptionPolicyMockFramework { + @Before + public void setup() { + super.setup(); + policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs, mClock); + } + + @Test + public void testPreemptionForHighestPriorityUnderutilizedQueue() + throws IOException { + /** + * The simplest test of queue with priorities, Queue structure is: + * + *
+ * root + * / | \ + * a b c + *+ * + * For priorities + * - a=1 + * - b/c=2 + * + * So c will preempt more resource from a, till a reaches guaranteed + * resource. + */ + String labelsConfig = "=100,true"; // default partition + String nodesConfig = "n1="; // only one node + String queuesConfig = + // guaranteed,max,used,pending + "root(=[100 100 100 100]);" + //root + "-a(=[30 100 40 50]){priority=1};" + // a + "-b(=[30 100 59 50]){priority=2};" + // b + "-c(=[40 100 1 25]){priority=2}"; // c + String appsConfig = + //queueName\t(priority,resource,host,expression,#repeat,reserved) + "a\t(1,1,n1,,40,false);" + // app1 in a + "b\t(1,1,n1,,59,false);" + // app2 in b + "c\t(1,1,n1,,1,false);"; // app3 in c + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // 10 preempted from app1, 15 preempted from app2, and nothing preempted + // from app3 + verify(mDisp, times(10)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + verify(mDisp, times(15)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + verify(mDisp, never()).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(3)))); + } + + @Test + public void testPreemptionForLowestPriorityUnderutilizedQueue() + throws IOException { + /** + * Similar to above, make sure we can still make sure less utilized queue + * can get resource first regardless of priority. + * + * Queue structure is: + * + *
+ * root + * / | \ + * a b c + *+ * + * For priorities + * - a=1 + * - b=2 + * - c=0 + * + * So c will preempt more resource from a, till a reaches guaranteed + * resource. + */ + String labelsConfig = "=100,true"; // default partition + String nodesConfig = "n1="; // only one node + String queuesConfig = + // guaranteed,max,used,pending + "root(=[100 100 100 100]);" + //root + "-a(=[30 100 40 50]){priority=1};" + // a + "-b(=[30 100 59 50]){priority=2};" + // b + "-c(=[40 100 1 25]){priority=0}"; // c + String appsConfig = + //queueName\t(priority,resource,host,expression,#repeat,reserved) + "a\t(1,1,n1,,40,false);" + // app1 in a + "b\t(1,1,n1,,59,false);" + // app2 in b + "c\t(1,1,n1,,1,false);"; // app3 in c + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // 10 preempted from app1, 15 preempted from app2, and nothing preempted + // from app3 + verify(mDisp, times(10)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + verify(mDisp, times(15)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + verify(mDisp, never()).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(3)))); + } + + @Test + public void testPreemptionWontHappenBetweenSatisfiedQueues() + throws IOException { + /** + * No preemption happen if a queue is already satisfied, regardless of + * priority + * + * Queue structure is: + * + *
+ * root + * / | \ + * a b c + *+ * + * For priorities + * - a=1 + * - b=1 + * - c=2 + * + * When c is satisfied, it will not preempt any resource from other queues + */ + String labelsConfig = "=100,true"; // default partition + String nodesConfig = "n1="; // only one node + String queuesConfig = + // guaranteed,max,used,pending + "root(=[100 100 100 100]);" + //root + "-a(=[30 100 0 0]){priority=1};" + // a + "-b(=[30 100 40 50]){priority=1};" + // b + "-c(=[40 100 60 25]){priority=2}"; // c + String appsConfig = + //queueName\t(priority,resource,host,expression,#repeat,reserved) + "b\t(1,1,n1,,40,false);" + // app1 in b + "c\t(1,1,n1,,60,false)"; // app2 in c + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // Nothing preempted + verify(mDisp, never()).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + verify(mDisp, never()).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + } + + @Test + public void testPreemptionForMultipleQueuesInTheSamePriorityBuckets() + throws IOException { + /** + * When a cluster has different priorities, each priority has multiple + * queues, preemption policy should try to balance resource between queues + * with same priority by ratio of their capacities + * + * Queue structure is: + * + *
+ * root + * - a (capacity=10), p=1 + * - b (capacity=15), p=1 + * - c (capacity=20), p=2 + * - d (capacity=25), p=2 + * - e (capacity=30), p=2 + *+ */ + String labelsConfig = "=100,true"; // default partition + String nodesConfig = "n1="; // only one node + String queuesConfig = + // guaranteed,max,used,pending + "root(=[100 100 100 100]);" + //root + "-a(=[10 100 35 50]){priority=1};" + // a + "-b(=[15 100 25 50]){priority=1};" + // b + "-c(=[20 100 39 50]){priority=2};" + // c + "-d(=[25 100 0 0]){priority=2};" + // d + "-e(=[30 100 1 99]){priority=2}"; // e + String appsConfig = + //queueName\t(priority,resource,host,expression,#repeat,reserved) + "a\t(1,1,n1,,35,false);" + // app1 in a + "b\t(1,1,n1,,25,false);" + // app2 in b + "c\t(1,1,n1,,39,false);" + // app3 in c + "e\t(1,1,n1,,1,false)"; // app4 in e + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // 23 preempted from app1, 6 preempted from app2, and nothing preempted + // from app3/app4 + // (After preemption, a has 35 - 23 = 12, b has 25 - 6 = 19, so a:b after + // preemption is 1.58, close to 1.50) + verify(mDisp, times(23)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + verify(mDisp, times(6)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + verify(mDisp, never()).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(3)))); + verify(mDisp, never()).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(4)))); + } + + @Test + public void testPreemptionForPriorityAndDisablePreemption() + throws IOException { + /** + * When a cluster has different priorities, each priority has multiple + * queues, preemption policy should try to balance resource between queues + * with same priority by ratio of their capacities. + * + * But also we need to make sure preemption disable will be honered + * regardless of priority. + * + * Queue structure is: + * + *
+ * root + * - a (capacity=10), p=1 + * - b (capacity=15), p=1 + * - c (capacity=20), p=2 + * - d (capacity=25), p=2 + * - e (capacity=30), p=2 + *+ */ + String labelsConfig = "=100,true"; // default partition + String nodesConfig = "n1="; // only one node + String queuesConfig = + // guaranteed,max,used,pending + "root(=[100 100 100 100]);" + //root + "-a(=[10 100 35 50]){priority=1,disable_preemption=true};" + // a + "-b(=[15 100 25 50]){priority=1};" + // b + "-c(=[20 100 39 50]){priority=2};" + // c + "-d(=[25 100 0 0]){priority=2};" + // d + "-e(=[30 100 1 99]){priority=2}"; // e + String appsConfig = + //queueName\t(priority,resource,host,expression,#repeat,reserved) + "a\t(1,1,n1,,35,false);" + // app1 in a + "b\t(1,1,n1,,25,false);" + // app2 in b + "c\t(1,1,n1,,39,false);" + // app3 in c + "e\t(1,1,n1,,1,false)"; // app4 in e + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // We suppose to preempt some resource from A, but now since queueA + // disables preemption, so we need to preempt some resource from B and + // some from C even if C has higher priority than A + verify(mDisp, never()).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + verify(mDisp, times(9)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + verify(mDisp, times(19)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(3)))); + verify(mDisp, never()).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(4)))); + } + + @Test + public void testPriorityPreemptionForHierarchicalOfQueues() + throws IOException { + /** + * When a queue has multiple hierarchy and different priorities: + * + *
+ * root + * - a (capacity=30), p=1 + * - a1 (capacity=40), p=1 + * - a2 (capacity=60), p=1 + * - b (capacity=30), p=1 + * - b1 (capacity=50), p=1 + * - b1 (capacity=50), p=2 + * - c (capacity=40), p=2 + *+ */ + String labelsConfig = "=100,true"; // default partition + String nodesConfig = "n1="; // only one node + String queuesConfig = + // guaranteed,max,used,pending + "root(=[100 100 100 100]);" + //root + "-a(=[30 100 40 50]){priority=1};" + // a + "--a1(=[12 100 20 50]){priority=1};" + // a1 + "--a2(=[18 100 20 50]){priority=1};" + // a2 + "-b(=[30 100 59 50]){priority=1};" + // b + "--b1(=[15 100 30 50]){priority=1};" + // b1 + "--b2(=[15 100 29 50]){priority=2};" + // b2 + "-c(=[40 100 1 30]){priority=1}"; // c + String appsConfig = + //queueName\t(priority,resource,host,expression,#repeat,reserved) + "a1\t(1,1,n1,,20,false);" + // app1 in a1 + "a2\t(1,1,n1,,20,false);" + // app2 in a2 + "b1\t(1,1,n1,,30,false);" + // app3 in b1 + "b2\t(1,1,n1,,29,false);" + // app4 in b2 + "c\t(1,1,n1,,29,false)"; // app5 in c + + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // Preemption should first divide capacities between a / b, and b2 should + // get less preemption than b1 (because b2 has higher priority) + verify(mDisp, times(5)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + verify(mDisp, never()).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + verify(mDisp, times(15)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(3)))); + verify(mDisp, times(9)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(4)))); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java index 7eca34fcbf..a14a2b1368 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.QueueOrderingPolicy; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; @@ -220,7 +221,9 @@ public class TestProportionalCapacityPreemptionPolicy { }; ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); policy.editSchedule(); - verify(mDisp, times(16)).handle(argThat(new IsPreemptionRequestFor(appA))); + + // A will preempt guaranteed-allocated. + verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appA))); } @Test @@ -588,8 +591,8 @@ public class TestProportionalCapacityPreemptionPolicy { }; ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); policy.editSchedule(); - // correct imbalance between over-capacity queues - verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appA))); + // Will not preempt for over capacity queues + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA))); } @Test @@ -702,7 +705,7 @@ public class TestProportionalCapacityPreemptionPolicy { public void testZeroGuarOverCap() { int[][] qData = new int[][] { // / A B C D E F - { 200, 100, 0, 99, 0, 100, 100 }, // abs + { 200, 100, 0, 100, 0, 100, 100 }, // abs { 200, 200, 200, 200, 200, 200, 200 }, // maxCap { 170, 170, 60, 20, 90, 0, 0 }, // used { 85, 50, 30, 10, 10, 20, 20 }, // pending @@ -713,14 +716,14 @@ public class TestProportionalCapacityPreemptionPolicy { }; ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); policy.editSchedule(); - // we verify both that C has priority on B and D (has it has >0 guarantees) - // and that B and D are force to share their over capacity fairly (as they - // are both zero-guarantees) hence D sees some of its containers preempted - verify(mDisp, times(15)).handle(argThat(new IsPreemptionRequestFor(appC))); + // No preemption should happen because zero guaranteed queues should be + // treated as always satisfied, they should not preempt from each other. + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA))); + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appB))); + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appC))); + verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appD))); } - - @Test public void testHierarchicalLarge() { int[][] qData = new int[][] { @@ -1232,6 +1235,13 @@ public class TestProportionalCapacityPreemptionPolicy { when(pq.getChildQueues()).thenReturn(cqs); ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); when(pq.getReadLock()).thenReturn(lock.readLock()); + + // Ordering policy + QueueOrderingPolicy policy = mock(QueueOrderingPolicy.class); + when(policy.getConfigName()).thenReturn( + CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY); + when(pq.getQueueOrderingPolicy()).thenReturn(policy); + when(pq.getPriority()).thenReturn(Priority.newInstance(0)); for (int i = 0; i < subqueues; ++i) { pqs.add(pq); } @@ -1302,6 +1312,7 @@ public class TestProportionalCapacityPreemptionPolicy { } ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); when(lq.getReadLock()).thenReturn(lock.readLock()); + when(lq.getPriority()).thenReturn(Priority.newInstance(0)); p.getChildQueues().add(lq); return lq; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java index e31a889c34..1fd455a607 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java @@ -95,7 +95,7 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions } @Test - public void testNodePartitionPreemptionRespectMaximumCapacity() + public void testNodePartitionPreemptionNotHappenBetweenSatisfiedQueues() throws IOException { /** * Queue structure is: @@ -114,8 +114,8 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions * 2 apps in cluster. * app1 in b and app2 in c. * - * app1 uses 90x, and app2 use 10x. After preemption, app2 will preempt 10x - * from app1 because of max capacity. + * app1 uses 90x, and app2 use 10x. We don't expect preemption happen + * between them because all of them are satisfied */ String labelsConfig = "=100,true;" + // default partition @@ -139,9 +139,8 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); policy.editSchedule(); - // 30 preempted from app1, 30 preempted from app4, and nothing preempted - // from app2/app3 - verify(mDisp, times(20)).handle( + // No preemption happens + verify(mDisp, never()).handle( argThat(new IsPreemptionRequestFor(getAppAttemptId(1)))); verify(mDisp, never()).handle( argThat(new IsPreemptionRequestFor(getAppAttemptId(2)))); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java index 07d1eefa2f..964a23085d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java @@ -46,8 +46,8 @@ public class TestProportionalCapacityPreemptionPolicyMockFramework "root(=[200 200 100 100],red=[100 100 100 100],blue=[200 200 200 200]);" + //root "-a(=[100 200 100 100],red=[0 0 0 0],blue=[200 200 200 200]);" + // a "--a1(=[50 100 50 100],red=[0 0 0 0],blue=[100 200 200 0]);" + // a1 - "--a2(=[50 200 50 0],red=[0 0 0 0],blue=[100 200 0 200]);" + // a2 - "-b(=[100 200 0 0],red=[100 100 100 100],blue=[0 0 0 0])"; + "--a2(=[50 200 50 0],red=[0 0 0 0],blue=[100 200 0 200]){priority=2};" + // a2 + "-b(=[100 200 0 0],red=[100 100 100 100],blue=[0 0 0 0]){priority=1,disable_preemption=true}"; String appsConfig= //queueName\t(priority,resource,host,expression,#repeat,reserved) // app1 in a1, , 50 in n2 (reserved), 50 in n2 (allocated) @@ -75,6 +75,7 @@ public class TestProportionalCapacityPreemptionPolicyMockFramework checkPendingResource(cs.getQueue("root"), "red", 100); checkAbsCapacities(cs.getQueue("root"), "blue", 1f, 1f, 1f); checkPendingResource(cs.getQueue("root"), "blue", 200); + checkPriority(cs.getQueue("root"), 0); // default // a checkAbsCapacities(cs.getQueue("a"), "", 0.5f, 1f, 0.5f); @@ -83,6 +84,7 @@ public class TestProportionalCapacityPreemptionPolicyMockFramework checkPendingResource(cs.getQueue("a"), "red", 0); checkAbsCapacities(cs.getQueue("a"), "blue", 1f, 1f, 1f); checkPendingResource(cs.getQueue("a"), "blue", 200); + checkPriority(cs.getQueue("a"), 0); // default // a1 checkAbsCapacities(cs.getQueue("a1"), "", 0.25f, 0.5f, 0.25f); @@ -91,6 +93,7 @@ public class TestProportionalCapacityPreemptionPolicyMockFramework checkPendingResource(cs.getQueue("a1"), "red", 0); checkAbsCapacities(cs.getQueue("a1"), "blue", 0.5f, 1f, 1f); checkPendingResource(cs.getQueue("a1"), "blue", 0); + checkPriority(cs.getQueue("a1"), 0); // default // a2 checkAbsCapacities(cs.getQueue("a2"), "", 0.25f, 1f, 0.25f); @@ -99,14 +102,18 @@ public class TestProportionalCapacityPreemptionPolicyMockFramework checkPendingResource(cs.getQueue("a2"), "red", 0); checkAbsCapacities(cs.getQueue("a2"), "blue", 0.5f, 1f, 0f); checkPendingResource(cs.getQueue("a2"), "blue", 200); + checkPriority(cs.getQueue("a2"), 2); + Assert.assertFalse(cs.getQueue("a2").getPreemptionDisabled()); - // b1 + // b checkAbsCapacities(cs.getQueue("b"), "", 0.5f, 1f, 0f); checkPendingResource(cs.getQueue("b"), "", 0); checkAbsCapacities(cs.getQueue("b"), "red", 1f, 1f, 1f); checkPendingResource(cs.getQueue("b"), "red", 100); checkAbsCapacities(cs.getQueue("b"), "blue", 0f, 0f, 0f); checkPendingResource(cs.getQueue("b"), "blue", 0); + checkPriority(cs.getQueue("b"), 1); + Assert.assertTrue(cs.getQueue("b").getPreemptionDisabled()); // Check ignored partitioned containers in queue Assert.assertEquals(100, ((LeafQueue) cs.getQueue("a1")) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java index bd9f6155b9..943b7d2107 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java @@ -46,7 +46,7 @@ public class CapacitySchedulerPreemptionTestBase { final int GB = 1024; - Configuration conf; + CapacitySchedulerConfiguration conf; RMNodeLabelsManager mgr; @@ -54,13 +54,15 @@ public class CapacitySchedulerPreemptionTestBase { @Before void setUp() throws Exception { - conf = new YarnConfiguration(); + conf = new CapacitySchedulerConfiguration(); conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, ResourceScheduler.class); conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true); conf.setClass(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES, ProportionalCapacityPreemptionPolicy.class, SchedulingEditPolicy.class); - conf = TestUtils.getConfigurationWithMultipleQueues(this.conf); + conf = (CapacitySchedulerConfiguration) TestUtils + .getConfigurationWithMultipleQueues(this.conf); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 100 * GB); // Set preemption related configurations conf.setInt(CapacitySchedulerConfiguration.PREEMPTION_WAIT_TIME_BEFORE_KILL, @@ -146,4 +148,18 @@ public class CapacitySchedulerPreemptionTestBase { Assert.fail(); } + + public void checkNumberOfPreemptionCandidateFromApp( + ProportionalCapacityPreemptionPolicy policy, int expected, + ApplicationAttemptId attemptId) { + int total = 0; + + for (RMContainer rmContainer : policy.getToPreemptContainers().keySet()) { + if (rmContainer.getApplicationAttemptId().equals(attemptId)) { + ++ total; + } + } + + Assert.assertEquals(expected, total); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java index 7382f3dffb..046ea4a47b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java @@ -110,9 +110,6 @@ public class TestApplicationLimits { thenReturn(Resources.createResource(16*GB, 32)); when(csContext.getClusterResource()). thenReturn(Resources.createResource(10 * 16 * GB, 10 * 32)); - when(csContext.getNonPartitionedQueueComparator()). - thenReturn( - CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR); when(csContext.getResourceCalculator()). thenReturn(resourceCalculator); when(csContext.getRMContext()).thenReturn(rmContext); @@ -276,9 +273,6 @@ public class TestApplicationLimits { thenReturn(Resources.createResource(GB, 1)); when(csContext.getMaximumResourceCapability()). thenReturn(Resources.createResource(16*GB, 16)); - when(csContext.getNonPartitionedQueueComparator()). - thenReturn( - CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR); when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); when(csContext.getRMContext()).thenReturn(rmContext); when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager()); @@ -581,9 +575,6 @@ public class TestApplicationLimits { thenReturn(Resources.createResource(GB)); when(csContext.getMaximumResourceCapability()). thenReturn(Resources.createResource(16*GB)); - when(csContext.getNonPartitionedQueueComparator()). - thenReturn( - CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR); when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); when(csContext.getRMContext()).thenReturn(rmContext); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java index 1f87c533ff..2fa06e8478 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java @@ -594,9 +594,6 @@ public class TestApplicationLimitsByPartition { .thenReturn(Resources.createResource(GB)); when(csContext.getMaximumResourceCapability()) .thenReturn(Resources.createResource(16 * GB)); - when(csContext.getNonPartitionedQueueComparator()) - .thenReturn( - CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR); when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); RMContext rmContext = TestUtils.getMockRMContext(); RMContext spyRMContext = spy(rmContext); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java index db6115c4ab..5989da005a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java @@ -22,22 +22,26 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy; import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Assert; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import java.util.ArrayList; import java.util.Arrays; +import java.util.Set; public class TestCapacitySchedulerSurgicalPreemption extends CapacitySchedulerPreemptionTestBase { @@ -167,8 +171,7 @@ public class TestCapacitySchedulerSurgicalPreemption * * 1) Two nodes (n1/n2) in the cluster, each of them has 20G. * - * 2) app1 submit to queue-a first, it asked 38 * 1G containers - * We will allocate 20 on n1 and 19 on n2. + * 2) app1 submit to queue-b, asks for 1G * 5 * * 3) app2 submit to queue-c, ask for one 4G container (for AM) * @@ -243,4 +246,569 @@ public class TestCapacitySchedulerSurgicalPreemption rm1.close(); } + + @Test(timeout = 60000) + public void testPriorityPreemptionWhenAllQueuesAreBelowGuaranteedCapacities() + throws Exception { + /** + * Test case: Submit two application (app1/app2) to different queues, queue + * structure: + * + *
+ * Root + * / | \ + * a b c + * 10 20 70 + *+ * + * 1) Two nodes (n1/n2) in the cluster, each of them has 20G. + * + * 2) app1 submit to queue-b first, it asked 6 * 1G containers + * We will allocate 4 on n1 (including AM) and 3 on n2. + * + * 3) app2 submit to queue-c, ask for one 18G container (for AM) + * + * After preemption, we should expect: + * Preempt 3 containers from app1 and AM of app2 successfully allocated. + */ + conf.setPUOrderingPolicyUnderUtilizedPreemptionEnabled(true); + conf.setPUOrderingPolicyUnderUtilizedPreemptionDelay(1000); + conf.setQueueOrderingPolicy(CapacitySchedulerConfiguration.ROOT, + CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY); + + // Queue c has higher priority than a/b + conf.setQueuePriority(CapacitySchedulerConfiguration.ROOT + ".c", 1); + + MockRM rm1 = new MockRM(conf); + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + + MockNM nm1 = rm1.registerNode("h1:1234", 20 * GB); + MockNM nm2 = rm1.registerNode("h2:1234", 20 * GB); + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId()); + + // launch an app to queue, AM container should be launched in nm1 + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + am1.allocate("*", 1 * GB, 6, new ArrayList<>()); + + // Do allocation for node1/node2 + for (int i = 0; i < 3; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + } + + // App1 should have 7 containers now, so the abs-used-cap of b is + // 7 / 40 = 17.5% < 20% (guaranteed) + FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt( + am1.getApplicationAttemptId()); + Assert.assertEquals(7, schedulerApp1.getLiveContainers().size()); + // 4 from n1 and 3 from n2 + waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode1.getNodeID()), + am1.getApplicationAttemptId(), 4); + waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode2.getNodeID()), + am1.getApplicationAttemptId(), 3); + + // Submit app2 to queue-c and asks for a 1G container for AM + RMApp app2 = rm1.submitApp(18 * GB, "app", "user", null, "c"); + FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt( + ApplicationAttemptId.newInstance(app2.getApplicationId(), 1)); + + while (cs.getNode(rmNode1.getNodeID()).getReservedContainer() == null) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + Thread.sleep(10); + } + + // Call editSchedule immediately: containers are not selected + ProportionalCapacityPreemptionPolicy editPolicy = + (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1); + editPolicy.editSchedule(); + Assert.assertEquals(0, editPolicy.getToPreemptContainers().size()); + + // Sleep the timeout interval, we should be able to see containers selected + Thread.sleep(1000); + editPolicy.editSchedule(); + Assert.assertEquals(2, editPolicy.getToPreemptContainers().size()); + + // Call editSchedule again: selected containers are killed, and new AM + // container launched + editPolicy.editSchedule(); + + // Do allocation till reserved container allocated + while (cs.getNode(rmNode1.getNodeID()).getReservedContainer() != null) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + Thread.sleep(10); + } + + waitNumberOfLiveContainersFromApp(schedulerApp2, 1); + + rm1.close(); + } + + @Test(timeout = 300000) + public void testPriorityPreemptionRequiresMoveReservation() + throws Exception { + /** + * Test case: Submit two application (app1/app2) to different queues, queue + * structure: + * + *
+ * Root + * / | \ + * a b c + * 10 20 70 + *+ * + * 1) 3 nodes in the cluster, 10G for each + * + * 2) app1 submit to queue-b first, it asked 2G each, + * it can get 2G on n1 (AM), 2 * 2G on n2 + * + * 3) app2 submit to queue-c, with 2G AM container (allocated on n3) + * app2 requires 9G resource, which will be reserved on n3 + * + * We should expect container unreserved from n3 and allocated on n1/n2 + */ + conf.setPUOrderingPolicyUnderUtilizedPreemptionEnabled(true); + conf.setPUOrderingPolicyUnderUtilizedPreemptionDelay(1000); + conf.setQueueOrderingPolicy(CapacitySchedulerConfiguration.ROOT, + CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY); + conf.setPUOrderingPolicyUnderUtilizedPreemptionMoveReservation(true); + + // Queue c has higher priority than a/b + conf.setQueuePriority(CapacitySchedulerConfiguration.ROOT + ".c", 1); + + MockRM rm1 = new MockRM(conf); + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + + MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); + MockNM nm2 = rm1.registerNode("h2:1234", 10 * GB); + MockNM nm3 = rm1.registerNode("h3:1234", 10 * GB); + + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId()); + RMNode rmNode3 = rm1.getRMContext().getRMNodes().get(nm3.getNodeId()); + + // launch an app to queue, AM container should be launched in nm1 + RMApp app1 = rm1.submitApp(2 * GB, "app", "user", null, "b"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + am1.allocate("*", 2 * GB, 2, new ArrayList<>()); + + // Do allocation for node2 twice + for (int i = 0; i < 2; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + } + + FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt( + am1.getApplicationAttemptId()); + Assert.assertEquals(3, schedulerApp1.getLiveContainers().size()); + + // 1 from n1 and 2 from n2 + waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode1.getNodeID()), + am1.getApplicationAttemptId(), 1); + waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode2.getNodeID()), + am1.getApplicationAttemptId(), 2); + + // Submit app2 to queue-c and asks for a 2G container for AM, on n3 + RMApp app2 = rm1.submitApp(2 * GB, "app", "user", null, "c"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm3); + FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt( + ApplicationAttemptId.newInstance(app2.getApplicationId(), 1)); + + // Asks 1 * 9G container + am2.allocate("*", 9 * GB, 1, new ArrayList<>()); + + // Do allocation for node3 once + cs.handle(new NodeUpdateSchedulerEvent(rmNode3)); + + // Make sure container reserved on node3 + Assert.assertNotNull( + cs.getNode(rmNode3.getNodeID()).getReservedContainer()); + + // Call editSchedule immediately: nothing happens + ProportionalCapacityPreemptionPolicy editPolicy = + (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1); + editPolicy.editSchedule(); + Assert.assertNotNull( + cs.getNode(rmNode3.getNodeID()).getReservedContainer()); + + // Sleep the timeout interval, we should be able to see reserved container + // moved to n2 (n1 occupied by AM) + Thread.sleep(1000); + editPolicy.editSchedule(); + Assert.assertNull( + cs.getNode(rmNode3.getNodeID()).getReservedContainer()); + Assert.assertNotNull( + cs.getNode(rmNode2.getNodeID()).getReservedContainer()); + Assert.assertEquals(am2.getApplicationAttemptId(), cs.getNode( + rmNode2.getNodeID()).getReservedContainer().getApplicationAttemptId()); + + // Do it again, we should see containers marked to be preempt + editPolicy.editSchedule(); + Assert.assertEquals(2, editPolicy.getToPreemptContainers().size()); + + // Call editSchedule again: selected containers are killed + editPolicy.editSchedule(); + + // Do allocation till reserved container allocated + while (schedulerApp2.getLiveContainers().size() < 2) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + Thread.sleep(200); + } + + waitNumberOfLiveContainersFromApp(schedulerApp1, 1); + + rm1.close(); + } + + @Test(timeout = 60000) + public void testPriorityPreemptionOnlyTriggeredWhenDemandingQueueUnsatisfied() + throws Exception { + /** + * Test case: Submit two application (app1/app2) to different queues, queue + * structure: + * + *
+ * Root + * / | \ + * a b c + * 10 20 70 + *+ * + * 1) 10 nodes (n0-n9) in the cluster, each of them has 10G. + * + * 2) app1 submit to queue-b first, it asked 8 * 1G containers + * We will allocate 1 container on each of n0-n10 + * + * 3) app2 submit to queue-c, ask for 10 * 10G containers (including AM) + * + * After preemption, we should expect: + * Preempt 7 containers from app1 and usage of app2 is 70% + */ + conf.setPUOrderingPolicyUnderUtilizedPreemptionEnabled(true); + conf.setPUOrderingPolicyUnderUtilizedPreemptionDelay(1000); + conf.setQueueOrderingPolicy(CapacitySchedulerConfiguration.ROOT, + CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY); + + // Queue c has higher priority than a/b + conf.setQueuePriority(CapacitySchedulerConfiguration.ROOT + ".c", 1); + + MockRM rm1 = new MockRM(conf); + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + + MockNM[] mockNMs = new MockNM[10]; + for (int i = 0; i < 10; i++) { + mockNMs[i] = rm1.registerNode("h" + i + ":1234", 10 * GB); + } + + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + + RMNode[] rmNodes = new RMNode[10]; + for (int i = 0; i < 10; i++) { + rmNodes[i] = rm1.getRMContext().getRMNodes().get(mockNMs[i].getNodeId()); + } + + // launch an app to queue, AM container should be launched in nm1 + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, mockNMs[0]); + + am1.allocate("*", 1 * GB, 8, new ArrayList<>()); + + // Do allocation for nm1-nm8 + for (int i = 1; i < 9; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i])); + } + + // App1 should have 9 containers now, so the abs-used-cap of b is 9% + FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt( + am1.getApplicationAttemptId()); + Assert.assertEquals(9, schedulerApp1.getLiveContainers().size()); + for (int i = 0; i < 9; i++) { + waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNodes[i].getNodeID()), + am1.getApplicationAttemptId(), 1); + } + + // Submit app2 to queue-c and asks for a 10G container for AM + // Launch AM in NM9 + RMApp app2 = rm1.submitApp(10 * GB, "app", "user", null, "c"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, mockNMs[9]); + FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt( + ApplicationAttemptId.newInstance(app2.getApplicationId(), 1)); + + // Ask 10 * 10GB containers + am2.allocate("*", 10 * GB, 10, new ArrayList<>()); + + // Do allocation for all nms + for (int i = 1; i < 10; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i])); + } + + // Check am2 reserved resource from nm1-nm9 + for (int i = 1; i < 9; i++) { + Assert.assertNotNull("Should reserve on nm-" + i, + cs.getNode(rmNodes[i].getNodeID()).getReservedContainer()); + } + + // Sleep the timeout interval, we should be able to see 6 containers selected + // 6 (selected) + 1 (allocated) which makes target capacity to 70% + Thread.sleep(1000); + + ProportionalCapacityPreemptionPolicy editPolicy = + (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1); + editPolicy.editSchedule(); + checkNumberOfPreemptionCandidateFromApp(editPolicy, 6, + am1.getApplicationAttemptId()); + + // Call editSchedule again: selected containers are killed + editPolicy.editSchedule(); + waitNumberOfLiveContainersFromApp(schedulerApp1, 3); + + // Do allocation for all nms + for (int i = 1; i < 10; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i])); + } + + waitNumberOfLiveContainersFromApp(schedulerApp2, 7); + waitNumberOfLiveContainersFromApp(schedulerApp1, 3); + + rm1.close(); + } + + @Test(timeout = 600000) + public void testPriorityPreemptionFromHighestPriorityQueueAndOldestContainer() + throws Exception { + /** + * Test case: Submit two application (app1/app2) to different queues, queue + * structure: + * + *
+ * Root + * / | \ + * a b c + * 45 45 10 + *+ * + * Priority of queue_a = 1 + * Priority of queue_b = 2 + * + * 1) 5 nodes (n0-n4) in the cluster, each of them has 4G. + * + * 2) app1 submit to queue-c first (AM=1G), it asked 4 * 1G containers + * We will allocate 1 container on each of n0-n4. AM on n4. + * + * 3) app2 submit to queue-a, AM container=0.5G, allocated on n0 + * Ask for 2 * 3.5G containers. (Reserved on n0/n1) + * + * 4) app2 submit to queue-b, AM container=0.5G, allocated on n2 + * Ask for 2 * 3.5G containers. (Reserved on n2/n3) + * + * First we will preempt container on n2 since it is the oldest container of + * Highest priority queue (b) + */ + + // Total preemption = 1G per round, which is 5% of cluster resource (20G) + conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND, + 0.05f); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512); + conf.setPUOrderingPolicyUnderUtilizedPreemptionEnabled(true); + conf.setPUOrderingPolicyUnderUtilizedPreemptionDelay(1000); + conf.setQueueOrderingPolicy(CapacitySchedulerConfiguration.ROOT, + CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY); + + // A/B has higher priority + conf.setQueuePriority(CapacitySchedulerConfiguration.ROOT + ".a", 1); + conf.setQueuePriority(CapacitySchedulerConfiguration.ROOT + ".b", 2); + conf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".a", 45f); + conf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".b", 45f); + conf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".c", 10f); + + MockRM rm1 = new MockRM(conf); + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + + MockNM[] mockNMs = new MockNM[5]; + for (int i = 0; i < 5; i++) { + mockNMs[i] = rm1.registerNode("h" + i + ":1234", 4 * GB); + } + + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + + RMNode[] rmNodes = new RMNode[5]; + for (int i = 0; i < 5; i++) { + rmNodes[i] = rm1.getRMContext().getRMNodes().get(mockNMs[i].getNodeId()); + } + + // launch an app to queue, AM container should be launched in nm1 + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "c"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, mockNMs[4]); + + am1.allocate("*", 1 * GB, 4, new ArrayList<>()); + + // Do allocation for nm1-nm8 + for (int i = 0; i < 4; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i])); + } + + // App1 should have 5 containers now, one for each node + FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt( + am1.getApplicationAttemptId()); + Assert.assertEquals(5, schedulerApp1.getLiveContainers().size()); + for (int i = 0; i < 5; i++) { + waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNodes[i].getNodeID()), + am1.getApplicationAttemptId(), 1); + } + + // Submit app2 to queue-a and asks for a 0.5G container for AM (on n0) + RMApp app2 = rm1.submitApp(512, "app", "user", null, "a"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, mockNMs[0]); + FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt( + ApplicationAttemptId.newInstance(app2.getApplicationId(), 1)); + + // Ask 2 * 3.5GB containers + am2.allocate("*", 3 * GB + 512, 2, new ArrayList<>()); + + // Do allocation for n0-n1 + for (int i = 0; i < 2; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i])); + } + + // Check am2 reserved resource from nm0-nm1 + for (int i = 0; i < 2; i++) { + Assert.assertNotNull("Should reserve on nm-" + i, + cs.getNode(rmNodes[i].getNodeID()).getReservedContainer()); + Assert.assertEquals(cs.getNode(rmNodes[i].getNodeID()) + .getReservedContainer().getQueueName(), "a"); + } + + // Submit app3 to queue-b and asks for a 0.5G container for AM (on n2) + RMApp app3 = rm1.submitApp(512, "app", "user", null, "b"); + MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, mockNMs[2]); + FiCaSchedulerApp schedulerApp3 = cs.getApplicationAttempt( + ApplicationAttemptId.newInstance(app3.getApplicationId(), 1)); + + // Ask 2 * 3.5GB containers + am3.allocate("*", 3 * GB + 512, 2, new ArrayList<>()); + + // Do allocation for n2-n3 + for (int i = 2; i < 4; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i])); + } + + // Check am2 reserved resource from nm2-nm3 + for (int i = 2; i < 4; i++) { + Assert.assertNotNull("Should reserve on nm-" + i, + cs.getNode(rmNodes[i].getNodeID()).getReservedContainer()); + Assert.assertEquals(cs.getNode(rmNodes[i].getNodeID()) + .getReservedContainer().getQueueName(), "b"); + } + + // Sleep the timeout interval, we should be able to see 1 container selected + Thread.sleep(1000); + + /* 1st container preempted is on n2 */ + ProportionalCapacityPreemptionPolicy editPolicy = + (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1); + editPolicy.editSchedule(); + + // We should have one to-preempt container, on node[2] + Set