YARN-10259. Fix reservation logic in Multi Node Placement.

Reviewed by Wangda Tan.
This commit is contained in:
Prabhu Joseph 2020-05-07 17:47:51 +05:30 committed by Prabhu Joseph
parent 1958cb7c2b
commit 6ce295b787
3 changed files with 130 additions and 12 deletions

View File

@ -1008,11 +1008,15 @@ private void setPreemptionAllowed(ResourceLimits limits, String nodePartition) {
private CSAssignment allocateFromReservedContainer(Resource clusterResource, private CSAssignment allocateFromReservedContainer(Resource clusterResource,
CandidateNodeSet<FiCaSchedulerNode> candidates, CandidateNodeSet<FiCaSchedulerNode> candidates,
ResourceLimits currentResourceLimits, SchedulingMode schedulingMode) { ResourceLimits currentResourceLimits, SchedulingMode schedulingMode) {
// Considering multi-node scheduling, its better to iterate through
// all candidates and stop once we get atleast one good node to allocate // Irrespective of Single / Multi Node Placement, the allocate from
// where reservation was made earlier. In normal case, there is only one // Reserved Container has to happen only for the single node which
// node and hence there wont be any impact after this change. // CapacityScheduler#allocateFromReservedContainer invokes with.
for (FiCaSchedulerNode node : candidates.getAllNodes().values()) { // Else In Multi Node Placement, there won't be any Allocation or
// Reserve of new containers when there is a RESERVED container on
// a node which is full.
FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates);
if (node != null) {
RMContainer reservedContainer = node.getReservedContainer(); RMContainer reservedContainer = node.getReservedContainer();
if (reservedContainer != null) { if (reservedContainer != null) {
FiCaSchedulerApp application = getApplication( FiCaSchedulerApp application = getApplication(

View File

@ -837,6 +837,7 @@ private ContainerAllocation allocate(Resource clusterResource,
// Do checks before determining which node to allocate // Do checks before determining which node to allocate
// Directly return if this check fails. // Directly return if this check fails.
ContainerAllocation result; ContainerAllocation result;
ContainerAllocation lastReservation = null;
AppPlacementAllocator<FiCaSchedulerNode> schedulingPS = AppPlacementAllocator<FiCaSchedulerNode> schedulingPS =
application.getAppSchedulingInfo().getAppPlacementAllocator( application.getAppSchedulingInfo().getAppPlacementAllocator(
@ -878,11 +879,24 @@ private ContainerAllocation allocate(Resource clusterResource,
result = tryAllocateOnNode(clusterResource, node, schedulingMode, result = tryAllocateOnNode(clusterResource, node, schedulingMode,
resourceLimits, schedulerKey, reservedContainer); resourceLimits, schedulerKey, reservedContainer);
if (AllocationState.ALLOCATED == result.getAllocationState() if (AllocationState.ALLOCATED == result.getAllocationState()) {
|| AllocationState.RESERVED == result.getAllocationState()) {
result = doAllocation(result, node, schedulerKey, reservedContainer); result = doAllocation(result, node, schedulerKey, reservedContainer);
break; break;
} }
// In MultiNodePlacement, Try Allocate on other Available nodes
// from Iterator as well before Reserving. Else there won't be any
// Allocate of new containers when the first node in the
// iterator could not fit and returns RESERVED allocation.
if (AllocationState.RESERVED == result.getAllocationState()) {
lastReservation = result;
if (iter.hasNext()) {
continue;
} else {
result = doAllocation(lastReservation, node, schedulerKey,
reservedContainer);
}
}
} }
return result; return result;

View File

@ -22,6 +22,7 @@
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
@ -223,6 +224,7 @@ public void testExcessReservationWillBeUnreserved() throws Exception {
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
LeafQueue leafQueue = (LeafQueue) cs.getQueue("default"); LeafQueue leafQueue = (LeafQueue) cs.getQueue("default");
FiCaSchedulerApp schedulerApp1 = FiCaSchedulerApp schedulerApp1 =
cs.getApplicationAttempt(am1.getApplicationAttemptId()); cs.getApplicationAttempt(am1.getApplicationAttemptId());
@ -234,12 +236,13 @@ public void testExcessReservationWillBeUnreserved() throws Exception {
* after its ask has been cancelled when used capacity of root queue is 1. * after its ask has been cancelled when used capacity of root queue is 1.
*/ */
// Ask a container with 6GB memory size for app1, // Ask a container with 6GB memory size for app1,
// nm1 will reserve a container for app1 // nm2 will reserve a container for app1
// Last Node from Node Iterator will be RESERVED
am1.allocate("*", 6 * GB, 1, new ArrayList<>()); am1.allocate("*", 6 * GB, 1, new ArrayList<>());
cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
// Check containers of app1 and app2. // Check containers of app1 and app2.
Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer()); Assert.assertNotNull(cs.getNode(nm2.getNodeId()).getReservedContainer());
Assert.assertEquals(1, schedulerApp1.getLiveContainers().size()); Assert.assertEquals(1, schedulerApp1.getLiveContainers().size());
Assert.assertEquals(1, schedulerApp1.getReservedContainers().size()); Assert.assertEquals(1, schedulerApp1.getReservedContainers().size());
Assert.assertEquals(1, schedulerApp2.getLiveContainers().size()); Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
@ -324,12 +327,13 @@ public void testAllocateForReservedContainer() throws Exception {
* after node has sufficient resource. * after node has sufficient resource.
*/ */
// Ask a container with 6GB memory size for app2, // Ask a container with 6GB memory size for app2,
// nm1 will reserve a container for app2 // nm2 will reserve a container for app2
// Last Node from Node Iterator will be RESERVED
am2.allocate("*", 6 * GB, 1, new ArrayList<>()); am2.allocate("*", 6 * GB, 1, new ArrayList<>());
cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
// Check containers of app1 and app2. // Check containers of app1 and app2.
Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer()); Assert.assertNotNull(cs.getNode(nm2.getNodeId()).getReservedContainer());
Assert.assertEquals(1, schedulerApp1.getLiveContainers().size()); Assert.assertEquals(1, schedulerApp1.getLiveContainers().size());
Assert.assertEquals(1, schedulerApp2.getLiveContainers().size()); Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
Assert.assertEquals(1, schedulerApp2.getReservedContainers().size()); Assert.assertEquals(1, schedulerApp2.getReservedContainers().size());
@ -344,4 +348,100 @@ public void testAllocateForReservedContainer() throws Exception {
rm1.close(); rm1.close();
} }
@Test(timeout=30000)
public void testAllocateOfReservedContainerFromAnotherNode()
throws Exception {
CapacitySchedulerConfiguration newConf =
new CapacitySchedulerConfiguration(conf);
newConf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
newConf.setInt(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME
+ ".resource-based.sorting-interval.ms", 0);
newConf.setMaximumApplicationMasterResourcePerQueuePercent("root.default",
1.0f);
MockRM rm1 = new MockRM(newConf);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 12 * GB, 2);
MockNM nm2 = rm1.registerNode("h2:1234", 12 * GB, 2);
// launch an app1 to queue, AM container will be launched in nm1
RMApp app1 = MockRMAppSubmitter.submit(rm1,
MockRMAppSubmissionData.Builder.createWithMemory(8 * GB, rm1)
.withAppName("app")
.withUser("user")
.withAcls(null)
.withQueue("default")
.build());
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
// launch another app2 to queue, AM container will be launched in nm2
RMApp app2 = MockRMAppSubmitter.submit(rm1,
MockRMAppSubmissionData.Builder.createWithMemory(8 * GB, rm1)
.withAppName("app")
.withUser("user")
.withAcls(null)
.withQueue("default")
.build());
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
// Reserve a Container for app3
RMApp app3 = MockRMAppSubmitter.submit(rm1,
MockRMAppSubmissionData.Builder.createWithMemory(8 * GB, rm1)
.withAppName("app")
.withUser("user")
.withAcls(null)
.withQueue("default")
.build());
final AtomicBoolean result = new AtomicBoolean(false);
Thread t = new Thread() {
public void run() {
try {
MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm1);
result.set(true);
} catch (Exception e) {
Assert.fail("Failed to allocate the reserved container");
}
}
};
t.start();
Thread.sleep(1000);
// Validate if app3 has got RESERVED container
FiCaSchedulerApp schedulerApp =
cs.getApplicationAttempt(app3.getCurrentAppAttempt().getAppAttemptId());
Assert.assertEquals("App3 failed to get reserved container", 1,
schedulerApp.getReservedContainers().size());
// Free the Space on other node where Reservation has not happened
if (cs.getNode(rmNode1.getNodeID()).getReservedContainer() != null) {
rm1.killApp(app2.getApplicationId());
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
} else {
rm1.killApp(app1.getApplicationId());
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
}
// Check if Reserved AM of app3 gets allocated in
// node where space available
while (!result.get()) {
Thread.sleep(100);
}
// Validate release of reserved containers
schedulerApp =
cs.getApplicationAttempt(app3.getCurrentAppAttempt().getAppAttemptId());
Assert.assertEquals("App3 failed to release Reserved container", 0,
schedulerApp.getReservedContainers().size());
Assert.assertNull(cs.getNode(rmNode1.getNodeID()).getReservedContainer());
Assert.assertNull(cs.getNode(rmNode2.getNodeID()).getReservedContainer());
rm1.close();
}
} }