diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java index d2596343a5..faffbf602b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java @@ -314,7 +314,7 @@ public GetNewApplicationResponse getNewApplication( // Try calling the getNewApplication method List blacklist = new ArrayList<>(); int activeSubClustersCount = getActiveSubClustersCount(); - int actualRetryNums = Math.min(activeSubClustersCount, numSubmitRetries) + 1; + int actualRetryNums = Math.min(activeSubClustersCount, numSubmitRetries); try { GetNewApplicationResponse response = @@ -470,7 +470,7 @@ public SubmitApplicationResponse submitApplication( // but if the number of Active SubClusters is less than this number at this time, // we should provide a high number of retry according to the number of Active SubClusters. int activeSubClustersCount = getActiveSubClustersCount(); - int actualRetryNums = Math.min(activeSubClustersCount, numSubmitRetries) + 1; + int actualRetryNums = Math.min(activeSubClustersCount, numSubmitRetries); // Try calling the SubmitApplication method SubmitApplicationResponse response = @@ -484,7 +484,7 @@ public SubmitApplicationResponse submitApplication( return response; } - } catch (Exception e){ + } catch (Exception e) { routerMetrics.incrAppsFailedSubmitted(); RouterServerUtil.logAndThrowException(e.getMessage(), e); } @@ -543,7 +543,7 @@ private SubmitApplicationResponse invokeSubmitApplication( ApplicationHomeSubCluster appHomeSubCluster = ApplicationHomeSubCluster.newInstance(applicationId, subClusterId); - if (exists || retryCount == 0) { + if (!exists || retryCount == 0) { addApplicationHomeSubCluster(applicationId, appHomeSubCluster); } else { updateApplicationHomeSubCluster(subClusterId, applicationId, appHomeSubCluster); @@ -563,8 +563,8 @@ private SubmitApplicationResponse invokeSubmitApplication( } catch (Exception e) { RouterAuditLogger.logFailure(user.getShortUserName(), SUBMIT_NEW_APP, UNKNOWN, TARGET_CLIENT_RM_SERVICE, e.getMessage(), applicationId, subClusterId); - LOG.warn("Unable to submitApplication appId {} try #{} on SubCluster {} error = {}.", - applicationId, subClusterId, e); + LOG.warn("Unable to submitApplication appId {} try #{} on SubCluster {}.", + applicationId, retryCount, subClusterId, e); if (subClusterId != null) { blackList.add(subClusterId); } @@ -1948,4 +1948,9 @@ private void updateReservationHomeSubCluster(SubClusterId subClusterId, } } } + + @VisibleForTesting + public void setNumSubmitRetries(int numSubmitRetries) { + this.numSubmitRetries = numSubmitRetries; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java index f52c9acbd4..2d0bc6b350 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java @@ -18,11 +18,14 @@ package org.apache.hadoop.yarn.server.router.clientrm; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.FEDERATION_POLICY_MANAGER; +import static org.hamcrest.CoreMatchers.is; import static org.mockito.Mockito.mock; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.List; import org.apache.hadoop.test.LambdaTestUtils; @@ -48,7 +51,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Assert; +import org.junit.Assume; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,14 +71,22 @@ * It tests the case with SubClusters down and the Router logic of retries. We * have 1 good SubCluster and 2 bad ones for all the tests. */ +@RunWith(Parameterized.class) public class TestFederationClientInterceptorRetry extends BaseRouterClientRMTest { private static final Logger LOG = LoggerFactory.getLogger(TestFederationClientInterceptorRetry.class); + @Parameters + public static Collection getParameters() { + return Arrays.asList(new String[][] {{UniformBroadcastPolicyManager.class.getName()}, + {TestSequentialBroadcastPolicyManager.class.getName()}}); + } + private TestableFederationClientInterceptor interceptor; private MemoryFederationStateStore stateStore; private FederationStateStoreTestUtil stateStoreUtil; + private String routerPolicyManagerName; private String user = "test-user"; @@ -84,6 +99,10 @@ public class TestFederationClientInterceptorRetry private static List scs = new ArrayList<>(); + public TestFederationClientInterceptorRetry(String policyManagerName) { + this.routerPolicyManagerName = policyManagerName; + } + @Override public void setUp() throws IOException { super.setUpConfig(); @@ -150,8 +169,7 @@ protected YarnConfiguration createConfiguration() { mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass + "," + TestableFederationClientInterceptor.class.getName()); - conf.set(YarnConfiguration.FEDERATION_POLICY_MANAGER, - UniformBroadcastPolicyManager.class.getName()); + conf.set(FEDERATION_POLICY_MANAGER, this.routerPolicyManagerName); // Disable StateStoreFacade cache conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0); @@ -283,4 +301,56 @@ public void testSubmitApplicationOneBadOneGood() SubClusterId respSubClusterId = responseHomeSubCluster.getHomeSubCluster(); Assert.assertEquals(good, respSubClusterId); } + + @Test + public void testSubmitApplicationTwoBadOneGood() throws Exception { + + LOG.info("Test submitApplication with two bad, one good SC."); + + // This test must require the TestSequentialRouterPolicy policy + Assume.assumeThat(routerPolicyManagerName, + is(TestSequentialBroadcastPolicyManager.class.getName())); + + setupCluster(Arrays.asList(bad1, bad2, good)); + final ApplicationId appId = + ApplicationId.newInstance(System.currentTimeMillis(), 1); + + // Use the TestSequentialRouterPolicy strategy, + // which will sort the SubClusterId because good=0, bad1=1, bad2=2 + // We will get 2, 1, 0 [bad2, bad1, good] + // Set the retryNum to 1 + // 1st time will use bad2, 2nd time will use bad1 + // bad1 is updated to stateStore + interceptor.setNumSubmitRetries(1); + final SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); + LambdaTestUtils.intercept(YarnException.class, "RM is stopped", + () -> interceptor.submitApplication(request)); + + // We will get bad1 + checkSubmitSubCluster(appId, bad1); + + // Set the retryNum to 2 + // 1st time will use bad2, 2nd time will use bad1, 3rd good + interceptor.setNumSubmitRetries(2); + SubmitApplicationResponse submitAppResponse = interceptor.submitApplication(request); + Assert.assertNotNull(submitAppResponse); + + // We will get good + checkSubmitSubCluster(appId, good); + } + + private void checkSubmitSubCluster(ApplicationId appId, SubClusterId expectSubCluster) + throws YarnException { + GetApplicationHomeSubClusterRequest getAppRequest = + GetApplicationHomeSubClusterRequest.newInstance(appId); + GetApplicationHomeSubClusterResponse getAppResponse = + stateStore.getApplicationHomeSubCluster(getAppRequest); + Assert.assertNotNull(getAppResponse); + Assert.assertNotNull(getAppResponse); + ApplicationHomeSubCluster responseHomeSubCluster = + getAppResponse.getApplicationHomeSubCluster(); + Assert.assertNotNull(responseHomeSubCluster); + SubClusterId respSubClusterId = responseHomeSubCluster.getHomeSubCluster(); + Assert.assertEquals(expectSubCluster, respSubClusterId); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestSequentialBroadcastPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestSequentialBroadcastPolicyManager.java new file mode 100644 index 0000000000..dfa8c7136d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestSequentialBroadcastPolicyManager.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.router.clientrm; + +import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.BroadcastAMRMProxyPolicy; +import org.apache.hadoop.yarn.server.federation.policies.manager.AbstractPolicyManager; + +/** + * This PolicyManager is used for testing and will contain the + * {@link TestSequentialRouterPolicy} policy. + * + * When we test FederationClientInterceptor Retry, + * we hope that SubCluster can return in a certain order, not randomly. + * We can view the policy description by linking to TestSequentialRouterPolicy. + */ +public class TestSequentialBroadcastPolicyManager extends AbstractPolicyManager { + public TestSequentialBroadcastPolicyManager() { + // this structurally hard-codes two compatible policies for Router and + // AMRMProxy. + routerFederationPolicy = TestSequentialRouterPolicy.class; + amrmProxyFederationPolicy = BroadcastAMRMProxyPolicy.class; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestSequentialRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestSequentialRouterPolicy.java new file mode 100644 index 0000000000..e702b764fe --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestSequentialRouterPolicy.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.router.clientrm; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext; +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContextValidator; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; +import org.apache.hadoop.yarn.server.federation.policies.router.AbstractRouterPolicy; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * This is a test strategy, + * the purpose of this strategy is to return subClusters in descending order of subClusterId. + * + * This strategy is to verify the situation of Retry during the use of FederationClientInterceptor. + * The conditions of use are as follows: + * 1.We require subClusterId to be an integer. + * 2.The larger the subCluster, the sooner the representative is selected. + * + * We have 4 subClusters, 2 normal subClusters, 2 bad subClusters. + * We expect to select badSubClusters first and then goodSubClusters during testing. + * We can set the subCluster like this, good1 = [0], good2 = [1], bad1 = [2], bad2 = [3]. + * This strategy will return [3, 2, 1, 0], + * The selection order of subCluster is bad2, bad1, good2, good1. + */ +public class TestSequentialRouterPolicy extends AbstractRouterPolicy { + + @Override + public void reinitialize(FederationPolicyInitializationContext policyContext) + throws FederationPolicyInitializationException { + FederationPolicyInitializationContextValidator.validate(policyContext, + this.getClass().getCanonicalName()); + setPolicyContext(policyContext); + } + + @Override + protected SubClusterId chooseSubCluster(String queue, + Map preSelectSubClusters) throws YarnException { + /** + * This strategy is only suitable for testing. We need to obtain subClusters sequentially. + * We have 3 subClusters, 1 goodSubCluster and 2 badSubClusters. + * The sc-id of goodSubCluster is 0, and the sc-id of badSubCluster is 1 and 2. + * We hope Return in reverse order, that is, return 2, 1, 0 + * Return to badCluster first. + */ + List subClusterIds = new ArrayList<>(preSelectSubClusters.keySet()); + if (subClusterIds.size() > 1) { + subClusterIds.sort((o1, o2) -> Integer.parseInt(o2.getId()) - Integer.parseInt(o1.getId())); + } + if(CollectionUtils.isNotEmpty(subClusterIds)){ + return subClusterIds.get(0); + } + return null; + } +}