From 9de13f879af66a211b098166fd423b6900fc730d Mon Sep 17 00:00:00 2001 From: slfan1989 <55643692+slfan1989@users.noreply.github.com> Date: Tue, 6 Jun 2023 06:54:41 +0800 Subject: [PATCH] YARN-11502. Refactor AMRMProxy#FederationInterceptor#registerApplicationMaster. (#5705) --- .../amrmproxy/FederationInterceptor.java | 89 ++++++++++--------- .../amrmproxy/TestFederationInterceptor.java | 25 +++--- 2 files changed, 58 insertions(+), 56 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java index 9460e70877..ae6765cfb4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java @@ -91,6 +91,7 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.server.uam.UnmanagedAMPoolManager; import org.apache.hadoop.yarn.util.AsyncCallback; import org.apache.hadoop.yarn.util.ConverterUtils; @@ -570,9 +571,12 @@ private Map> recoverSubClusterAMRMTokenIdenti * For the same reason, this method needs to be synchronized. */ @Override - public synchronized RegisterApplicationMasterResponse - registerApplicationMaster(RegisterApplicationMasterRequest request) - throws YarnException, IOException { + public synchronized RegisterApplicationMasterResponse registerApplicationMaster( + RegisterApplicationMasterRequest request) throws YarnException, IOException { + + if (request == null) { + throw new YarnException("RegisterApplicationMasterRequest can't be null!"); + } // Reset the heartbeat responseId to zero upon register synchronized (this.lastAllocateResponseLock) { @@ -590,18 +594,9 @@ private Map> recoverSubClusterAMRMTokenIdenti // Save the registration request. This will be used for registering with // secondary sub-clusters using UAMs, as well as re-register later this.amRegistrationRequest = request; - if (getNMStateStore() != null) { - try { - RegisterApplicationMasterRequestPBImpl pb = - (RegisterApplicationMasterRequestPBImpl) - this.amRegistrationRequest; - getNMStateStore().storeAMRMProxyAppContextEntry(this.attemptId, - NMSS_REG_REQUEST_KEY, pb.getProto().toByteArray()); - } catch (Exception e) { - LOG.error("Error storing AMRMProxy application context entry for " - + this.attemptId, e); - } - } + RegisterApplicationMasterRequestPBImpl requestPB = (RegisterApplicationMasterRequestPBImpl) + this.amRegistrationRequest; + storeAMRMProxyAppContextEntry(NMSS_REG_REQUEST_KEY, requestPB.getProto().toByteArray()); } /* @@ -625,54 +620,64 @@ private Map> recoverSubClusterAMRMTokenIdenti * is running and will breaks the elasticity feature. The registration with * the other sub-cluster RM will be done lazily as needed later. */ - this.amRegistrationResponse = - this.homeRMRelayer.registerApplicationMaster(request); - if (this.amRegistrationResponse - .getContainersFromPreviousAttempts() != null) { - cacheAllocatedContainers( - this.amRegistrationResponse.getContainersFromPreviousAttempts(), - this.homeSubClusterId); + this.amRegistrationResponse = this.homeRMRelayer.registerApplicationMaster(request); + + if (this.amRegistrationResponse == null) { + throw new YarnException("RegisterApplicationMasterResponse can't be null!"); + } + + List containersFromPreviousAttempts = + this.amRegistrationResponse.getContainersFromPreviousAttempts(); + if (containersFromPreviousAttempts != null) { + cacheAllocatedContainers(containersFromPreviousAttempts, this.homeSubClusterId); } ApplicationId appId = this.attemptId.getApplicationId(); reAttachUAMAndMergeRegisterResponse(this.amRegistrationResponse, appId); - if (getNMStateStore() != null) { - try { - RegisterApplicationMasterResponsePBImpl pb = - (RegisterApplicationMasterResponsePBImpl) - this.amRegistrationResponse; - getNMStateStore().storeAMRMProxyAppContextEntry(this.attemptId, - NMSS_REG_RESPONSE_KEY, pb.getProto().toByteArray()); - } catch (Exception e) { - LOG.error("Error storing AMRMProxy application context entry for " - + this.attemptId, e); - } - } + RegisterApplicationMasterResponsePBImpl responsePB = (RegisterApplicationMasterResponsePBImpl) + this.amRegistrationResponse; + storeAMRMProxyAppContextEntry(NMSS_REG_RESPONSE_KEY, responsePB.getProto().toByteArray()); // the queue this application belongs will be used for getting // AMRMProxy policy from state store. String queue = this.amRegistrationResponse.getQueue(); if (queue == null) { - LOG.warn("Received null queue for application " + appId - + " from home subcluster. Will use default queue name " - + YarnConfiguration.DEFAULT_QUEUE_NAME - + " for getting AMRMProxyPolicy"); + LOG.warn("Received null queue for application {} from home subcluster. " + + " Will use default queue name {} for getting AMRMProxyPolicy.", appId, + YarnConfiguration.DEFAULT_QUEUE_NAME); } else { - LOG.info("Application " + appId + " belongs to queue " + queue); + LOG.info("Application {} belongs to queue {}.", appId, queue); } // Initialize the AMRMProxyPolicy try { - this.policyInterpreter = - FederationPolicyUtils.loadAMRMPolicy(queue, this.policyInterpreter, - getConf(), this.federationFacade, this.homeSubClusterId); + this.policyInterpreter = FederationPolicyUtils.loadAMRMPolicy(queue, this.policyInterpreter, + getConf(), this.federationFacade, this.homeSubClusterId); } catch (FederationPolicyInitializationException e) { throw new YarnRuntimeException(e); } return this.amRegistrationResponse; } + /** + * Add a context entry for an application attempt in AMRMProxyService. + * + * @param key key string + * @param data state data + */ + private void storeAMRMProxyAppContextEntry(String key, byte[] data) { + NMStateStoreService nmStateStore = getNMStateStore(); + if (nmStateStore != null) { + try { + nmStateStore.storeAMRMProxyAppContextEntry(this.attemptId, key, data); + } catch (Exception e) { + LOG.error("Error storing AMRMProxy application context entry[{}] for {}.", + key, this.attemptId, e); + } + } + } + /** * Sends the heart beats to the home RM and the secondary sub-cluster RMs that * are being used by the application. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java index 51c1c5a096..8661990ed7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java @@ -195,10 +195,8 @@ private void deRegisterSubCluster(SubClusterId subClusterId) private List getContainersAndAssert(int numberOfResourceRequests, int numberOfAllocationExcepted) throws Exception { AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class); - List containers = - new ArrayList(numberOfResourceRequests); - List askList = - new ArrayList(numberOfResourceRequests); + List containers = new ArrayList<>(numberOfResourceRequests); + List askList = new ArrayList<>(numberOfResourceRequests); for (int id = 0; id < numberOfResourceRequests; id++) { askList.add(createResourceRequest("test-node-" + Integer.toString(id), 6000, 2, id % 5, 1)); @@ -269,8 +267,8 @@ private void releaseContainersAndAssert(List containers) List newlyFinished = getCompletedContainerIds( allocateResponse.getCompletedContainersStatuses()); containersForReleasedContainerIds.addAll(newlyFinished); - LOG.info("Number of containers received in the original request: " - + Integer.toString(newlyFinished.size())); + LOG.info("Number of containers received in the original request: {}", + newlyFinished.size()); // Send max 10 heart beats to receive all the containers. If not, we will // fail the test @@ -290,10 +288,9 @@ private void releaseContainersAndAssert(List containers) newlyFinished = getCompletedContainerIds( allocateResponse.getCompletedContainersStatuses()); containersForReleasedContainerIds.addAll(newlyFinished); - LOG.info("Number of containers received in this request: " - + Integer.toString(newlyFinished.size())); - LOG.info("Total number of containers received: " - + Integer.toString(containersForReleasedContainerIds.size())); + LOG.info("Number of containers received in this request: {}.", newlyFinished.size()); + LOG.info("Total number of containers received: {}.", + containersForReleasedContainerIds.size()); Thread.sleep(10); } @@ -431,7 +428,7 @@ public Object run() throws Exception { FinishApplicationMasterResponse finishResponse = interceptor.finishApplicationMaster(finishReq); Assert.assertNotNull(finishResponse); - Assert.assertEquals(true, finishResponse.getIsUnregistered()); + Assert.assertTrue(finishResponse.getIsUnregistered()); return null; } }); @@ -624,7 +621,7 @@ public Object run() throws Exception { FinishApplicationMasterResponse finishResponse = interceptor.finishApplicationMaster(finishReq); Assert.assertNotNull(finishResponse); - Assert.assertEquals(true, finishResponse.getIsUnregistered()); + Assert.assertTrue(finishResponse.getIsUnregistered()); // After the application succeeds, the registry/NMSS entry should be // cleaned up @@ -805,7 +802,7 @@ public Object run() throws Exception { Assert.assertEquals(0, interceptor.getTimedOutSCs(true).size()); // Generate a duplicate heartbeat from AM, so that it won't really - // trigger an heartbeat to all SC + // trigger a heartbeat to all SC AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class); // Set to lastResponseId - 1 so that it will be considered a duplicate @@ -904,7 +901,7 @@ public Object run() throws Exception { FinishApplicationMasterResponse finishResponse = interceptor.finishApplicationMaster(finishReq); Assert.assertNotNull(finishResponse); - Assert.assertEquals(true, finishResponse.getIsUnregistered()); + Assert.assertTrue(finishResponse.getIsUnregistered()); // After the application succeeds, the registry entry should be deleted if (interceptor.getRegistryClient() != null) {