diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java index 85a14f71bd..bf18561096 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java @@ -66,6 +66,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.StrictPreemptionContract; import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.client.AMRMClientUtils; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; @@ -771,18 +772,25 @@ public FinishApplicationMasterResponse finishApplicationMaster( if (failedToUnRegister) { homeResponse.setIsUnregistered(false); - } else { + } else if (checkRequestFinalApplicationStatusSuccess(request)) { // Clean up UAMs only when the app finishes successfully, so that no more // attempt will be launched. this.uamPool.stop(); - if (this.registryClient != null) { - this.registryClient - .removeAppFromRegistry(this.attemptId.getApplicationId()); - } + removeAppFromRegistry(); } return homeResponse; } + private boolean checkRequestFinalApplicationStatusSuccess( + FinishApplicationMasterRequest request) { + if (request != null && request.getFinalApplicationStatus() != null) { + if (request.getFinalApplicationStatus().equals(FinalApplicationStatus.SUCCEEDED)) { + return true; + } + } + return false; + } + @Override public void setNextInterceptor(RequestInterceptor next) { throw new YarnRuntimeException( @@ -818,9 +826,21 @@ public void shutdown() { this.homeHeartbeartHandler.shutdown(); this.homeRMRelayer.shutdown(); + // Shutdown needs to clean up app + removeAppFromRegistry(); + super.shutdown(); } + private void removeAppFromRegistry() { + if (this.registryClient != null && this.attemptId != null) { + ApplicationId applicationId = this.attemptId.getApplicationId(); + if (applicationId != null) { + this.registryClient.removeAppFromRegistry(applicationId); + } + } + } + /** * Only for unit test cleanup. */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java index 59bc467414..f0659a9d46 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java @@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.UpdateContainerError; import org.apache.hadoop.yarn.api.records.UpdatedContainer; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException; @@ -71,6 +72,7 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; +import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext; @@ -1119,4 +1121,125 @@ public void testBatchFinishApplicationMaster() throws IOException, InterruptedEx return null; }); } + + @Test + public void testRemoveAppFromRegistryApplicationSuccess() + throws IOException, InterruptedException { + + final RegisterApplicationMasterRequest registerReq = + Records.newRecord(RegisterApplicationMasterRequest.class); + registerReq.setHost(Integer.toString(testAppId)); + registerReq.setRpcPort(testAppId); + registerReq.setTrackingUrl(""); + + UserGroupInformation ugi = interceptor.getUGIWithToken(interceptor.getAttemptId()); + + ugi.doAs((PrivilegedExceptionAction) () -> { + + // Register the application + RegisterApplicationMasterRequest registerReq1 = + Records.newRecord(RegisterApplicationMasterRequest.class); + registerReq1.setHost(Integer.toString(testAppId)); + registerReq1.setRpcPort(0); + registerReq1.setTrackingUrl(""); + + // Register ApplicationMaster + RegisterApplicationMasterResponse registerResponse = + interceptor.registerApplicationMaster(registerReq1); + Assert.assertNotNull(registerResponse); + lastResponseId = 0; + + Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize()); + + // Allocate the first batch of containers, with sc1 active + registerSubCluster(SubClusterId.newInstance("SC-1")); + + int numberOfContainers = 3; + List containers = + getContainersAndAssert(numberOfContainers, numberOfContainers); + Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize()); + Assert.assertEquals(numberOfContainers, containers.size()); + + // Finish the application + FinishApplicationMasterRequest finishReq = + Records.newRecord(FinishApplicationMasterRequest.class); + finishReq.setDiagnostics(""); + finishReq.setTrackingUrl(""); + finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED); + + FinishApplicationMasterResponse finishResp = interceptor.finishApplicationMaster(finishReq); + Assert.assertNotNull(finishResp); + Assert.assertTrue(finishResp.getIsUnregistered()); + + FederationRegistryClient client = interceptor.getRegistryClient(); + List applications = client.getAllApplications(); + Assert.assertNotNull(finishResp); + Assert.assertEquals(0, applications.size()); + return null; + }); + } + + @Test + public void testRemoveAppFromRegistryApplicationFailed() + throws IOException, InterruptedException { + + final RegisterApplicationMasterRequest registerReq = + Records.newRecord(RegisterApplicationMasterRequest.class); + + registerReq.setHost(Integer.toString(testAppId)); + registerReq.setRpcPort(testAppId); + registerReq.setTrackingUrl(""); + + UserGroupInformation ugi = interceptor.getUGIWithToken(interceptor.getAttemptId()); + + ugi.doAs((PrivilegedExceptionAction) () -> { + + // Register the application + RegisterApplicationMasterRequest registerReq1 = + Records.newRecord(RegisterApplicationMasterRequest.class); + registerReq1.setHost(Integer.toString(testAppId)); + registerReq1.setRpcPort(0); + registerReq1.setTrackingUrl(""); + + // Register ApplicationMaster + RegisterApplicationMasterResponse registerResponse = + interceptor.registerApplicationMaster(registerReq1); + Assert.assertNotNull(registerResponse); + lastResponseId = 0; + + Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize()); + + // Allocate the first batch of containers, with sc1 active + registerSubCluster(SubClusterId.newInstance("SC-1")); + + int numberOfContainers = 3; + List containers = + getContainersAndAssert(numberOfContainers, numberOfContainers); + Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize()); + Assert.assertEquals(numberOfContainers, containers.size()); + + // Finish the application + FinishApplicationMasterRequest finishReq = + Records.newRecord(FinishApplicationMasterRequest.class); + finishReq.setDiagnostics(""); + finishReq.setTrackingUrl(""); + finishReq.setFinalApplicationStatus(FinalApplicationStatus.FAILED); + + // Check Registry Applications + // At this time, the Application should not be cleaned up because the state is not SUCCESS. + FederationRegistryClient client = interceptor.getRegistryClient(); + List applications = client.getAllApplications(); + Assert.assertNotNull(applications); + Assert.assertEquals(1, applications.size()); + + // interceptor cleanupRegistry + ApplicationId applicationId = interceptor.getAttemptId().getApplicationId(); + client.removeAppFromRegistry(applicationId); + applications = client.getAllApplications(); + Assert.assertNotNull(applications); + Assert.assertEquals(0, applications.size()); + + return null; + }); + } }