YARN-8138. Add unit test to validate queue priority preemption works under node partition. (Zian Chen via wangda)
Change-Id: Ibebfab98a714c12c2dc643b6d7b9754a7f813632 (cherry picked from commit 6ee62e6b1c9b4bc3447ce870446068e626b1a492)
This commit is contained in:
parent
669eb7bdea
commit
896b473f1b
@ -18,8 +18,12 @@
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeLabel;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
@ -38,18 +42,24 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
import org.hamcrest.CoreMatchers;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
|
||||
public class TestCapacitySchedulerSurgicalPreemption
|
||||
extends CapacitySchedulerPreemptionTestBase {
|
||||
|
||||
private static final int NUM_NM = 5;
|
||||
@Override
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
@ -860,6 +870,146 @@ public class TestCapacitySchedulerSurgicalPreemption
|
||||
rm1.close();
|
||||
}
|
||||
|
||||
private void initializeConfProperties(CapacitySchedulerConfiguration conf)
|
||||
throws IOException {
|
||||
|
||||
conf.setQueues("root", new String[] {"A", "B"});
|
||||
conf.setCapacity("root.A", 50);
|
||||
conf.setCapacity("root.B", 50);
|
||||
conf.setQueuePriority("root.A", 1);
|
||||
conf.setQueuePriority("root.B", 2);
|
||||
|
||||
conf.set(PREFIX + "root.ordering-policy", "priority-utilization");
|
||||
conf.set(PREFIX + "ordering-policy.priority-utilization.underutilized-preemption.enabled", "true");
|
||||
conf.set(PREFIX + "ordering-policy.priority-utilization.underutilized-preemption.allow-move-reservation", "false");
|
||||
conf.set(PREFIX + "ordering-policy.priority-utilization.underutilized-preemption.reserved-container-delay-ms", "0");
|
||||
conf.set(PREFIX + "root.accessible-node-labels.x.capacity", "100");
|
||||
|
||||
// Setup queue access to node labels
|
||||
conf.set(PREFIX + "root.A.accessible-node-labels", "x");
|
||||
conf.set(PREFIX + "root.B.accessible-node-labels", "x");
|
||||
conf.set(PREFIX + "root.A.default-node-label-expression", "x");
|
||||
conf.set(PREFIX + "root.B.default-node-label-expression", "x");
|
||||
conf.set(PREFIX + "root.A.accessible-node-labels.x.capacity", "50");
|
||||
conf.set(PREFIX + "root.B.accessible-node-labels.x.capacity", "50");
|
||||
conf.set(PREFIX + "root.A.user-limit-factor", "100");
|
||||
conf.set(PREFIX + "root.B.user-limit-factor", "100");
|
||||
conf.set(PREFIX + "maximum-am-resource-percent", "1");
|
||||
|
||||
conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
|
||||
conf.set(YarnConfiguration.RM_AM_MAX_ATTEMPTS, "1");
|
||||
conf.set(CapacitySchedulerConfiguration.PREEMPTION_WAIT_TIME_BEFORE_KILL, "1000");
|
||||
conf.set(CapacitySchedulerConfiguration.PREEMPTION_MONITORING_INTERVAL, "1000");
|
||||
conf.set(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND, "0.5");
|
||||
conf.set(CapacitySchedulerConfiguration.PREEMPTION_NATURAL_TERMINATION_FACTOR, "1");
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPriorityPreemptionWithNodeLabels() throws Exception {
|
||||
// set up queue priority and capacity
|
||||
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
|
||||
|
||||
initializeConfProperties(conf);
|
||||
|
||||
MockRM rm1 = new MockRM(conf) {
|
||||
protected RMNodeLabelsManager createNodeLabelManager() {
|
||||
return mgr;
|
||||
}
|
||||
};
|
||||
rm1.start();
|
||||
|
||||
MockNM[] mockNMs = new MockNM[NUM_NM];
|
||||
for (int i = 0; i < NUM_NM; i++) {
|
||||
mockNMs[i] = rm1.registerNode("h" + i + ":1234", 6144);
|
||||
}
|
||||
|
||||
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
||||
|
||||
mgr.addToCluserNodeLabels(Arrays.asList(NodeLabel.newInstance("x")));
|
||||
|
||||
RMNode[] rmNodes = new RMNode[5];
|
||||
for (int i = 0; i < NUM_NM; i++) {
|
||||
rmNodes[i] = rm1.getRMContext().getRMNodes().get(mockNMs[i].getNodeId());
|
||||
mgr.replaceLabelsOnNode(
|
||||
ImmutableMap.of(rmNodes[i].getNodeID(), ImmutableSet.of("x")));
|
||||
}
|
||||
|
||||
// launch an app to queue B, AM container launched in nm4
|
||||
RMApp app1 = rm1.submitApp(4096, "app", "user", null, "B");
|
||||
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, mockNMs[4]);
|
||||
|
||||
am1.allocate("*", 4096, NUM_NM-1, new ArrayList<>());
|
||||
|
||||
// Do allocation for nm0-nm3
|
||||
for (int i = 0; i < NUM_NM-1; i++) {
|
||||
cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
|
||||
}
|
||||
|
||||
// App1 should have 5 containers now, one for each node
|
||||
FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
|
||||
am1.getApplicationAttemptId());
|
||||
Assert.assertEquals(NUM_NM, schedulerApp1.getLiveContainers().size());
|
||||
for (int i = 0; i < NUM_NM; i++) {
|
||||
waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(
|
||||
rmNodes[i].getNodeID()), am1.getApplicationAttemptId(), 1);
|
||||
}
|
||||
|
||||
// Submit app2 to queue A and asks for a 750MB container for AM (on n0)
|
||||
RMApp app2 = rm1.submitApp(1024, "app", "user", null, "A");
|
||||
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, mockNMs[0]);
|
||||
FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
|
||||
ApplicationAttemptId.newInstance(app2.getApplicationId(), 1));
|
||||
|
||||
// Ask NUM_NM-1 * 1500MB containers
|
||||
am2.allocate("*", 2048, NUM_NM-1, new ArrayList<>());
|
||||
|
||||
// Do allocation for n1-n4
|
||||
for (int i = 1; i < NUM_NM; i++) {
|
||||
cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
|
||||
}
|
||||
|
||||
// kill app1
|
||||
rm1.killApp(app1.getApplicationId());
|
||||
|
||||
// Submit app3 to queue B and asks for a 5000MB container for AM (on n2)
|
||||
RMApp app3 = rm1.submitApp(1024, "app", "user", null, "B");
|
||||
MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, mockNMs[2]);
|
||||
FiCaSchedulerApp schedulerApp3 = cs.getApplicationAttempt(
|
||||
ApplicationAttemptId.newInstance(app3.getApplicationId(), 1));
|
||||
|
||||
// Ask NUM_NM * 5000MB containers
|
||||
am3.allocate("*", 5120, NUM_NM, new ArrayList<>());
|
||||
|
||||
// Do allocation for n0-n4
|
||||
for (int i = 0; i < NUM_NM; i++) {
|
||||
cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
|
||||
}
|
||||
|
||||
// Sleep the timeout interval, we should see 2 containers selected
|
||||
Thread.sleep(1000);
|
||||
|
||||
SchedulingMonitorManager smm = ((CapacityScheduler) rm1.
|
||||
getResourceScheduler()).getSchedulingMonitorManager();
|
||||
SchedulingMonitor smon = smm.getAvailableSchedulingMonitor();
|
||||
ProportionalCapacityPreemptionPolicy editPolicy =
|
||||
(ProportionalCapacityPreemptionPolicy) smon.getSchedulingEditPolicy();
|
||||
editPolicy.editSchedule();
|
||||
|
||||
// We should only allow to preempt 2 containers, on node1 and node2
|
||||
Set<RMContainer> selectedToPreempt =
|
||||
editPolicy.getToPreemptContainers().keySet();
|
||||
Assert.assertEquals(2, selectedToPreempt.size());
|
||||
List<NodeId> selectedToPreemptNodeIds = new ArrayList<>();
|
||||
for (RMContainer rmc : selectedToPreempt) {
|
||||
selectedToPreemptNodeIds.add(rmc.getAllocatedNode());
|
||||
}
|
||||
assertThat(selectedToPreemptNodeIds, CoreMatchers.hasItems(
|
||||
mockNMs[1].getNodeId(), mockNMs[2].getNodeId()));
|
||||
|
||||
rm1.close();
|
||||
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testPreemptionForFragmentatedCluster() throws Exception {
|
||||
|
Loading…
x
Reference in New Issue
Block a user