diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index e51ef4bf47..27550889e0 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -564,6 +564,10 @@ Release 0.23.1 - Unreleased MAPREDUCE-3683. Fixed maxCapacity of queues to be product of parent maxCapacities. (acmurthy) + MAPREDUCE-3713. Fixed the way head-room is allocated to applications by + CapacityScheduler so that it deducts current-usage per user and not + per-application. (Arun C Murthy via vinodkv) + Release 0.23.0 - 2011-11-01 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java index 94ddb2af8a..8e25e3d222 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java @@ -295,10 +295,6 @@ synchronized public void showRequests() { } } - public synchronized void setAvailableResourceLimit(Resource globalLimit) { - this.resourceLimit = globalLimit; - } - public synchronized RMContainer getRMContainer(ContainerId id) { return liveContainers.get(id); } @@ -446,20 +442,21 @@ public synchronized List getReservedContainers() { return reservedContainers; } + public synchronized void setHeadroom(Resource globalLimit) { + this.resourceLimit = globalLimit; + } + /** * Get available headroom in terms of resources for the application's user. * @return available resource headroom */ public synchronized Resource getHeadroom() { - Resource limit = Resources.subtract(resourceLimit, currentConsumption); - Resources.subtractFrom(limit, currentReservation); - // Corner case to deal with applications being slightly over-limit - if (limit.getMemory() < 0) { - limit.setMemory(0); + if (resourceLimit.getMemory() < 0) { + resourceLimit.setMemory(0); } - return limit; + return resourceLimit; } public Queue getQueue() { diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 402e03a35a..1faef7a1e8 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -720,12 +720,11 @@ private synchronized SchedulerApp getApplication( if(LOG.isDebugEnabled()) { LOG.debug("pre-assignContainers for application " + application.getApplicationId()); + application.showRequests(); } - application.showRequests(); synchronized (application) { - computeAndSetUserResourceLimit(application, clusterResource); - + // Schedule in priority order for (Priority priority : application.getPriorities()) { // Required resource Resource required = @@ -736,15 +735,21 @@ private synchronized SchedulerApp getApplication( continue; } - // Are we going over limits by allocating to this application? - // Maximum Capacity of the queue + // Compute & set headroom + // Note: We set the headroom with the highest priority request + // as the target. + // This works since we never assign lower priority requests + // before all higher priority ones are serviced. + Resource userLimit = + computeAndSetUserResourceLimit(application, clusterResource, + required); + + // Check queue max-capacity limit if (!assignToQueue(clusterResource, required)) { return NULL_ASSIGNMENT; } - // User limits - Resource userLimit = - computeUserLimit(application, clusterResource, required); + // Check user limit if (!assignToUser(application.getUser(), userLimit)) { break; } @@ -758,7 +763,7 @@ private synchronized SchedulerApp getApplication( null); Resource assigned = assignment.getResource(); - + // Did we schedule or reserve a container? if (Resources.greaterThan(assigned, Resources.none())) { @@ -832,13 +837,15 @@ private synchronized boolean assignToQueue(Resource clusterResource, return true; } - private void computeAndSetUserResourceLimit(SchedulerApp application, - Resource clusterResource) { - Resource userLimit = - computeUserLimit(application, clusterResource, Resources.none()); - application.setAvailableResourceLimit(userLimit); - metrics.setAvailableResourcesToUser(application.getUser(), - application.getHeadroom()); + private Resource computeAndSetUserResourceLimit(SchedulerApp application, + Resource clusterResource, Resource required) { + String user = application.getUser(); + Resource limit = computeUserLimit(application, clusterResource, required); + Resource headroom = + Resources.subtract(limit, getUser(user).getConsumedResources()); + application.setHeadroom(headroom); + metrics.setAvailableResourcesToUser(user, headroom); + return limit; } private int roundUp(int memory) { @@ -909,7 +916,7 @@ private synchronized boolean assignToUser(String userName, Resource limit) { User user = getUser(userName); // Note: We aren't considering the current request since there is a fixed - // overhead of the AM, but it's a >= check, so... + // overhead of the AM, but it's a > check, not a >= check, so... if ((user.getConsumedResources().getMemory()) > limit.getMemory()) { if (LOG.isDebugEnabled()) { LOG.debug("User " + userName + " in queue " + getQueueName() + @@ -1227,8 +1234,8 @@ public void completedContainer(Resource clusterResource, // happen under scheduler's lock... // So, this is, in effect, a transaction across application & node if (rmContainer.getState() == RMContainerState.RESERVED) { - application.unreserve(node, rmContainer.getReservedPriority()); - node.unreserveResource(application); + unreserve(application, rmContainer.getReservedPriority(), + node, rmContainer); } else { application.containerCompleted(rmContainer, containerStatus, event); node.releaseContainer(container); @@ -1301,7 +1308,8 @@ public synchronized void updateClusterResource(Resource clusterResource) { // Update application properties for (SchedulerApp application : activeApplications) { - computeAndSetUserResourceLimit(application, clusterResource); + computeAndSetUserResourceLimit( + application, clusterResource, Resources.none()); } } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 48e01a72fd..145cb8d20d 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -358,7 +358,7 @@ private void assignContainers(SchedulerNode node) { } } - application.setAvailableResourceLimit(clusterResource); + application.setHeadroom(clusterResource); LOG.debug("post-assignContainers"); application.showRequests(); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java index 824c8086a4..f1a1d956eb 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java @@ -21,16 +21,24 @@ import static org.mockito.Mockito.*; import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.junit.After; @@ -283,38 +291,76 @@ public void testHeadroom() throws Exception { final String user_0 = "user_0"; final String user_1 = "user_1"; - int APPLICATION_ID = 0; + RecordFactory recordFactory = + RecordFactoryProvider.getRecordFactory(null); + RMContext rmContext = TestUtils.getMockRMContext(); - // Submit first application from user_0, check headroom - SchedulerApp app_0_0 = getMockApplication(APPLICATION_ID++, user_0); + Priority priority_1 = TestUtils.createMockPriority(1); + + // Submit first application with some resource-requests from user_0, + // and check headroom + final ApplicationAttemptId appAttemptId_0_0 = + TestUtils.getMockApplicationAttemptId(0, 0); + SchedulerApp app_0_0 = + spy(new SchedulerApp(appAttemptId_0_0, user_0, queue, rmContext, null)); queue.submitApplication(app_0_0, user_0, A); - queue.assignContainers(clusterResource, node_0); // Schedule to compute + + List app_0_0_requests = new ArrayList(); + app_0_0_requests.add( + TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 2, + priority_1, recordFactory)); + app_0_0.updateResourceRequests(app_0_0_requests); + + // Schedule to compute + queue.assignContainers(clusterResource, node_0); Resource expectedHeadroom = Resources.createResource(10*16*GB); - verify(app_0_0).setAvailableResourceLimit(eq(expectedHeadroom)); + verify(app_0_0).setHeadroom(eq(expectedHeadroom)); // Submit second application from user_0, check headroom - SchedulerApp app_0_1 = getMockApplication(APPLICATION_ID++, user_0); + final ApplicationAttemptId appAttemptId_0_1 = + TestUtils.getMockApplicationAttemptId(1, 0); + SchedulerApp app_0_1 = + spy(new SchedulerApp(appAttemptId_0_1, user_0, queue, rmContext, null)); queue.submitApplication(app_0_1, user_0, A); + + List app_0_1_requests = new ArrayList(); + app_0_1_requests.add( + TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 2, + priority_1, recordFactory)); + app_0_1.updateResourceRequests(app_0_1_requests); + + // Schedule to compute queue.assignContainers(clusterResource, node_0); // Schedule to compute - verify(app_0_0, times(2)).setAvailableResourceLimit(eq(expectedHeadroom)); - verify(app_0_1).setAvailableResourceLimit(eq(expectedHeadroom));// no change + verify(app_0_0, times(2)).setHeadroom(eq(expectedHeadroom)); + verify(app_0_1).setHeadroom(eq(expectedHeadroom));// no change // Submit first application from user_1, check for new headroom - SchedulerApp app_1_0 = getMockApplication(APPLICATION_ID++, user_1); + final ApplicationAttemptId appAttemptId_1_0 = + TestUtils.getMockApplicationAttemptId(2, 0); + SchedulerApp app_1_0 = + spy(new SchedulerApp(appAttemptId_1_0, user_1, queue, rmContext, null)); queue.submitApplication(app_1_0, user_1, A); + + List app_1_0_requests = new ArrayList(); + app_1_0_requests.add( + TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 2, + priority_1, recordFactory)); + app_1_0.updateResourceRequests(app_1_0_requests); + + // Schedule to compute queue.assignContainers(clusterResource, node_0); // Schedule to compute expectedHeadroom = Resources.createResource(10*16*GB / 2); // changes - verify(app_0_0).setAvailableResourceLimit(eq(expectedHeadroom)); - verify(app_0_1).setAvailableResourceLimit(eq(expectedHeadroom)); - verify(app_1_0).setAvailableResourceLimit(eq(expectedHeadroom)); - + verify(app_0_0).setHeadroom(eq(expectedHeadroom)); + verify(app_0_1).setHeadroom(eq(expectedHeadroom)); + verify(app_1_0).setHeadroom(eq(expectedHeadroom)); + // Now reduce cluster size and check for the smaller headroom clusterResource = Resources.createResource(90*16*GB); queue.assignContainers(clusterResource, node_0); // Schedule to compute expectedHeadroom = Resources.createResource(9*16*GB / 2); // changes - verify(app_0_0).setAvailableResourceLimit(eq(expectedHeadroom)); - verify(app_0_1).setAvailableResourceLimit(eq(expectedHeadroom)); - verify(app_1_0).setAvailableResourceLimit(eq(expectedHeadroom)); + verify(app_0_0).setHeadroom(eq(expectedHeadroom)); + verify(app_0_1).setHeadroom(eq(expectedHeadroom)); + verify(app_1_0).setHeadroom(eq(expectedHeadroom)); }