From e1d3670f4c233696dc673c37b578ce46b44a6876 Mon Sep 17 00:00:00 2001 From: Sanford Ryza Date: Sun, 8 Dec 2013 03:09:34 +0000 Subject: [PATCH] YARN-807. When querying apps by queue, iterating over all apps is inefficient and limiting (Sandy Ryza) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1548983 13f79535-47bb-0310-9956-ffa450edef68 --- .../scheduler/ResourceSchedulerWrapper.java | 5 ++ hadoop-yarn-project/CHANGES.txt | 3 + .../resourcemanager/ClientRMService.java | 70 ++++++++++++++----- .../scheduler/YarnScheduler.java | 10 +++ .../scheduler/capacity/CSQueue.java | 9 +++ .../scheduler/capacity/CapacityScheduler.java | 11 +++ .../scheduler/capacity/LeafQueue.java | 10 +++ .../scheduler/capacity/ParentQueue.java | 10 +++ .../scheduler/fair/FSLeafQueue.java | 13 ++++ .../scheduler/fair/FSParentQueue.java | 10 +++ .../scheduler/fair/FSQueue.java | 10 ++- .../scheduler/fair/FairScheduler.java | 12 ++++ .../scheduler/fifo/FifoScheduler.java | 15 ++++ .../resourcemanager/TestClientRMService.java | 29 +++++++- .../capacity/TestCapacityScheduler.java | 30 ++++++++ .../scheduler/fair/TestFairScheduler.java | 37 ++++++++++ .../scheduler/fifo/TestFifoScheduler.java | 19 +++++ 17 files changed, 283 insertions(+), 20 deletions(-) diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java index edb98a8bef..b7fdb8bfa4 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java @@ -867,5 +867,10 @@ public ApplicationResourceUsageReport getAppResourceUsageReport( ApplicationAttemptId appAttemptId) { return scheduler.getAppResourceUsageReport(appAttemptId); } + + @Override + public List getAppsInQueue(String queue) { + return scheduler.getAppsInQueue(queue); + } } diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index f74a7cf894..a81b95ac92 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -145,6 +145,9 @@ Release 2.4.0 - UNRELEASED YARN-546. Allow disabling the Fair Scheduler event log (Sandy Ryza) + YARN-807. When querying apps by queue, iterating over all apps is + inefficient and limiting (Sandy Ryza) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index c3410a9b46..f0e8553490 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -24,7 +24,9 @@ import java.util.ArrayList; import java.util.Collection; import java.util.EnumSet; +import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; @@ -67,6 +69,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; @@ -431,12 +434,51 @@ public GetApplicationsResponse getApplications( LongRange start = request.getStartRange(); LongRange finish = request.getFinishRange(); - List reports = new ArrayList(); - long count = 0; - for (RMApp application : this.rmContext.getRMApps().values()) { - if (++count > limit) { - break; + final Map apps = rmContext.getRMApps(); + Iterator appsIter; + // If the query filters by queues, we can avoid considering apps outside + // of those queues by asking the scheduler for the apps in those queues. + if (queues != null && !queues.isEmpty()) { + // Construct an iterator over apps in given queues + // Collect list of lists to avoid copying all apps + final List> queueAppLists = + new ArrayList>(); + for (String queue : queues) { + List appsInQueue = scheduler.getAppsInQueue(queue); + if (appsInQueue != null && !appsInQueue.isEmpty()) { + queueAppLists.add(appsInQueue); + } } + appsIter = new Iterator() { + Iterator> appListIter = queueAppLists.iterator(); + Iterator schedAppsIter; + + @Override + public boolean hasNext() { + // Because queueAppLists has no empty lists, hasNext is whether the + // current list hasNext or whether there are any remaining lists + return (schedAppsIter != null && schedAppsIter.hasNext()) + || appListIter.hasNext(); + } + @Override + public RMApp next() { + if (schedAppsIter == null || !schedAppsIter.hasNext()) { + schedAppsIter = appListIter.next().iterator(); + } + return apps.get(schedAppsIter.next().getApplicationId()); + } + @Override + public void remove() { + throw new UnsupportedOperationException("Remove not supported"); + } + }; + } else { + appsIter = apps.values().iterator(); + } + + List reports = new ArrayList(); + while (appsIter.hasNext() && reports.size() < limit) { + RMApp application = appsIter.next(); if (applicationTypes != null && !applicationTypes.isEmpty()) { String appTypeToMatch = caseSensitive ? application.getApplicationType() @@ -458,11 +500,6 @@ public GetApplicationsResponse getApplications( continue; } - if (queues != null && !queues.isEmpty() && - !queues.contains(application.getQueue())) { - continue; - } - if (start != null && !start.containsLong(application.getStartTime())) { continue; } @@ -515,13 +552,12 @@ public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request) request.getRecursive()); List appReports = EMPTY_APPS_REPORT; if (request.getIncludeApplications()) { - Collection apps = this.rmContext.getRMApps().values(); - appReports = new ArrayList( - apps.size()); - for (RMApp app : apps) { - if (app.getQueue().equals(queueInfo.getQueueName())) { - appReports.add(app.createAndGetApplicationReport(null, true)); - } + List apps = + scheduler.getAppsInQueue(request.getQueueName()); + appReports = new ArrayList(apps.size()); + for (ApplicationAttemptId app : apps) { + RMApp rmApp = rmContext.getRMApps().get(app.getApplicationId()); + appReports.add(rmApp.createAndGetApplicationReport(null, true)); } } queueInfo.setApplications(appReports); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java index bdeaf74f6b..b0a56a4a9c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; import java.io.IOException; +import java.util.Collection; import java.util.List; import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; @@ -160,4 +161,13 @@ ApplicationResourceUsageReport getAppResourceUsageReport( */ boolean checkAccess(UserGroupInformation callerUGI, QueueACL acl, String queueName); + + /** + * Gets the apps under a given queue + * @param queueName the name of the queue. + * @return a collection of app attempt ids in the given queue. + */ + @LimitedPrivate("yarn") + @Stable + public List getAppsInQueue(String queueName); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index 769b157bdf..c317df51a6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -19,12 +19,14 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import java.io.IOException; +import java.util.Collection; import java.util.List; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.QueueACL; @@ -33,6 +35,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; @@ -228,4 +231,10 @@ public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) */ public void recoverContainer(Resource clusterResource, FiCaSchedulerApp application, Container container); + + /** + * Adds all applications in the queue and its subqueues to the given collection. + * @param apps the collection to add the applications to + */ + public void collectSchedulerApplications(Collection apps); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index e61ff938fa..3a2b353363 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -66,6 +66,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; @@ -941,4 +942,14 @@ public synchronized boolean checkAccess(UserGroupInformation callerUGI, return queue.hasAccess(acl, callerUGI); } + @Override + public List getAppsInQueue(String queueName) { + CSQueue queue = queues.get(queueName); + if (queue == null) { + return null; + } + List apps = new ArrayList(); + queue.collectSchedulerApplications(apps); + return apps; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 11e497373e..db7db607ba 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -58,6 +59,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; @@ -1622,4 +1624,12 @@ public Resource getTotalResourcePending() { return ret; } + @Override + public void collectSchedulerApplications( + Collection apps) { + for (FiCaSchedulerApp app : activeApplications) { + apps.add(app.getApplicationAttemptId()); + } + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 5ca953dc9a..b22b24ed4e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -36,6 +36,7 @@ import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AccessControlList; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.QueueACL; @@ -50,6 +51,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -764,4 +766,12 @@ public void recoverContainer(Resource clusterResource, parent.recoverContainer(clusterResource, application, container); } } + + @Override + public void collectSchedulerApplications( + Collection apps) { + for (CSQueue queue : childQueues) { + queue.collectSchedulerApplications(apps); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index 1257cba1fc..80378be95b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -29,11 +29,13 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; import org.apache.hadoop.yarn.util.resource.Resources; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; @Private @Unstable @@ -105,6 +107,17 @@ public Collection getRunnableAppSchedulables() { public List getNonRunnableAppSchedulables() { return nonRunnableAppScheds; } + + @Override + public void collectSchedulerApplications( + Collection apps) { + for (AppSchedulable appSched : runnableAppScheds) { + apps.add(appSched.getApp().getApplicationAttemptId()); + } + for (AppSchedulable appSched : nonRunnableAppScheds) { + apps.add(appSched.getApp().getApplicationAttemptId()); + } + } @Override public void setPolicy(SchedulingPolicy policy) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java index 0a9272594c..90a8741618 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java @@ -28,10 +28,12 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.util.resource.Resources; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; @Private @Unstable @@ -184,4 +186,12 @@ public void decrementRunnableApps() { public int getNumRunnableApps() { return runnableApps; } + + @Override + public void collectSchedulerApplications( + Collection apps) { + for (FSQueue childQueue : childQueues) { + childQueue.collectSchedulerApplications(apps); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java index 8b3d90d766..33e0d89830 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java @@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; @@ -158,7 +159,14 @@ public boolean hasAccess(QueueACL acl, UserGroupInformation user) { * Gets the children of this queue, if any. */ public abstract Collection getChildQueues(); - + + /** + * Adds all applications in the queue and its subqueues to the given collection. + * @param apps the collection to add the applications to + */ + public abstract void collectSchedulerApplications( + Collection apps); + /** * Return the number of apps for which containers can be allocated. * Includes apps in subqueues. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index a439adc68b..861fad8715 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -72,6 +72,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; @@ -1267,4 +1268,15 @@ public void onReload(AllocationConfiguration queueInfo) { } } + @Override + public List getAppsInQueue(String queueName) { + FSQueue queue = queueMgr.getQueue(queueName); + if (queue == null) { + return null; + } + List apps = new ArrayList(); + queue.collectSchedulerApplications(apps); + return apps; + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 4242d02c03..956cb492f5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -850,5 +851,19 @@ public synchronized boolean checkAccess(UserGroupInformation callerUGI, QueueACL acl, String queueName) { return DEFAULT_QUEUE.hasAccess(acl, callerUGI); } + + @Override + public synchronized List getAppsInQueue(String queueName) { + if (queueName.equals(DEFAULT_QUEUE.getQueueName())) { + List apps = new ArrayList( + applications.size()); + for (FiCaSchedulerApp app : applications.values()) { + apps.add(app.getApplicationAttemptId()); + } + return apps; + } else { + return null; + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java index 491454a53e..ca6dc3e037 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java @@ -30,9 +30,12 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Arrays; import java.util.HashSet; import java.util.EnumSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.ConcurrentHashMap; @@ -106,6 +109,9 @@ public class TestClientRMService { private static RMDelegationTokenSecretManager dtsm; + private final static String QUEUE_1 = "Q-1"; + private final static String QUEUE_2 = "Q-2"; + @BeforeClass public static void setupSecretManager() throws IOException { RMContext rmContext = mock(RMContext.class); @@ -438,7 +444,7 @@ public void handle(Event event) {} mockAclsManager, mockQueueACLsManager, null); // Initialize appnames and queues - String[] queues = {"Q-1", "Q-2"}; + String[] queues = {QUEUE_1, QUEUE_2}; String[] appNames = {MockApps.newAppName(), MockApps.newAppName(), MockApps.newAppName()}; ApplicationId[] appIds = @@ -596,6 +602,8 @@ private void mockRMContext(YarnScheduler yarnScheduler, RMContext rmContext) ConcurrentHashMap apps = getRMApps(rmContext, yarnScheduler); when(rmContext.getRMApps()).thenReturn(apps); + when(yarnScheduler.getAppsInQueue(eq("testqueue"))).thenReturn( + getSchedulerApps(apps)); } private ConcurrentHashMap getRMApps( @@ -614,10 +622,23 @@ private ConcurrentHashMap getRMApps( config, "testqueue")); return apps; } + + private List getSchedulerApps( + Map apps) { + List schedApps = new ArrayList(); + // Return app IDs for the apps in testqueue (as defined in getRMApps) + schedApps.add(ApplicationAttemptId.newInstance(getApplicationId(1), 0)); + schedApps.add(ApplicationAttemptId.newInstance(getApplicationId(3), 0)); + return schedApps; + } - private ApplicationId getApplicationId(int id) { + private static ApplicationId getApplicationId(int id) { return ApplicationId.newInstance(123456, id); } + + private static ApplicationAttemptId getApplicationAttemptId(int id) { + return ApplicationAttemptId.newInstance(getApplicationId(id), 1); + } private RMAppImpl getRMApp(RMContext rmContext, YarnScheduler yarnScheduler, ApplicationId applicationId3, YarnConfiguration config, String queueName) { @@ -641,6 +662,10 @@ private static YarnScheduler mockYarnScheduler() { when(yarnScheduler.getMaximumResourceCapability()).thenReturn( Resources.createResource( YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB)); + when(yarnScheduler.getAppsInQueue(QUEUE_1)).thenReturn( + Arrays.asList(getApplicationAttemptId(101), getApplicationAttemptId(102))); + when(yarnScheduler.getAppsInQueue(QUEUE_2)).thenReturn( + Arrays.asList(getApplicationAttemptId(103))); return yarnScheduler; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index a5628febee..a804138407 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -651,5 +651,35 @@ public void run() { } assertFalse(failed.get()); } + + @Test + public void testGetAppsInQueue() throws Exception { + Application application_0 = new Application("user_0", "a1", resourceManager); + application_0.submit(); + + Application application_1 = new Application("user_0", "a2", resourceManager); + application_1.submit(); + + Application application_2 = new Application("user_0", "b2", resourceManager); + application_2.submit(); + + ResourceScheduler scheduler = resourceManager.getResourceScheduler(); + + List appsInA1 = scheduler.getAppsInQueue("a1"); + assertEquals(1, appsInA1.size()); + + List appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.contains(application_0.getApplicationAttemptId())); + assertTrue(appsInA.contains(application_1.getApplicationAttemptId())); + assertEquals(2, appsInA.size()); + + List appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(application_0.getApplicationAttemptId())); + assertTrue(appsInRoot.contains(application_1.getApplicationAttemptId())); + assertTrue(appsInRoot.contains(application_2.getApplicationAttemptId())); + assertEquals(3, appsInRoot.size()); + + Assert.assertNull(scheduler.getAppsInQueue("nonexistentqueue")); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index a3aa3f6614..84ffbc173a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -79,6 +79,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; @@ -2490,4 +2491,40 @@ public void testBlacklistNodes() throws Exception { assertEquals("Incorrect number of containers allocated", 1, app .getLiveContainers().size()); } + + @Test + public void testGetAppsInQueue() throws Exception { + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + ApplicationAttemptId appAttId1 = + createSchedulingRequest(1024, 1, "queue1.subqueue1", "user1"); + ApplicationAttemptId appAttId2 = + createSchedulingRequest(1024, 1, "queue1.subqueue2", "user1"); + ApplicationAttemptId appAttId3 = + createSchedulingRequest(1024, 1, "default", "user1"); + + List apps = + scheduler.getAppsInQueue("queue1.subqueue1"); + assertEquals(1, apps.size()); + assertEquals(appAttId1, apps.get(0)); + // with and without root prefix should work + apps = scheduler.getAppsInQueue("root.queue1.subqueue1"); + assertEquals(1, apps.size()); + assertEquals(appAttId1, apps.get(0)); + + apps = scheduler.getAppsInQueue("user1"); + assertEquals(1, apps.size()); + assertEquals(appAttId3, apps.get(0)); + // with and without root prefix should work + apps = scheduler.getAppsInQueue("root.user1"); + assertEquals(1, apps.size()); + assertEquals(appAttId3, apps.get(0)); + + // apps in subqueues should be included + apps = scheduler.getAppsInQueue("queue1"); + Assert.assertEquals(2, apps.size()); + Set appAttIds = Sets.newHashSet(apps.get(0), apps.get(1)); + assertTrue(appAttIds.contains(appAttId1)); + assertTrue(appAttIds.contains(appAttId2)); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java index 525fbefde7..e70070434e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import java.io.IOException; import java.lang.reflect.Method; @@ -555,6 +556,24 @@ public void testBlackListNodes() throws Exception { Assert.assertFalse(fs.getApplication(appAttemptId).isBlacklisted(host)); rm.stop(); } + + @Test + public void testGetAppsInQueue() throws Exception { + Application application_0 = new Application("user_0", resourceManager); + application_0.submit(); + + Application application_1 = new Application("user_0", resourceManager); + application_1.submit(); + + ResourceScheduler scheduler = resourceManager.getResourceScheduler(); + + List appsInDefault = scheduler.getAppsInQueue("default"); + assertTrue(appsInDefault.contains(application_0.getApplicationAttemptId())); + assertTrue(appsInDefault.contains(application_1.getApplicationAttemptId())); + assertEquals(2, appsInDefault.size()); + + Assert.assertNull(scheduler.getAppsInQueue("someotherqueue")); + } private void checkApplicationResourceUsage(int expected, Application application) {