YARN-807. When querying apps by queue, iterating over all apps is inefficient and limiting (Sandy Ryza)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1548983 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
91d0b47270
commit
e1d3670f4c
@ -867,5 +867,10 @@ public ApplicationResourceUsageReport getAppResourceUsageReport(
|
||||
ApplicationAttemptId appAttemptId) {
|
||||
return scheduler.getAppResourceUsageReport(appAttemptId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ApplicationAttemptId> getAppsInQueue(String queue) {
|
||||
return scheduler.getAppsInQueue(queue);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -145,6 +145,9 @@ Release 2.4.0 - UNRELEASED
|
||||
|
||||
YARN-546. Allow disabling the Fair Scheduler event log (Sandy Ryza)
|
||||
|
||||
YARN-807. When querying apps by queue, iterating over all apps is
|
||||
inefficient and limiting (Sandy Ryza)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
@ -24,7 +24,9 @@
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.EnumSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
@ -67,6 +69,7 @@
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
@ -431,12 +434,51 @@ public GetApplicationsResponse getApplications(
|
||||
LongRange start = request.getStartRange();
|
||||
LongRange finish = request.getFinishRange();
|
||||
|
||||
List<ApplicationReport> reports = new ArrayList<ApplicationReport>();
|
||||
long count = 0;
|
||||
for (RMApp application : this.rmContext.getRMApps().values()) {
|
||||
if (++count > limit) {
|
||||
break;
|
||||
final Map<ApplicationId, RMApp> apps = rmContext.getRMApps();
|
||||
Iterator<RMApp> appsIter;
|
||||
// If the query filters by queues, we can avoid considering apps outside
|
||||
// of those queues by asking the scheduler for the apps in those queues.
|
||||
if (queues != null && !queues.isEmpty()) {
|
||||
// Construct an iterator over apps in given queues
|
||||
// Collect list of lists to avoid copying all apps
|
||||
final List<List<ApplicationAttemptId>> queueAppLists =
|
||||
new ArrayList<List<ApplicationAttemptId>>();
|
||||
for (String queue : queues) {
|
||||
List<ApplicationAttemptId> appsInQueue = scheduler.getAppsInQueue(queue);
|
||||
if (appsInQueue != null && !appsInQueue.isEmpty()) {
|
||||
queueAppLists.add(appsInQueue);
|
||||
}
|
||||
}
|
||||
appsIter = new Iterator<RMApp>() {
|
||||
Iterator<List<ApplicationAttemptId>> appListIter = queueAppLists.iterator();
|
||||
Iterator<ApplicationAttemptId> schedAppsIter;
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
// Because queueAppLists has no empty lists, hasNext is whether the
|
||||
// current list hasNext or whether there are any remaining lists
|
||||
return (schedAppsIter != null && schedAppsIter.hasNext())
|
||||
|| appListIter.hasNext();
|
||||
}
|
||||
@Override
|
||||
public RMApp next() {
|
||||
if (schedAppsIter == null || !schedAppsIter.hasNext()) {
|
||||
schedAppsIter = appListIter.next().iterator();
|
||||
}
|
||||
return apps.get(schedAppsIter.next().getApplicationId());
|
||||
}
|
||||
@Override
|
||||
public void remove() {
|
||||
throw new UnsupportedOperationException("Remove not supported");
|
||||
}
|
||||
};
|
||||
} else {
|
||||
appsIter = apps.values().iterator();
|
||||
}
|
||||
|
||||
List<ApplicationReport> reports = new ArrayList<ApplicationReport>();
|
||||
while (appsIter.hasNext() && reports.size() < limit) {
|
||||
RMApp application = appsIter.next();
|
||||
if (applicationTypes != null && !applicationTypes.isEmpty()) {
|
||||
String appTypeToMatch = caseSensitive
|
||||
? application.getApplicationType()
|
||||
@ -458,11 +500,6 @@ public GetApplicationsResponse getApplications(
|
||||
continue;
|
||||
}
|
||||
|
||||
if (queues != null && !queues.isEmpty() &&
|
||||
!queues.contains(application.getQueue())) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (start != null && !start.containsLong(application.getStartTime())) {
|
||||
continue;
|
||||
}
|
||||
@ -515,13 +552,12 @@ public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request)
|
||||
request.getRecursive());
|
||||
List<ApplicationReport> appReports = EMPTY_APPS_REPORT;
|
||||
if (request.getIncludeApplications()) {
|
||||
Collection<RMApp> apps = this.rmContext.getRMApps().values();
|
||||
appReports = new ArrayList<ApplicationReport>(
|
||||
apps.size());
|
||||
for (RMApp app : apps) {
|
||||
if (app.getQueue().equals(queueInfo.getQueueName())) {
|
||||
appReports.add(app.createAndGetApplicationReport(null, true));
|
||||
}
|
||||
List<ApplicationAttemptId> apps =
|
||||
scheduler.getAppsInQueue(request.getQueueName());
|
||||
appReports = new ArrayList<ApplicationReport>(apps.size());
|
||||
for (ApplicationAttemptId app : apps) {
|
||||
RMApp rmApp = rmContext.getRMApps().get(app.getApplicationId());
|
||||
appReports.add(rmApp.createAndGetApplicationReport(null, true));
|
||||
}
|
||||
}
|
||||
queueInfo.setApplications(appReports);
|
||||
|
@ -19,6 +19,7 @@
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
|
||||
@ -160,4 +161,13 @@ ApplicationResourceUsageReport getAppResourceUsageReport(
|
||||
*/
|
||||
boolean checkAccess(UserGroupInformation callerUGI,
|
||||
QueueACL acl, String queueName);
|
||||
|
||||
/**
|
||||
* Gets the apps under a given queue
|
||||
* @param queueName the name of the queue.
|
||||
* @return a collection of app attempt ids in the given queue.
|
||||
*/
|
||||
@LimitedPrivate("yarn")
|
||||
@Stable
|
||||
public List<ApplicationAttemptId> getAppsInQueue(String queueName);
|
||||
}
|
||||
|
@ -19,12 +19,14 @@
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Stable;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.QueueACL;
|
||||
@ -33,6 +35,7 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
||||
|
||||
@ -228,4 +231,10 @@ public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource)
|
||||
*/
|
||||
public void recoverContainer(Resource clusterResource, FiCaSchedulerApp application,
|
||||
Container container);
|
||||
|
||||
/**
|
||||
* Adds all applications in the queue and its subqueues to the given collection.
|
||||
* @param apps the collection to add the applications to
|
||||
*/
|
||||
public void collectSchedulerApplications(Collection<ApplicationAttemptId> apps);
|
||||
}
|
||||
|
@ -66,6 +66,7 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||
@ -941,4 +942,14 @@ public synchronized boolean checkAccess(UserGroupInformation callerUGI,
|
||||
return queue.hasAccess(acl, callerUGI);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ApplicationAttemptId> getAppsInQueue(String queueName) {
|
||||
CSQueue queue = queues.get(queueName);
|
||||
if (queue == null) {
|
||||
return null;
|
||||
}
|
||||
List<ApplicationAttemptId> apps = new ArrayList<ApplicationAttemptId>();
|
||||
queue.collectSchedulerApplications(apps);
|
||||
return apps;
|
||||
}
|
||||
}
|
||||
|
@ -20,6 +20,7 @@
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
@ -58,6 +59,7 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||
@ -1622,4 +1624,12 @@ public Resource getTotalResourcePending() {
|
||||
return ret;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void collectSchedulerApplications(
|
||||
Collection<ApplicationAttemptId> apps) {
|
||||
for (FiCaSchedulerApp app : activeApplications) {
|
||||
apps.add(app.getApplicationAttemptId());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -36,6 +36,7 @@
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authorize.AccessControlList;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.QueueACL;
|
||||
@ -50,6 +51,7 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||
@ -764,4 +766,12 @@ public void recoverContainer(Resource clusterResource,
|
||||
parent.recoverContainer(clusterResource, application, container);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void collectSchedulerApplications(
|
||||
Collection<ApplicationAttemptId> apps) {
|
||||
for (CSQueue queue : childQueues) {
|
||||
queue.collectSchedulerApplications(apps);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -29,11 +29,13 @@
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.QueueACL;
|
||||
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
@ -105,6 +107,17 @@ public Collection<AppSchedulable> getRunnableAppSchedulables() {
|
||||
public List<AppSchedulable> getNonRunnableAppSchedulables() {
|
||||
return nonRunnableAppScheds;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void collectSchedulerApplications(
|
||||
Collection<ApplicationAttemptId> apps) {
|
||||
for (AppSchedulable appSched : runnableAppScheds) {
|
||||
apps.add(appSched.getApp().getApplicationAttemptId());
|
||||
}
|
||||
for (AppSchedulable appSched : nonRunnableAppScheds) {
|
||||
apps.add(appSched.getApp().getApplicationAttemptId());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setPolicy(SchedulingPolicy policy)
|
||||
|
@ -28,10 +28,12 @@
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.QueueACL;
|
||||
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
@ -184,4 +186,12 @@ public void decrementRunnableApps() {
|
||||
public int getNumRunnableApps() {
|
||||
return runnableApps;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void collectSchedulerApplications(
|
||||
Collection<ApplicationAttemptId> apps) {
|
||||
for (FSQueue childQueue : childQueues) {
|
||||
childQueue.collectSchedulerApplications(apps);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -24,6 +24,7 @@
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.QueueACL;
|
||||
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
||||
@ -158,7 +159,14 @@ public boolean hasAccess(QueueACL acl, UserGroupInformation user) {
|
||||
* Gets the children of this queue, if any.
|
||||
*/
|
||||
public abstract Collection<FSQueue> getChildQueues();
|
||||
|
||||
|
||||
/**
|
||||
* Adds all applications in the queue and its subqueues to the given collection.
|
||||
* @param apps the collection to add the applications to
|
||||
*/
|
||||
public abstract void collectSchedulerApplications(
|
||||
Collection<ApplicationAttemptId> apps);
|
||||
|
||||
/**
|
||||
* Return the number of apps for which containers can be allocated.
|
||||
* Includes apps in subqueues.
|
||||
|
@ -72,6 +72,7 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
||||
@ -1267,4 +1268,15 @@ public void onReload(AllocationConfiguration queueInfo) {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ApplicationAttemptId> getAppsInQueue(String queueName) {
|
||||
FSQueue queue = queueMgr.getQueue(queueName);
|
||||
if (queue == null) {
|
||||
return null;
|
||||
}
|
||||
List<ApplicationAttemptId> apps = new ArrayList<ApplicationAttemptId>();
|
||||
queue.collectSchedulerApplications(apps);
|
||||
return apps;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -21,6 +21,7 @@
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
@ -850,5 +851,19 @@ public synchronized boolean checkAccess(UserGroupInformation callerUGI,
|
||||
QueueACL acl, String queueName) {
|
||||
return DEFAULT_QUEUE.hasAccess(acl, callerUGI);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized List<ApplicationAttemptId> getAppsInQueue(String queueName) {
|
||||
if (queueName.equals(DEFAULT_QUEUE.getQueueName())) {
|
||||
List<ApplicationAttemptId> apps = new ArrayList<ApplicationAttemptId>(
|
||||
applications.size());
|
||||
for (FiCaSchedulerApp app : applications.values()) {
|
||||
apps.add(app.getApplicationAttemptId());
|
||||
}
|
||||
return apps;
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -30,9 +30,12 @@
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.BrokenBarrierException;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
@ -106,6 +109,9 @@ public class TestClientRMService {
|
||||
|
||||
private static RMDelegationTokenSecretManager dtsm;
|
||||
|
||||
private final static String QUEUE_1 = "Q-1";
|
||||
private final static String QUEUE_2 = "Q-2";
|
||||
|
||||
@BeforeClass
|
||||
public static void setupSecretManager() throws IOException {
|
||||
RMContext rmContext = mock(RMContext.class);
|
||||
@ -438,7 +444,7 @@ public void handle(Event event) {}
|
||||
mockAclsManager, mockQueueACLsManager, null);
|
||||
|
||||
// Initialize appnames and queues
|
||||
String[] queues = {"Q-1", "Q-2"};
|
||||
String[] queues = {QUEUE_1, QUEUE_2};
|
||||
String[] appNames =
|
||||
{MockApps.newAppName(), MockApps.newAppName(), MockApps.newAppName()};
|
||||
ApplicationId[] appIds =
|
||||
@ -596,6 +602,8 @@ private void mockRMContext(YarnScheduler yarnScheduler, RMContext rmContext)
|
||||
ConcurrentHashMap<ApplicationId, RMApp> apps = getRMApps(rmContext,
|
||||
yarnScheduler);
|
||||
when(rmContext.getRMApps()).thenReturn(apps);
|
||||
when(yarnScheduler.getAppsInQueue(eq("testqueue"))).thenReturn(
|
||||
getSchedulerApps(apps));
|
||||
}
|
||||
|
||||
private ConcurrentHashMap<ApplicationId, RMApp> getRMApps(
|
||||
@ -614,10 +622,23 @@ private ConcurrentHashMap<ApplicationId, RMApp> getRMApps(
|
||||
config, "testqueue"));
|
||||
return apps;
|
||||
}
|
||||
|
||||
private List<ApplicationAttemptId> getSchedulerApps(
|
||||
Map<ApplicationId, RMApp> apps) {
|
||||
List<ApplicationAttemptId> schedApps = new ArrayList<ApplicationAttemptId>();
|
||||
// Return app IDs for the apps in testqueue (as defined in getRMApps)
|
||||
schedApps.add(ApplicationAttemptId.newInstance(getApplicationId(1), 0));
|
||||
schedApps.add(ApplicationAttemptId.newInstance(getApplicationId(3), 0));
|
||||
return schedApps;
|
||||
}
|
||||
|
||||
private ApplicationId getApplicationId(int id) {
|
||||
private static ApplicationId getApplicationId(int id) {
|
||||
return ApplicationId.newInstance(123456, id);
|
||||
}
|
||||
|
||||
private static ApplicationAttemptId getApplicationAttemptId(int id) {
|
||||
return ApplicationAttemptId.newInstance(getApplicationId(id), 1);
|
||||
}
|
||||
|
||||
private RMAppImpl getRMApp(RMContext rmContext, YarnScheduler yarnScheduler,
|
||||
ApplicationId applicationId3, YarnConfiguration config, String queueName) {
|
||||
@ -641,6 +662,10 @@ private static YarnScheduler mockYarnScheduler() {
|
||||
when(yarnScheduler.getMaximumResourceCapability()).thenReturn(
|
||||
Resources.createResource(
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB));
|
||||
when(yarnScheduler.getAppsInQueue(QUEUE_1)).thenReturn(
|
||||
Arrays.asList(getApplicationAttemptId(101), getApplicationAttemptId(102)));
|
||||
when(yarnScheduler.getAppsInQueue(QUEUE_2)).thenReturn(
|
||||
Arrays.asList(getApplicationAttemptId(103)));
|
||||
return yarnScheduler;
|
||||
}
|
||||
}
|
||||
|
@ -651,5 +651,35 @@ public void run() {
|
||||
}
|
||||
assertFalse(failed.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetAppsInQueue() throws Exception {
|
||||
Application application_0 = new Application("user_0", "a1", resourceManager);
|
||||
application_0.submit();
|
||||
|
||||
Application application_1 = new Application("user_0", "a2", resourceManager);
|
||||
application_1.submit();
|
||||
|
||||
Application application_2 = new Application("user_0", "b2", resourceManager);
|
||||
application_2.submit();
|
||||
|
||||
ResourceScheduler scheduler = resourceManager.getResourceScheduler();
|
||||
|
||||
List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
|
||||
assertEquals(1, appsInA1.size());
|
||||
|
||||
List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a");
|
||||
assertTrue(appsInA.contains(application_0.getApplicationAttemptId()));
|
||||
assertTrue(appsInA.contains(application_1.getApplicationAttemptId()));
|
||||
assertEquals(2, appsInA.size());
|
||||
|
||||
List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root");
|
||||
assertTrue(appsInRoot.contains(application_0.getApplicationAttemptId()));
|
||||
assertTrue(appsInRoot.contains(application_1.getApplicationAttemptId()));
|
||||
assertTrue(appsInRoot.contains(application_2.getApplicationAttemptId()));
|
||||
assertEquals(3, appsInRoot.size());
|
||||
|
||||
Assert.assertNull(scheduler.getAppsInQueue("nonexistentqueue"));
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -79,6 +79,7 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacityScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
||||
@ -2490,4 +2491,40 @@ public void testBlacklistNodes() throws Exception {
|
||||
assertEquals("Incorrect number of containers allocated", 1, app
|
||||
.getLiveContainers().size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetAppsInQueue() throws Exception {
|
||||
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
||||
|
||||
ApplicationAttemptId appAttId1 =
|
||||
createSchedulingRequest(1024, 1, "queue1.subqueue1", "user1");
|
||||
ApplicationAttemptId appAttId2 =
|
||||
createSchedulingRequest(1024, 1, "queue1.subqueue2", "user1");
|
||||
ApplicationAttemptId appAttId3 =
|
||||
createSchedulingRequest(1024, 1, "default", "user1");
|
||||
|
||||
List<ApplicationAttemptId> apps =
|
||||
scheduler.getAppsInQueue("queue1.subqueue1");
|
||||
assertEquals(1, apps.size());
|
||||
assertEquals(appAttId1, apps.get(0));
|
||||
// with and without root prefix should work
|
||||
apps = scheduler.getAppsInQueue("root.queue1.subqueue1");
|
||||
assertEquals(1, apps.size());
|
||||
assertEquals(appAttId1, apps.get(0));
|
||||
|
||||
apps = scheduler.getAppsInQueue("user1");
|
||||
assertEquals(1, apps.size());
|
||||
assertEquals(appAttId3, apps.get(0));
|
||||
// with and without root prefix should work
|
||||
apps = scheduler.getAppsInQueue("root.user1");
|
||||
assertEquals(1, apps.size());
|
||||
assertEquals(appAttId3, apps.get(0));
|
||||
|
||||
// apps in subqueues should be included
|
||||
apps = scheduler.getAppsInQueue("queue1");
|
||||
Assert.assertEquals(2, apps.size());
|
||||
Set<ApplicationAttemptId> appAttIds = Sets.newHashSet(apps.get(0), apps.get(1));
|
||||
assertTrue(appAttIds.contains(appAttId1));
|
||||
assertTrue(appAttIds.contains(appAttId2));
|
||||
}
|
||||
}
|
||||
|
@ -19,6 +19,7 @@
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Method;
|
||||
@ -555,6 +556,24 @@ public void testBlackListNodes() throws Exception {
|
||||
Assert.assertFalse(fs.getApplication(appAttemptId).isBlacklisted(host));
|
||||
rm.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetAppsInQueue() throws Exception {
|
||||
Application application_0 = new Application("user_0", resourceManager);
|
||||
application_0.submit();
|
||||
|
||||
Application application_1 = new Application("user_0", resourceManager);
|
||||
application_1.submit();
|
||||
|
||||
ResourceScheduler scheduler = resourceManager.getResourceScheduler();
|
||||
|
||||
List<ApplicationAttemptId> appsInDefault = scheduler.getAppsInQueue("default");
|
||||
assertTrue(appsInDefault.contains(application_0.getApplicationAttemptId()));
|
||||
assertTrue(appsInDefault.contains(application_1.getApplicationAttemptId()));
|
||||
assertEquals(2, appsInDefault.size());
|
||||
|
||||
Assert.assertNull(scheduler.getAppsInQueue("someotherqueue"));
|
||||
}
|
||||
|
||||
private void checkApplicationResourceUsage(int expected,
|
||||
Application application) {
|
||||
|
Loading…
Reference in New Issue
Block a user