From c60a900583d6a8d0494980f4bbbf4f95438b741b Mon Sep 17 00:00:00 2001 From: slfan1989 <55643692+slfan1989@users.noreply.github.com> Date: Sun, 28 Aug 2022 01:17:00 +0800 Subject: [PATCH] YARN-11275. [Federation] Add batchFinishApplicationMaster in UAMPoolManager. (#4792) --- .../server/uam/UnmanagedAMPoolManager.java | 51 +++++++++++++ .../amrmproxy/FederationInterceptor.java | 76 ++++--------------- .../amrmproxy/TestFederationInterceptor.java | 53 +++++++++++++ 3 files changed, 118 insertions(+), 62 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java index 47203b257e..d1e86de5da 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java @@ -22,6 +22,8 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.HashMap; +import java.util.Collections; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorCompletionService; @@ -450,4 +452,53 @@ public void drainUAMHeartbeats() { uam.drainHeartbeatThread(); } } + + /** + * Complete FinishApplicationMaster interface calls in batches. + * + * @param request FinishApplicationMasterRequest + * @param appId application Id + * @return Returns the Map map, + * the key is subClusterId, the value is FinishApplicationMasterResponse + */ + public Map batchFinishApplicationMaster( + FinishApplicationMasterRequest request, String appId) { + + Map responseMap = new HashMap<>(); + Set subClusterIds = this.unmanagedAppMasterMap.keySet(); + + if (subClusterIds != null && !subClusterIds.isEmpty()) { + ExecutorCompletionService> finishAppService = + new ExecutorCompletionService<>(this.threadpool); + LOG.info("Sending finish application request to {} sub-cluster RMs", subClusterIds.size()); + + for (final String subClusterId : subClusterIds) { + finishAppService.submit(() -> { + LOG.info("Sending finish application request to RM {}", subClusterId); + try { + FinishApplicationMasterResponse uamResponse = + finishApplicationMaster(subClusterId, request); + return Collections.singletonMap(subClusterId, uamResponse); + } catch (Throwable e) { + LOG.warn("Failed to finish unmanaged application master: " + + " RM address: {} ApplicationId: {}", subClusterId, appId, e); + return Collections.singletonMap(subClusterId, null); + } + }); + } + + for (int i = 0; i < subClusterIds.size(); ++i) { + try { + Future> future = finishAppService.take(); + Map uamResponse = future.get(); + LOG.debug("Received finish application response from RM: {}", uamResponse.keySet()); + responseMap.putAll(uamResponse); + } catch (Throwable e) { + LOG.warn("Failed to finish unmanaged application master: ApplicationId: {}", appId, e); + } + } + } + + return responseMap; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java index 13c062a46d..46414793e7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java @@ -736,50 +736,26 @@ public FinishApplicationMasterResponse finishApplicationMaster( this.finishAMCalled = true; - // TODO: consider adding batchFinishApplicationMaster in UAMPoolManager boolean failedToUnRegister = false; - ExecutorCompletionService compSvc = - null; // Application master is completing operation. Send the finish // application master request to all the registered sub-cluster resource // managers in parallel, wait for the responses and aggregate the results. - Set subClusterIds = this.uamPool.getAllUAMIds(); - if (subClusterIds.size() > 0) { - final FinishApplicationMasterRequest finishRequest = request; - compSvc = - new ExecutorCompletionService( - this.threadpool); + Map responseMap = + this.uamPool.batchFinishApplicationMaster(request, attemptId.toString()); - LOG.info("Sending finish application request to {} sub-cluster RMs", - subClusterIds.size()); - for (final String subClusterId : subClusterIds) { - compSvc.submit(new Callable() { - @Override - public FinishApplicationMasterResponseInfo call() throws Exception { - LOG.info("Sending finish application request to RM {}", - subClusterId); - FinishApplicationMasterResponse uamResponse = null; - try { - uamResponse = - uamPool.finishApplicationMaster(subClusterId, finishRequest); - - if (uamResponse.getIsUnregistered()) { - secondaryRelayers.remove(subClusterId); - if (getNMStateStore() != null) { - getNMStateStore().removeAMRMProxyAppContextEntry(attemptId, - NMSS_SECONDARY_SC_PREFIX + subClusterId); - } - } - } catch (Throwable e) { - LOG.warn("Failed to finish unmanaged application master: " - + "RM address: " + subClusterId + " ApplicationId: " - + attemptId, e); - } - return new FinishApplicationMasterResponseInfo(uamResponse, - subClusterId); - } - }); + for (Map.Entry entry : responseMap.entrySet()) { + String subClusterId = entry.getKey(); + FinishApplicationMasterResponse response = entry.getValue(); + if (response != null && response.getIsUnregistered()) { + secondaryRelayers.remove(subClusterId); + if (getNMStateStore() != null) { + getNMStateStore().removeAMRMProxyAppContextEntry(attemptId, + NMSS_SECONDARY_SC_PREFIX + subClusterId); + } + } else { + // response is null or response.getIsUnregistered() == false + failedToUnRegister = true; } } @@ -792,30 +768,6 @@ public FinishApplicationMasterResponseInfo call() throws Exception { // Stop the home heartbeat thread this.homeHeartbeartHandler.shutdown(); - if (subClusterIds.size() > 0) { - // Wait for other sub-cluster resource managers to return the - // response and merge it with the home response - LOG.info( - "Waiting for finish application response from {} sub-cluster RMs", - subClusterIds.size()); - for (int i = 0; i < subClusterIds.size(); ++i) { - try { - Future future = compSvc.take(); - FinishApplicationMasterResponseInfo uamResponse = future.get(); - LOG.debug("Received finish application response from RM: {}", - uamResponse.getSubClusterId()); - if (uamResponse.getResponse() == null - || !uamResponse.getResponse().getIsUnregistered()) { - failedToUnRegister = true; - } - } catch (Throwable e) { - failedToUnRegister = true; - LOG.warn("Failed to finish unmanaged application master: " - + " ApplicationId: " + this.attemptId, e); - } - } - } - if (failedToUnRegister) { homeResponse.setIsUnregistered(false); } else { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java index 3288382b26..f81eb69a4d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java @@ -969,4 +969,57 @@ private PreemptionMessage createDummyPreemptionMessage( preemptionMessage.setContract(contract); return preemptionMessage; } + + @Test + public void testBatchFinishApplicationMaster() throws IOException, InterruptedException { + + final RegisterApplicationMasterRequest registerReq = + Records.newRecord(RegisterApplicationMasterRequest.class); + registerReq.setHost(Integer.toString(testAppId)); + registerReq.setRpcPort(testAppId); + registerReq.setTrackingUrl(""); + + UserGroupInformation ugi = interceptor.getUGIWithToken(interceptor.getAttemptId()); + + ugi.doAs((PrivilegedExceptionAction) () -> { + + // Register the application + RegisterApplicationMasterRequest registerReq1 = + Records.newRecord(RegisterApplicationMasterRequest.class); + registerReq1.setHost(Integer.toString(testAppId)); + registerReq1.setRpcPort(0); + registerReq1.setTrackingUrl(""); + + // Register ApplicationMaster + RegisterApplicationMasterResponse registerResponse = + interceptor.registerApplicationMaster(registerReq1); + Assert.assertNotNull(registerResponse); + lastResponseId = 0; + + Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize()); + + // Allocate the first batch of containers, with sc1 and sc2 active + registerSubCluster(SubClusterId.newInstance("SC-1")); + registerSubCluster(SubClusterId.newInstance("SC-2")); + + int numberOfContainers = 3; + List containers = + getContainersAndAssert(numberOfContainers, numberOfContainers * 2); + Assert.assertEquals(2, interceptor.getUnmanagedAMPoolSize()); + Assert.assertEquals(numberOfContainers * 2, containers.size()); + + // Finish the application + FinishApplicationMasterRequest finishReq = + Records.newRecord(FinishApplicationMasterRequest.class); + finishReq.setDiagnostics(""); + finishReq.setTrackingUrl(""); + finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED); + + FinishApplicationMasterResponse finishResp = interceptor.finishApplicationMaster(finishReq); + Assert.assertNotNull(finishResp); + Assert.assertTrue(finishResp.getIsUnregistered()); + + return null; + }); + } }