From cf0d0844d6ae25d537391edb9b65fca05d1848e6 Mon Sep 17 00:00:00 2001 From: Sunil G Date: Thu, 13 Jul 2017 16:48:29 +0530 Subject: [PATCH] YARN-5731. Preemption calculation is not accurate when reserved containers are present in queue. Contributed by Wangda Tan. --- .../capacity/FifoCandidatesSelector.java | 6 +- .../ProportionalCapacityPreemptionPolicy.java | 22 ++++- .../CapacitySchedulerPreemptionTestBase.java | 7 +- ...stCapacitySchedulerSurgicalPreemption.java | 97 ++++++++++++++++++- 4 files changed, 125 insertions(+), 7 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java index f4d7e92a11..f843db402c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java @@ -43,12 +43,12 @@ public class FifoCandidatesSelector LogFactory.getLog(FifoCandidatesSelector.class); private PreemptableResourceCalculator preemptableAmountCalculator; - FifoCandidatesSelector( - CapacitySchedulerPreemptionContext preemptionContext) { + FifoCandidatesSelector(CapacitySchedulerPreemptionContext preemptionContext, + boolean includeReservedResource) { super(preemptionContext); preemptableAmountCalculator = new PreemptableResourceCalculator( - preemptionContext, false); + preemptionContext, includeReservedResource); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index 76d6637a3f..719d2ebcd3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -232,7 +232,27 @@ public void init(Configuration config, RMContext context, } // initialize candidates preemption selection policies - candidatesSelectionPolicies.add(new FifoCandidatesSelector(this)); + // When select candidates for reserved containers is enabled, exclude reserved + // resource in fifo policy (less aggressive). Otherwise include reserved + // resource. + // + // Why doing this? In YARN-4390, we added preemption-based-on-reserved-container + // Support. To reduce unnecessary preemption for large containers. We will + // not include reserved resources while calculating ideal-allocation in + // FifoCandidatesSelector. + // + // Changes in YARN-4390 will significantly reduce number of containers preempted + // When cluster has heterogeneous container requests. (Please check test + // report: https://issues.apache.org/jira/secure/attachment/12796197/YARN-4390-test-results.pdf + // + // However, on the other hand, in some corner cases, especially for + // fragmented cluster. It could lead to preemption cannot kick in in some + // cases. Please see YARN-5731. + // + // So to solve the problem, we will include reserved when surgical preemption + // for reserved container, which reverts behavior when YARN-4390 is disabled. + candidatesSelectionPolicies.add(new FifoCandidatesSelector(this, + !selectCandidatesForResevedContainers)); // Do we need to specially consider intra queue boolean isIntraQueuePreemptionEnabled = csConfig.getBoolean( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java index 943b7d2107..55ccb8afca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java @@ -131,9 +131,10 @@ public void waitNumberOfReservedContainersFromApp(FiCaSchedulerApp app, public void waitNumberOfLiveContainersOnNodeFromApp(FiCaSchedulerNode node, ApplicationAttemptId appId, int expected) throws InterruptedException { int waitNum = 0; + int total = 0; while (waitNum < 500) { - int total = 0; + total = 0; for (RMContainer c : node.getCopiedListOfRunningContainers()) { if (c.getApplicationAttemptId().equals(appId)) { total++; @@ -146,7 +147,9 @@ public void waitNumberOfLiveContainersOnNodeFromApp(FiCaSchedulerNode node, waitNum++; } - Assert.fail(); + Assert.fail( + "Check #live-container-on-node-from-app, actual=" + total + " expected=" + + expected); } public void checkNumberOfPreemptionCandidateFromApp( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java index 4a37bef46b..afd2f829b0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java @@ -36,11 +36,11 @@ import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Assert; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import java.util.Set; public class TestCapacitySchedulerSurgicalPreemption @@ -811,4 +811,99 @@ public void testPriorityPreemptionFromHighestPriorityQueueAndOldestContainer() rm1.close(); } + @Test(timeout = 60000) + public void testPreemptionForFragmentatedCluster() throws Exception { + conf.setBoolean( + CapacitySchedulerConfiguration.PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS, + false); + + /** + * Two queues, a/b, each of them are 50/50 + * 5 nodes in the cluster, each of them is 30G. + * + * Submit first app, AM = 3G, and 4 * 21G containers. + * Submit second app, AM = 3G, and 4 * 21G containers, + * + * We can get one container preempted from 1st app. + */ + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration( + this.conf); + conf.setLong(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, + 1024 * 21); + conf.setQueues("root", new String[] { "a", "b" }); + conf.setCapacity("root.a", 50); + conf.setUserLimitFactor("root.a", 100); + conf.setCapacity("root.b", 50); + conf.setUserLimitFactor("root.b", 100); + MockRM rm1 = new MockRM(conf); + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + + List nms = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + nms.add(rm1.registerNode("h" + i + ":1234", 30 * GB)); + } + + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + + // launch an app to queue, AM container should be launched in nm1 + RMApp app1 = rm1.submitApp(3 * GB, "app", "user", null, "a"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nms.get(0)); + + am1.allocate("*", 21 * GB, 4, new ArrayList()); + + // Do allocation for all nodes + for (int i = 0; i < 10; i++) { + MockNM mockNM = nms.get(i % nms.size()); + RMNode rmNode = cs.getRMContext().getRMNodes().get(mockNM.getNodeId()); + cs.handle(new NodeUpdateSchedulerEvent(rmNode)); + } + + // App1 should have 5 containers now + FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt( + am1.getApplicationAttemptId()); + Assert.assertEquals(5, schedulerApp1.getLiveContainers().size()); + + // launch an app to queue, AM container should be launched in nm1 + RMApp app2 = rm1.submitApp(3 * GB, "app", "user", null, "b"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nms.get(2)); + + am2.allocate("*", 21 * GB, 4, new ArrayList()); + + // Do allocation for all nodes + for (int i = 0; i < 10; i++) { + MockNM mockNM = nms.get(i % nms.size()); + RMNode rmNode = cs.getRMContext().getRMNodes().get(mockNM.getNodeId()); + cs.handle(new NodeUpdateSchedulerEvent(rmNode)); + } + + // App2 should have 2 containers now + FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt( + am2.getApplicationAttemptId()); + Assert.assertEquals(2, schedulerApp2.getLiveContainers().size()); + + waitNumberOfReservedContainersFromApp(schedulerApp2, 1); + + // Call editSchedule twice and allocation once, container should get allocated + SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1); + editPolicy.editSchedule(); + editPolicy.editSchedule(); + + int tick = 0; + while (schedulerApp2.getLiveContainers().size() != 4 && tick < 10) { + // Do allocation for all nodes + for (int i = 0; i < 10; i++) { + MockNM mockNM = nms.get(i % nms.size()); + RMNode rmNode = cs.getRMContext().getRMNodes().get(mockNM.getNodeId()); + cs.handle(new NodeUpdateSchedulerEvent(rmNode)); + } + tick++; + Thread.sleep(100); + } + Assert.assertEquals(3, schedulerApp2.getLiveContainers().size()); + + rm1.close(); + } + + }