From d10428cab26c4c75328ecca118744041f2848251 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Thu, 25 Apr 2013 09:15:51 +0000 Subject: [PATCH] YARN-289. Fair scheduler allows reservations that won't fit on node. Contributed by Sandy Ryza. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1475681 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 + .../scheduler/fair/AppSchedulable.java | 71 ++++++++++++------- .../scheduler/fair/FairScheduler.java | 24 +++++-- .../scheduler/fair/TestFairScheduler.java | 24 +++++++ 4 files changed, 90 insertions(+), 32 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 5caebb9d41..1e26018524 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -303,6 +303,9 @@ Release 2.0.5-beta - UNRELEASED YARN-605. Fix failing unit test in TestNMWebServices when versionInfo has parantheses like when running on a git checkout. (Hitesh Shah via vinodkv) + YARN-289. Fair scheduler allows reservations that won't fit on node. + (Sandy Ryza via tomwhite) + Release 2.0.4-alpha - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java index fa01a0bb99..a8e71735ef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; @@ -45,6 +46,9 @@ @Private @Unstable public class AppSchedulable extends Schedulable { + private static final DefaultResourceCalculator RESOURCE_CALCULATOR + = new DefaultResourceCalculator(); + private FairScheduler scheduler; private FSSchedulerApp app; private Resource demand = Resources.createResource(0); @@ -180,15 +184,15 @@ public Container createContainer( * update relevant bookeeping. This dispatches ro relevant handlers * in the {@link FSSchedulerNode} and {@link SchedulerApp} classes. */ - private void reserve(FSSchedulerApp application, Priority priority, - FSSchedulerNode node, Container container, boolean alreadyReserved) { + private void reserve(Priority priority, FSSchedulerNode node, + Container container, boolean alreadyReserved) { LOG.info("Making reservation: node=" + node.getHostName() + " app_id=" + app.getApplicationId()); if (!alreadyReserved) { - getMetrics().reserveResource(application.getUser(), container.getResource()); - RMContainer rmContainer = application.reserve(node, priority, null, + getMetrics().reserveResource(app.getUser(), container.getResource()); + RMContainer rmContainer = app.reserve(node, priority, null, container); - node.reserveResource(application, priority, rmContainer); + node.reserveResource(app, priority, rmContainer); getMetrics().reserveResource(app.getUser(), container.getResource()); scheduler.getRootQueueMetrics().reserveResource(app.getUser(), @@ -197,25 +201,24 @@ private void reserve(FSSchedulerApp application, Priority priority, else { RMContainer rmContainer = node.getReservedContainer(); - application.reserve(node, priority, rmContainer, container); - node.reserveResource(application, priority, rmContainer); + app.reserve(node, priority, rmContainer, container); + node.reserveResource(app, priority, rmContainer); } } /** - * Remove the reservation on {@code node} for {@ application} at the given + * Remove the reservation on {@code node} at the given * {@link Priority}. This dispatches to the SchedulerApp and SchedulerNode * handlers for an unreservation. */ - private void unreserve(FSSchedulerApp application, Priority priority, - FSSchedulerNode node) { + public void unreserve(Priority priority, FSSchedulerNode node) { RMContainer rmContainer = node.getReservedContainer(); - application.unreserve(node, priority); - node.unreserveResource(application); + app.unreserve(node, priority); + node.unreserveResource(app); getMetrics().unreserveResource( - application.getUser(), rmContainer.getContainer().getResource()); + app.getUser(), rmContainer.getContainer().getResource()); scheduler.getRootQueueMetrics().unreserveResource( - application.getUser(), rmContainer.getContainer().getResource()); + app.getUser(), rmContainer.getContainer().getResource()); } /** @@ -224,8 +227,8 @@ private void unreserve(FSSchedulerApp application, Priority priority, * sure the particular request should be facilitated by this node. */ private Resource assignContainer(FSSchedulerNode node, - FSSchedulerApp application, Priority priority, - ResourceRequest request, NodeType type, boolean reserved) { + Priority priority, ResourceRequest request, NodeType type, + boolean reserved) { // How much does this request need? Resource capability = request.getCapability(); @@ -237,7 +240,7 @@ private Resource assignContainer(FSSchedulerNode node, if (reserved) { container = node.getReservedContainer().getContainer(); } else { - container = createContainer(application, node, capability, priority); + container = createContainer(app, node, capability, priority); } // Can we allocate a container on this node? @@ -247,9 +250,12 @@ private Resource assignContainer(FSSchedulerNode node, if (availableContainers > 0) { // Inform the application of the new container for this request RMContainer allocatedContainer = - application.allocate(type, node, priority, request, container); + app.allocate(type, node, priority, request, container); if (allocatedContainer == null) { // Did the application need this resource? + if (reserved) { + unreserve(priority, node); + } return Resources.none(); } else { @@ -262,17 +268,17 @@ private Resource assignContainer(FSSchedulerNode node, // If we had previously made a reservation, delete it if (reserved) { - unreserve(application, priority, node); + unreserve(priority, node); } // Inform the node - node.allocateContainer(application.getApplicationId(), + node.allocateContainer(app.getApplicationId(), allocatedContainer); return container.getResource(); } else { // The desired container won't fit here, so reserve - reserve(application, priority, node, container, reserved); + reserve(priority, node, container, reserved); return FairScheduler.CONTAINER_RESERVED; } @@ -287,7 +293,7 @@ private Resource assignContainer(FSSchedulerNode node, boolean reserved) { // Make sure the application still needs requests at this priority if (app.getTotalRequiredResources(priority) == 0) { - unreserve(app, priority, node); + unreserve(priority, node); return Resources.none(); } } else { @@ -304,7 +310,8 @@ private Resource assignContainer(FSSchedulerNode node, boolean reserved) { // (not scheduled) in order to promote better locality. synchronized (app) { for (Priority priority : prioritiesToTry) { - if (app.getTotalRequiredResources(priority) <= 0) { + if (app.getTotalRequiredResources(priority) <= 0 || + !hasContainerForNode(priority, node)) { continue; } @@ -321,14 +328,14 @@ private Resource assignContainer(FSSchedulerNode node, boolean reserved) { if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0 && localRequest != null && localRequest.getNumContainers() != 0) { - return assignContainer(node, app, priority, + return assignContainer(node, priority, localRequest, NodeType.NODE_LOCAL, reserved); } if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0 && (allowedLocality.equals(NodeType.RACK_LOCAL) || allowedLocality.equals(NodeType.OFF_SWITCH))) { - return assignContainer(node, app, priority, rackLocalRequest, + return assignContainer(node, priority, rackLocalRequest, NodeType.RACK_LOCAL, reserved); } @@ -336,7 +343,7 @@ private Resource assignContainer(FSSchedulerNode node, boolean reserved) { ResourceRequest.ANY); if (offSwitchRequest != null && offSwitchRequest.getNumContainers() != 0 && allowedLocality.equals(NodeType.OFF_SWITCH)) { - return assignContainer(node, app, priority, offSwitchRequest, + return assignContainer(node, priority, offSwitchRequest, NodeType.OFF_SWITCH, reserved); } } @@ -352,4 +359,16 @@ public Resource assignReservedContainer(FSSchedulerNode node) { public Resource assignContainer(FSSchedulerNode node) { return assignContainer(node, false); } + + /** + * Whether this app has containers requests that could be satisfied on the + * given node, if the node had full space. + */ + public boolean hasContainerForNode(Priority prio, FSSchedulerNode node) { + // TODO: add checks stuff about node specific scheduling here + ResourceRequest request = app.getResourceRequest(prio, ResourceRequest.ANY); + return request.getNumContainers() > 0 && + Resources.lessThanOrEqual(RESOURCE_CALCULATOR, null, + request.getCapability(), node.getRMNode().getTotalCapability()); + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 8aafaeb5cc..15f3eba65e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; @@ -805,14 +806,25 @@ private synchronized void nodeUpdate(RMNode nm) { AppSchedulable reservedAppSchedulable = node.getReservedAppSchedulable(); if (reservedAppSchedulable != null) { - // Reservation exists; try to fulfill the reservation - LOG.info("Trying to fulfill reservation for application " - + reservedAppSchedulable.getApp().getApplicationAttemptId() - + " on node: " + nm); + Priority reservedPriority = node.getReservedContainer().getReservedPriority(); + if (reservedAppSchedulable != null && + !reservedAppSchedulable.hasContainerForNode(reservedPriority, node)) { + // Don't hold the reservation if app can no longer use it + LOG.info("Releasing reservation that cannot be satisfied for application " + + reservedAppSchedulable.getApp().getApplicationAttemptId() + + " on node " + nm); + reservedAppSchedulable.unreserve(reservedPriority, node); + reservedAppSchedulable = null; + } else { + // Reservation exists; try to fulfill the reservation + LOG.info("Trying to fulfill reservation for application " + + reservedAppSchedulable.getApp().getApplicationAttemptId() + + " on node: " + nm); - node.getReservedAppSchedulable().assignReservedContainer(node); + node.getReservedAppSchedulable().assignReservedContainer(node); + } } - else { + if (reservedAppSchedulable == null) { // No reservation, schedule at queue which is farthest below fair share int assignedContainers = 0; while (node.getReservedContainer() == null) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 039e0b0b6d..e6761b9c60 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -1520,4 +1520,28 @@ public void testNotAllowSubmitApplication() throws Exception { } assertEquals(FinalApplicationStatus.FAILED, application.getFinalApplicationStatus()); } + + @Test + public void testReservationThatDoesntFit() { + RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024)); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + ApplicationAttemptId attId = createSchedulingRequest(2048, "queue1", + "user1", 1); + scheduler.update(); + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); + scheduler.handle(updateEvent); + + FSSchedulerApp app = scheduler.applications.get(attId); + assertEquals(0, app.getLiveContainers().size()); + assertEquals(0, app.getReservedContainers().size()); + + createSchedulingRequestExistingApplication(1024, 2, attId); + scheduler.update(); + scheduler.handle(updateEvent); + + assertEquals(1, app.getLiveContainers().size()); + assertEquals(0, app.getReservedContainers().size()); + } }