From 4a14264ddb28d4cfd06ec4f70b42e3174a1d888c Mon Sep 17 00:00:00 2001 From: Arun Murthy Date: Mon, 3 Jun 2013 18:13:53 +0000 Subject: [PATCH] YARN-398. Make it possible to specify hard locality constraints in resource requests for CapacityScheduler. Contributed by Arun C. Murthy. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1489087 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 + .../scheduler/capacity/CSAssignment.java | 17 +- .../scheduler/capacity/LeafQueue.java | 112 ++++--- .../common/fica/FiCaSchedulerApp.java | 5 + .../capacity/TestApplicationLimits.java | 6 +- .../scheduler/capacity/TestLeafQueue.java | 293 ++++++++++++++---- .../scheduler/capacity/TestUtils.java | 7 +- 7 files changed, 329 insertions(+), 114 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index a1c6ccc6a7..2b2c64ef4b 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -111,6 +111,9 @@ Release 2.1.0-beta - UNRELEASED YARN-326. Add multi-resource scheduling to the fair scheduler. (sandyr via tucu) + YARN-398. Make it possible to specify hard locality constraints in resource + requests for CapacityScheduler. (acmurthy) + IMPROVEMENTS YARN-365. Change NM heartbeat handling to not generate a scheduler event diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java index 1f1250a2b6..d2c0cc7581 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java @@ -20,6 +20,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; @@ -31,12 +32,14 @@ public class CSAssignment { private NodeType type; private final RMContainer excessReservation; private final FiCaSchedulerApp application; + private final boolean skipped; public CSAssignment(Resource resource, NodeType type) { this.resource = resource; this.type = type; this.application = null; this.excessReservation = null; + this.skipped = false; } public CSAssignment(FiCaSchedulerApp application, RMContainer excessReservation) { @@ -44,8 +47,16 @@ public CSAssignment(FiCaSchedulerApp application, RMContainer excessReservation) this.type = NodeType.NODE_LOCAL; this.application = application; this.excessReservation = excessReservation; + this.skipped = false; + } + + public CSAssignment(boolean skipped) { + this.resource = Resources.createResource(0, 0); + this.type = NodeType.NODE_LOCAL; + this.application = null; + this.excessReservation = null; + this.skipped = skipped; } - public Resource getResource() { return resource; @@ -67,6 +78,10 @@ public RMContainer getExcessReservation() { return excessReservation; } + public boolean getSkipped() { + return skipped; + } + @Override public String toString() { return resource.getMemory() + ":" + type; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 57797ed7ae..1d0cec2712 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -784,6 +784,8 @@ private synchronized FiCaSchedulerApp getApplication( private static final CSAssignment NULL_ASSIGNMENT = new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL); + private static final CSAssignment SKIP_ASSIGNMENT = new CSAssignment(true); + @Override public synchronized CSAssignment assignContainers(Resource clusterResource, FiCaSchedulerNode node) { @@ -853,6 +855,13 @@ private synchronized FiCaSchedulerApp getApplication( assignContainersOnNode(clusterResource, node, application, priority, null); + // Did the application skip this node? + if (assignment.getSkipped()) { + // Don't count 'skipped nodes' as a scheduling opportunity! + application.subtractSchedulingOpportunity(priority); + continue; + } + // Did we schedule or reserve a container? Resource assigned = assignment.getResource(); if (Resources.greaterThan( @@ -1104,73 +1113,88 @@ private CSAssignment assignContainersOnNode(Resource clusterResource, Resource assigned = Resources.none(); // Data-local - assigned = - assignNodeLocalContainers(clusterResource, node, application, priority, - reservedContainer); - if (Resources.greaterThan(resourceCalculator, clusterResource, - assigned, Resources.none())) { - return new CSAssignment(assigned, NodeType.NODE_LOCAL); + ResourceRequest nodeLocalResourceRequest = + application.getResourceRequest(priority, node.getHostName()); + if (nodeLocalResourceRequest != null) { + assigned = + assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest, + node, application, priority, reservedContainer); + if (Resources.greaterThan(resourceCalculator, clusterResource, + assigned, Resources.none())) { + return new CSAssignment(assigned, NodeType.NODE_LOCAL); + } } // Rack-local - assigned = - assignRackLocalContainers(clusterResource, node, application, priority, - reservedContainer); - if (Resources.greaterThan(resourceCalculator, clusterResource, - assigned, Resources.none())) { - return new CSAssignment(assigned, NodeType.RACK_LOCAL); + ResourceRequest rackLocalResourceRequest = + application.getResourceRequest(priority, node.getRackName()); + if (rackLocalResourceRequest != null) { + if (!rackLocalResourceRequest.getRelaxLocality()) { + return SKIP_ASSIGNMENT; + } + + assigned = + assignRackLocalContainers(clusterResource, rackLocalResourceRequest, + node, application, priority, reservedContainer); + if (Resources.greaterThan(resourceCalculator, clusterResource, + assigned, Resources.none())) { + return new CSAssignment(assigned, NodeType.RACK_LOCAL); + } } // Off-switch - return new CSAssignment( - assignOffSwitchContainers(clusterResource, node, application, - priority, reservedContainer), - NodeType.OFF_SWITCH); + ResourceRequest offSwitchResourceRequest = + application.getResourceRequest(priority, ResourceRequest.ANY); + if (offSwitchResourceRequest != null) { + if (!offSwitchResourceRequest.getRelaxLocality()) { + return SKIP_ASSIGNMENT; + } + + return new CSAssignment( + assignOffSwitchContainers(clusterResource, offSwitchResourceRequest, + node, application, priority, reservedContainer), + NodeType.OFF_SWITCH); + } + + return SKIP_ASSIGNMENT; } - private Resource assignNodeLocalContainers(Resource clusterResource, + private Resource assignNodeLocalContainers( + Resource clusterResource, ResourceRequest nodeLocalResourceRequest, FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, RMContainer reservedContainer) { - ResourceRequest request = - application.getResourceRequest(priority, node.getHostName()); - if (request != null) { - if (canAssign(application, priority, node, NodeType.NODE_LOCAL, - reservedContainer)) { - return assignContainer(clusterResource, node, application, priority, - request, NodeType.NODE_LOCAL, reservedContainer); - } + if (canAssign(application, priority, node, NodeType.NODE_LOCAL, + reservedContainer)) { + return assignContainer(clusterResource, node, application, priority, + nodeLocalResourceRequest, NodeType.NODE_LOCAL, reservedContainer); } return Resources.none(); } - private Resource assignRackLocalContainers(Resource clusterResource, + private Resource assignRackLocalContainers( + Resource clusterResource, ResourceRequest rackLocalResourceRequest, FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, RMContainer reservedContainer) { - ResourceRequest request = - application.getResourceRequest(priority, node.getRackName()); - if (request != null) { - if (canAssign(application, priority, node, NodeType.RACK_LOCAL, - reservedContainer)) { - return assignContainer(clusterResource, node, application, priority, request, - NodeType.RACK_LOCAL, reservedContainer); - } + if (canAssign(application, priority, node, NodeType.RACK_LOCAL, + reservedContainer)) { + return assignContainer(clusterResource, node, application, priority, + rackLocalResourceRequest, NodeType.RACK_LOCAL, reservedContainer); } + return Resources.none(); } - private Resource assignOffSwitchContainers(Resource clusterResource, FiCaSchedulerNode node, - FiCaSchedulerApp application, Priority priority, + private Resource assignOffSwitchContainers( + Resource clusterResource, ResourceRequest offSwitchResourceRequest, + FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, RMContainer reservedContainer) { - ResourceRequest request = - application.getResourceRequest(priority, ResourceRequest.ANY); - if (request != null) { - if (canAssign(application, priority, node, NodeType.OFF_SWITCH, - reservedContainer)) { - return assignContainer(clusterResource, node, application, priority, request, - NodeType.OFF_SWITCH, reservedContainer); - } + if (canAssign(application, priority, node, NodeType.OFF_SWITCH, + reservedContainer)) { + return assignContainer(clusterResource, node, application, priority, + offSwitchResourceRequest, NodeType.OFF_SWITCH, reservedContainer); } + return Resources.none(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 57e0634e29..ba4efaa7c6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -332,6 +332,11 @@ synchronized public void addSchedulingOpportunity(Priority priority) { schedulingOpportunities.count(priority) + 1); } + synchronized public void subtractSchedulingOpportunity(Priority priority) { + int count = schedulingOpportunities.count(priority) - 1; + this.schedulingOpportunities.setCount(priority, Math.max(count, 0)); + } + /** * Return the number of times the application has been given an opportunity * to schedule a task at the given priority since the last time it diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java index e9f53137b9..84d9236b40 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java @@ -512,7 +512,7 @@ public void testHeadroom() throws Exception { List app_0_0_requests = new ArrayList(); app_0_0_requests.add( TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, - priority_1, recordFactory)); + true, priority_1, recordFactory)); app_0_0.updateResourceRequests(app_0_0_requests); // Schedule to compute @@ -531,7 +531,7 @@ public void testHeadroom() throws Exception { List app_0_1_requests = new ArrayList(); app_0_1_requests.add( TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, - priority_1, recordFactory)); + true, priority_1, recordFactory)); app_0_1.updateResourceRequests(app_0_1_requests); // Schedule to compute @@ -550,7 +550,7 @@ public void testHeadroom() throws Exception { List app_1_0_requests = new ArrayList(); app_1_0_requests.add( TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, - priority_1, recordFactory)); + true, priority_1, recordFactory)); app_1_0.updateResourceRequests(app_1_0_requests); // Schedule to compute diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 141b7948c3..908e9a8ce3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -292,8 +292,8 @@ public void testSingleQueueOneUserMetrics() throws Exception { // Setup resource-requests Priority priority = TestUtils.createMockPriority(1); app_0.updateResourceRequests(Collections.singletonList( - TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 3, priority, - recordFactory))); + TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 3, true, + priority, recordFactory))); // Start testing... @@ -414,12 +414,12 @@ public void testSingleQueueWithOneUser() throws Exception { // Setup resource-requests Priority priority = TestUtils.createMockPriority(1); app_0.updateResourceRequests(Collections.singletonList( - TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 3, priority, - recordFactory))); + TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 3, true, + priority, recordFactory))); app_1.updateResourceRequests(Collections.singletonList( - TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, priority, - recordFactory))); + TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true, + priority, recordFactory))); // Start testing... @@ -547,12 +547,12 @@ public void testUserLimits() throws Exception { // Setup resource-requests Priority priority = TestUtils.createMockPriority(1); app_0.updateResourceRequests(Collections.singletonList( - TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, priority, - recordFactory))); + TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, true, + priority, recordFactory))); app_1.updateResourceRequests(Collections.singletonList( - TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, priority, - recordFactory))); + TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true, + priority, recordFactory))); /** * Start testing... @@ -640,12 +640,12 @@ public void testHeadroomWithMaxCap() throws Exception { // Setup resource-requests Priority priority = TestUtils.createMockPriority(1); app_0.updateResourceRequests(Collections.singletonList( - TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, priority, - recordFactory))); + TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, true, + priority, recordFactory))); app_1.updateResourceRequests(Collections.singletonList( - TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, priority, - recordFactory))); + TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true, + priority, recordFactory))); /** * Start testing... @@ -679,8 +679,8 @@ public void testHeadroomWithMaxCap() throws Exception { // Submit requests for app_1 and set max-cap a.setMaxCapacity(.1f); app_2.updateResourceRequests(Collections.singletonList( - TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, priority, - recordFactory))); + TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true, + priority, recordFactory))); assertEquals(2, a.getActiveUsersManager().getNumActiveUsers()); // No more to user_0 since he is already over user-limit @@ -696,8 +696,8 @@ public void testHeadroomWithMaxCap() throws Exception { // Check headroom for app_2 LOG.info("here"); app_1.updateResourceRequests(Collections.singletonList( // unset - TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 0, priority, - recordFactory))); + TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 0, true, + priority, recordFactory))); assertEquals(1, a.getActiveUsersManager().getNumActiveUsers()); a.assignContainers(clusterResource, node_1); assertEquals(1*GB, app_2.getHeadroom().getMemory()); // hit queue max-cap @@ -757,12 +757,12 @@ public void testSingleQueueWithMultipleUsers() throws Exception { // Setup resource-requests Priority priority = TestUtils.createMockPriority(1); app_0.updateResourceRequests(Collections.singletonList( - TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 10, priority, - recordFactory))); + TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 10, true, + priority, recordFactory))); app_1.updateResourceRequests(Collections.singletonList( - TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 10, priority, - recordFactory))); + TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 10, true, + priority, recordFactory))); /** * Start testing... @@ -791,12 +791,12 @@ public void testSingleQueueWithMultipleUsers() throws Exception { // Submit resource requests for other apps now to 'activate' them app_2.updateResourceRequests(Collections.singletonList( - TestUtils.createResourceRequest(ResourceRequest.ANY, 3*GB, 1, priority, - recordFactory))); + TestUtils.createResourceRequest(ResourceRequest.ANY, 3*GB, 1, true, + priority, recordFactory))); app_3.updateResourceRequests(Collections.singletonList( - TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, priority, - recordFactory))); + TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true, + priority, recordFactory))); // Now allocations should goto app_2 since // user_0 is at limit inspite of high user-limit-factor @@ -919,12 +919,12 @@ public void testReservation() throws Exception { // Setup resource-requests Priority priority = TestUtils.createMockPriority(1); app_0.updateResourceRequests(Collections.singletonList( - TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, priority, - recordFactory))); + TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true, + priority, recordFactory))); app_1.updateResourceRequests(Collections.singletonList( - TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 1, priority, - recordFactory))); + TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 1, true, + priority, recordFactory))); // Start testing... @@ -1021,19 +1021,19 @@ public void testStolenReservedContainer() throws Exception { // Setup resource-requests Priority priority = TestUtils.createMockPriority(1); app_0.updateResourceRequests(Collections.singletonList( - TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, priority, - recordFactory))); + TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, true, + priority, recordFactory))); // Setup app_1 to request a 4GB container on host_0 and // another 4GB container anywhere. ArrayList appRequests_1 = new ArrayList(4); appRequests_1.add(TestUtils.createResourceRequest(host_0, 4*GB, 1, - priority, recordFactory)); + true, priority, recordFactory)); appRequests_1.add(TestUtils.createResourceRequest(DEFAULT_RACK, 4*GB, 1, - priority, recordFactory)); + true, priority, recordFactory)); appRequests_1.add(TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 2, - priority, recordFactory)); + true, priority, recordFactory)); app_1.updateResourceRequests(appRequests_1); // Start testing... @@ -1127,12 +1127,12 @@ public void testReservationExchange() throws Exception { // Setup resource-requests Priority priority = TestUtils.createMockPriority(1); app_0.updateResourceRequests(Collections.singletonList( - TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, priority, - recordFactory))); + TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true, + priority, recordFactory))); app_1.updateResourceRequests(Collections.singletonList( - TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 1, priority, - recordFactory))); + TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 1, true, + priority, recordFactory))); // Start testing... @@ -1242,19 +1242,19 @@ public void testLocalityScheduling() throws Exception { List app_0_requests_0 = new ArrayList(); app_0_requests_0.add( TestUtils.createResourceRequest(host_0, 1*GB, 1, - priority, recordFactory)); + true, priority, recordFactory)); app_0_requests_0.add( TestUtils.createResourceRequest(rack_0, 1*GB, 1, - priority, recordFactory)); + true, priority, recordFactory)); app_0_requests_0.add( TestUtils.createResourceRequest(host_1, 1*GB, 1, - priority, recordFactory)); + true, priority, recordFactory)); app_0_requests_0.add( TestUtils.createResourceRequest(rack_1, 1*GB, 1, - priority, recordFactory)); + true, priority, recordFactory)); app_0_requests_0.add( TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 3, // one extra - priority, recordFactory)); + true, priority, recordFactory)); app_0.updateResourceRequests(app_0_requests_0); // Start testing... @@ -1313,13 +1313,13 @@ public void testLocalityScheduling() throws Exception { app_0_requests_0.clear(); app_0_requests_0.add( TestUtils.createResourceRequest(host_1, 1*GB, 1, - priority, recordFactory)); + true, priority, recordFactory)); app_0_requests_0.add( TestUtils.createResourceRequest(rack_1, 1*GB, 1, - priority, recordFactory)); + true, priority, recordFactory)); app_0_requests_0.add( TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, // one extra - priority, recordFactory)); + true, priority, recordFactory)); app_0.updateResourceRequests(app_0_requests_0); assertEquals(2, app_0.getTotalRequiredResources(priority)); @@ -1385,31 +1385,31 @@ public void testApplicationPriorityScheduling() throws Exception { Priority priority_1 = TestUtils.createMockPriority(1); app_0_requests_0.add( TestUtils.createResourceRequest(host_0, 1*GB, 1, - priority_1, recordFactory)); + true, priority_1, recordFactory)); app_0_requests_0.add( TestUtils.createResourceRequest(rack_0, 1*GB, 1, - priority_1, recordFactory)); + true, priority_1, recordFactory)); app_0_requests_0.add( TestUtils.createResourceRequest(host_1, 1*GB, 1, - priority_1, recordFactory)); + true, priority_1, recordFactory)); app_0_requests_0.add( TestUtils.createResourceRequest(rack_1, 1*GB, 1, - priority_1, recordFactory)); + true, priority_1, recordFactory)); app_0_requests_0.add( TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, - priority_1, recordFactory)); + true, priority_1, recordFactory)); // P2 Priority priority_2 = TestUtils.createMockPriority(2); app_0_requests_0.add( TestUtils.createResourceRequest(host_2, 2*GB, 1, - priority_2, recordFactory)); + true, priority_2, recordFactory)); app_0_requests_0.add( TestUtils.createResourceRequest(rack_2, 2*GB, 1, - priority_2, recordFactory)); + true, priority_2, recordFactory)); app_0_requests_0.add( TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, - priority_2, recordFactory)); + true, priority_2, recordFactory)); app_0.updateResourceRequests(app_0_requests_0); @@ -1513,19 +1513,19 @@ public void testSchedulingConstraints() throws Exception { List app_0_requests_0 = new ArrayList(); app_0_requests_0.add( TestUtils.createResourceRequest(host_0_0, 1*GB, 1, - priority, recordFactory)); + true, priority, recordFactory)); app_0_requests_0.add( TestUtils.createResourceRequest(host_0_1, 1*GB, 1, - priority, recordFactory)); + true, priority, recordFactory)); app_0_requests_0.add( TestUtils.createResourceRequest(rack_0, 1*GB, 1, - priority, recordFactory)); + true, priority, recordFactory)); app_0_requests_0.add( TestUtils.createResourceRequest(host_1_0, 1*GB, 1, - priority, recordFactory)); + true, priority, recordFactory)); app_0_requests_0.add( TestUtils.createResourceRequest(rack_1, 1*GB, 1, - priority, recordFactory)); + true, priority, recordFactory)); app_0.updateResourceRequests(app_0_requests_0); // Start testing... @@ -1534,7 +1534,7 @@ public void testSchedulingConstraints() throws Exception { app_0_requests_0.clear(); app_0_requests_0.add( TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, // only one - priority, recordFactory)); + true, priority, recordFactory)); app_0.updateResourceRequests(app_0_requests_0); // NODE_LOCAL - node_0_1 @@ -1557,7 +1557,7 @@ public void testSchedulingConstraints() throws Exception { app_0_requests_0.clear(); app_0_requests_0.add( TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, // only one - priority, recordFactory)); + true, priority, recordFactory)); app_0.updateResourceRequests(app_0_requests_0); // No allocation on node_0_1 even though it's node/rack local since @@ -1731,7 +1731,174 @@ public void testInheritedQueueAcls() throws IOException { c1.getQueueUserAclInfo(user), QueueACL.SUBMIT_APPLICATIONS)); } - + + @Test + public void testLocalityConstraints() throws Exception { + + // Manipulate queue 'a' + LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A)); + + // User + String user_0 = "user_0"; + + // Submit applications + final ApplicationAttemptId appAttemptId_0 = + TestUtils.getMockApplicationAttemptId(0, 0); + FiCaSchedulerApp app_0 = + spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a, + mock(ActiveUsersManager.class), rmContext)); + a.submitApplication(app_0, user_0, A); + + final ApplicationAttemptId appAttemptId_1 = + TestUtils.getMockApplicationAttemptId(1, 0); + FiCaSchedulerApp app_1 = + spy(new FiCaSchedulerApp(appAttemptId_1, user_0, a, + mock(ActiveUsersManager.class), rmContext)); + a.submitApplication(app_1, user_0, A); + + // Setup some nodes and racks + String host_0_0 = "127.0.0.1"; + String rack_0 = "rack_0"; + FiCaSchedulerNode node_0_0 = TestUtils.getMockNode(host_0_0, rack_0, 0, 8*GB); + String host_0_1 = "127.0.0.2"; + FiCaSchedulerNode node_0_1 = TestUtils.getMockNode(host_0_1, rack_0, 0, 8*GB); + + + String host_1_0 = "127.0.0.3"; + String rack_1 = "rack_1"; + FiCaSchedulerNode node_1_0 = TestUtils.getMockNode(host_1_0, rack_1, 0, 8*GB); + String host_1_1 = "127.0.0.4"; + FiCaSchedulerNode node_1_1 = TestUtils.getMockNode(host_1_1, rack_1, 0, 8*GB); + + final int numNodes = 4; + Resource clusterResource = Resources.createResource( + numNodes * (8*GB), numNodes * 1); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + + // Setup resource-requests + // resourceName: + // host_0_0: < 1, 1GB, 1, true > + // host_0_1: < null > + // rack_0: < null > <---- + // host_1_0: < 1, 1GB, 1, true > + // host_1_1: < null > + // rack_1: < 1, 1GB, 1, false > <---- + // ANY: < 1, 1GB, 1, false > <---- + // Availability: + // host_0_0: 8G + // host_0_1: 8G + // host_1_0: 8G + // host_1_1: 8G + Priority priority = TestUtils.createMockPriority(1); + List app_0_requests_0 = new ArrayList(); + app_0_requests_0.add( + TestUtils.createResourceRequest(host_0_0, 1*GB, 1, + true, priority, recordFactory)); + app_0_requests_0.add( + TestUtils.createResourceRequest(host_1_0, 1*GB, 1, + true, priority, recordFactory)); + app_0_requests_0.add( + TestUtils.createResourceRequest(rack_1, 1*GB, 1, + false, priority, recordFactory)); + app_0_requests_0.add( + TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, // only one + false, priority, recordFactory)); + app_0.updateResourceRequests(app_0_requests_0); + app_0_requests_0.clear(); + + // + // Start testing... + // + + // node_0_1 + // Shouldn't allocate since RR(rack_0) = null && RR(ANY) = relax: false + a.assignContainers(clusterResource, node_0_1); + verify(app_0, never()).allocate(any(NodeType.class), eq(node_0_1), + any(Priority.class), any(ResourceRequest.class), any(Container.class)); + assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0 + + // resourceName: + // host_0_0: < 1, 1GB, 1, true > + // host_0_1: < null > + // rack_0: < null > <---- + // host_1_0: < 1, 1GB, 1, true > + // host_1_1: < null > + // rack_1: < 1, 1GB, 1, false > <---- + // ANY: < 1, 1GB, 1, false > <---- + // Availability: + // host_0_0: 8G + // host_0_1: 8G + // host_1_0: 8G + // host_1_1: 8G + + // node_1_1 + // Shouldn't allocate since RR(rack_1) = relax: false + a.assignContainers(clusterResource, node_1_1); + verify(app_0, never()).allocate(any(NodeType.class), eq(node_0_1), + any(Priority.class), any(ResourceRequest.class), any(Container.class)); + assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0 + + // Allow rack-locality for rack_1 + app_0_requests_0.add( + TestUtils.createResourceRequest(rack_1, 1*GB, 1, + true, priority, recordFactory)); + app_0.updateResourceRequests(app_0_requests_0); + app_0_requests_0.clear(); + + // resourceName: + // host_0_0: < 1, 1GB, 1, true > + // host_0_1: < null > + // rack_0: < null > + // host_1_0: < 1, 1GB, 1, true > + // host_1_1: < null > + // rack_1: < 1, 1GB, 1, true > <---- + // ANY: < 1, 1GB, 1, false > + // Availability: + // host_0_0: 8G + // host_0_1: 8G + // host_1_0: 8G + // host_1_1: 8G + + // node_1_1 + // Now, should allocate since RR(rack_1) = relax: true + a.assignContainers(clusterResource, node_1_1); + verify(app_0).allocate(eq(NodeType.RACK_LOCAL), eq(node_1_1), + any(Priority.class), any(ResourceRequest.class), any(Container.class)); + assertEquals(0, app_0.getSchedulingOpportunities(priority)); + assertEquals(0, app_0.getTotalRequiredResources(priority)); + + // Now sanity-check node_local + app_0_requests_0.add( + TestUtils.createResourceRequest(rack_1, 1*GB, 1, + false, priority, recordFactory)); + app_0_requests_0.add( + TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, // only one + false, priority, recordFactory)); + app_0.updateResourceRequests(app_0_requests_0); + app_0_requests_0.clear(); + + // resourceName: + // host_0_0: < 1, 1GB, 1, true > + // host_0_1: < null > + // rack_0: < null > + // host_1_0: < 1, 1GB, 1, true > + // host_1_1: < null > + // rack_1: < 1, 1GB, 1, false > <---- + // ANY: < 1, 1GB, 1, false > <---- + // Availability: + // host_0_0: 8G + // host_0_1: 8G + // host_1_0: 8G + // host_1_1: 7G + + a.assignContainers(clusterResource, node_1_0); + verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1_0), + any(Priority.class), any(ResourceRequest.class), any(Container.class)); + assertEquals(0, app_0.getSchedulingOpportunities(priority)); + assertEquals(0, app_0.getTotalRequiredResources(priority)); + + } + @After public void tearDown() throws Exception { } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java index 56dda8a0d2..d2a0ac7f53 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java @@ -115,15 +115,16 @@ public static Priority createMockPriority( int priority) { } public static ResourceRequest createResourceRequest( - String hostName, int memory, int numContainers, Priority priority, - RecordFactory recordFactory) { + String resourceName, int memory, int numContainers, boolean relaxLocality, + Priority priority, RecordFactory recordFactory) { ResourceRequest request = recordFactory.newRecordInstance(ResourceRequest.class); Resource capability = Resources.createResource(memory, 1); request.setNumContainers(numContainers); - request.setResourceName(hostName); + request.setResourceName(resourceName); request.setCapability(capability); + request.setRelaxLocality(relaxLocality); request.setPriority(priority); return request; }