diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java index 1b72e4065d..78ad48d7f2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java @@ -522,6 +522,15 @@ public class UnmanagedAMPoolManager extends AbstractService { }; } + public void unAttachUAM(String uamId) { + if (this.unmanagedAppMasterMap.containsKey(uamId)) { + UnmanagedApplicationManager appManager = this.unmanagedAppMasterMap.get(uamId); + appManager.shutDownConnections(); + } + this.unmanagedAppMasterMap.remove(uamId); + this.appIdMap.remove(uamId); + } + @VisibleForTesting protected Map getUnmanagedAppMasterMap() { return unmanagedAppMasterMap; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java index 93ffd2aeed..40ddbf6b56 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.security.UserGroupInformation; @@ -954,4 +955,9 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, throws YarnException, IOException { return null; } + + @VisibleForTesting + public HashMap> getApplicationContainerIdMap() { + return applicationContainerIdMap; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java index cfec69847e..d3dff95abc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java @@ -35,6 +35,7 @@ import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; @@ -200,8 +201,6 @@ public class FederationInterceptor extends AbstractRequestInterceptor { * that all the {@link AMRMClientRelayer} will be re-populated with all * pending requests. * - * TODO: When split-merge is not idempotent, this can lead to some - * over-allocation without a full cancel to RM. */ private volatile boolean justRecovered; @@ -357,104 +356,103 @@ public class FederationInterceptor extends AbstractRequestInterceptor { @Override public void recover(Map recoveredDataMap) { super.recover(recoveredDataMap); - LOG.info("Recovering data for FederationInterceptor for {}", - this.attemptId); + LOG.info("Recovering data for FederationInterceptor for {}.", this.attemptId); this.justRecovered = true; - if (recoveredDataMap == null) { + if (recoveredDataMap == null || recoveredDataMap.isEmpty()) { + LOG.warn("recoveredDataMap isNull Or isEmpty, FederationInterceptor can't recover."); return; } - try { - if (recoveredDataMap.containsKey(NMSS_REG_REQUEST_KEY)) { - RegisterApplicationMasterRequestProto pb = - RegisterApplicationMasterRequestProto - .parseFrom(recoveredDataMap.get(NMSS_REG_REQUEST_KEY)); - this.amRegistrationRequest = - new RegisterApplicationMasterRequestPBImpl(pb); - LOG.info("amRegistrationRequest recovered for {}", this.attemptId); + if (!recoveredDataMap.containsKey(NMSS_REG_REQUEST_KEY)) { + return; + } + + try { + + if (recoveredDataMap.containsKey(NMSS_REG_REQUEST_KEY)) { + byte[] appMasterRequestBytes = recoveredDataMap.get(NMSS_REG_REQUEST_KEY); + RegisterApplicationMasterRequestProto pb = + RegisterApplicationMasterRequestProto.parseFrom(appMasterRequestBytes); + this.amRegistrationRequest = new RegisterApplicationMasterRequestPBImpl(pb); + LOG.info("amRegistrationRequest recovered for {}.", this.attemptId); // Give the register request to homeRMRelayer for future re-registration this.homeRMRelayer.setAMRegistrationRequest(this.amRegistrationRequest); } + if (recoveredDataMap.containsKey(NMSS_REG_RESPONSE_KEY)) { + byte[] appMasterResponseBytes = recoveredDataMap.get(NMSS_REG_RESPONSE_KEY); RegisterApplicationMasterResponseProto pb = - RegisterApplicationMasterResponseProto - .parseFrom(recoveredDataMap.get(NMSS_REG_RESPONSE_KEY)); - this.amRegistrationResponse = - new RegisterApplicationMasterResponsePBImpl(pb); - LOG.info("amRegistrationResponse recovered for {}", this.attemptId); + RegisterApplicationMasterResponseProto.parseFrom(appMasterResponseBytes); + this.amRegistrationResponse = new RegisterApplicationMasterResponsePBImpl(pb); + LOG.info("amRegistrationResponse recovered for {}.", this.attemptId); } // Recover UAM amrmTokens from registry or NMSS - Map> uamMap; - if (this.registryClient != null) { - uamMap = this.registryClient - .loadStateFromRegistry(this.attemptId.getApplicationId()); - LOG.info("Found {} existing UAMs for application {} in Yarn Registry", - uamMap.size(), this.attemptId.getApplicationId()); - } else { - uamMap = new HashMap<>(); - for (Entry entry : recoveredDataMap.entrySet()) { - if (entry.getKey().startsWith(NMSS_SECONDARY_SC_PREFIX)) { - // entry for subClusterId -> UAM amrmToken - String scId = - entry.getKey().substring(NMSS_SECONDARY_SC_PREFIX.length()); - Token amrmToken = new Token<>(); - amrmToken.decodeFromUrlString( - new String(entry.getValue(), STRING_TO_BYTE_FORMAT)); - uamMap.put(scId, amrmToken); - LOG.debug("Recovered UAM in {} from NMSS", scId); - } - } - LOG.info("Found {} existing UAMs for application {} in NMStateStore", - uamMap.size(), this.attemptId.getApplicationId()); - } + Map> uamMap = + recoverSubClusterAMRMTokenIdentifierMap(recoveredDataMap); // Re-attach the UAMs int containers = 0; - for (Map.Entry> entry : uamMap - .entrySet()) { - SubClusterId subClusterId = SubClusterId.newInstance(entry.getKey()); + AMRMProxyApplicationContext applicationContext = getApplicationContext(); + ApplicationId applicationId = this.attemptId.getApplicationId(); + String queue = this.amRegistrationResponse.getQueue(); + String homeSCId = this.homeSubClusterId.getId(); + String user = applicationContext.getUser(); - // Create a config loaded with federation on and subclusterId + for (Map.Entry> entry : uamMap.entrySet()) { + String keyScId = entry.getKey(); + Token tokens = entry.getValue(); + SubClusterId subClusterId = SubClusterId.newInstance(keyScId); + + // Create a config loaded with federation on and subClusterId // for each UAM YarnConfiguration config = new YarnConfiguration(getConf()); - FederationProxyProviderUtil.updateConfForFederation(config, - subClusterId.getId()); + FederationProxyProviderUtil.updateConfForFederation(config, keyScId); try { - this.uamPool.reAttachUAM(subClusterId.getId(), config, - this.attemptId.getApplicationId(), - this.amRegistrationResponse.getQueue(), - getApplicationContext().getUser(), this.homeSubClusterId.getId(), - entry.getValue(), subClusterId.toString()); + // ReAttachUAM + this.uamPool.reAttachUAM(keyScId, config, applicationId, queue, user, homeSCId, + tokens, keyScId); - this.secondaryRelayers.put(subClusterId.getId(), - this.uamPool.getAMRMClientRelayer(subClusterId.getId())); + // GetAMRMClientRelayer + this.secondaryRelayers.put(keyScId, this.uamPool.getAMRMClientRelayer(keyScId)); + // RegisterApplicationMaster RegisterApplicationMasterResponse response = - this.uamPool.registerApplicationMaster(subClusterId.getId(), - this.amRegistrationRequest); + this.uamPool.registerApplicationMaster(keyScId, this.amRegistrationRequest); // Set sub-cluster to be timed out initially - lastSCResponseTime.put(subClusterId, - clock.getTime() - subClusterTimeOut); + lastSCResponseTime.put(subClusterId, clock.getTime() - subClusterTimeOut); // Running containers from secondary RMs - for (Container container : response - .getContainersFromPreviousAttempts()) { - containerIdToSubClusterIdMap.put(container.getId(), subClusterId); + List previousAttempts = response.getContainersFromPreviousAttempts(); + for (Container container : previousAttempts) { + ContainerId containerId = container.getId(); + containerIdToSubClusterIdMap.put(containerId, subClusterId); containers++; - LOG.debug(" From subcluster {} running container {}", - subClusterId, container.getId()); + LOG.info("From subCluster {} running container {}", subClusterId, containerId); } - LOG.info("Recovered {} running containers from UAM in {}", - response.getContainersFromPreviousAttempts().size(), - subClusterId); + + LOG.info("Recovered {} running containers from UAM in {}.", + previousAttempts.size(), subClusterId); } catch (Exception e) { - LOG.error("Error reattaching UAM to " + subClusterId + " for " - + this.attemptId, e); + LOG.error("Error reattaching UAM to {} for {}.", subClusterId, this.attemptId, e); + // During recovery, we need to clean up the data of the bad SubCluster. + // This ensures that when the bad SubCluster is recovered, + // new Containers can still be allocated and new UAMs can be registered. + this.uamPool.unAttachUAM(keyScId); + this.secondaryRelayers.remove(keyScId); + this.lastSCResponseTime.remove(subClusterId); + List containerIds = + containerIdToSubClusterIdMap.entrySet().stream() + .filter(item-> item.getValue().equals(subClusterId)) + .map(Entry::getKey) + .collect(Collectors.toList()); + for (ContainerId containerId : containerIds) { + containerIdToSubClusterIdMap.remove(containerId); + } } } @@ -463,42 +461,91 @@ public class FederationInterceptor extends AbstractRequestInterceptor { // map as well. UserGroupInformation appSubmitter; if (UserGroupInformation.isSecurityEnabled()) { - appSubmitter = UserGroupInformation.createProxyUser(getApplicationContext().getUser(), + appSubmitter = UserGroupInformation.createProxyUser(user, UserGroupInformation.getLoginUser()); } else { - appSubmitter = UserGroupInformation.createRemoteUser(getApplicationContext().getUser()); + appSubmitter = UserGroupInformation.createRemoteUser(user); } - ApplicationClientProtocol rmClient = - createHomeRMProxy(getApplicationContext(), - ApplicationClientProtocol.class, appSubmitter); - GetContainersResponse response = rmClient - .getContainers(GetContainersRequest.newInstance(this.attemptId)); + ApplicationClientProtocol rmClient = createHomeRMProxy(applicationContext, + ApplicationClientProtocol.class, appSubmitter); + + GetContainersRequest request = GetContainersRequest.newInstance(this.attemptId); + GetContainersResponse response = rmClient.getContainers(request); + for (ContainerReport container : response.getContainerList()) { - containerIdToSubClusterIdMap.put(container.getContainerId(), - this.homeSubClusterId); + ContainerId containerId = container.getContainerId(); + containerIdToSubClusterIdMap.put(containerId, this.homeSubClusterId); containers++; - LOG.debug(" From home RM {} running container {}", - this.homeSubClusterId, container.getContainerId()); + LOG.debug("From home RM {} running container {}.", this.homeSubClusterId, containerId); } - LOG.info("{} running containers including AM recovered from home RM {}", + LOG.info("{} running containers including AM recovered from home RM {}.", response.getContainerList().size(), this.homeSubClusterId); - LOG.info( - "In all {} UAMs {} running containers including AM recovered for {}", + LOG.info("In all {} UAMs {} running containers including AM recovered for {}.", uamMap.size(), containers, this.attemptId); - if (this.amRegistrationResponse != null) { + if (queue != null) { // Initialize the AMRMProxyPolicy - String queue = this.amRegistrationResponse.getQueue(); - this.policyInterpreter = - FederationPolicyUtils.loadAMRMPolicy(queue, this.policyInterpreter, - getConf(), this.federationFacade, this.homeSubClusterId); + queue = this.amRegistrationResponse.getQueue(); + this.policyInterpreter = FederationPolicyUtils.loadAMRMPolicy(queue, this.policyInterpreter, + getConf(), this.federationFacade, this.homeSubClusterId); } } catch (IOException | YarnException e) { throw new YarnRuntimeException(e); } + } + /** + * recover SubClusterAMRMTokenIdentifierMap. + * + * If registryClient is not empty, restore directly from registryClient, + * otherwise restore from NMSS. + * + * @param recoveredDataMap recoveredDataMap. + * @return subClusterAMRMTokenIdentifierMap. + * @throws IOException IO Exception occurs. + */ + private Map> recoverSubClusterAMRMTokenIdentifierMap( + Map recoveredDataMap) throws IOException { + Map> uamMap; + ApplicationId applicationId = this.attemptId.getApplicationId(); + if (this.registryClient != null) { + uamMap = this.registryClient.loadStateFromRegistry(applicationId); + LOG.info("Found {} existing UAMs for application {} in Yarn Registry.", + uamMap.size(), applicationId); + } else { + uamMap = recoverSubClusterAMRMTokenIdentifierMapFromNMSS(recoveredDataMap); + LOG.info("Found {} existing UAMs for application {} in NMStateStore.", + uamMap.size(), applicationId); + } + return uamMap; + } + + /** + * recover SubClusterAMRMTokenIdentifierMap From NMSS. + * + * @param recoveredDataMap recoveredDataMap + * @return subClusterAMRMTokenIdentifierMap. + * @throws IOException IO Exception occurs. + */ + private Map> recoverSubClusterAMRMTokenIdentifierMapFromNMSS( + Map recoveredDataMap) throws IOException { + Map> uamMap = new HashMap<>(); + for (Entry entry : recoveredDataMap.entrySet()) { + String key = entry.getKey(); + byte[] value = entry.getValue(); + + if (key.startsWith(NMSS_SECONDARY_SC_PREFIX)) { + // entry for subClusterId -> UAM AMRMTokenIdentifier + String scId = key.substring(NMSS_SECONDARY_SC_PREFIX.length()); + Token aMRMTokenIdentifier = new Token<>(); + aMRMTokenIdentifier.decodeFromUrlString(new String(value, STRING_TO_BYTE_FORMAT)); + uamMap.put(scId, aMRMTokenIdentifier); + LOG.debug("Recovered UAM in {} from NMSS.", scId); + } + } + return uamMap; } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java index 77c68bebb1..51c1c5a096 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java @@ -26,12 +26,14 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.HashMap; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.stream.Collectors; import org.apache.hadoop.registry.client.api.RegistryOperations; import org.apache.hadoop.registry.client.impl.FSRegistryOperationsService; @@ -78,6 +80,7 @@ import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; +import org.apache.hadoop.yarn.server.uam.UnmanagedAMPoolManager; import org.apache.hadoop.yarn.util.Records; import org.junit.Assert; import org.junit.Test; @@ -517,6 +520,16 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest { testRecover(null); } + @Test + public void testRecoverBadSCWithAMRMProxyHA() throws Exception { + testRecoverWithBadSubCluster(registry); + } + + @Test + public void testRecoverBadSCWithoutAMRMProxyHA() throws Exception { + testRecoverWithBadSubCluster(null); + } + protected void testRecover(final RegistryOperations registryObj) throws Exception { UserGroupInformation ugi = @@ -1242,4 +1255,181 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest { return null; }); } + + public void testRecoverWithBadSubCluster(final RegistryOperations registryObj) + throws IOException, InterruptedException { + + UserGroupInformation ugi = + interceptor.getUGIWithToken(interceptor.getAttemptId()); + + // Prepare a list of subclusters + List subClusterIds = new ArrayList<>(); + SubClusterId sc1 = SubClusterId.newInstance("SC-1"); + SubClusterId sc2 = SubClusterId.newInstance("SC-2"); + SubClusterId homeSC = SubClusterId.newInstance(HOME_SC_ID); + subClusterIds.add(sc1); + subClusterIds.add(sc2); + subClusterIds.add(homeSC); + + // Prepare AMRMProxy Context + AMRMProxyApplicationContext appContext = new AMRMProxyApplicationContextImpl(nmContext, + getConf(), attemptId, "test-user", null, null, null, registryObj); + + // Prepare RegisterApplicationMasterRequest + RegisterApplicationMasterRequest registerReq = + Records.newRecord(RegisterApplicationMasterRequest.class); + registerReq.setHost(Integer.toString(testAppId)); + registerReq.setRpcPort(testAppId); + registerReq.setTrackingUrl(""); + + ugi.doAs((PrivilegedExceptionAction) () -> { + + // Step1. Prepare subClusters SC-1, SC-2, HomeSC and Interceptor + initSubClusterAndInterceptor(subClusterIds, registryObj); + + // Step2. Register Application And Assign Containers + List containers = registerApplicationAndAssignContainers(registerReq); + + // Step3. Offline SC-1 cluster + offlineSubClusterSC1(sc1); + + // Step4. Recover ApplicationMaster + recoverApplicationMaster(appContext); + + // Step5. We recovered ApplicationMaster. + // SC-1 was offline, SC-2 was recovered at this time, UnmanagedAMPool.size=1 and only SC-2 + UnmanagedAMPoolManager unmanagedAMPoolManager = interceptor.getUnmanagedAMPool(); + Set allUAMIds = unmanagedAMPoolManager.getAllUAMIds(); + Assert.assertNotNull(allUAMIds); + Assert.assertEquals(1, allUAMIds.size()); + Assert.assertTrue(allUAMIds.contains(sc2.getId())); + + // Step6. The first allocate call expects a fail-over exception and re-register. + AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class); + allocateRequest.setResponseId(0); + LambdaTestUtils.intercept(ApplicationMasterNotRegisteredException.class, + "AMRMProxy just restarted and recovered for " + this.attemptId + + ". AM should re-register and full re-send pending requests.", + () -> interceptor.allocate(allocateRequest)); + interceptor.registerApplicationMaster(registerReq); + + // Step7. release Containers + releaseContainers(containers, sc1); + + // Step8. finish application + finishApplication(); + + return null; + }); + } + + private void initSubClusterAndInterceptor(List subClusterIds, + RegistryOperations registryObj) throws YarnException { + // Prepare subClusters SC-1, SC-2, HomeSC + for (SubClusterId subClusterId : subClusterIds) { + registerSubCluster(subClusterId); + } + + // Prepare Interceptor + interceptor = new TestableFederationInterceptor(); + AMRMProxyApplicationContext appContext = new AMRMProxyApplicationContextImpl(nmContext, + getConf(), attemptId, "test-user", null, null, null, registryObj); + interceptor.init(appContext); + interceptor.cleanupRegistry(); + } + + private List registerApplicationAndAssignContainers( + RegisterApplicationMasterRequest registerReq) throws Exception { + + // Register HomeSC + RegisterApplicationMasterResponse registerResponse = + interceptor.registerApplicationMaster(registerReq); + Assert.assertNotNull(registerResponse); + + // We only registered HomeSC, so UnmanagedAMPoolSize should be empty + Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize()); + + // We assign 3 Containers to each cluster + int numberOfContainers = 3; + List containers = + getContainersAndAssert(numberOfContainers, numberOfContainers * 3); + + // At this point, UnmanagedAMPoolSize should be equal to 2 and should contain SC-1, SC-2 + Assert.assertEquals(2, interceptor.getUnmanagedAMPoolSize()); + UnmanagedAMPoolManager unmanagedAMPoolManager = interceptor.getUnmanagedAMPool(); + Set allUAMIds = unmanagedAMPoolManager.getAllUAMIds(); + Assert.assertNotNull(allUAMIds); + Assert.assertEquals(2, allUAMIds.size()); + Assert.assertTrue(allUAMIds.contains("SC-1")); + Assert.assertTrue(allUAMIds.contains("SC-2")); + + // Make sure all async hb threads are done + interceptor.drainAllAsyncQueue(true); + + return containers; + } + + private void offlineSubClusterSC1(SubClusterId subClusterId) throws YarnException { + + ConcurrentHashMap secondaries = + interceptor.getSecondaryRMs(); + + // SC-1 out of service + deRegisterSubCluster(subClusterId); + secondaries.get(subClusterId.getId()).setRunningMode(false); + } + + private void recoverApplicationMaster(AMRMProxyApplicationContext appContext) + throws IOException { + // Prepare for Federation Interceptor restart and recover + Map recoveredDataMap = + recoverDataMapForAppAttempt(nmStateStore, attemptId); + + // Preserve the mock RM instances + MockResourceManagerFacade homeRM = interceptor.getHomeRM(); + + // Create a new interceptor instance and recover + interceptor = new TestableFederationInterceptor(homeRM, + interceptor.getSecondaryRMs()); + interceptor.init(appContext); + interceptor.recover(recoveredDataMap); + } + + private void releaseContainers(List containers, SubClusterId subClusterId) + throws Exception { + + ConcurrentHashMap secondaries = + interceptor.getSecondaryRMs(); + lastResponseId = 0; + + // Get the Container list of SC-1 + MockResourceManagerFacade sc1Facade = secondaries.get("SC-1"); + HashMap> appContainerMap = + sc1Facade.getApplicationContainerIdMap(); + Assert.assertNotNull(appContainerMap); + ApplicationId applicationId = attemptId.getApplicationId(); + Assert.assertNotNull(applicationId); + List sc1ContainerList = appContainerMap.get(applicationId); + + // Release all containers, + // Because SC-1 is offline, it is necessary to clean up the Containers allocated by SC-1 + containers = containers.stream() + .filter(container -> !sc1ContainerList.contains(container.getId())) + .collect(Collectors.toList()); + releaseContainersAndAssert(containers); + } + + private void finishApplication() throws IOException, YarnException { + // Finish the application + FinishApplicationMasterRequest finishReq = + Records.newRecord(FinishApplicationMasterRequest.class); + finishReq.setDiagnostics(""); + finishReq.setTrackingUrl(""); + finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED); + + FinishApplicationMasterResponse finishResponse = + interceptor.finishApplicationMaster(finishReq); + Assert.assertNotNull(finishResponse); + Assert.assertTrue(finishResponse.getIsUnregistered()); + } }