YARN-5731. Preemption calculation is not accurate when reserved containers are present in queue. Contributed by Wangda Tan.
This commit is contained in:
parent
e15e2713e1
commit
cf0d0844d6
@ -43,12 +43,12 @@ public class FifoCandidatesSelector
|
|||||||
LogFactory.getLog(FifoCandidatesSelector.class);
|
LogFactory.getLog(FifoCandidatesSelector.class);
|
||||||
private PreemptableResourceCalculator preemptableAmountCalculator;
|
private PreemptableResourceCalculator preemptableAmountCalculator;
|
||||||
|
|
||||||
FifoCandidatesSelector(
|
FifoCandidatesSelector(CapacitySchedulerPreemptionContext preemptionContext,
|
||||||
CapacitySchedulerPreemptionContext preemptionContext) {
|
boolean includeReservedResource) {
|
||||||
super(preemptionContext);
|
super(preemptionContext);
|
||||||
|
|
||||||
preemptableAmountCalculator = new PreemptableResourceCalculator(
|
preemptableAmountCalculator = new PreemptableResourceCalculator(
|
||||||
preemptionContext, false);
|
preemptionContext, includeReservedResource);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -232,7 +232,27 @@ public void init(Configuration config, RMContext context,
|
|||||||
}
|
}
|
||||||
|
|
||||||
// initialize candidates preemption selection policies
|
// initialize candidates preemption selection policies
|
||||||
candidatesSelectionPolicies.add(new FifoCandidatesSelector(this));
|
// When select candidates for reserved containers is enabled, exclude reserved
|
||||||
|
// resource in fifo policy (less aggressive). Otherwise include reserved
|
||||||
|
// resource.
|
||||||
|
//
|
||||||
|
// Why doing this? In YARN-4390, we added preemption-based-on-reserved-container
|
||||||
|
// Support. To reduce unnecessary preemption for large containers. We will
|
||||||
|
// not include reserved resources while calculating ideal-allocation in
|
||||||
|
// FifoCandidatesSelector.
|
||||||
|
//
|
||||||
|
// Changes in YARN-4390 will significantly reduce number of containers preempted
|
||||||
|
// When cluster has heterogeneous container requests. (Please check test
|
||||||
|
// report: https://issues.apache.org/jira/secure/attachment/12796197/YARN-4390-test-results.pdf
|
||||||
|
//
|
||||||
|
// However, on the other hand, in some corner cases, especially for
|
||||||
|
// fragmented cluster. It could lead to preemption cannot kick in in some
|
||||||
|
// cases. Please see YARN-5731.
|
||||||
|
//
|
||||||
|
// So to solve the problem, we will include reserved when surgical preemption
|
||||||
|
// for reserved container, which reverts behavior when YARN-4390 is disabled.
|
||||||
|
candidatesSelectionPolicies.add(new FifoCandidatesSelector(this,
|
||||||
|
!selectCandidatesForResevedContainers));
|
||||||
|
|
||||||
// Do we need to specially consider intra queue
|
// Do we need to specially consider intra queue
|
||||||
boolean isIntraQueuePreemptionEnabled = csConfig.getBoolean(
|
boolean isIntraQueuePreemptionEnabled = csConfig.getBoolean(
|
||||||
|
@ -131,9 +131,10 @@ public void waitNumberOfReservedContainersFromApp(FiCaSchedulerApp app,
|
|||||||
public void waitNumberOfLiveContainersOnNodeFromApp(FiCaSchedulerNode node,
|
public void waitNumberOfLiveContainersOnNodeFromApp(FiCaSchedulerNode node,
|
||||||
ApplicationAttemptId appId, int expected) throws InterruptedException {
|
ApplicationAttemptId appId, int expected) throws InterruptedException {
|
||||||
int waitNum = 0;
|
int waitNum = 0;
|
||||||
|
int total = 0;
|
||||||
|
|
||||||
while (waitNum < 500) {
|
while (waitNum < 500) {
|
||||||
int total = 0;
|
total = 0;
|
||||||
for (RMContainer c : node.getCopiedListOfRunningContainers()) {
|
for (RMContainer c : node.getCopiedListOfRunningContainers()) {
|
||||||
if (c.getApplicationAttemptId().equals(appId)) {
|
if (c.getApplicationAttemptId().equals(appId)) {
|
||||||
total++;
|
total++;
|
||||||
@ -146,7 +147,9 @@ public void waitNumberOfLiveContainersOnNodeFromApp(FiCaSchedulerNode node,
|
|||||||
waitNum++;
|
waitNum++;
|
||||||
}
|
}
|
||||||
|
|
||||||
Assert.fail();
|
Assert.fail(
|
||||||
|
"Check #live-container-on-node-from-app, actual=" + total + " expected="
|
||||||
|
+ expected);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void checkNumberOfPreemptionCandidateFromApp(
|
public void checkNumberOfPreemptionCandidateFromApp(
|
||||||
|
@ -36,11 +36,11 @@
|
|||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Ignore;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
public class TestCapacitySchedulerSurgicalPreemption
|
public class TestCapacitySchedulerSurgicalPreemption
|
||||||
@ -811,4 +811,99 @@ public void testPriorityPreemptionFromHighestPriorityQueueAndOldestContainer()
|
|||||||
rm1.close();
|
rm1.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testPreemptionForFragmentatedCluster() throws Exception {
|
||||||
|
conf.setBoolean(
|
||||||
|
CapacitySchedulerConfiguration.PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS,
|
||||||
|
false);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Two queues, a/b, each of them are 50/50
|
||||||
|
* 5 nodes in the cluster, each of them is 30G.
|
||||||
|
*
|
||||||
|
* Submit first app, AM = 3G, and 4 * 21G containers.
|
||||||
|
* Submit second app, AM = 3G, and 4 * 21G containers,
|
||||||
|
*
|
||||||
|
* We can get one container preempted from 1st app.
|
||||||
|
*/
|
||||||
|
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(
|
||||||
|
this.conf);
|
||||||
|
conf.setLong(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
|
||||||
|
1024 * 21);
|
||||||
|
conf.setQueues("root", new String[] { "a", "b" });
|
||||||
|
conf.setCapacity("root.a", 50);
|
||||||
|
conf.setUserLimitFactor("root.a", 100);
|
||||||
|
conf.setCapacity("root.b", 50);
|
||||||
|
conf.setUserLimitFactor("root.b", 100);
|
||||||
|
MockRM rm1 = new MockRM(conf);
|
||||||
|
rm1.getRMContext().setNodeLabelManager(mgr);
|
||||||
|
rm1.start();
|
||||||
|
|
||||||
|
List<MockNM> nms = new ArrayList<>();
|
||||||
|
for (int i = 0; i < 5; i++) {
|
||||||
|
nms.add(rm1.registerNode("h" + i + ":1234", 30 * GB));
|
||||||
|
}
|
||||||
|
|
||||||
|
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
||||||
|
|
||||||
|
// launch an app to queue, AM container should be launched in nm1
|
||||||
|
RMApp app1 = rm1.submitApp(3 * GB, "app", "user", null, "a");
|
||||||
|
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nms.get(0));
|
||||||
|
|
||||||
|
am1.allocate("*", 21 * GB, 4, new ArrayList<ContainerId>());
|
||||||
|
|
||||||
|
// Do allocation for all nodes
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
MockNM mockNM = nms.get(i % nms.size());
|
||||||
|
RMNode rmNode = cs.getRMContext().getRMNodes().get(mockNM.getNodeId());
|
||||||
|
cs.handle(new NodeUpdateSchedulerEvent(rmNode));
|
||||||
|
}
|
||||||
|
|
||||||
|
// App1 should have 5 containers now
|
||||||
|
FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
|
||||||
|
am1.getApplicationAttemptId());
|
||||||
|
Assert.assertEquals(5, schedulerApp1.getLiveContainers().size());
|
||||||
|
|
||||||
|
// launch an app to queue, AM container should be launched in nm1
|
||||||
|
RMApp app2 = rm1.submitApp(3 * GB, "app", "user", null, "b");
|
||||||
|
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nms.get(2));
|
||||||
|
|
||||||
|
am2.allocate("*", 21 * GB, 4, new ArrayList<ContainerId>());
|
||||||
|
|
||||||
|
// Do allocation for all nodes
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
MockNM mockNM = nms.get(i % nms.size());
|
||||||
|
RMNode rmNode = cs.getRMContext().getRMNodes().get(mockNM.getNodeId());
|
||||||
|
cs.handle(new NodeUpdateSchedulerEvent(rmNode));
|
||||||
|
}
|
||||||
|
|
||||||
|
// App2 should have 2 containers now
|
||||||
|
FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
|
||||||
|
am2.getApplicationAttemptId());
|
||||||
|
Assert.assertEquals(2, schedulerApp2.getLiveContainers().size());
|
||||||
|
|
||||||
|
waitNumberOfReservedContainersFromApp(schedulerApp2, 1);
|
||||||
|
|
||||||
|
// Call editSchedule twice and allocation once, container should get allocated
|
||||||
|
SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
|
||||||
|
editPolicy.editSchedule();
|
||||||
|
editPolicy.editSchedule();
|
||||||
|
|
||||||
|
int tick = 0;
|
||||||
|
while (schedulerApp2.getLiveContainers().size() != 4 && tick < 10) {
|
||||||
|
// Do allocation for all nodes
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
MockNM mockNM = nms.get(i % nms.size());
|
||||||
|
RMNode rmNode = cs.getRMContext().getRMNodes().get(mockNM.getNodeId());
|
||||||
|
cs.handle(new NodeUpdateSchedulerEvent(rmNode));
|
||||||
|
}
|
||||||
|
tick++;
|
||||||
|
Thread.sleep(100);
|
||||||
|
}
|
||||||
|
Assert.assertEquals(3, schedulerApp2.getLiveContainers().size());
|
||||||
|
|
||||||
|
rm1.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user