From 6b608aad7d52b524fa94955a538e8b3524d42d93 Mon Sep 17 00:00:00 2001 From: Arun Murthy Date: Mon, 5 Sep 2011 19:49:47 +0000 Subject: [PATCH] MAPREDUCE-2697. Enhance CapacityScheduler to cap concurrently running applications per-queue & per-user. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1165403 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 6 + .../records/impl/pb/ContainerIdPBImpl.java | 2 - .../scheduler/capacity/CapacityScheduler.java | 7 + .../CapacitySchedulerConfiguration.java | 13 + .../capacity/CapacitySchedulerContext.java | 2 + .../scheduler/capacity/LeafQueue.java | 195 +++++++++++++-- .../scheduler/capacity/ParentQueue.java | 9 +- .../scheduler/capacity/Queue.java | 2 +- .../src/main/resources/capacity-scheduler.xml | 5 + .../capacity/TestApplicationLimits.java | 234 ++++++++++++++++++ .../scheduler/capacity/TestLeafQueue.java | 8 +- .../scheduler/capacity/TestParentQueue.java | 2 + .../scheduler/capacity/TestUtils.java | 7 + 13 files changed, 464 insertions(+), 28 deletions(-) create mode 100644 hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 392e8e5a46..818ef441d0 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -1,6 +1,7 @@ Hadoop MapReduce Change Log Trunk (unreleased changes) + IMPROVEMENTS MAPREDUCE-2887 due to HADOOP-7524 Change RPC to allow multiple protocols including multuple versions of the same protocol (sanjay Radia) @@ -236,6 +237,11 @@ Release 0.23.0 - Unreleased MAPREDUCE-2735. Add an applications summary log to ResourceManager. (Thomas Graves via acmurthy) + MAPREDUCE-2697. Enhance CapacityScheduler to cap concurrently running + applications per-queue & per-user. (acmurthy) + Configuration changes: + add yarn.capacity-scheduler.maximum-am-resource-percent + OPTIMIZATIONS MAPREDUCE-2026. Make JobTracker.getJobCounters() and diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerIdPBImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerIdPBImpl.java index 00b2630e46..a4e2d49d96 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerIdPBImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerIdPBImpl.java @@ -29,8 +29,6 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProtoOrBuilder; -import org.mortbay.log.Log; - public class ContainerIdPBImpl extends ProtoBase implements ContainerId { diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 77172e80bb..37e3bb5780 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -166,6 +166,11 @@ public RMContext getRMContext() { return this.rmContext; } + @Override + public Resource getClusterResources() { + return clusterResource; + } + @Override public synchronized void reinitialize(Configuration conf, ContainerTokenSecretManager containerTokenSecretManager, RMContext rmContext) @@ -621,6 +626,7 @@ public void handle(SchedulerEvent event) { private synchronized void addNode(RMNode nodeManager) { this.nodes.put(nodeManager.getNodeID(), new SchedulerNode(nodeManager)); Resources.addTo(clusterResource, nodeManager.getTotalCapability()); + root.updateClusterResource(clusterResource); ++numNodeManagers; LOG.info("Added node " + nodeManager.getNodeAddress() + " clusterResource: " + clusterResource); @@ -629,6 +635,7 @@ private synchronized void addNode(RMNode nodeManager) { private synchronized void removeNode(RMNode nodeInfo) { SchedulerNode node = this.nodes.get(nodeInfo.getNodeID()); Resources.subtractFrom(clusterResource, nodeInfo.getTotalCapability()); + root.updateClusterResource(clusterResource); --numNodeManagers; // Remove running containers diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 714a472678..345381651c 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -49,6 +49,10 @@ public class CapacitySchedulerConfiguration extends Configuration { public static final String MAXIMUM_SYSTEM_APPLICATIONS = PREFIX + "maximum-applications"; + @Private + public static final String MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT = + PREFIX + "maximum-am-resource-percent"; + @Private public static final String QUEUES = "queues"; @@ -82,6 +86,10 @@ public class CapacitySchedulerConfiguration extends Configuration { @Private public static final int DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS = 10000; + @Private + public static final float + DEFAULT_MAXIMUM_APPLICATIONMASTERS_RESOURCE_PERCENT = 0.1f; + @Private public static final int UNDEFINED = -1; @@ -124,6 +132,11 @@ public int getMaximumSystemApplications() { return maxApplications; } + public float getMaximumApplicationMasterResourcePercent() { + return getFloat(MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT, + DEFAULT_MAXIMUM_APPLICATIONMASTERS_RESOURCE_PERCENT); + } + public int getCapacity(String queue) { int capacity = getInt(getQueuePrefix(queue) + CAPACITY, UNDEFINED); if (capacity < MINIMUM_CAPACITY_VALUE || capacity > MAXIMUM_CAPACITY_VALUE) { diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java index c6b2b390e7..5f06bf6644 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java @@ -37,4 +37,6 @@ public interface CapacitySchedulerContext { int getNumClusterNodes(); RMContext getRMContext(); + + Resource getClusterResources(); } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 7ed33d5853..3d3ac1265e 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -24,6 +24,7 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -77,15 +78,22 @@ public class LeafQueue implements Queue { private int maxApplications; private int maxApplicationsPerUser; + + private float maxAMResourcePercent; + private int maxActiveApplications; + private int maxActiveApplicationsPerUser; + private Resource usedResources = Resources.createResource(0); private float utilization = 0.0f; private float usedCapacity = 0.0f; private volatile int numContainers; - Set applications; + Set activeApplications; Map applicationsMap = new HashMap(); + Set pendingApplications; + private final Resource minimumAllocation; private final Resource maximumAllocation; private final float minimumAllocationFactor; @@ -108,6 +116,8 @@ public class LeafQueue implements Queue { private CapacitySchedulerContext scheduler; + final static int DEFAULT_AM_RESOURCE = 2 * 1024; + public LeafQueue(CapacitySchedulerContext cs, String queueName, Queue parent, Comparator applicationComparator, Queue old) { @@ -144,6 +154,15 @@ public LeafQueue(CapacitySchedulerContext cs, int maxApplicationsPerUser = (int)(maxApplications * (userLimit / 100.0f) * userLimitFactor); + this.maxAMResourcePercent = + cs.getConfiguration().getMaximumApplicationMasterResourcePercent(); + int maxActiveApplications = + computeMaxActiveApplications(cs.getClusterResources(), + maxAMResourcePercent, absoluteCapacity); + int maxActiveApplicationsPerUser = + computeMaxActiveApplicationsPerUser(maxActiveApplications, userLimit, + userLimitFactor); + this.queueInfo = recordFactory.newRecordInstance(QueueInfo.class); this.queueInfo.setQueueName(queueName); this.queueInfo.setChildQueues(new ArrayList()); @@ -157,20 +176,38 @@ public LeafQueue(CapacitySchedulerContext cs, maximumCapacity, absoluteMaxCapacity, userLimit, userLimitFactor, maxApplications, maxApplicationsPerUser, + maxActiveApplications, maxActiveApplicationsPerUser, state, acls); LOG.info("DEBUG --- LeafQueue:" + " name=" + queueName + ", fullname=" + getQueuePath()); - this.applications = new TreeSet(applicationComparator); + this.pendingApplications = + new TreeSet(applicationComparator); + this.activeApplications = new TreeSet(applicationComparator); } + private int computeMaxActiveApplications(Resource clusterResource, + float maxAMResourcePercent, float absoluteCapacity) { + return + Math.max( + (int)((clusterResource.getMemory() / DEFAULT_AM_RESOURCE) * + maxAMResourcePercent * absoluteCapacity), + 1); + } + + private int computeMaxActiveApplicationsPerUser(int maxActiveApplications, + int userLimit, float userLimitFactor) { + return (int)(maxActiveApplications * (userLimit / 100.0f) * userLimitFactor); + } + private synchronized void setupQueueConfigs( float capacity, float absoluteCapacity, float maxCapacity, float absoluteMaxCapacity, int userLimit, float userLimitFactor, int maxApplications, int maxApplicationsPerUser, + int maxActiveApplications, int maxActiveApplicationsPerUser, QueueState state, Map acls) { this.capacity = capacity; @@ -185,6 +222,9 @@ private synchronized void setupQueueConfigs( this.maxApplications = maxApplications; this.maxApplicationsPerUser = maxApplicationsPerUser; + this.maxActiveApplications = maxActiveApplications; + this.maxActiveApplicationsPerUser = maxActiveApplicationsPerUser; + this.state = state; this.acls = acls; @@ -269,6 +309,22 @@ public float getMinimumAllocationFactor() { return minimumAllocationFactor; } + public int getMaxApplications() { + return maxApplications; + } + + public int getMaxApplicationsPerUser() { + return maxApplicationsPerUser; + } + + public int getMaximumActiveApplications() { + return maxActiveApplications; + } + + public int getMaximumActiveApplicationsPerUser() { + return maxActiveApplicationsPerUser; + } + @Override public synchronized float getUsedCapacity() { return usedCapacity; @@ -329,10 +385,34 @@ synchronized void setParentQueue(Queue parent) { this.parent = parent; } + @Override public synchronized int getNumApplications() { - return applications.size(); + return getNumPendingApplications() + getNumActiveApplications(); } + public synchronized int getNumPendingApplications() { + return pendingApplications.size(); + } + + public synchronized int getNumActiveApplications() { + return activeApplications.size(); + } + + @Private + public synchronized int getNumApplications(String user) { + return getUser(user).getTotalApplications(); + } + + @Private + public synchronized int getNumPendingApplications(String user) { + return getUser(user).getPendingApplications(); + } + + @Private + public synchronized int getNumActiveApplications(String user) { + return getUser(user).getActiveApplications(); + } + public synchronized int getNumContainers() { return numContainers; } @@ -342,6 +422,16 @@ public synchronized QueueState getState() { return state; } + @Private + public int getUserLimit() { + return userLimit; + } + + @Private + public float getUserLimitFactor() { + return userLimitFactor; + } + @Override public synchronized Map getQueueAcls() { return new HashMap(acls); @@ -404,6 +494,8 @@ public synchronized void reinitialize(Queue queue, Resource clusterResource) leafQueue.maximumCapacity, leafQueue.absoluteMaxCapacity, leafQueue.userLimit, leafQueue.userLimitFactor, leafQueue.maxApplications, leafQueue.maxApplicationsPerUser, + leafQueue.maxActiveApplications, + leafQueue.maxActiveApplicationsPerUser, leafQueue.state, leafQueue.acls); updateResource(clusterResource); @@ -443,7 +535,7 @@ public void submitApplication(SchedulerApp application, String userName, synchronized (this) { // Check if the queue is accepting jobs - if (state != QueueState.RUNNING) { + if (getState() != QueueState.RUNNING) { String msg = "Queue " + getQueuePath() + " is STOPPED. Cannot accept submission of application: " + application.getApplicationId(); @@ -452,7 +544,7 @@ public void submitApplication(SchedulerApp application, String userName, } // Check submission limits for queues - if (getNumApplications() >= maxApplications) { + if (getNumApplications() >= getMaxApplications()) { String msg = "Queue " + getQueuePath() + " already has " + getNumApplications() + " applications," + " cannot accept submission of application: " + @@ -463,9 +555,9 @@ public void submitApplication(SchedulerApp application, String userName, // Check submission limits for the user on this queue user = getUser(userName); - if (user.getApplications() >= maxApplicationsPerUser) { + if (user.getTotalApplications() >= getMaxApplicationsPerUser()) { String msg = "Queue " + getQueuePath() + - " already has " + user.getApplications() + + " already has " + user.getTotalApplications() + " applications from user " + userName + " cannot accept submission of application: " + application.getApplicationId(); @@ -490,17 +582,46 @@ public void submitApplication(SchedulerApp application, String userName, } } + private synchronized void activateApplications() { + for (Iterator i=pendingApplications.iterator(); + i.hasNext(); ) { + SchedulerApp application = i.next(); + + // Check queue limit + if (getNumActiveApplications() >= getMaximumActiveApplications()) { + break; + } + + // Check user limit + User user = getUser(application.getUser()); + if (user.getActiveApplications() < getMaximumActiveApplicationsPerUser()) { + user.activateApplication(); + activeApplications.add(application); + i.remove(); + LOG.info("Application " + application.getApplicationId().getId() + + " from user: " + application.getUser() + + " activated in queue: " + getQueueName()); + } + } + } + private synchronized void addApplication(SchedulerApp application, User user) { // Accept user.submitApplication(); - applications.add(application); + pendingApplications.add(application); applicationsMap.put(application.getApplicationAttemptId(), application); + // Activate applications + activateApplications(); + LOG.info("Application added -" + " appId: " + application.getApplicationId() + " user: " + user + "," + " leaf-queue: " + getQueueName() + - " #user-applications: " + user.getApplications() + - " #queue-applications: " + getNumApplications()); + " #user-pending-applications: " + user.getPendingApplications() + + " #user-active-applications: " + user.getActiveApplications() + + " #queue-pending-applications: " + getNumPendingApplications() + + " #queue-active-applications: " + getNumActiveApplications() + ); } @Override @@ -515,20 +636,26 @@ public void finishApplication(SchedulerApp application, String queue) { } public synchronized void removeApplication(SchedulerApp application, User user) { - applications.remove(application); + activeApplications.remove(application); applicationsMap.remove(application.getApplicationAttemptId()); user.finishApplication(); - if (user.getApplications() == 0) { + if (user.getTotalApplications() == 0) { users.remove(application.getUser()); } + // Check if we can activate more applications + activateApplications(); + LOG.info("Application removed -" + " appId: " + application.getApplicationId() + " user: " + application.getUser() + " queue: " + getQueueName() + - " #user-applications: " + user.getApplications() + - " #queue-applications: " + getNumApplications()); + " #user-pending-applications: " + user.getPendingApplications() + + " #user-active-applications: " + user.getActiveApplications() + + " #queue-pending-applications: " + getNumPendingApplications() + + " #queue-active-applications: " + getNumActiveApplications() + ); } private synchronized SchedulerApp getApplication( @@ -542,7 +669,7 @@ private synchronized SchedulerApp getApplication( LOG.info("DEBUG --- assignContainers:" + " node=" + node.getHostName() + - " #applications=" + applications.size()); + " #applications=" + activeApplications.size()); // Check for reserved resources RMContainer reservedContainer = node.getReservedContainer(); @@ -554,7 +681,7 @@ private synchronized SchedulerApp getApplication( } // Try to assign containers to applications in order - for (SchedulerApp application : applications) { + for (SchedulerApp application : activeApplications) { LOG.info("DEBUG --- pre-assignContainers for application " + application.getApplicationId()); @@ -1119,7 +1246,16 @@ synchronized void releaseResource(Resource clusterResource, } @Override - public synchronized void updateResource(Resource clusterResource) { + public synchronized void updateClusterResource(Resource clusterResource) { + maxActiveApplications = + computeMaxActiveApplications(clusterResource, maxAMResourcePercent, + absoluteCapacity); + maxActiveApplicationsPerUser = + computeMaxActiveApplicationsPerUser(maxActiveApplications, userLimit, + userLimitFactor); + } + + private synchronized void updateResource(Resource clusterResource) { float queueLimit = clusterResource.getMemory() * absoluteCapacity; setUtilization(usedResources.getMemory() / queueLimit); setUsedCapacity( @@ -1138,22 +1274,36 @@ public QueueMetrics getMetrics() { static class User { Resource consumed = Resources.createResource(0); - int applications = 0; + int pendingApplications = 0; + int activeApplications = 0; public Resource getConsumedResources() { return consumed; } - public int getApplications() { - return applications; + public int getPendingApplications() { + return pendingApplications; } + public int getActiveApplications() { + return activeApplications; + } + + public int getTotalApplications() { + return getPendingApplications() + getActiveApplications(); + } + public synchronized void submitApplication() { - ++applications; + ++pendingApplications; + } + + public synchronized void activateApplication() { + --pendingApplications; + ++activeApplications; } public synchronized void finishApplication() { - --applications; + --activeApplications; } public synchronized void assignContainer(Resource resource) { @@ -1175,4 +1325,5 @@ public void recoverContainer(Resource clusterResource, parent.recoverContainer(clusterResource, application, container); } + } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 7aa37fc9d1..4be8522c5e 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -646,7 +646,14 @@ synchronized void releaseResource(Resource clusterResource, } @Override - public synchronized void updateResource(Resource clusterResource) { + public synchronized void updateClusterResource(Resource clusterResource) { + // Update all children + for (Queue childQueue : childQueues) { + childQueue.updateClusterResource(clusterResource); + } + } + + private synchronized void updateResource(Resource clusterResource) { float queueLimit = clusterResource.getMemory() * absoluteCapacity; setUtilization(usedResources.getMemory() / queueLimit); setUsedCapacity( diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java index b39bdc970a..446ff8f822 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java @@ -190,7 +190,7 @@ public void reinitialize(Queue queue, Resource clusterResource) * Update the cluster resource for queues as we add/remove nodes * @param clusterResource the current cluster resource */ - public void updateResource(Resource clusterResource); + public void updateClusterResource(Resource clusterResource); /** * Recover the state of the queue diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/resources/capacity-scheduler.xml b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/resources/capacity-scheduler.xml index f6e2b0ce74..43a0437b9d 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/resources/capacity-scheduler.xml +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/resources/capacity-scheduler.xml @@ -5,6 +5,11 @@ 10000 + + yarn.capacity-scheduler.maximum-am-resource-percent + 0.1 + + yarn.capacity-scheduler.root.queues default diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java new file mode 100644 index 0000000000..fe9b15b64f --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java @@ -0,0 +1,234 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.QueueACL; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestApplicationLimits { + + private static final Log LOG = LogFactory.getLog(TestApplicationLimits.class); + final static int GB = 1024; + + LeafQueue queue; + + @Before + public void setUp() { + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + setupQueueConfiguration(csConf); + + + CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class); + when(csContext.getConfiguration()).thenReturn(csConf); + when(csContext.getMinimumResourceCapability()).thenReturn(Resources.createResource(GB)); + when(csContext.getMaximumResourceCapability()).thenReturn(Resources.createResource(16*GB)); + when(csContext.getClusterResources()).thenReturn(Resources.createResource(10 * 16 * GB)); + + Map queues = new HashMap(); + Queue root = + CapacityScheduler.parseQueue(csContext, csConf, null, "root", + queues, queues, + CapacityScheduler.queueComparator, + CapacityScheduler.applicationComparator, + TestUtils.spyHook); + + + queue = spy( + new LeafQueue(csContext, A, root, + CapacityScheduler.applicationComparator, null) + ); + + // Stub out ACL checks + doReturn(true). + when(queue).hasAccess(any(QueueACL.class), + any(UserGroupInformation.class)); + + // Some default values + doReturn(100).when(queue).getMaxApplications(); + doReturn(25).when(queue).getMaxApplicationsPerUser(); + doReturn(10).when(queue).getMaximumActiveApplications(); + doReturn(2).when(queue).getMaximumActiveApplicationsPerUser(); + } + + private static final String A = "a"; + private static final String B = "b"; + private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) { + + // Define top-level queues + conf.setQueues(CapacityScheduler.ROOT, new String[] {A, B}); + conf.setCapacity(CapacityScheduler.ROOT, 100); + + final String Q_A = CapacityScheduler.ROOT + "." + A; + conf.setCapacity(Q_A, 10); + + final String Q_B = CapacityScheduler.ROOT + "." + B; + conf.setCapacity(Q_B, 90); + + LOG.info("Setup top-level queues a and b"); + } + + private SchedulerApp getMockApplication(int appId, String user) { + SchedulerApp application = mock(SchedulerApp.class); + ApplicationAttemptId applicationAttemptId = + TestUtils.getMockApplicationAttemptId(appId, 0); + doReturn(applicationAttemptId.getApplicationId()). + when(application).getApplicationId(); + doReturn(applicationAttemptId). when(application).getApplicationAttemptId(); + doReturn(user).when(application).getUser(); + return application; + } + + @Test + public void testLimitsComputation() throws Exception { + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + setupQueueConfiguration(csConf); + + + CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class); + when(csContext.getConfiguration()).thenReturn(csConf); + when(csContext.getMinimumResourceCapability()).thenReturn(Resources.createResource(GB)); + when(csContext.getMaximumResourceCapability()).thenReturn(Resources.createResource(16*GB)); + + // Say cluster has 100 nodes of 16G each + Resource clusterResource = Resources.createResource(100 * 16 * GB); + when(csContext.getClusterResources()).thenReturn(clusterResource); + + Map queues = new HashMap(); + Queue root = + CapacityScheduler.parseQueue(csContext, csConf, null, "root", + queues, queues, + CapacityScheduler.queueComparator, + CapacityScheduler.applicationComparator, + TestUtils.spyHook); + + LeafQueue queue = (LeafQueue)queues.get(A); + + LOG.info("Queue 'A' -" + + " maxActiveApplications=" + queue.getMaximumActiveApplications() + + " maxActiveApplicationsPerUser=" + + queue.getMaximumActiveApplicationsPerUser()); + int expectedMaxActiveApps = + Math.max(1, + (int)((clusterResource.getMemory() / LeafQueue.DEFAULT_AM_RESOURCE) * + csConf.getMaximumApplicationMasterResourcePercent() * + queue.getAbsoluteCapacity())); + assertEquals(expectedMaxActiveApps, + queue.getMaximumActiveApplications()); + assertEquals((int)(expectedMaxActiveApps * (queue.getUserLimit() / 100.0f) * + queue.getUserLimitFactor()), + queue.getMaximumActiveApplicationsPerUser()); + + // Add some nodes to the cluster & test new limits + clusterResource = Resources.createResource(120 * 16 * GB); + root.updateClusterResource(clusterResource); + expectedMaxActiveApps = + Math.max(1, + (int)((clusterResource.getMemory() / LeafQueue.DEFAULT_AM_RESOURCE) * + csConf.getMaximumApplicationMasterResourcePercent() * + queue.getAbsoluteCapacity())); + assertEquals(expectedMaxActiveApps, + queue.getMaximumActiveApplications()); + assertEquals((int)(expectedMaxActiveApps * (queue.getUserLimit() / 100.0f) * + queue.getUserLimitFactor()), + queue.getMaximumActiveApplicationsPerUser()); + + } + + @Test + public void testActiveApplicationLimits() throws Exception { + final String user_0 = "user_0"; + final String user_1 = "user_1"; + + int APPLICATION_ID = 0; + // Submit first application + SchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0); + queue.submitApplication(app_0, user_0, A); + assertEquals(1, queue.getNumActiveApplications()); + assertEquals(0, queue.getNumPendingApplications()); + assertEquals(1, queue.getNumActiveApplications(user_0)); + assertEquals(0, queue.getNumPendingApplications(user_0)); + + // Submit second application + SchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0); + queue.submitApplication(app_1, user_0, A); + assertEquals(2, queue.getNumActiveApplications()); + assertEquals(0, queue.getNumPendingApplications()); + assertEquals(2, queue.getNumActiveApplications(user_0)); + assertEquals(0, queue.getNumPendingApplications(user_0)); + + // Submit third application, should remain pending + SchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0); + queue.submitApplication(app_2, user_0, A); + assertEquals(2, queue.getNumActiveApplications()); + assertEquals(1, queue.getNumPendingApplications()); + assertEquals(2, queue.getNumActiveApplications(user_0)); + assertEquals(1, queue.getNumPendingApplications(user_0)); + + // Finish one application, app_2 should be activated + queue.finishApplication(app_0, A); + assertEquals(2, queue.getNumActiveApplications()); + assertEquals(0, queue.getNumPendingApplications()); + assertEquals(2, queue.getNumActiveApplications(user_0)); + assertEquals(0, queue.getNumPendingApplications(user_0)); + + // Submit another one for user_0 + SchedulerApp app_3 = getMockApplication(APPLICATION_ID++, user_0); + queue.submitApplication(app_3, user_0, A); + assertEquals(2, queue.getNumActiveApplications()); + assertEquals(1, queue.getNumPendingApplications()); + assertEquals(2, queue.getNumActiveApplications(user_0)); + assertEquals(1, queue.getNumPendingApplications(user_0)); + + // Change queue limit to be smaller so 2 users can fill it up + doReturn(3).when(queue).getMaximumActiveApplications(); + + // Submit first app for user_1 + SchedulerApp app_4 = getMockApplication(APPLICATION_ID++, user_1); + queue.submitApplication(app_4, user_1, A); + assertEquals(3, queue.getNumActiveApplications()); + assertEquals(1, queue.getNumPendingApplications()); + assertEquals(2, queue.getNumActiveApplications(user_0)); + assertEquals(1, queue.getNumPendingApplications(user_0)); + assertEquals(1, queue.getNumActiveApplications(user_1)); + assertEquals(0, queue.getNumPendingApplications(user_1)); + + // Submit second app for user_1, should block due to queue-limit + SchedulerApp app_5 = getMockApplication(APPLICATION_ID++, user_1); + queue.submitApplication(app_5, user_1, A); + assertEquals(3, queue.getNumActiveApplications()); + assertEquals(2, queue.getNumPendingApplications()); + assertEquals(2, queue.getNumActiveApplications(user_0)); + assertEquals(1, queue.getNumPendingApplications(user_0)); + assertEquals(1, queue.getNumActiveApplications(user_1)); + assertEquals(1, queue.getNumPendingApplications(user_1)); + + // Now finish one app of user_1 so app_5 should be activated + queue.finishApplication(app_4, A); + assertEquals(3, queue.getNumActiveApplications()); + assertEquals(1, queue.getNumPendingApplications()); + assertEquals(2, queue.getNumActiveApplications(user_0)); + assertEquals(1, queue.getNumPendingApplications(user_0)); + assertEquals(1, queue.getNumActiveApplications(user_1)); + assertEquals(0, queue.getNumPendingApplications(user_1)); + } + + @After + public void tearDown() { + + } +} diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 8e6152bbdc..a3ac403306 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -83,8 +83,12 @@ public void setUp() throws Exception { csContext = mock(CapacitySchedulerContext.class); when(csContext.getConfiguration()).thenReturn(csConf); - when(csContext.getMinimumResourceCapability()).thenReturn(Resources.createResource(GB)); - when(csContext.getMaximumResourceCapability()).thenReturn(Resources.createResource(16*GB)); + when(csContext.getMinimumResourceCapability()). + thenReturn(Resources.createResource(GB)); + when(csContext.getMaximumResourceCapability()). + thenReturn(Resources.createResource(16*GB)); + when(csContext.getClusterResources()). + thenReturn(Resources.createResource(100 * 16 * GB)); root = CapacityScheduler.parseQueue(csContext, csConf, null, "root", queues, queues, diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java index 6894d857d1..ea635270e0 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java @@ -60,6 +60,8 @@ public void setUp() throws Exception { Resources.createResource(GB)); when(csContext.getMaximumResourceCapability()).thenReturn( Resources.createResource(16*GB)); + when(csContext.getClusterResources()). + thenReturn(Resources.createResource(100 * 16 * GB)); } private static final String A = "a"; diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java index ae7853be67..0d59711578 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java @@ -116,6 +116,13 @@ public static ResourceRequest createResourceRequest( return request; } + public static ApplicationId getMockApplicationId(int appId) { + ApplicationId applicationId = mock(ApplicationId.class); + when(applicationId.getClusterTimestamp()).thenReturn(0L); + when(applicationId.getId()).thenReturn(appId); + return applicationId; + } + public static ApplicationAttemptId getMockApplicationAttemptId(int appId, int attemptId) { ApplicationId applicationId = mock(ApplicationId.class);