diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java index 46414793e7..85a14f71bd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java @@ -85,6 +85,7 @@ import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; import org.apache.hadoop.yarn.server.uam.UnmanagedAMPoolManager; @@ -1475,6 +1476,7 @@ protected void mergeAllocateResponse(AllocateResponse homeResponse, private void cacheAllocatedContainers(List containers, SubClusterId subClusterId) { for (Container container : containers) { + SubClusterId chooseSubClusterId = SubClusterId.newInstance(subClusterId.toString()); LOG.debug("Adding container {}", container); if (this.containerIdToSubClusterIdMap.containsKey(container.getId())) { @@ -1497,22 +1499,53 @@ private void cacheAllocatedContainers(List containers, + " from same sub-cluster: {}, so ignoring.", container.getId(), subClusterId); } else { + + LOG.info("Duplicate containerID found in the allocated containers. " + + "try to re-pick the sub-cluster."); + // The same container allocation from different sub-clusters, // something is wrong. - // TODO: YARN-6667 if some subcluster RM is configured wrong, we - // should not fail the entire heartbeat. - throw new YarnRuntimeException( - "Duplicate containerID found in the allocated containers. This" - + " can happen if the RM epoch is not configured properly." - + " ContainerId: " + container.getId().toString() - + " ApplicationId: " + this.attemptId + " From RM: " - + subClusterId - + " . Previous container was from sub-cluster: " - + existingSubClusterId); + try { + + boolean existAllocatedScHealth = isSCHealth(existingSubClusterId); + boolean newAllocatedScHealth = isSCHealth(subClusterId); + + if (existAllocatedScHealth) { + // If the previous RM which allocated Container is normal, + // the previous RM will be used first + LOG.info("Use Previous Allocated Container's subCluster. " + + "ContainerId: {} ApplicationId: {} From RM: {}.", this.attemptId, + container.getId(), existingSubClusterId); + chooseSubClusterId = existingSubClusterId; + } else if (newAllocatedScHealth) { + // If the previous RM which allocated Container is abnormal, + // but the RM of the newly allocated Container is normal, use the new RM + LOG.info("Use Newly Allocated Container's subCluster. " + + "ApplicationId: {} ContainerId: {} From RM: {}.", this.attemptId, + container.getId(), subClusterId); + chooseSubClusterId = subClusterId; + } else { + // There is a very small probability that an exception will be thrown. + // The RM of the previously allocated Container + // and the RM of the newly allocated Container are not normal. + throw new YarnRuntimeException( + " Can't use any subCluster because an exception occurred" + + " ContainerId: " + container.getId() + " ApplicationId: " + this.attemptId + + " From RM: " + subClusterId + ". " + + " Previous Container was From subCluster: " + existingSubClusterId); + } + } catch (Exception ex) { + // An exception occurred + throw new YarnRuntimeException( + " Can't use any subCluster because an exception occurred" + + " ContainerId: " + container.getId() + " ApplicationId: " + this.attemptId + + " From RM: " + subClusterId + ". " + + " Previous Container was From subCluster: " + existingSubClusterId, ex); + } } } - this.containerIdToSubClusterIdMap.put(container.getId(), subClusterId); + this.containerIdToSubClusterIdMap.put(container.getId(), chooseSubClusterId); } } @@ -1761,4 +1794,25 @@ public static boolean isNullOrEmpty(Collection c) { public static boolean isNullOrEmpty(Map c) { return (c == null || c.size() == 0); } + + @VisibleForTesting + protected void cacheAllocatedContainersForSubClusterId( + List containers, SubClusterId subClusterId) { + cacheAllocatedContainers(containers, subClusterId); + } + + @VisibleForTesting + protected Map getContainerIdToSubClusterIdMap() { + return containerIdToSubClusterIdMap; + } + + private boolean isSCHealth(SubClusterId subClusterId) throws YarnException { + Set timeOutScs = getTimedOutSCs(true); + SubClusterInfo subClusterInfo = federationFacade.getSubCluster(subClusterId); + if (timeOutScs.contains(subClusterId) || + subClusterInfo == null || subClusterInfo.getState().isUnusable()) { + return false; + } + return true; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java index f81eb69a4d..59bc467414 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java @@ -36,6 +36,7 @@ import org.apache.hadoop.registry.client.api.RegistryOperations; import org.apache.hadoop.registry.client.impl.FSRegistryOperationsService; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.test.LambdaTestUtils; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; @@ -61,6 +62,7 @@ import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.MockResourceManagerFacade; import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager; import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore; @@ -970,6 +972,101 @@ private PreemptionMessage createDummyPreemptionMessage( return preemptionMessage; } + @Test + public void testSameContainerFromDiffRM() throws IOException, InterruptedException { + + UserGroupInformation ugi = + interceptor.getUGIWithToken(interceptor.getAttemptId()); + + ugi.doAs((PrivilegedExceptionAction) () -> { + + // Register the application + RegisterApplicationMasterRequest registerReq = + Records.newRecord(RegisterApplicationMasterRequest.class); + registerReq.setHost(Integer.toString(testAppId)); + registerReq.setRpcPort(0); + registerReq.setTrackingUrl(""); + + RegisterApplicationMasterResponse registerResponse = + interceptor.registerApplicationMaster(registerReq); + Assert.assertNotNull(registerResponse); + lastResponseId = 0; + + Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize()); + + // Allocate the first batch of containers, with sc1 active + SubClusterId subClusterId1 = SubClusterId.newInstance("SC-1"); + registerSubCluster(subClusterId1); + + int numberOfContainers = 3; + List containers = + getContainersAndAssert(numberOfContainers, numberOfContainers); + Assert.assertNotNull(containers); + Assert.assertEquals(3, containers.size()); + + // with sc2 active + SubClusterId subClusterId2 = SubClusterId.newInstance("SC-2"); + registerSubCluster(subClusterId2); + + // 1.Container has been registered to SubCluster1, try to register the same Container + // to SubCluster2. + // Because SubCluster1 is in normal state at this time, + // So the SubCluster corresponding to Container should be SubCluster1 + interceptor.cacheAllocatedContainersForSubClusterId(containers, subClusterId2); + Map cIdToSCMap = interceptor.getContainerIdToSubClusterIdMap(); + for (SubClusterId subClusterId : cIdToSCMap.values()) { + Assert.assertNotNull(subClusterId); + Assert.assertEquals(subClusterId1, subClusterId); + } + + // 2.Deregister SubCluster1, Register the same Containers to SubCluster2 + // So the SubCluster corresponding to Container should be SubCluster2 + deRegisterSubCluster(subClusterId1); + interceptor.cacheAllocatedContainersForSubClusterId(containers, subClusterId2); + Map cIdToSCMap2 = interceptor.getContainerIdToSubClusterIdMap(); + for (SubClusterId subClusterId : cIdToSCMap2.values()) { + Assert.assertNotNull(subClusterId); + Assert.assertEquals(subClusterId2, subClusterId); + } + + // 3.Deregister subClusterId2, Register the same Containers to SubCluster1 + // Because both SubCluster1 and SubCluster2 are abnormal at this time, + // an exception will be thrown when registering the first Container. + deRegisterSubCluster(subClusterId2); + Container container1 = containers.get(0); + Assert.assertNotNull(container1); + String errMsg = + " Can't use any subCluster because an exception occurred" + + " ContainerId: " + container1.getId() + + " ApplicationId: " + interceptor.getAttemptId() + + " From RM: " + subClusterId1 + ". " + + " Previous Container was From subCluster: " + subClusterId2; + + LambdaTestUtils.intercept(YarnRuntimeException.class, errMsg, + () -> interceptor.cacheAllocatedContainersForSubClusterId(containers, subClusterId1)); + + // 4. register SubCluster1, re-register the Container, + // and try to finish application + registerSubCluster(subClusterId1); + interceptor.cacheAllocatedContainersForSubClusterId(containers, subClusterId1); + releaseContainersAndAssert(containers); + + // Finish the application + FinishApplicationMasterRequest finishReq = + Records.newRecord(FinishApplicationMasterRequest.class); + finishReq.setDiagnostics(""); + finishReq.setTrackingUrl(""); + finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED); + + FinishApplicationMasterResponse finishResponse = + interceptor.finishApplicationMaster(finishReq); + Assert.assertNotNull(finishResponse); + Assert.assertTrue(finishResponse.getIsUnregistered()); + + return null; + }); + } + @Test public void testBatchFinishApplicationMaster() throws IOException, InterruptedException {