From b9a5fd51904a074a7a33f38266378f0f6f97b531 Mon Sep 17 00:00:00 2001 From: Arun Murthy Date: Wed, 31 Aug 2011 19:52:20 +0000 Subject: [PATCH] MAPREDUCE-2917. Fixed corner case in container reservation which led to starvation and hung jobs. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1163768 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../scheduler/SchedulerApp.java | 51 ++++++-- .../capacity/CapacitySchedulerContext.java | 1 - .../scheduler/capacity/LeafQueue.java | 71 +++++++++-- .../scheduler/capacity/Queue.java | 1 - .../scheduler/capacity/TestLeafQueue.java | 117 +++++++++++++++++- 6 files changed, 224 insertions(+), 20 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 469fa1f8b9..b945e05a47 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -1181,6 +1181,9 @@ Release 0.23.0 - Unreleased MAPREDUCE-2916. Ivy build for MRv1 fails with bad organization for common daemon. (mahadev) + MAPREDUCE-2917. Fixed corner case in container reservation which led to + starvation and hung jobs. (acmurthy) + Release 0.22.0 - Unreleased INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java index d16ae4b994..024f9eb491 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java @@ -73,7 +73,11 @@ public class SchedulerApp { final Map> reservedContainers = new HashMap>(); - Map schedulingOpportunities = new HashMap(); + Map schedulingOpportunities = + new HashMap(); + + Map reReservations = + new HashMap(); Resource currentReservation = recordFactory .newRecordInstance(Resource.class); @@ -265,15 +269,15 @@ public synchronized RMContainer getRMContainer(ContainerId id) { } synchronized public void resetSchedulingOpportunities(Priority priority) { - Integer schedulingOpportunities = this.schedulingOpportunities - .get(priority); + Integer schedulingOpportunities = + this.schedulingOpportunities.get(priority); schedulingOpportunities = 0; this.schedulingOpportunities.put(priority, schedulingOpportunities); } synchronized public void addSchedulingOpportunity(Priority priority) { - Integer schedulingOpportunities = this.schedulingOpportunities - .get(priority); + Integer schedulingOpportunities = + this.schedulingOpportunities.get(priority); if (schedulingOpportunities == null) { schedulingOpportunities = 0; } @@ -282,8 +286,8 @@ synchronized public void addSchedulingOpportunity(Priority priority) { } synchronized public int getSchedulingOpportunities(Priority priority) { - Integer schedulingOpportunities = this.schedulingOpportunities - .get(priority); + Integer schedulingOpportunities = + this.schedulingOpportunities.get(priority); if (schedulingOpportunities == null) { schedulingOpportunities = 0; this.schedulingOpportunities.put(priority, schedulingOpportunities); @@ -291,6 +295,30 @@ synchronized public int getSchedulingOpportunities(Priority priority) { return schedulingOpportunities; } + synchronized void resetReReservations(Priority priority) { + Integer reReservations = this.reReservations.get(priority); + reReservations = 0; + this.reReservations.put(priority, reReservations); + } + + synchronized void addReReservation(Priority priority) { + Integer reReservations = this.reReservations.get(priority); + if (reReservations == null) { + reReservations = 0; + } + ++reReservations; + this.reReservations.put(priority, reReservations); + } + + synchronized public int getReReservations(Priority priority) { + Integer reReservations = this.reReservations.get(priority); + if (reReservations == null) { + reReservations = 0; + this.reReservations.put(priority, reReservations); + } + return reReservations; + } + public synchronized int getNumReservedContainers(Priority priority) { Map reservedContainers = this.reservedContainers.get(priority); @@ -318,6 +346,12 @@ public synchronized RMContainer reserve(SchedulerNode node, Priority priority, rmContext.getContainerAllocationExpirer()); Resources.addTo(currentReservation, container.getResource()); + + // Reset the re-reservation count + resetReReservations(priority); + } else { + // Note down the re-reservation + addReReservation(priority); } rmContainer.handle(new RMContainerReservedEvent(container.getId(), container.getResource(), node.getNodeID(), priority)); @@ -347,6 +381,9 @@ public synchronized void unreserve(SchedulerNode node, Priority priority) { this.reservedContainers.remove(priority); } + // Reset the re-reservation count + resetReReservations(priority); + Resource resource = reservedContainer.getContainer().getResource(); Resources.subtractFrom(currentReservation, resource); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java index d48557a329..c6b2b390e7 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java @@ -19,7 +19,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager; diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 6f98c268ff..7ed33d5853 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -86,7 +86,9 @@ public class LeafQueue implements Queue { Map applicationsMap = new HashMap(); - public final Resource minimumAllocation; + private final Resource minimumAllocation; + private final Resource maximumAllocation; + private final float minimumAllocationFactor; private ContainerTokenSecretManager containerTokenSecretManager; @@ -118,6 +120,10 @@ public LeafQueue(CapacitySchedulerContext cs, cs.getConfiguration().getEnableUserMetrics()); this.minimumAllocation = cs.getMinimumResourceCapability(); + this.maximumAllocation = cs.getMaximumResourceCapability(); + this.minimumAllocationFactor = + (float)(maximumAllocation.getMemory() - minimumAllocation.getMemory()) / + maximumAllocation.getMemory(); this.containerTokenSecretManager = cs.getContainerTokenSecretManager(); float capacity = @@ -239,6 +245,30 @@ public String getQueuePath() { return parent.getQueuePath() + "." + getQueueName(); } + /** + * Used only by tests. + */ + @Private + public Resource getMinimumAllocation() { + return minimumAllocation; + } + + /** + * Used only by tests. + */ + @Private + public Resource getMaximumAllocation() { + return maximumAllocation; + } + + /** + * Used only by tests. + */ + @Private + public float getMinimumAllocationFactor() { + return minimumAllocationFactor; + } + @Override public synchronized float getUsedCapacity() { return usedCapacity; @@ -536,25 +566,24 @@ private synchronized SchedulerApp getApplication( setUserResourceLimit(application, userLimit); for (Priority priority : application.getPriorities()) { + // Required resource + Resource required = + application.getResourceRequest(priority, RMNode.ANY).getCapability(); // Do we need containers at this 'priority'? - if (!needContainers(application, priority)) { + if (!needContainers(application, priority, required)) { continue; } // Are we going over limits by allocating to this application? - ResourceRequest required = - application.getResourceRequest(priority, RMNode.ANY); - // Maximum Capacity of the queue - if (!assignToQueue(clusterResource, required.getCapability())) { + if (!assignToQueue(clusterResource, required)) { return Resources.none(); } // User limits userLimit = - computeUserLimit(application, clusterResource, - required.getCapability()); + computeUserLimit(application, clusterResource, required); if (!assignToUser(application.getUser(), userLimit)) { break; } @@ -732,10 +761,32 @@ private static int divideAndCeil(int a, int b) { return (a + (b - 1)) / b; } - boolean needContainers(SchedulerApp application, Priority priority) { + boolean needContainers(SchedulerApp application, Priority priority, Resource required) { int requiredContainers = application.getTotalRequiredResources(priority); int reservedContainers = application.getNumReservedContainers(priority); - return ((requiredContainers - reservedContainers) > 0); + int starvation = 0; + if (reservedContainers > 0) { + float nodeFactor = + ((float)required.getMemory() / getMaximumAllocation().getMemory()); + + // Use percentage of node required to bias against large containers... + // Protect against corner case where you need the whole node with + // Math.min(nodeFactor, minimumAllocationFactor) + starvation = + (int)((application.getReReservations(priority) / reservedContainers) * + (1.0f - (Math.min(nodeFactor, getMinimumAllocationFactor()))) + ); + + if (LOG.isDebugEnabled()) { + LOG.debug("needsContainers:" + + " app.#re-reserve=" + application.getReReservations(priority) + + " reserved=" + reservedContainers + + " nodeFactor=" + nodeFactor + + " minAllocFactor=" + minimumAllocationFactor + + " starvation=" + starvation); + } + } + return (((starvation + requiredContainers) - reservedContainers) > 0); } private Resource assignContainersOnNode(Resource clusterResource, diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java index 9c9473da7b..b39bdc970a 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java @@ -29,7 +29,6 @@ import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp; diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index d60a4e8924..8e6152bbdc 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -447,7 +447,7 @@ public void testReservation() throws Exception { SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 4*GB); final int numNodes = 1; - Resource clusterResource = Resources.createResource(numNodes * (8*GB)); + Resource clusterResource = Resources.createResource(numNodes * (4*GB)); when(csContext.getNumClusterNodes()).thenReturn(numNodes); // Setup resource-requests @@ -504,6 +504,121 @@ public void testReservation() throws Exception { assertEquals(4*GB, node_0.getUsedResource().getMemory()); } + @Test + public void testReservationExchange() throws Exception { + + // Manipulate queue 'a' + LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A)); + a.setUserLimitFactor(10); + + // Users + final String user_0 = "user_0"; + final String user_1 = "user_1"; + + // Submit applications + final ApplicationAttemptId appAttemptId_0 = + TestUtils.getMockApplicationAttemptId(0, 0); + SchedulerApp app_0 = + new SchedulerApp(appAttemptId_0, user_0, a, rmContext, null); + a.submitApplication(app_0, user_0, A); + + final ApplicationAttemptId appAttemptId_1 = + TestUtils.getMockApplicationAttemptId(1, 0); + SchedulerApp app_1 = + new SchedulerApp(appAttemptId_1, user_1, a, rmContext, null); + a.submitApplication(app_1, user_1, A); + + // Setup some nodes + String host_0 = "host_0"; + SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 4*GB); + + String host_1 = "host_1"; + SchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 4*GB); + + final int numNodes = 2; + Resource clusterResource = Resources.createResource(numNodes * (4*GB)); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + when(csContext.getMaximumResourceCapability()).thenReturn( + Resources.createResource(4*GB)); + when(a.getMaximumAllocation()).thenReturn(Resources.createResource(4*GB)); + when(a.getMinimumAllocationFactor()).thenReturn(0.25f); // 1G / 4G + + // Setup resource-requests + Priority priority = TestUtils.createMockPriority(1); + app_0.updateResourceRequests(Collections.singletonList( + TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 2, priority, + recordFactory))); + + app_1.updateResourceRequests(Collections.singletonList( + TestUtils.createResourceRequest(RMNodeImpl.ANY, 4*GB, 1, priority, + recordFactory))); + + // Start testing... + + // Only 1 container + a.assignContainers(clusterResource, node_0); + assertEquals(1*GB, a.getUsedResources().getMemory()); + assertEquals(1*GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); + + // Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also + // you can get one container more than user-limit + a.assignContainers(clusterResource, node_0); + assertEquals(2*GB, a.getUsedResources().getMemory()); + assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); + + // Now, reservation should kick in for app_1 + a.assignContainers(clusterResource, node_0); + assertEquals(6*GB, a.getUsedResources().getMemory()); + assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); + assertEquals(4*GB, app_1.getCurrentReservation().getMemory()); + assertEquals(2*GB, node_0.getUsedResource().getMemory()); + + // Now free 1 container from app_0 i.e. 1G, and re-reserve it + a.completedContainer(clusterResource, app_0, node_0, + app_0.getLiveContainers().iterator().next(), RMContainerEventType.KILL); + a.assignContainers(clusterResource, node_0); + assertEquals(5*GB, a.getUsedResources().getMemory()); + assertEquals(1*GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); + assertEquals(4*GB, app_1.getCurrentReservation().getMemory()); + assertEquals(1*GB, node_0.getUsedResource().getMemory()); + assertEquals(1, app_1.getReReservations(priority)); + + // Re-reserve + a.assignContainers(clusterResource, node_0); + assertEquals(5*GB, a.getUsedResources().getMemory()); + assertEquals(1*GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); + assertEquals(4*GB, app_1.getCurrentReservation().getMemory()); + assertEquals(1*GB, node_0.getUsedResource().getMemory()); + assertEquals(2, app_1.getReReservations(priority)); + + // Try to schedule on node_1 now, should *move* the reservation + a.assignContainers(clusterResource, node_1); + assertEquals(9*GB, a.getUsedResources().getMemory()); + assertEquals(1*GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(4*GB, app_1.getCurrentConsumption().getMemory()); + assertEquals(4*GB, app_1.getCurrentReservation().getMemory()); + assertEquals(4*GB, node_1.getUsedResource().getMemory()); + // Doesn't change yet... only when reservation is cancelled or a different + // container is reserved + assertEquals(2, app_1.getReReservations(priority)); + + // Now finish another container from app_0 and see the reservation cancelled + a.completedContainer(clusterResource, app_0, node_0, + app_0.getLiveContainers().iterator().next(), RMContainerEventType.KILL); + a.assignContainers(clusterResource, node_0); + assertEquals(4*GB, a.getUsedResources().getMemory()); + assertEquals(0*GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(4*GB, app_1.getCurrentConsumption().getMemory()); + assertEquals(0*GB, app_1.getCurrentReservation().getMemory()); + assertEquals(0*GB, node_0.getUsedResource().getMemory()); + } + + @Test public void testLocalityScheduling() throws Exception {