From 989715ec5066c6ac7868e25ad9234dc64723e61e Mon Sep 17 00:00:00 2001 From: Giovanni Matteo Fumarola Date: Fri, 2 Nov 2018 15:30:08 -0700 Subject: [PATCH] YARN-8893. [AMRMProxy] Fix thread leak in AMRMClientRelayer and UAM client. Contributed by Botong Huang. --- .../hadoop/yarn/server/AMRMClientRelayer.java | 57 +++++-------------- .../server/uam/UnmanagedAMPoolManager.java | 28 +++++++++ .../uam/UnmanagedApplicationManager.java | 28 ++++++--- .../server/MockResourceManagerFacade.java | 5 +- .../yarn/server/TestAMRMClientRelayer.java | 10 ++-- .../metrics/TestAMRMClientRelayerMetrics.java | 6 -- .../uam/TestUnmanagedApplicationManager.java | 27 ++++++++- .../amrmproxy/FederationInterceptor.java | 18 +++--- .../amrmproxy/TestAMRMProxyService.java | 1 - 9 files changed, 103 insertions(+), 77 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java index dc668684ec..ac43b122f4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java @@ -27,9 +27,8 @@ import java.util.Set; import java.util.TreeSet; -import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; @@ -47,8 +46,6 @@ import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.api.records.UpdatedContainer; import org.apache.hadoop.yarn.client.AMRMClientUtils; -import org.apache.hadoop.yarn.client.ClientRMProxy; -import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -66,8 +63,7 @@ * pending requests similar to AMRMClient, and handles RM re-sync automatically * without propagate the re-sync exception back to AMRMClient. */ -public class AMRMClientRelayer extends AbstractService - implements ApplicationMasterProtocol { +public class AMRMClientRelayer implements ApplicationMasterProtocol { private static final Logger LOG = LoggerFactory.getLogger(AMRMClientRelayer.class); @@ -136,51 +132,16 @@ public class AMRMClientRelayer extends AbstractService private AMRMClientRelayerMetrics metrics; - public AMRMClientRelayer() { - super(AMRMClientRelayer.class.getName()); - this.resetResponseId = -1; - this.metrics = AMRMClientRelayerMetrics.getInstance(); - this.rmClient = null; - this.appId = null; - this.rmId = ""; - } - public AMRMClientRelayer(ApplicationMasterProtocol rmClient, ApplicationId appId, String rmId) { - this(); + this.resetResponseId = -1; + this.metrics = AMRMClientRelayerMetrics.getInstance(); + this.rmId = ""; this.rmClient = rmClient; this.appId = appId; this.rmId = rmId; } - @Override - protected void serviceInit(Configuration conf) throws Exception { - super.serviceInit(conf); - } - - @Override - protected void serviceStart() throws Exception { - final YarnConfiguration conf = new YarnConfiguration(getConfig()); - try { - if (this.rmClient == null) { - this.rmClient = - ClientRMProxy.createRMProxy(conf, ApplicationMasterProtocol.class); - } - } catch (IOException e) { - throw new YarnRuntimeException(e); - } - super.serviceStart(); - } - - @Override - protected void serviceStop() throws Exception { - if (this.rmClient != null) { - RPC.stopProxy(this.rmClient); - } - shutdown(); - super.serviceStop(); - } - public void setAMRegistrationRequest( RegisterApplicationMasterRequest registerRequest) { this.amRegistrationRequest = registerRequest; @@ -231,6 +192,14 @@ public void shutdown() { .decrClientPending(rmId, req.getContainerUpdateType(), 1); } } + + if (this.rmClient != null) { + try { + RPC.stopProxy(this.rmClient); + this.rmClient = null; + } catch (HadoopIllegalArgumentException e) { + } + } } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java index d5a0168175..7072030bd8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java @@ -371,6 +371,34 @@ public FinishApplicationMasterResponse finishApplicationMaster(String uamId, return response; } + /** + * Shutdown an UAM client without killing it in YarnRM. + * + * @param uamId uam Id + * @throws YarnException if fails + */ + public void shutDownConnections(String uamId) + throws YarnException { + if (!this.unmanagedAppMasterMap.containsKey(uamId)) { + throw new YarnException("UAM " + uamId + " does not exist"); + } + LOG.info( + "Shutting down UAM id {} for application {} without killing the UAM", + uamId, this.appIdMap.get(uamId)); + this.unmanagedAppMasterMap.remove(uamId).shutDownConnections(); + } + + /** + * Shutdown all UAM clients without killing them in YarnRM. + * + * @throws YarnException if fails + */ + public void shutDownConnections() throws YarnException { + for (String uamId : this.unmanagedAppMasterMap.keySet()) { + shutDownConnections(uamId); + } + } + /** * Get the id of all running UAMs. * diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java index 134df1d304..cc1d21f038 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java @@ -255,9 +255,6 @@ public RegisterApplicationMasterResponse registerApplicationMaster( public FinishApplicationMasterResponse finishApplicationMaster( FinishApplicationMasterRequest request) throws YarnException, IOException { - - this.heartbeatHandler.shutdown(); - if (this.userUgi == null) { if (this.connectionInitiated) { // This is possible if the async launchUAM is still @@ -270,7 +267,12 @@ public FinishApplicationMasterResponse finishApplicationMaster( + "be called before createAndRegister"); } } - return this.rmProxyRelayer.finishApplicationMaster(request); + FinishApplicationMasterResponse response = + this.rmProxyRelayer.finishApplicationMaster(request); + if (response.getIsUnregistered()) { + shutDownConnections(); + } + return response; } /** @@ -282,11 +284,10 @@ public FinishApplicationMasterResponse finishApplicationMaster( */ public KillApplicationResponse forceKillApplication() throws IOException, YarnException { + shutDownConnections(); + KillApplicationRequest request = KillApplicationRequest.newInstance(this.applicationId); - - this.heartbeatHandler.shutdown(); - if (this.rmClient == null) { this.rmClient = createRMProxy(ApplicationClientProtocol.class, this.conf, UserGroupInformation.createRemoteUser(this.submitter), null); @@ -323,6 +324,14 @@ public void allocateAsync(AllocateRequest request, } } + /** + * Shutdown this UAM client, without killing the UAM in the YarnRM side. + */ + public void shutDownConnections() { + this.heartbeatHandler.shutdown(); + this.rmProxyRelayer.shutdown(); + } + /** * Returns the application id of the UAM. * @@ -532,4 +541,9 @@ public int getRequestQueueSize() { protected void drainHeartbeatThread() { this.heartbeatHandler.drainHeartbeatThread(); } + + @VisibleForTesting + protected boolean isHeartbeatThreadAlive() { + return this.heartbeatHandler.isAlive(); + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java index 60c2ac9ad6..16ba9032b6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java @@ -120,7 +120,6 @@ import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; -import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeLabel; @@ -338,9 +337,7 @@ public FinishApplicationMasterResponse finishApplicationMaster( applicationContainerIdMap.remove(appId); } - return FinishApplicationMasterResponse.newInstance( - request.getFinalApplicationStatus() == FinalApplicationStatus.SUCCEEDED - ? true : false); + return FinishApplicationMasterResponse.newInstance(true); } protected ApplicationId getApplicationId(int id) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/TestAMRMClientRelayer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/TestAMRMClientRelayer.java index fa46960ee7..46570a1465 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/TestAMRMClientRelayer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/TestAMRMClientRelayer.java @@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.scheduler.ResourceRequestSet; import org.apache.hadoop.yarn.util.Records; +import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -155,16 +156,17 @@ public void setup() throws YarnException, IOException { this.mockAMS = new MockApplicationMasterService(); this.relayer = new AMRMClientRelayer(this.mockAMS, null, "TEST"); - - this.relayer.init(conf); - this.relayer.start(); - this.relayer.registerApplicationMaster( RegisterApplicationMasterRequest.newInstance("", 0, "")); clearAllocateRequestLists(); } + @After + public void cleanup() { + this.relayer.shutdown(); + } + private void assertAsksAndReleases(int expectedAsk, int expectedRelease) { Assert.assertEquals(expectedAsk, this.mockAMS.lastAsk.size()); Assert.assertEquals(expectedRelease, this.mockAMS.lastRelease.size()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/metrics/TestAMRMClientRelayerMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/metrics/TestAMRMClientRelayerMetrics.java index ebbfae238d..dd48241d72 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/metrics/TestAMRMClientRelayerMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/metrics/TestAMRMClientRelayerMetrics.java @@ -141,17 +141,11 @@ public void setup() throws YarnException, IOException { this.homeRelayer = new AMRMClientRelayer(this.mockAMS, ApplicationId.newInstance(0, 0), this.homeID); - this.homeRelayer.init(conf); - this.homeRelayer.start(); - this.homeRelayer.registerApplicationMaster( RegisterApplicationMasterRequest.newInstance("", 0, "")); this.uamRelayer = new AMRMClientRelayer(this.mockAMS, ApplicationId.newInstance(0, 0), this.uamID); - this.uamRelayer.init(conf); - this.uamRelayer.start(); - this.uamRelayer.registerApplicationMaster( RegisterApplicationMasterRequest.newInstance("", 0, "")); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java index 54e7dd3554..abb1d93c33 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java @@ -87,7 +87,7 @@ protected void waitForCallBackCountAndCheckZeroPending( } } - @Test(timeout = 5000) + @Test(timeout = 10000) public void testBasicUsage() throws YarnException, IOException, InterruptedException { @@ -104,6 +104,11 @@ public void testBasicUsage() finishApplicationMaster( FinishApplicationMasterRequest.newInstance(null, null, null), attemptId); + + while (uam.isHeartbeatThreadAlive()) { + LOG.info("waiting for heartbeat thread to finish"); + Thread.sleep(100); + } } /* @@ -261,7 +266,7 @@ public void testFinishWithoutRegister() attemptId); } - @Test + @Test(timeout = 10000) public void testForceKill() throws YarnException, IOException, InterruptedException { launchUAM(attemptId); @@ -269,6 +274,11 @@ public void testForceKill() RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId); uam.forceKillApplication(); + while (uam.isHeartbeatThreadAlive()) { + LOG.info("waiting for heartbeat thread to finish"); + Thread.sleep(100); + } + try { uam.forceKillApplication(); Assert.fail("Should fail because application is already killed"); @@ -276,6 +286,19 @@ public void testForceKill() } } + @Test(timeout = 10000) + public void testShutDownConnections() + throws YarnException, IOException, InterruptedException { + launchUAM(attemptId); + registerApplicationMaster( + RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId); + uam.shutDownConnections(); + while (uam.isHeartbeatThreadAlive()) { + LOG.info("waiting for heartbeat thread to finish"); + Thread.sleep(100); + } + } + protected UserGroupInformation getUGIWithToken( ApplicationAttemptId appAttemptId) { UserGroupInformation ugi = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java index c47887120c..ae9f78df2a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java @@ -716,12 +716,7 @@ public FinishApplicationMasterResponseInfo call() throws Exception { uamPool.finishApplicationMaster(subClusterId, finishRequest); if (uamResponse.getIsUnregistered()) { - AMRMClientRelayer relayer = - secondaryRelayers.remove(subClusterId); - if(relayer != null) { - relayer.shutdown(); - } - + secondaryRelayers.remove(subClusterId); if (getNMStateStore() != null) { getNMStateStore().removeAMRMProxyAppContextEntry(attemptId, NMSS_SECONDARY_SC_PREFIX + subClusterId); @@ -801,8 +796,16 @@ public void setNextInterceptor(RequestInterceptor next) { */ @Override public void shutdown() { + LOG.info("Shutting down FederationInterceptor for {}", this.attemptId); + // Do not stop uamPool service and kill UAMs here because of possible second // app attempt + try { + this.uamPool.shutDownConnections(); + } catch (YarnException e) { + LOG.error("Error shutting down all UAM clients without killing them", e); + } + if (this.threadpool != null) { try { this.threadpool.shutdown(); @@ -814,9 +817,6 @@ public void shutdown() { // Stop the home heartbeat thread this.homeHeartbeartHandler.shutdown(); this.homeRMRelayer.shutdown(); - for (AMRMClientRelayer relayer : this.secondaryRelayers.values()) { - relayer.shutdown(); - } super.shutdown(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java index 1eefbd59a6..b269fa4567 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java @@ -206,7 +206,6 @@ public void testFinishOneApplicationMasterWithFailure() throws Exception { finishApplicationMaster(testAppId, FinalApplicationStatus.FAILED); Assert.assertNotNull(finshResponse); - Assert.assertEquals(false, finshResponse.getIsUnregistered()); try { // Try to finish an application master that is already finished.