From 503e73e849cbdd1194cc0d16b4969c60929aca11 Mon Sep 17 00:00:00 2001 From: Naganarasimha Date: Fri, 11 Nov 2016 20:48:31 +0530 Subject: [PATCH] YARN-5545. Fix issues related to Max App in capacity scheduler. Contributed by Bibin A Chundatt --- .../scheduler/capacity/CapacityScheduler.java | 14 ++ .../CapacitySchedulerConfiguration.java | 15 ++ .../scheduler/capacity/LeafQueue.java | 11 +- .../capacity/TestApplicationLimits.java | 171 +++++++++++++++++- 4 files changed, 203 insertions(+), 8 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index af51f3c862..65a08c69ed 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -868,6 +868,13 @@ private void addApplication(ApplicationId applicationId, String queueName, String user, Priority priority) { try { writeLock.lock(); + if (isSystemAppsLimitReached()) { + String message = "Maximum system application limit reached," + + "cannot accept submission of application: " + applicationId; + this.rmContext.getDispatcher().getEventHandler().handle(new RMAppEvent( + applicationId, RMAppEventType.APP_REJECTED, message)); + return; + } // Sanity checks. CSQueue queue = getQueue(queueName); if (queue == null) { @@ -2023,6 +2030,13 @@ public List getAppsInQueue(String queueName) { return apps; } + public boolean isSystemAppsLimitReached() { + if (root.getNumApplications() < conf.getMaximumSystemApplications()) { + return false; + } + return true; + } + private CapacitySchedulerConfiguration loadCapacitySchedulerConfiguration( Configuration configuration) throws IOException { try { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index c153c263aa..f8335a8d52 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -1133,4 +1133,19 @@ public boolean getLazyPreemptionEnabled() { INTRA_QUEUE_PREEMPTION_CONFIG_PREFIX + "max-allowable-limit"; public static final float DEFAULT_INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT = 0.2f; + + /** + * Maximum application for a queue to be used when application per queue is + * not defined.To be consistent with previous version the default value is set + * as UNDEFINED. + */ + @Private + public static final String QUEUE_GLOBAL_MAX_APPLICATION = + PREFIX + "global-queue-max-application"; + + public int getGlobalMaximumApplicationsPerQueue() { + int maxApplicationsPerQueue = + getInt(QUEUE_GLOBAL_MAX_APPLICATION, (int) UNDEFINED); + return maxApplicationsPerQueue; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 161957f922..9661206f1e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -188,9 +188,14 @@ protected void setupQueueConfigs(Resource clusterResource) maxApplications = conf.getMaximumApplicationsPerQueue(getQueuePath()); if (maxApplications < 0) { - int maxSystemApps = conf.getMaximumSystemApplications(); - maxApplications = - (int) (maxSystemApps * queueCapacities.getAbsoluteCapacity()); + int maxGlobalPerQueueApps = conf.getGlobalMaximumApplicationsPerQueue(); + if (maxGlobalPerQueueApps > 0) { + maxApplications = maxGlobalPerQueueApps; + } else { + int maxSystemApps = conf.getMaximumSystemApplications(); + maxApplications = + (int) (maxSystemApps * queueCapacities.getAbsoluteCapacity()); + } } maxApplicationsPerUser = Math.min(maxApplications, (int) (maxApplications * (userLimit / 100.0f) * userLimitFactor)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java index 963f50b169..11e94b90d8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java @@ -31,14 +31,17 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.Resource; @@ -47,11 +50,17 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; @@ -61,12 +70,15 @@ import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX; -import org.junit.After; import org.junit.Before; import org.junit.Test; import org.mockito.Matchers; import org.mockito.Mockito; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; + public class TestApplicationLimits { private static final Log LOG = LogFactory.getLog(TestApplicationLimits.class); @@ -692,10 +704,159 @@ public void testHeadroom() throws Exception { assertEquals(expectedHeadroom, app_0_1.getHeadroom()); assertEquals(expectedHeadroom, app_1_0.getHeadroom()); } - - @After - public void tearDown() { - + private Configuration getConfigurationWithQueueLabels(Configuration config) { + CapacitySchedulerConfiguration conf = + new CapacitySchedulerConfiguration(config); + // Define top-level + conf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[]{"a", "b", "c", "d"}); + conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100); + conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "y", 100); + conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "z", 100); + + conf.setInt(CapacitySchedulerConfiguration.QUEUE_GLOBAL_MAX_APPLICATION, + 20); + conf.setInt("yarn.scheduler.capacity.root.a.a1.maximum-applications", 1); + conf.setFloat("yarn.scheduler.capacity.root.d.user-limit-factor", 0.1f); + conf.setInt("yarn.scheduler.capacity.maximum-applications", 4); + final String a = CapacitySchedulerConfiguration.ROOT + ".a"; + final String b = CapacitySchedulerConfiguration.ROOT + ".b"; + final String c = CapacitySchedulerConfiguration.ROOT + ".c"; + final String d = CapacitySchedulerConfiguration.ROOT + ".d"; + final String aa1 = a + ".a1"; + final String aa2 = a + ".a2"; + final String aa3 = a + ".a3"; + + conf.setQueues(a, new String[]{"a1", "a2", "a3"}); + conf.setCapacity(a, 50); + conf.setCapacity(b, 50); + conf.setCapacity(c, 0); + conf.setCapacity(d, 0); + conf.setCapacity(aa1, 50); + conf.setCapacity(aa2, 50); + conf.setCapacity(aa3, 0); + + conf.setCapacityByLabel(a, "y", 25); + conf.setCapacityByLabel(b, "y", 50); + conf.setCapacityByLabel(c, "y", 25); + conf.setCapacityByLabel(d, "y", 0); + + conf.setCapacityByLabel(a, "x", 50); + conf.setCapacityByLabel(b, "x", 50); + + conf.setCapacityByLabel(a, "z", 50); + conf.setCapacityByLabel(b, "z", 50); + + conf.setCapacityByLabel(aa1, "x", 100); + conf.setCapacityByLabel(aa2, "x", 0); + + conf.setCapacityByLabel(aa1, "y", 25); + conf.setCapacityByLabel(aa2, "y", 75); + + conf.setCapacityByLabel(aa2, "z", 75); + conf.setCapacityByLabel(aa3, "z", 25); + return conf; + } + + private Set toSet(String... elements) { + Set set = Sets.newHashSet(elements); + return set; + } + + @Test(timeout = 120000) + public void testApplicationLimitSubmit() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + RMNodeLabelsManager mgr = new NullRMNodeLabelsManager(); + mgr.init(conf); + // set node -> label + mgr.addToCluserNodeLabelsWithDefaultExclusivity( + ImmutableSet.of("x", "y", "z")); + + // set mapping: + // h1 -> x + // h2 -> y + mgr.addLabelsToNode( + ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"))); + mgr.addLabelsToNode( + ImmutableMap.of(NodeId.newInstance("h2", 0), toSet("y"))); + + // inject node label manager + MockRM rm = new MockRM(getConfigurationWithQueueLabels(conf)) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + rm.getRMContext().setNodeLabelManager(mgr); + rm.start(); + MockNM nm1 = rm.registerNode("h1:1234", 4096); + MockNM nm2 = rm.registerNode("h2:1234", 4096); + MockNM nm3 = rm.registerNode("h3:1234", 4096); + + // Submit application to queue c where the default partition capacity is + // zero + RMApp app1 = rm.submitApp(GB, "app", "user", null, "c", false); + rm.drainEvents(); + rm.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); + assertEquals(RMAppState.ACCEPTED, app1.getState()); + rm.killApp(app1.getApplicationId()); + + RMApp app2 = rm.submitApp(GB, "app", "user", null, "a1", false); + rm.drainEvents(); + rm.waitForState(app2.getApplicationId(), RMAppState.ACCEPTED); + assertEquals(RMAppState.ACCEPTED, app2.getState()); + + // Check second application is rejected and based on queue level max + // application app is rejected + RMApp app3 = rm.submitApp(GB, "app", "user", null, "a1", false); + rm.drainEvents(); + rm.waitForState(app3.getApplicationId(), RMAppState.FAILED); + assertEquals(RMAppState.FAILED, app3.getState()); + assertEquals( + "org.apache.hadoop.security.AccessControlException: " + + "Queue root.a.a1 already has 1 applications, cannot accept " + + "submission of application: " + app3.getApplicationId(), + app3.getDiagnostics().toString()); + + // based on Global limit of queue usert application is rejected + RMApp app11 = rm.submitApp(GB, "app", "user", null, "d", false); + rm.drainEvents(); + rm.waitForState(app11.getApplicationId(), RMAppState.ACCEPTED); + assertEquals(RMAppState.ACCEPTED, app11.getState()); + RMApp app12 = rm.submitApp(GB, "app", "user", null, "d", false); + rm.drainEvents(); + rm.waitForState(app12.getApplicationId(), RMAppState.ACCEPTED); + assertEquals(RMAppState.ACCEPTED, app12.getState()); + RMApp app13 = rm.submitApp(GB, "app", "user", null, "d", false); + rm.drainEvents(); + rm.waitForState(app13.getApplicationId(), RMAppState.FAILED); + assertEquals(RMAppState.FAILED, app13.getState()); + assertEquals( + "org.apache.hadoop.security.AccessControlException: Queue" + + " root.d already has 2 applications from user user cannot" + + " accept submission of application: " + app13.getApplicationId(), + app13.getDiagnostics().toString()); + + // based on system max limit application is rejected + RMApp app14 = rm.submitApp(GB, "app", "user2", null, "a2", false); + rm.drainEvents(); + rm.waitForState(app14.getApplicationId(), RMAppState.ACCEPTED); + RMApp app15 = rm.submitApp(GB, "app", "user2", null, "a2", false); + rm.drainEvents(); + rm.waitForState(app15.getApplicationId(), RMAppState.FAILED); + assertEquals(RMAppState.FAILED, app15.getState()); + assertEquals( + "Maximum system application limit reached,cannot" + + " accept submission of application: " + app15.getApplicationId(), + app15.getDiagnostics().toString()); + + rm.killApp(app2.getApplicationId()); + rm.killApp(app11.getApplicationId()); + rm.killApp(app13.getApplicationId()); + rm.killApp(app14.getApplicationId()); + rm.stop(); } }