From 6d3bcaa674d5e8b76a75c1cfeef690d2ea48b800 Mon Sep 17 00:00:00 2001 From: slfan1989 <55643692+slfan1989@users.noreply.github.com> Date: Sat, 5 Aug 2023 16:29:21 +0800 Subject: [PATCH] YARN-7402. BackPort [GPG] Fix potential connection leak in GPGUtils. (#5901) --- .../globalpolicygenerator/GPGUtils.java | 32 +++++++---- .../policygenerator/TestPolicyGenerator.java | 53 +++++++++++++++++-- 2 files changed, 71 insertions(+), 14 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java index 636ce92500..6d2e1d4142 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java @@ -18,20 +18,21 @@ package org.apache.hadoop.yarn.server.globalpolicygenerator; +import static javax.servlet.http.HttpServletResponse.SC_OK; + import java.util.HashMap; import java.util.Map; import java.util.Set; -import javax.servlet.http.HttpServletResponse; import javax.ws.rs.core.MediaType; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; import com.sun.jersey.api.client.Client; import com.sun.jersey.api.client.ClientResponse; import com.sun.jersey.api.client.WebResource; -import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; -import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; /** * GPGUtils contains utility functions for the GPG. @@ -57,15 +58,24 @@ public static T invokeRMWebService(String webAddr, String path, final Class< T obj = null; WebResource webResource = client.resource(webAddr); - ClientResponse response = webResource.path("ws/v1/cluster").path(path) - .accept(MediaType.APPLICATION_XML).get(ClientResponse.class); - if (response.getStatus() == HttpServletResponse.SC_OK) { - obj = response.getEntity(returnType); - } else { - throw new YarnRuntimeException("Bad response from remote web service: " - + response.getStatus()); + ClientResponse response = null; + try { + response = webResource.path("ws/v1/cluster").path(path) + .accept(MediaType.APPLICATION_XML).get(ClientResponse.class); + if (response.getStatus() == SC_OK) { + obj = response.getEntity(returnType); + } else { + throw new YarnRuntimeException( + "Bad response from remote web service: " + response.getStatus()); + } + return obj; + } finally { + if (response != null) { + response.close(); + response = null; + } + client.destroy(); } - return obj; } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/TestPolicyGenerator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/TestPolicyGenerator.java index 0fe475e3fd..9893e85e56 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/TestPolicyGenerator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/TestPolicyGenerator.java @@ -48,6 +48,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfoList; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfo; import org.apache.hadoop.yarn.webapp.util.WebAppUtils; import org.junit.After; import org.junit.Assert; @@ -292,11 +294,56 @@ public void testCallRM() { resourceManager.start(); String rmAddress = WebAppUtils.getRMWebAppURLWithScheme(this.conf); - SchedulerTypeInfo sti = GPGUtils - .invokeRMWebService(rmAddress, RMWSConsts.SCHEDULER, - SchedulerTypeInfo.class); + SchedulerTypeInfo sti = GPGUtils.invokeRMWebService(rmAddress, RMWSConsts.SCHEDULER, + SchedulerTypeInfo.class); Assert.assertNotNull(sti); + SchedulerInfo schedulerInfo = sti.getSchedulerInfo(); + Assert.assertTrue(schedulerInfo instanceof CapacitySchedulerInfo); + + CapacitySchedulerInfo capacitySchedulerInfo = (CapacitySchedulerInfo) schedulerInfo; + Assert.assertNotNull(capacitySchedulerInfo); + + CapacitySchedulerQueueInfoList queues = capacitySchedulerInfo.getQueues(); + Assert.assertNotNull(queues); + ArrayList queueInfoList = queues.getQueueInfoList(); + Assert.assertNotNull(queueInfoList); + Assert.assertEquals(2, queueInfoList.size()); + + CapacitySchedulerQueueInfo queueA = queueInfoList.get(0); + Assert.assertNotNull(queueA); + Assert.assertEquals("root.a", queueA.getQueuePath()); + Assert.assertEquals(10.5f, queueA.getCapacity(), 0.00001); + CapacitySchedulerQueueInfoList queueAQueues = queueA.getQueues(); + Assert.assertNotNull(queueAQueues); + ArrayList queueInfoAList = queueAQueues.getQueueInfoList(); + Assert.assertNotNull(queueInfoAList); + Assert.assertEquals(2, queueInfoAList.size()); + CapacitySchedulerQueueInfo queueA1 = queueInfoAList.get(0); + Assert.assertNotNull(queueA1); + Assert.assertEquals(30f, queueA1.getCapacity(), 0.00001); + CapacitySchedulerQueueInfo queueA2 = queueInfoAList.get(1); + Assert.assertNotNull(queueA2); + Assert.assertEquals(70f, queueA2.getCapacity(), 0.00001); + + CapacitySchedulerQueueInfo queueB = queueInfoList.get(1); + Assert.assertNotNull(queueB); + Assert.assertEquals("root.b", queueB.getQueuePath()); + Assert.assertEquals(89.5f, queueB.getCapacity(), 0.00001); + CapacitySchedulerQueueInfoList queueBQueues = queueB.getQueues(); + Assert.assertNotNull(queueBQueues); + ArrayList queueInfoBList = queueBQueues.getQueueInfoList(); + Assert.assertNotNull(queueInfoBList); + Assert.assertEquals(3, queueInfoBList.size()); + CapacitySchedulerQueueInfo queueB1 = queueInfoBList.get(0); + Assert.assertNotNull(queueB1); + Assert.assertEquals(79.2f, queueB1.getCapacity(), 0.00001); + CapacitySchedulerQueueInfo queueB2 = queueInfoBList.get(1); + Assert.assertNotNull(queueB2); + Assert.assertEquals(0.8f, queueB2.getCapacity(), 0.00001); + CapacitySchedulerQueueInfo queueB3 = queueInfoBList.get(2); + Assert.assertNotNull(queueB3); + Assert.assertEquals(20f, queueB3.getCapacity(), 0.00001); } /**