YARN-9903: Support reservations continue looking for Node Labels. Contributed by Jim Brennan (Jim_Brennan).

This commit is contained in:
Eric E Payne 2020-06-29 18:39:53 +00:00
parent 0be26811f3
commit 74fc13cf91
4 changed files with 300 additions and 18 deletions

View File

@ -1076,14 +1076,12 @@ boolean canAssignToThisQueue(Resource clusterResource,
if (Resources.greaterThanOrEqual(resourceCalculator, clusterResource, if (Resources.greaterThanOrEqual(resourceCalculator, clusterResource,
usedExceptKillable, currentLimitResource)) { usedExceptKillable, currentLimitResource)) {
// if reservation continous looking enabled, check to see if could we // if reservation continue looking enabled, check to see if could we
// potentially use this node instead of a reserved node if the application // potentially use this node instead of a reserved node if the application
// has reserved containers. // has reserved containers.
// TODO, now only consider reservation cases when the node has no label if (this.reservationsContinueLooking
if (this.reservationsContinueLooking && nodePartition.equals( && Resources.greaterThan(resourceCalculator, clusterResource,
RMNodeLabelsManager.NO_LABEL) && Resources.greaterThan( resourceCouldBeUnreserved, Resources.none())) {
resourceCalculator, clusterResource, resourceCouldBeUnreserved,
Resources.none())) {
// resource-without-reserved = used - reserved // resource-without-reserved = used - reserved
Resource newTotalWithoutReservedResource = Resources.subtract( Resource newTotalWithoutReservedResource = Resources.subtract(
usedExceptKillable, resourceCouldBeUnreserved); usedExceptKillable, resourceCouldBeUnreserved);

View File

@ -1574,8 +1574,7 @@ protected boolean canAssignToUser(Resource clusterResource,
user.getUsed(nodePartition), limit)) { user.getUsed(nodePartition), limit)) {
// if enabled, check to see if could we potentially use this node instead // if enabled, check to see if could we potentially use this node instead
// of a reserved node if the application has reserved containers // of a reserved node if the application has reserved containers
if (this.reservationsContinueLooking && nodePartition.equals( if (this.reservationsContinueLooking) {
CommonNodeLabelsManager.NO_LABEL)) {
if (Resources.lessThanOrEqual(resourceCalculator, clusterResource, if (Resources.lessThanOrEqual(resourceCalculator, clusterResource,
Resources.subtract(user.getUsed(), Resources.subtract(user.getUsed(),
application.getCurrentReservation()), limit)) { application.getCurrentReservation()), limit)) {

View File

@ -79,12 +79,11 @@ private boolean checkHeadroom(Resource clusterResource,
String nodePartition) { String nodePartition) {
// If headroom + currentReservation < required, we cannot allocate this // If headroom + currentReservation < required, we cannot allocate this
// require // require
Resource resourceCouldBeUnReserved = application.getCurrentReservation(); Resource resourceCouldBeUnReserved =
if (!application.getCSLeafQueue().getReservationContinueLooking() application.getAppAttemptResourceUsage().getReserved(nodePartition);
|| !nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) { if (!application.getCSLeafQueue().getReservationContinueLooking()) {
// If we don't allow reservation continuous looking, OR we're looking at // If we don't allow reservation continuous looking,
// non-default node partition, we won't allow to unreserve before // we won't allow to unreserve before allocation.
// allocation.
resourceCouldBeUnReserved = Resources.none(); resourceCouldBeUnReserved = Resources.none();
} }
return Resources.greaterThanOrEqual(rc, clusterResource, Resources.add( return Resources.greaterThanOrEqual(rc, clusterResource, Resources.add(
@ -583,13 +582,10 @@ private ContainerAllocation assignContainer(Resource clusterResource,
// Allocate... // Allocate...
// We will only do continuous reservation when this is not allocated from // We will only do continuous reservation when this is not allocated from
// reserved container // reserved container
if (rmContainer == null && reservationsContinueLooking if (rmContainer == null && reservationsContinueLooking) {
&& node.getLabels().isEmpty()) {
// when reservationsContinueLooking is set, we may need to unreserve // when reservationsContinueLooking is set, we may need to unreserve
// some containers to meet this queue, its parents', or the users' // some containers to meet this queue, its parents', or the users'
// resource limits. // resource limits.
// TODO, need change here when we want to support continuous reservation
// looking for labeled partitions.
if (!shouldAllocOrReserveNewContainer || needToUnreserve) { if (!shouldAllocOrReserveNewContainer || needToUnreserve) {
if (!needToUnreserve) { if (!needToUnreserve) {
// If we shouldn't allocate/reserve new container then we should // If we shouldn't allocate/reserve new container then we should

View File

@ -642,6 +642,295 @@ public RMNodeLabelsManager createNodeLabelManager() {
rm1.close(); rm1.close();
} }
@Test (timeout = 120000)
public void testContainerReservationContinueLookingWithLabels()
throws Exception {
// set node -> label
mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x"));
mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0),
toSet("x"), NodeId.newInstance("h2", 0), toSet("x")));
// inject node label manager
MockRM rm1 = new MockRM(
TestUtils.getConfigurationWithQueueLabels(conf)) {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); // label = x
MockNM nm2 = rm1.registerNode("h2:1234", 8 * GB); // label = x
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
LeafQueue leafQueue = (LeafQueue) cs.getQueue("a1");
ContainerId containerId;
// launch an app to queue a1 (label = x)
MockRMAppSubmissionData data1 =
MockRMAppSubmissionData.Builder.createWithMemory(2 * GB, rm1)
.withAppName("app1")
.withUser("user")
.withAcls(null)
.withQueue("a1")
.withUnmanagedAM(false)
.withAmLabel("x")
.build();
RMApp app1 = MockRMAppSubmitter.submit(rm1, data1);
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(am1
.getApplicationAttemptId());
// Verify live on node1
containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
"h1");
Assert.assertEquals(1, schedulerApp1.getLiveContainers().size());
Assert.assertFalse(schedulerApp1.getReservedContainers().size() > 0);
Assert.assertEquals(2 * GB, cs.getRootQueue().getQueueResourceUsage()
.getUsed("x").getMemorySize());
Assert.assertEquals(0 * GB, cs.getRootQueue().getQueueResourceUsage()
.getReserved("x").getMemorySize());
Assert.assertEquals(2 * GB,
leafQueue.getQueueResourceUsage().getUsed("x").getMemorySize());
Assert.assertEquals(0 * GB,
leafQueue.getQueueResourceUsage().getReserved("x").getMemorySize());
// request map containers for app1.
am1.allocate("*", 5 * GB, 2, 5, new ArrayList<ContainerId>(), "x");
// Do node heartbeat to allocate first mapper on node1
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
// Verify live on node1
containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
"h1");
Assert.assertEquals(2, schedulerApp1.getLiveContainers().size());
Assert.assertFalse(schedulerApp1.getReservedContainers().size() > 0);
Assert.assertEquals(7 * GB, cs.getRootQueue().getQueueResourceUsage()
.getUsed("x").getMemorySize());
Assert.assertEquals(0 * GB, cs.getRootQueue().getQueueResourceUsage()
.getReserved("x").getMemorySize());
Assert.assertEquals(7 * GB,
leafQueue.getQueueResourceUsage().getUsed("x").getMemorySize());
Assert.assertEquals(0 * GB,
leafQueue.getQueueResourceUsage().getReserved("x").getMemorySize());
// Do node heartbeat to allocate second mapper on node2
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
// Verify live on node2
containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 3);
checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
"h2");
// node1 7 GB used, node2 5 GB used
Assert.assertEquals(3, schedulerApp1.getLiveContainers().size());
Assert.assertFalse(schedulerApp1.getReservedContainers().size() > 0);
Assert.assertEquals(12 * GB, cs.getRootQueue().getQueueResourceUsage()
.getUsed("x").getMemorySize());
Assert.assertEquals(0 * GB, cs.getRootQueue().getQueueResourceUsage()
.getReserved("x").getMemorySize());
Assert.assertEquals(12 * GB,
leafQueue.getQueueResourceUsage().getUsed("x").getMemorySize());
Assert.assertEquals(0 * GB,
leafQueue.getQueueResourceUsage().getReserved("x").getMemorySize());
// request reducer containers for app1.
am1.allocate("*", 3 * GB, 2, 10, new ArrayList<ContainerId>(), "x");
// Do node heartbeat to reserve reducer on node1
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
// node1 7 GB used and 3 GB reserved, node2 5 GB used
Assert.assertEquals(3, schedulerApp1.getLiveContainers().size());
Assert.assertEquals(1, schedulerApp1.getReservedContainers().size());
Assert.assertEquals(15 * GB, cs.getRootQueue().getQueueResourceUsage()
.getUsed("x").getMemorySize());
Assert.assertEquals(3 * GB, cs.getRootQueue().getQueueResourceUsage()
.getReserved("x").getMemorySize());
Assert.assertEquals(15 * GB,
leafQueue.getQueueResourceUsage().getUsed("x").getMemorySize());
Assert.assertEquals(3 * GB,
leafQueue.getQueueResourceUsage().getReserved("x").getMemorySize());
// Do node heartbeat to allocate container for second reducer on node2
// This should unreserve the reserved container
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
// Verify live on node2
containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 5);
checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
"h2");
// node1 7 GB used and 0 GB reserved, node2 8 GB used
Assert.assertEquals(4, schedulerApp1.getLiveContainers().size());
Assert.assertEquals(0, schedulerApp1.getReservedContainers().size());
Assert.assertEquals(15 * GB, cs.getRootQueue().getQueueResourceUsage()
.getUsed("x").getMemorySize());
Assert.assertEquals(0 * GB, cs.getRootQueue().getQueueResourceUsage()
.getReserved("x").getMemorySize());
Assert.assertEquals(15 * GB,
leafQueue.getQueueResourceUsage().getUsed("x").getMemorySize());
Assert.assertEquals(0 * GB,
leafQueue.getQueueResourceUsage().getReserved("x").getMemorySize());
rm1.close();
}
@Test (timeout = 120000)
public void testContainerReservationContinueLookingWithDefaultLabels()
throws Exception {
// This is the same as testContainerReservationContinueLookingWithLabels,
// but this test doesn't specify the label expression in the
// ResourceRequest, instead it uses default queue label expressions
mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x"));
mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0),
toSet("x"), NodeId.newInstance("h2", 0), toSet("x")));
// inject node label manager
MockRM rm1 = new MockRM(
TestUtils.getConfigurationWithDefaultQueueLabels(conf)) {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); // label = x
MockNM nm2 = rm1.registerNode("h2:1234", 8 * GB); // label = x
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
LeafQueue leafQueue = (LeafQueue) cs.getQueue("a1");
ContainerId containerId;
// launch an app to queue a1 (label = x)
MockRMAppSubmissionData data1 =
MockRMAppSubmissionData.Builder.createWithMemory(2 * GB, rm1)
.withAppName("app1")
.withUser("user")
.withAcls(null)
.withQueue("a1")
.withUnmanagedAM(false)
.build();
RMApp app1 = MockRMAppSubmitter.submit(rm1, data1);
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(am1
.getApplicationAttemptId());
// Verify live on node1
containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
"h1");
Assert.assertEquals(1, schedulerApp1.getLiveContainers().size());
Assert.assertFalse(schedulerApp1.getReservedContainers().size() > 0);
Assert.assertEquals(2 * GB, cs.getRootQueue().getQueueResourceUsage()
.getUsed("x").getMemorySize());
Assert.assertEquals(0 * GB, cs.getRootQueue().getQueueResourceUsage()
.getReserved("x").getMemorySize());
Assert.assertEquals(2 * GB,
leafQueue.getQueueResourceUsage().getUsed("x").getMemorySize());
Assert.assertEquals(0 * GB,
leafQueue.getQueueResourceUsage().getReserved("x").getMemorySize());
// request map containers for app1.
am1.allocate("*", 5 * GB, 2, 5, new ArrayList<ContainerId>(), null);
// Do node heartbeat to allocate first mapper on node1
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
// Verify live on node1
containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
"h1");
Assert.assertEquals(2, schedulerApp1.getLiveContainers().size());
Assert.assertFalse(schedulerApp1.getReservedContainers().size() > 0);
Assert.assertEquals(7 * GB, cs.getRootQueue().getQueueResourceUsage()
.getUsed("x").getMemorySize());
Assert.assertEquals(0 * GB, cs.getRootQueue().getQueueResourceUsage()
.getReserved("x").getMemorySize());
Assert.assertEquals(7 * GB,
leafQueue.getQueueResourceUsage().getUsed("x").getMemorySize());
Assert.assertEquals(0 * GB,
leafQueue.getQueueResourceUsage().getReserved("x").getMemorySize());
// Do node heartbeat to allocate second mapper on node2
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
// Verify live on node2
containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 3);
checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
"h2");
// node1 7 GB used, node2 5 GB used
Assert.assertEquals(3, schedulerApp1.getLiveContainers().size());
Assert.assertFalse(schedulerApp1.getReservedContainers().size() > 0);
Assert.assertEquals(12 * GB, cs.getRootQueue().getQueueResourceUsage()
.getUsed("x").getMemorySize());
Assert.assertEquals(0 * GB, cs.getRootQueue().getQueueResourceUsage()
.getReserved("x").getMemorySize());
Assert.assertEquals(12 * GB,
leafQueue.getQueueResourceUsage().getUsed("x").getMemorySize());
Assert.assertEquals(0 * GB,
leafQueue.getQueueResourceUsage().getReserved("x").getMemorySize());
// request reducer containers for app1.
am1.allocate("*", 3 * GB, 2, 10, new ArrayList<ContainerId>(), null);
// Do node heartbeat to reserve reducer on node1
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
// node1 7 GB used and 3 GB reserved, node2 5 GB used
Assert.assertEquals(3, schedulerApp1.getLiveContainers().size());
Assert.assertEquals(1, schedulerApp1.getReservedContainers().size());
Assert.assertEquals(15 * GB, cs.getRootQueue().getQueueResourceUsage()
.getUsed("x").getMemorySize());
Assert.assertEquals(3 * GB, cs.getRootQueue().getQueueResourceUsage()
.getReserved("x").getMemorySize());
Assert.assertEquals(15 * GB,
leafQueue.getQueueResourceUsage().getUsed("x").getMemorySize());
Assert.assertEquals(3 * GB,
leafQueue.getQueueResourceUsage().getReserved("x").getMemorySize());
// Do node heartbeat to allocate container for second reducer on node2
// This should unreserve the reserved container
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
// Verify live on node2
containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 5);
checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
"h2");
// node1 7 GB used and 0 GB reserved, node2 8 GB used
Assert.assertEquals(4, schedulerApp1.getLiveContainers().size());
Assert.assertEquals(0, schedulerApp1.getReservedContainers().size());
Assert.assertEquals(15 * GB, cs.getRootQueue().getQueueResourceUsage()
.getUsed("x").getMemorySize());
Assert.assertEquals(0 * GB, cs.getRootQueue().getQueueResourceUsage()
.getReserved("x").getMemorySize());
Assert.assertEquals(15 * GB,
leafQueue.getQueueResourceUsage().getUsed("x").getMemorySize());
Assert.assertEquals(0 * GB,
leafQueue.getQueueResourceUsage().getReserved("x").getMemorySize());
rm1.close();
}
@Test (timeout = 120000) @Test (timeout = 120000)
public void testRMContainerLeakInLeafQueue() throws Exception { public void testRMContainerLeakInLeafQueue() throws Exception {
// set node -> label // set node -> label