From 9af3eabdca2e246dbeb583cbc89d51e89ef11ecb Mon Sep 17 00:00:00 2001 From: Szilard Nemeth Date: Wed, 20 Apr 2022 19:39:47 +0200 Subject: [PATCH] YARN-11114. RMWebServices returns only apps matching exactly the submitted queue name. Contributed by Szilard Nemeth --- .../resourcemanager/ClientRMService.java | 29 +++- .../resourcemanager/TestClientRMService.java | 4 +- .../webapp/TestRMWebServicesApps.java | 150 +++++++++++++++++- 3 files changed, 178 insertions(+), 5 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index 04efc88641..6c37b7e9c0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -37,7 +37,6 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; - import org.apache.commons.cli.UnrecognizedOptionException; import org.apache.commons.lang3.Range; import org.slf4j.Logger; @@ -913,7 +912,17 @@ public GetApplicationsResponse getApplications(GetApplicationsRequest request) } if (queues != null && !queues.isEmpty()) { - if (!queues.contains(application.getQueue())) { + Map> foundApps = queryApplicationsByQueues(apps, queues); + List runningAppsByQueues = foundApps.entrySet().stream() + .filter(e -> queues.contains(e.getKey())) + .map(Map.Entry::getValue) + .flatMap(Collection::stream) + .collect(Collectors.toList()); + List runningAppsById = runningAppsByQueues.stream() + .filter(app -> app.getApplicationId().equals(application.getApplicationId())) + .collect(Collectors.toList()); + + if (runningAppsById.isEmpty() && !queues.contains(application.getQueue())) { continue; } } @@ -992,6 +1001,22 @@ public GetApplicationsResponse getApplications(GetApplicationsRequest request) return response; } + private Map> queryApplicationsByQueues( + Map apps, Set queues) { + final Map> appsToQueues = new HashMap<>(); + for (String queue : queues) { + List appsInQueue = scheduler.getAppsInQueue(queue); + if (appsInQueue != null && !appsInQueue.isEmpty()) { + for (ApplicationAttemptId appAttemptId : appsInQueue) { + RMApp rmApp = apps.get(appAttemptId.getApplicationId()); + appsToQueues.putIfAbsent(queue, new ArrayList<>()); + appsToQueues.get(queue).add(rmApp); + } + } + } + return appsToQueues; + } + private Set getLowerCasedAppTypes(GetApplicationsRequest request) { Set applicationTypes = new HashSet<>(); if (request.getApplicationTypes() != null && !request.getApplicationTypes() diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java index 7806845a2e..9f4e9433b1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java @@ -1402,9 +1402,9 @@ public void handle(Event event) {} request.setQueues(queueSet); queueSet.add(queues[0]); - assertEquals("Incorrect number of applications in queue", 2, + assertEquals("Incorrect number of applications in queue", 3, rmService.getApplications(request).getApplicationList().size()); - assertEquals("Incorrect number of applications in queue", 2, + assertEquals("Incorrect number of applications in queue", 3, rmService.getApplications(request).getApplicationList().size()); queueSet.add(queues[1]); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesApps.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesApps.java index e22311ccbd..4c859baf78 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesApps.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesApps.java @@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo; import org.apache.hadoop.yarn.webapp.GenericExceptionHandler; @@ -82,6 +83,16 @@ public class TestRMWebServicesApps extends JerseyTestBase { private static final int CONTAINER_MB = 1024; private static class WebServletModule extends ServletModule { + private final Class scheduler; + + public WebServletModule() { + this.scheduler = FifoScheduler.class; + } + + public WebServletModule(Class scheduler) { + this.scheduler = scheduler; + } + @Override protected void configureServlets() { bind(JAXBContextResolver.class); @@ -90,7 +101,7 @@ protected void configureServlets() { Configuration conf = new Configuration(); conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); - conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class, + conf.setClass(YarnConfiguration.RM_SCHEDULER, scheduler, ResourceScheduler.class); rm = new MockRM(conf); bind(ResourceManager.class).toInstance(rm); @@ -1970,5 +1981,142 @@ public void verifyResourceRequestsGeneric(ResourceRequest request, enforceExecutionType); } + @Test + public void testAppsQueryByQueueShortname() throws Exception { + GuiceServletConfig.setInjector( + Guice.createInjector(new WebServletModule(CapacityScheduler.class))); + + rm.start(); + MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048); + //YARN-11114 - Finished apps can only be queried with exactly the + // same queue name that the app is submitted to. + //As the queue is 'root.default' and the query is 'default' here, + // this app won't be returned. + RMApp finishedApp1 = MockRMAppSubmitter.submit(rm, + MockRMAppSubmissionData.Builder + .createWithMemory(CONTAINER_MB, rm) + .withQueue("root.default") + .build()); + RMApp finishedApp2 = MockRMAppSubmitter.submit(rm, + MockRMAppSubmissionData.Builder + .createWithMemory(CONTAINER_MB, rm) + .withQueue("default") + .build()); + + RMApp runningApp1 = MockRMAppSubmitter.submit(rm, + MockRMAppSubmissionData.Builder + .createWithMemory(CONTAINER_MB, rm) + .withQueue("default") + .build()); + RMApp runningApp2 = MockRMAppSubmitter.submit(rm, + MockRMAppSubmissionData.Builder + .createWithMemory(CONTAINER_MB, rm) + .withQueue("root.default") + .build()); + amNodeManager.nodeHeartbeat(true); + finishApp(amNodeManager, finishedApp1); + amNodeManager.nodeHeartbeat(true); + finishApp(amNodeManager, finishedApp2); + + WebResource r = resource(); + + ClientResponse response = r.path("ws").path("v1").path("cluster") + .path("apps") + .queryParam("queue", "default") + .accept(MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8, + response.getType().toString()); + JSONObject json = response.getEntity(JSONObject.class); + assertEquals("incorrect number of elements", 1, json.length()); + JSONObject apps = json.getJSONObject("apps"); + assertEquals("incorrect number of elements", 1, apps.length()); + + JSONArray array = apps.getJSONArray("app"); + + Set appIds = getApplicationIds(array); + assertTrue("Running app 1 should be in the result list!", + appIds.contains(runningApp1.getApplicationId().toString())); + assertTrue("Running app 2 should be in the result list!", + appIds.contains(runningApp2.getApplicationId().toString())); + assertFalse("Finished app 1 should not be in the result list " + + "as it was submitted to 'root.default' but the query is for 'default'", + appIds.contains(finishedApp1.getApplicationId().toString())); + assertTrue("Finished app 2 should be in the result list " + + "as it was submitted to 'default' and the query is exactly for 'default'", + appIds.contains(finishedApp2.getApplicationId().toString())); + assertEquals("incorrect number of elements", 3, array.length()); + + rm.stop(); + } + + @Test + public void testAppsQueryByQueueFullname() throws Exception { + GuiceServletConfig.setInjector( + Guice.createInjector(new WebServletModule(CapacityScheduler.class))); + + rm.start(); + MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048); + RMApp finishedApp1 = MockRMAppSubmitter.submit(rm, + MockRMAppSubmissionData.Builder + .createWithMemory(CONTAINER_MB, rm) + .withQueue("root.default") + .build()); + //YARN-11114 - Finished apps can only be queried with exactly the + // same queue name that the app is submitted to. + //As the queue is 'default' and the query is 'root.default' here, + // this app won't be returned, + RMApp finishedApp2 = MockRMAppSubmitter.submit(rm, + MockRMAppSubmissionData.Builder + .createWithMemory(CONTAINER_MB, rm) + .withQueue("default") + .build()); + + RMApp runningApp1 = MockRMAppSubmitter.submit(rm, + MockRMAppSubmissionData.Builder + .createWithMemory(CONTAINER_MB, rm) + .withQueue("default") + .build()); + RMApp runningApp2 = MockRMAppSubmitter.submit(rm, + MockRMAppSubmissionData.Builder + .createWithMemory(CONTAINER_MB, rm) + .withQueue("root.default") + .build()); + amNodeManager.nodeHeartbeat(true); + finishApp(amNodeManager, finishedApp1); + + amNodeManager.nodeHeartbeat(true); + finishApp(amNodeManager, finishedApp2); + + WebResource r = resource(); + + ClientResponse response = r.path("ws").path("v1").path("cluster") + .path("apps") + .queryParam("queue", "root.default") + .accept(MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8, + response.getType().toString()); + JSONObject json = response.getEntity(JSONObject.class); + assertEquals("incorrect number of elements", 1, json.length()); + JSONObject apps = json.getJSONObject("apps"); + assertEquals("incorrect number of elements", 1, apps.length()); + + JSONArray array = apps.getJSONArray("app"); + + Set appIds = getApplicationIds(array); + assertTrue("Running app 1 should be in the result list!", + appIds.contains(runningApp1.getApplicationId().toString())); + assertTrue("Running app 2 should be in the result list!", + appIds.contains(runningApp2.getApplicationId().toString())); + assertTrue("Finished app 1 should be in the result list, " + + "as it was submitted to 'root.default' and the query is exactly for 'root.default'!", + appIds.contains(finishedApp1.getApplicationId().toString())); + assertFalse("Finished app 2 should not be in the result list, " + + "as it was submitted to 'default' but the query is for 'root.default'!", + appIds.contains(finishedApp2.getApplicationId().toString())); + assertEquals("incorrect number of elements", 3, array.length()); + + rm.stop(); + } + }