From bc159b5a8791cdb808f0c4a45344b2b87ac5b70b Mon Sep 17 00:00:00 2001 From: slfan1989 <55643692+slfan1989@users.noreply.github.com> Date: Thu, 11 Jan 2024 20:01:59 +0800 Subject: [PATCH] YARN-10125. [Federation] Kill application from client does not kill Unmanaged AM's and containers launched by Unmanaged AM. (#6363) Contributed by Shilun Fan. Reviewed-by: Inigo Goiri Signed-off-by: Shilun Fan --- .../clientrm/FederationClientInterceptor.java | 25 ++++++- .../TestFederationClientInterceptor.java | 65 +++++++++++++++++++ 2 files changed, 87 insertions(+), 3 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java index ab0e1b345e..84278b2f1c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java @@ -664,6 +664,13 @@ public KillApplicationResponse forceKillApplication( try { LOG.info("forceKillApplication {} on SubCluster {}.", applicationId, subClusterId); response = clientRMProxy.forceKillApplication(request); + // If kill home sub-cluster application is successful, + // we will try to kill the same application in other sub-clusters. + if (response != null) { + ClientMethod remoteMethod = new ClientMethod("forceKillApplication", + new Class[]{KillApplicationRequest.class}, new Object[]{request}); + invokeConcurrent(remoteMethod, KillApplicationResponse.class, subClusterId); + } } catch (Exception e) { routerMetrics.incrAppsFailedKilled(); String msg = "Unable to kill the application report."; @@ -834,12 +841,24 @@ public GetClusterMetricsResponse getClusterMetrics( return RouterYarnClientUtils.merge(clusterMetrics); } - Collection invokeConcurrent(ClientMethod request, Class clazz) - throws YarnException { - + Collection invokeConcurrent(ClientMethod request, Class clazz) throws YarnException { // Get Active SubClusters Map subClusterInfo = federationFacade.getSubClusters(true); Collection subClusterIds = subClusterInfo.keySet(); + return invokeConcurrent(request, clazz, subClusterIds); + } + + Collection invokeConcurrent(ClientMethod request, Class clazz, + SubClusterId homeSubclusterId) throws YarnException { + // Get Active SubClusters + Map subClusterInfo = federationFacade.getSubClusters(true); + Collection subClusterIds = subClusterInfo.keySet(); + subClusterIds.remove(homeSubclusterId); + return invokeConcurrent(request, clazz, subClusterIds); + } + + Collection invokeConcurrent(ClientMethod request, Class clazz, + Collection subClusterIds) throws YarnException { List>> callables = new ArrayList<>(); List>> futures = new ArrayList<>(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java index 3c2c7b4d3b..0522326c6b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java @@ -29,15 +29,19 @@ import java.util.Map; import java.util.HashMap; import java.util.Set; +import java.util.HashSet; +import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import java.util.Arrays; import java.util.Collection; import org.apache.hadoop.io.Text; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.LambdaTestUtils; import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.MockApps; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; @@ -132,6 +136,7 @@ import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter; import org.apache.hadoop.yarn.api.records.ReservationRequests; import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; @@ -387,6 +392,66 @@ public void testForceKillApplication() Assert.assertNotNull(responseKill); } + @Test + public void testForceKillApplicationAllSubClusters() + throws IOException, YarnException, InterruptedException, TimeoutException { + + // We will design a unit test. In this unit test, + // we will submit the same application to all sub-clusters. + // Then we use interceptor kill application, + // the application should be cleared from all sub-clusters. + + Set subClusterSet = new HashSet<>(); + for (SubClusterId subCluster : subClusters) { + subClusterSet.add(subCluster); + } + + ApplicationId appId = + ApplicationId.newInstance(System.currentTimeMillis(), 2); + SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); + + // Submit the application we are going to kill later + SubmitApplicationResponse response = interceptor.submitApplication(request); + + Assert.assertNotNull(response); + SubClusterId subClusterId = stateStoreUtil.queryApplicationHomeSC(appId); + Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId)); + + subClusterSet.remove(subClusterId); + + for (SubClusterId subCluster : subClusterSet) { + LOG.info("SubCluster : {}.", subCluster); + ApplicationClientProtocol clientRMProxyForSubCluster = + interceptor.getClientRMProxyForSubCluster(subCluster); + clientRMProxyForSubCluster.submitApplication(request); + } + + KillApplicationRequest requestKill = KillApplicationRequest.newInstance(appId); + GenericTestUtils.waitFor(() -> { + KillApplicationResponse responseKill; + try { + responseKill = interceptor.forceKillApplication(requestKill); + } catch (Exception e) { + throw new RuntimeException(e); + } + return (responseKill.getIsKillCompleted()); + }, 100, 2000); + + for (SubClusterId subCluster : subClusters) { + ApplicationClientProtocol clientRMProxyForSubCluster = + interceptor.getClientRMProxyForSubCluster(subCluster); + GetApplicationReportRequest requestGet = GetApplicationReportRequest.newInstance(appId); + GetApplicationReportResponse responseGet = + clientRMProxyForSubCluster.getApplicationReport(requestGet); + Assert.assertNotNull(responseGet); + ApplicationReport applicationReport = responseGet.getApplicationReport(); + Assert.assertNotNull(applicationReport); + YarnApplicationState yarnApplicationState = applicationReport.getYarnApplicationState(); + Assert.assertNotNull(yarnApplicationState); + Assert.assertEquals(YarnApplicationState.KILLED, yarnApplicationState); + } + } + /** * This test validates the correctness of ForceKillApplication in case of * application does not exist in StateStore.