From 6ce295b78737aca8103912121d54f318cb5d36ef Mon Sep 17 00:00:00 2001 From: Prabhu Joseph Date: Thu, 7 May 2020 17:47:51 +0530 Subject: [PATCH] YARN-10259. Fix reservation logic in Multi Node Placement. Reviewed by Wangda Tan. --- .../scheduler/capacity/LeafQueue.java | 14 ++- .../allocator/RegularContainerAllocator.java | 18 ++- .../TestCapacitySchedulerMultiNodes.java | 110 +++++++++++++++++- 3 files changed, 130 insertions(+), 12 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 64b1536379..69f41ef533 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -1008,11 +1008,15 @@ private void setPreemptionAllowed(ResourceLimits limits, String nodePartition) { private CSAssignment allocateFromReservedContainer(Resource clusterResource, CandidateNodeSet candidates, ResourceLimits currentResourceLimits, SchedulingMode schedulingMode) { - // Considering multi-node scheduling, its better to iterate through - // all candidates and stop once we get atleast one good node to allocate - // where reservation was made earlier. In normal case, there is only one - // node and hence there wont be any impact after this change. - for (FiCaSchedulerNode node : candidates.getAllNodes().values()) { + + // Irrespective of Single / Multi Node Placement, the allocate from + // Reserved Container has to happen only for the single node which + // CapacityScheduler#allocateFromReservedContainer invokes with. + // Else In Multi Node Placement, there won't be any Allocation or + // Reserve of new containers when there is a RESERVED container on + // a node which is full. + FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates); + if (node != null) { RMContainer reservedContainer = node.getReservedContainer(); if (reservedContainer != null) { FiCaSchedulerApp application = getApplication( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java index 1dacc96242..287dc67ded 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java @@ -837,6 +837,7 @@ private ContainerAllocation allocate(Resource clusterResource, // Do checks before determining which node to allocate // Directly return if this check fails. ContainerAllocation result; + ContainerAllocation lastReservation = null; AppPlacementAllocator schedulingPS = application.getAppSchedulingInfo().getAppPlacementAllocator( @@ -878,11 +879,24 @@ private ContainerAllocation allocate(Resource clusterResource, result = tryAllocateOnNode(clusterResource, node, schedulingMode, resourceLimits, schedulerKey, reservedContainer); - if (AllocationState.ALLOCATED == result.getAllocationState() - || AllocationState.RESERVED == result.getAllocationState()) { + if (AllocationState.ALLOCATED == result.getAllocationState()) { result = doAllocation(result, node, schedulerKey, reservedContainer); break; } + + // In MultiNodePlacement, Try Allocate on other Available nodes + // from Iterator as well before Reserving. Else there won't be any + // Allocate of new containers when the first node in the + // iterator could not fit and returns RESERVED allocation. + if (AllocationState.RESERVED == result.getAllocationState()) { + lastReservation = result; + if (iter.hasNext()) { + continue; + } else { + result = doAllocation(lastReservation, node, schedulerKey, + reservedContainer); + } + } } return result; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java index fa85ca7813..bb2cbfdba1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java @@ -22,6 +22,7 @@ import java.util.Iterator; import java.util.List; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; @@ -223,6 +224,7 @@ public void testExcessReservationWillBeUnreserved() throws Exception { CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId()); LeafQueue leafQueue = (LeafQueue) cs.getQueue("default"); FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(am1.getApplicationAttemptId()); @@ -234,12 +236,13 @@ public void testExcessReservationWillBeUnreserved() throws Exception { * after its ask has been cancelled when used capacity of root queue is 1. */ // Ask a container with 6GB memory size for app1, - // nm1 will reserve a container for app1 + // nm2 will reserve a container for app1 + // Last Node from Node Iterator will be RESERVED am1.allocate("*", 6 * GB, 1, new ArrayList<>()); - cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); // Check containers of app1 and app2. - Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer()); + Assert.assertNotNull(cs.getNode(nm2.getNodeId()).getReservedContainer()); Assert.assertEquals(1, schedulerApp1.getLiveContainers().size()); Assert.assertEquals(1, schedulerApp1.getReservedContainers().size()); Assert.assertEquals(1, schedulerApp2.getLiveContainers().size()); @@ -324,12 +327,13 @@ public void testAllocateForReservedContainer() throws Exception { * after node has sufficient resource. */ // Ask a container with 6GB memory size for app2, - // nm1 will reserve a container for app2 + // nm2 will reserve a container for app2 + // Last Node from Node Iterator will be RESERVED am2.allocate("*", 6 * GB, 1, new ArrayList<>()); cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); // Check containers of app1 and app2. - Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer()); + Assert.assertNotNull(cs.getNode(nm2.getNodeId()).getReservedContainer()); Assert.assertEquals(1, schedulerApp1.getLiveContainers().size()); Assert.assertEquals(1, schedulerApp2.getLiveContainers().size()); Assert.assertEquals(1, schedulerApp2.getReservedContainers().size()); @@ -344,4 +348,100 @@ public void testAllocateForReservedContainer() throws Exception { rm1.close(); } + + @Test(timeout=30000) + public void testAllocateOfReservedContainerFromAnotherNode() + throws Exception { + CapacitySchedulerConfiguration newConf = + new CapacitySchedulerConfiguration(conf); + newConf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, + YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER); + newConf.setInt(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME + + ".resource-based.sorting-interval.ms", 0); + newConf.setMaximumApplicationMasterResourcePerQueuePercent("root.default", + 1.0f); + MockRM rm1 = new MockRM(newConf); + + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 12 * GB, 2); + MockNM nm2 = rm1.registerNode("h2:1234", 12 * GB, 2); + + // launch an app1 to queue, AM container will be launched in nm1 + RMApp app1 = MockRMAppSubmitter.submit(rm1, + MockRMAppSubmissionData.Builder.createWithMemory(8 * GB, rm1) + .withAppName("app") + .withUser("user") + .withAcls(null) + .withQueue("default") + .build()); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + // launch another app2 to queue, AM container will be launched in nm2 + RMApp app2 = MockRMAppSubmitter.submit(rm1, + MockRMAppSubmissionData.Builder.createWithMemory(8 * GB, rm1) + .withAppName("app") + .withUser("user") + .withAcls(null) + .withQueue("default") + .build()); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2); + + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId()); + + // Reserve a Container for app3 + RMApp app3 = MockRMAppSubmitter.submit(rm1, + MockRMAppSubmissionData.Builder.createWithMemory(8 * GB, rm1) + .withAppName("app") + .withUser("user") + .withAcls(null) + .withQueue("default") + .build()); + + final AtomicBoolean result = new AtomicBoolean(false); + Thread t = new Thread() { + public void run() { + try { + MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm1); + result.set(true); + } catch (Exception e) { + Assert.fail("Failed to allocate the reserved container"); + } + } + }; + t.start(); + Thread.sleep(1000); + + // Validate if app3 has got RESERVED container + FiCaSchedulerApp schedulerApp = + cs.getApplicationAttempt(app3.getCurrentAppAttempt().getAppAttemptId()); + Assert.assertEquals("App3 failed to get reserved container", 1, + schedulerApp.getReservedContainers().size()); + + // Free the Space on other node where Reservation has not happened + if (cs.getNode(rmNode1.getNodeID()).getReservedContainer() != null) { + rm1.killApp(app2.getApplicationId()); + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + } else { + rm1.killApp(app1.getApplicationId()); + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + } + + // Check if Reserved AM of app3 gets allocated in + // node where space available + while (!result.get()) { + Thread.sleep(100); + } + + // Validate release of reserved containers + schedulerApp = + cs.getApplicationAttempt(app3.getCurrentAppAttempt().getAppAttemptId()); + Assert.assertEquals("App3 failed to release Reserved container", 0, + schedulerApp.getReservedContainers().size()); + Assert.assertNull(cs.getNode(rmNode1.getNodeID()).getReservedContainer()); + Assert.assertNull(cs.getNode(rmNode2.getNodeID()).getReservedContainer()); + + rm1.close(); + } }