From 7cb22eb72d587d4b721c3e072bb2811c767db9ab Mon Sep 17 00:00:00 2001 From: slfan1989 <55643692+slfan1989@users.noreply.github.com> Date: Wed, 23 Nov 2022 06:38:24 +0800 Subject: [PATCH] YARN-11371. [Federation] Refactor FederationInterceptorREST#createNewApplication\submitApplication Use FederationActionRetry. (#5130) --- .../utils/FederationStateStoreFacade.java | 170 ++++++++- .../yarn/server/router/RouterServerUtil.java | 45 --- .../clientrm/FederationClientInterceptor.java | 96 +---- .../webapp/FederationInterceptorREST.java | 332 ++++++++---------- .../TestFederationInterceptorRESTRetry.java | 19 +- 5 files changed, 339 insertions(+), 323 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java index 47cb9e9e35..fc1e442ab9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java @@ -22,8 +22,10 @@ import java.nio.ByteBuffer; import java.util.HashMap; import java.util.List; +import java.util.ArrayList; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.Random; import javax.cache.Cache; import javax.cache.CacheManager; @@ -38,6 +40,8 @@ import javax.cache.integration.CacheLoaderException; import javax.cache.spi.CachingProvider; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; import org.apache.commons.lang3.NotImplementedException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.retry.RetryPolicies; @@ -50,6 +54,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException; import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver; import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreRetriableException; @@ -110,6 +116,8 @@ public final class FederationStateStoreFacade { private static final FederationStateStoreFacade FACADE = new FederationStateStoreFacade(); + private static Random rand = new Random(System.currentTimeMillis()); + private FederationStateStore stateStore; private int cacheTimeToLive; private Configuration conf; @@ -496,6 +504,7 @@ public void deleteReservationHomeSubCluster(ReservationId reservationId) throws * @param defaultValue the default implementation for fallback * @param type the class for which a retry proxy is required * @param retryPolicy the policy for retrying method call failures + * @param The type of the instance * @return a retry proxy for the specified interface */ public static Object createRetryInstance(Configuration conf, @@ -731,7 +740,7 @@ public FederationStateStore getStateStore() { return stateStore; } - /* + /** * The Router Supports Store NewMasterKey (RouterMasterKey{@link RouterMasterKey}). * * @param newKey Key used for generating and verifying delegation tokens @@ -849,4 +858,163 @@ public RouterRMTokenResponse getTokenByRouterStoreToken(RMDelegationTokenIdentif RouterRMTokenRequest request = RouterRMTokenRequest.newInstance(storeToken); return stateStore.getTokenByRouterStoreToken(request); } + + /** + * Get the number of active cluster nodes. + * + * @return number of active cluster nodes. + * @throws YarnException if the call to the state store is unsuccessful. + */ + public int getActiveSubClustersCount() throws YarnException { + Map activeSubClusters = getSubClusters(true); + if (activeSubClusters == null || activeSubClusters.isEmpty()) { + return 0; + } else { + return activeSubClusters.size(); + } + } + + /** + * Randomly pick ActiveSubCluster. + * During the selection process, we will exclude SubClusters from the blacklist. + * + * @param activeSubClusters List of active subClusters. + * @param blackList blacklist. + * @return Active SubClusterId. + * @throws YarnException When there is no Active SubCluster, + * an exception will be thrown (No active SubCluster available to submit the request.) + */ + public static SubClusterId getRandomActiveSubCluster( + Map activeSubClusters, List blackList) + throws YarnException { + + // Check if activeSubClusters is empty, if it is empty, we need to throw an exception + if (MapUtils.isEmpty(activeSubClusters)) { + throw new FederationPolicyException( + FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE); + } + + // Change activeSubClusters to List + List subClusterIds = new ArrayList<>(activeSubClusters.keySet()); + + // If the blacklist is not empty, we need to remove all the subClusters in the blacklist + if (CollectionUtils.isNotEmpty(blackList)) { + subClusterIds.removeAll(blackList); + } + + // Check there are still active subcluster after removing the blacklist + if (CollectionUtils.isEmpty(subClusterIds)) { + throw new FederationPolicyException( + FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE); + } + + // Randomly choose a SubCluster + return subClusterIds.get(rand.nextInt(subClusterIds.size())); + } + + /** + * Get the number of retries. + * + * @param configRetries User-configured number of retries. + * @return number of retries. + * @throws YarnException yarn exception. + */ + public int getRetryNumbers(int configRetries) throws YarnException { + int activeSubClustersCount = getActiveSubClustersCount(); + int actualRetryNums = Math.min(activeSubClustersCount, configRetries); + // Normally, we don't set a negative number for the number of retries, + // but if the user sets a negative number for the number of retries, + // we will return 0 + if (actualRetryNums < 0) { + return 0; + } + return actualRetryNums; + } + + /** + * Query SubClusterId By applicationId. + * + * If SubClusterId is not empty, it means it exists and returns true; + * if SubClusterId is empty, it means it does not exist and returns false. + * + * @param applicationId applicationId + * @return true, SubClusterId exists; false, SubClusterId not exists. + */ + public boolean existsApplicationHomeSubCluster(ApplicationId applicationId) { + try { + SubClusterId subClusterId = getApplicationHomeSubCluster(applicationId); + if (subClusterId != null) { + return true; + } + } catch (YarnException e) { + LOG.warn("get homeSubCluster by applicationId = {} error.", applicationId, e); + } + return false; + } + + /** + * Add ApplicationHomeSubCluster to FederationStateStore. + * + * @param applicationId applicationId. + * @param homeSubCluster homeSubCluster, homeSubCluster selected according to policy. + * @throws YarnException yarn exception. + */ + public void addApplicationHomeSubCluster(ApplicationId applicationId, + ApplicationHomeSubCluster homeSubCluster) throws YarnException { + try { + addApplicationHomeSubCluster(homeSubCluster); + } catch (YarnException e) { + String msg = String.format( + "Unable to insert the ApplicationId %s into the FederationStateStore.", applicationId); + throw new YarnException(msg, e); + } + } + + /** + * Update ApplicationHomeSubCluster to FederationStateStore. + * + * @param subClusterId homeSubClusterId + * @param applicationId applicationId. + * @param homeSubCluster homeSubCluster, homeSubCluster selected according to policy. + * @throws YarnException yarn exception. + */ + public void updateApplicationHomeSubCluster(SubClusterId subClusterId, + ApplicationId applicationId, ApplicationHomeSubCluster homeSubCluster) throws YarnException { + try { + updateApplicationHomeSubCluster(homeSubCluster); + } catch (YarnException e) { + SubClusterId subClusterIdInStateStore = getApplicationHomeSubCluster(applicationId); + if (subClusterId == subClusterIdInStateStore) { + LOG.info("Application {} already submitted on SubCluster {}.", applicationId, subClusterId); + } else { + String msg = String.format( + "Unable to update the ApplicationId %s into the FederationStateStore.", applicationId); + throw new YarnException(msg, e); + } + } + } + + /** + * Add or Update ApplicationHomeSubCluster. + * + * @param applicationId applicationId, is the id of the application. + * @param subClusterId homeSubClusterId, this is selected by strategy. + * @param retryCount number of retries. + * @throws YarnException yarn exception. + */ + public void addOrUpdateApplicationHomeSubCluster(ApplicationId applicationId, + SubClusterId subClusterId, int retryCount) throws YarnException { + Boolean exists = existsApplicationHomeSubCluster(applicationId); + ApplicationHomeSubCluster appHomeSubCluster = + ApplicationHomeSubCluster.newInstance(applicationId, subClusterId); + if (!exists || retryCount == 0) { + // persist the mapping of applicationId and the subClusterId which has + // been selected as its home. + addApplicationHomeSubCluster(applicationId, appHomeSubCluster); + } else { + // update the mapping of applicationId and the home subClusterId to + // the new subClusterId we have selected. + updateApplicationHomeSubCluster(subClusterId, applicationId, appHomeSubCluster); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java index 6dd49daa4e..93818229dd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java @@ -18,8 +18,6 @@ package org.apache.hadoop.yarn.server.router; -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.collections.MapUtils; import org.apache.commons.lang3.math.NumberUtils; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; @@ -30,9 +28,6 @@ import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils; -import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; -import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,8 +36,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; -import java.util.Map; -import java.util.Random; import java.io.IOException; /** @@ -61,8 +54,6 @@ public final class RouterServerUtil { private static final String EPOCH_PREFIX = "e"; - private static Random rand = new Random(System.currentTimeMillis()); - /** Disable constructor. */ private RouterServerUtil() { } @@ -479,42 +470,6 @@ public static void validateContainerId(String containerId) } } - /** - * Randomly pick ActiveSubCluster. - * During the selection process, we will exclude SubClusters from the blacklist. - * - * @param activeSubClusters List of active subClusters. - * @param blackList blacklist. - * @return Active SubClusterId. - * @throws YarnException When there is no Active SubCluster, - * an exception will be thrown (No active SubCluster available to submit the request.) - */ - public static SubClusterId getRandomActiveSubCluster( - Map activeSubClusters, List blackList) - throws YarnException { - - // Check if activeSubClusters is empty, if it is empty, we need to throw an exception - if (MapUtils.isEmpty(activeSubClusters)) { - logAndThrowException(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE, null); - } - - // Change activeSubClusters to List - List subClusterIds = new ArrayList<>(activeSubClusters.keySet()); - - // If the blacklist is not empty, we need to remove all the subClusters in the blacklist - if (CollectionUtils.isNotEmpty(blackList)) { - subClusterIds.removeAll(blackList); - } - - // Check there are still active subcluster after removing the blacklist - if (CollectionUtils.isEmpty(subClusterIds)) { - logAndThrowException(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE, null); - } - - // Randomly choose a SubCluster - return subClusterIds.get(rand.nextInt(subClusterIds.size())); - } - public static UserGroupInformation setupUser(final String userName) { UserGroupInformation user = null; try { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java index 9bfcb04ff8..cf457c7077 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java @@ -272,17 +272,6 @@ private SubClusterId getRandomActiveSubCluster( return list.get(rand.nextInt(list.size())); } - @VisibleForTesting - private int getActiveSubClustersCount() throws YarnException { - Map activeSubClusters = - federationFacade.getSubClusters(true); - if (activeSubClusters == null || activeSubClusters.isEmpty()) { - return 0; - } else { - return activeSubClusters.size(); - } - } - /** * YARN Router forwards every getNewApplication requests to any RM. During * this operation there will be no communication with the State Store. The @@ -318,7 +307,7 @@ public GetNewApplicationResponse getNewApplication( // Try calling the getNewApplication method List blacklist = new ArrayList<>(); - int activeSubClustersCount = getActiveSubClustersCount(); + int activeSubClustersCount = federationFacade.getActiveSubClustersCount(); int actualRetryNums = Math.min(activeSubClustersCount, numSubmitRetries); try { @@ -361,7 +350,7 @@ private GetNewApplicationResponse invokeGetNewApplication( List blackList, GetNewApplicationRequest request, int retryCount) throws YarnException, IOException { SubClusterId subClusterId = - RouterServerUtil.getRandomActiveSubCluster(subClustersActive, blackList); + federationFacade.getRandomActiveSubCluster(subClustersActive, blackList); LOG.info("getNewApplication try #{} on SubCluster {}.", retryCount, subClusterId); ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId); try { @@ -474,7 +463,7 @@ public SubmitApplicationResponse submitApplication( // the user will provide us with an expected submitRetries, // but if the number of Active SubClusters is less than this number at this time, // we should provide a high number of retry according to the number of Active SubClusters. - int activeSubClustersCount = getActiveSubClustersCount(); + int activeSubClustersCount = federationFacade.getActiveSubClustersCount(); int actualRetryNums = Math.min(activeSubClustersCount, numSubmitRetries); // Try calling the SubmitApplication method @@ -542,17 +531,10 @@ private SubmitApplicationResponse invokeSubmitApplication( LOG.info("submitApplication appId {} try #{} on SubCluster {}.", applicationId, retryCount, subClusterId); - // Step2. Query homeSubCluster according to ApplicationId. - Boolean exists = existsApplicationHomeSubCluster(applicationId); - - ApplicationHomeSubCluster appHomeSubCluster = - ApplicationHomeSubCluster.newInstance(applicationId, subClusterId); - - if (!exists || retryCount == 0) { - addApplicationHomeSubCluster(applicationId, appHomeSubCluster); - } else { - updateApplicationHomeSubCluster(subClusterId, applicationId, appHomeSubCluster); - } + // Step2. We Store the mapping relationship + // between Application and HomeSubCluster in stateStore. + federationFacade.addOrUpdateApplicationHomeSubCluster( + applicationId, subClusterId, retryCount); // Step3. SubmitApplication to the subCluster ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId); @@ -581,70 +563,6 @@ private SubmitApplicationResponse invokeSubmitApplication( throw new YarnException(msg); } - /** - * Add ApplicationHomeSubCluster to FederationStateStore. - * - * @param applicationId applicationId. - * @param homeSubCluster homeSubCluster, homeSubCluster selected according to policy. - * @throws YarnException yarn exception. - */ - private void addApplicationHomeSubCluster(ApplicationId applicationId, - ApplicationHomeSubCluster homeSubCluster) throws YarnException { - try { - federationFacade.addApplicationHomeSubCluster(homeSubCluster); - } catch (YarnException e) { - RouterServerUtil.logAndThrowException(e, - "Unable to insert the ApplicationId %s into the FederationStateStore.", applicationId); - } - } - - /** - * Update ApplicationHomeSubCluster to FederationStateStore. - * - * @param subClusterId homeSubClusterId - * @param applicationId applicationId. - * @param homeSubCluster homeSubCluster, homeSubCluster selected according to policy. - * @throws YarnException yarn exception. - */ - private void updateApplicationHomeSubCluster(SubClusterId subClusterId, - ApplicationId applicationId, ApplicationHomeSubCluster homeSubCluster) throws YarnException { - try { - federationFacade.updateApplicationHomeSubCluster(homeSubCluster); - } catch (YarnException e) { - SubClusterId subClusterIdInStateStore = - federationFacade.getApplicationHomeSubCluster(applicationId); - if (subClusterId == subClusterIdInStateStore) { - LOG.info("Application {} already submitted on SubCluster {}.", - applicationId, subClusterId); - } else { - RouterServerUtil.logAndThrowException(e, - "Unable to update the ApplicationId %s into the FederationStateStore.", - applicationId); - } - } - } - - /** - * Query SubClusterId By applicationId. - * - * If SubClusterId is not empty, it means it exists and returns true; - * if SubClusterId is empty, it means it does not exist and returns false. - * - * @param applicationId applicationId - * @return true, SubClusterId exists; false, SubClusterId not exists. - */ - private boolean existsApplicationHomeSubCluster(ApplicationId applicationId) { - try { - SubClusterId subClusterId = federationFacade.getApplicationHomeSubCluster(applicationId); - if (subClusterId != null) { - return true; - } - } catch (YarnException e) { - LOG.warn("get homeSubCluster by applicationId = {} error.", applicationId, e); - } - return false; - } - /** * The YARN Router will forward to the respective YARN RM in which the AM is * running. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java index dc3508e0df..a21be7b4e1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java @@ -27,14 +27,13 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Random; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.stream.Collectors; +import java.util.concurrent.TimeUnit; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequestWrapper; @@ -62,9 +61,10 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils; import org.apache.hadoop.yarn.server.federation.policies.RouterPolicyFacade; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver; -import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; +import org.apache.hadoop.yarn.server.federation.retry.FederationActionRetry; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; @@ -134,13 +134,13 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor { private int numSubmitRetries; private FederationStateStoreFacade federationFacade; - private Random rand; private RouterPolicyFacade policyFacade; private RouterMetrics routerMetrics; private final Clock clock = new MonotonicClock(); private boolean returnPartialReport; private boolean appInfosCacheEnabled; private int appInfosCacheCount; + private long submitIntervalTime; private Map interceptors; private LRUCacheHashMap appInfosCaches; @@ -156,7 +156,6 @@ public void init(String user) { super.init(user); federationFacade = FederationStateStoreFacade.getInstance(); - rand = new Random(); final Configuration conf = this.getConf(); @@ -194,24 +193,10 @@ public void init(String user) { YarnConfiguration.DEFAULT_ROUTER_APPSINFO_CACHED_COUNT); appInfosCaches = new LRUCacheHashMap<>(appInfosCacheCount, true); } - } - private SubClusterId getRandomActiveSubCluster( - Map activeSubclusters, - List blackListSubClusters) throws YarnException { - - if (activeSubclusters == null || activeSubclusters.size() < 1) { - RouterServerUtil.logAndThrowException( - FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE, null); - } - Collection keySet = activeSubclusters.keySet(); - FederationPolicyUtils.validateSubClusterAvailability(keySet, blackListSubClusters); - if (blackListSubClusters != null) { - keySet.removeAll(blackListSubClusters); - } - - List list = keySet.stream().collect(Collectors.toList()); - return list.get(rand.nextInt(list.size())); + submitIntervalTime = conf.getTimeDuration( + YarnConfiguration.ROUTER_CLIENTRM_SUBMIT_INTERVAL_TIME, + YarnConfiguration.DEFAULT_CLIENTRM_SUBMIT_INTERVAL_TIME, TimeUnit.MILLISECONDS); } @VisibleForTesting @@ -301,62 +286,79 @@ public Response createNewApplication(HttpServletRequest hsr) long startTime = clock.getTime(); - Map subClustersActive; try { - subClustersActive = federationFacade.getSubClusters(true); - } catch (YarnException e) { - routerMetrics.incrAppsFailedCreated(); - return Response.status(Status.INTERNAL_SERVER_ERROR) - .entity(e.getLocalizedMessage()).build(); - } + Map subClustersActive = + federationFacade.getSubClusters(true); - List blacklist = new ArrayList<>(); - - for (int i = 0; i < numSubmitRetries; ++i) { - - SubClusterId subClusterId; - try { - subClusterId = getRandomActiveSubCluster(subClustersActive, blacklist); - } catch (YarnException e) { - routerMetrics.incrAppsFailedCreated(); - return Response.status(Status.SERVICE_UNAVAILABLE) - .entity(e.getLocalizedMessage()).build(); - } - - LOG.debug("getNewApplication try #{} on SubCluster {}.", i, subClusterId); - - DefaultRequestInterceptorREST interceptor = - getOrCreateInterceptorForSubCluster(subClusterId, - subClustersActive.get(subClusterId).getRMWebServiceAddress()); - Response response = null; - try { - response = interceptor.createNewApplication(hsr); - } catch (Exception e) { - LOG.warn("Unable to create a new ApplicationId in SubCluster {}.", - subClusterId.getId(), e); - } - - if (response != null && - response.getStatus() == HttpServletResponse.SC_OK) { + // We declare blackList and retries. + List blackList = new ArrayList<>(); + int actualRetryNums = federationFacade.getRetryNumbers(numSubmitRetries); + Response response = ((FederationActionRetry) (retryCount) -> + invokeGetNewApplication(subClustersActive, blackList, hsr, retryCount)). + runWithRetries(actualRetryNums, submitIntervalTime); + // If the response is not empty and the status is SC_OK, + // this request can be returned directly. + if (response != null && response.getStatus() == HttpServletResponse.SC_OK) { long stopTime = clock.getTime(); routerMetrics.succeededAppsCreated(stopTime - startTime); - return response; - } else { - // Empty response from the ResourceManager. - // Blacklist this subcluster for this request. - blacklist.add(subClusterId); } + } catch (FederationPolicyException e) { + // If a FederationPolicyException is thrown, the service is unavailable. + routerMetrics.incrAppsFailedCreated(); + return Response.status(Status.SERVICE_UNAVAILABLE).entity(e.getLocalizedMessage()).build(); + } catch (Exception e) { + routerMetrics.incrAppsFailedCreated(); + return Response.status(Status.INTERNAL_SERVER_ERROR).entity(e.getLocalizedMessage()).build(); } + // return error message directly. String errMsg = "Fail to create a new application."; LOG.error(errMsg); routerMetrics.incrAppsFailedCreated(); - return Response - .status(Status.INTERNAL_SERVER_ERROR) - .entity(errMsg) - .build(); + return Response.status(Status.INTERNAL_SERVER_ERROR).entity(errMsg).build(); + } + + /** + * Invoke GetNewApplication to different subClusters. + * + * @param subClustersActive Active SubClusters. + * @param blackList Blacklist avoid repeated calls to unavailable subCluster. + * @param hsr HttpServletRequest. + * @param retryCount number of retries. + * @return Get response, If the response is empty or status not equal SC_OK, the request fails, + * if the response is not empty and status equal SC_OK, the request is successful. + * @throws YarnException yarn exception. + * @throws IOException io error. + * @throws InterruptedException interrupted exception. + */ + private Response invokeGetNewApplication(Map subClustersActive, + List blackList, HttpServletRequest hsr, int retryCount) + throws YarnException, IOException, InterruptedException { + + SubClusterId subClusterId = + federationFacade.getRandomActiveSubCluster(subClustersActive, blackList); + + LOG.info("getNewApplication try #{} on SubCluster {}.", retryCount, subClusterId); + + DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(subClusterId, + subClustersActive.get(subClusterId).getRMWebServiceAddress()); + + try { + Response response = interceptor.createNewApplication(hsr); + if (response != null && response.getStatus() == HttpServletResponse.SC_OK) { + return response; + } + } catch (Exception e) { + blackList.add(subClusterId); + RouterServerUtil.logAndThrowException(e.getMessage(), e); + } + + // We need to throw the exception directly. + String msg = String.format("Unable to create a new ApplicationId in SubCluster %s.", + subClusterId.getId()); + throw new YarnException(msg); } /** @@ -431,142 +433,106 @@ public Response submitApplication(ApplicationSubmissionContextInfo newApp, long startTime = clock.getTime(); + // We verify the parameters to ensure that newApp is not empty and + // that the format of applicationId is correct. if (newApp == null || newApp.getApplicationId() == null) { routerMetrics.incrAppsFailedSubmitted(); String errMsg = "Missing ApplicationSubmissionContextInfo or " + "applicationSubmissionContext information."; - return Response - .status(Status.BAD_REQUEST) - .entity(errMsg) - .build(); + return Response.status(Status.BAD_REQUEST).entity(errMsg).build(); } - ApplicationId applicationId = null; try { - applicationId = ApplicationId.fromString(newApp.getApplicationId()); + String applicationId = newApp.getApplicationId(); + RouterServerUtil.validateApplicationId(applicationId); } catch (IllegalArgumentException e) { routerMetrics.incrAppsFailedSubmitted(); - return Response - .status(Status.BAD_REQUEST) - .entity(e.getLocalizedMessage()) - .build(); + return Response.status(Status.BAD_REQUEST).entity(e.getLocalizedMessage()).build(); } - List blacklist = new ArrayList<>(); - - for (int i = 0; i < numSubmitRetries; ++i) { - - ApplicationSubmissionContext context = - RMWebAppUtil.createAppSubmissionContext(newApp, this.getConf()); - - SubClusterId subClusterId = null; - try { - subClusterId = policyFacade.getHomeSubcluster(context, blacklist); - } catch (YarnException e) { - routerMetrics.incrAppsFailedSubmitted(); - return Response - .status(Status.SERVICE_UNAVAILABLE) - .entity(e.getLocalizedMessage()) - .build(); - } - LOG.info("submitApplication appId {} try #{} on SubCluster {}.", - applicationId, i, subClusterId); - - ApplicationHomeSubCluster appHomeSubCluster = - ApplicationHomeSubCluster.newInstance(applicationId, subClusterId); - - if (i == 0) { - try { - // persist the mapping of applicationId and the subClusterId which has - // been selected as its home - subClusterId = - federationFacade.addApplicationHomeSubCluster(appHomeSubCluster); - } catch (YarnException e) { - routerMetrics.incrAppsFailedSubmitted(); - String errMsg = "Unable to insert the ApplicationId " + applicationId - + " into the FederationStateStore"; - return Response - .status(Status.SERVICE_UNAVAILABLE) - .entity(errMsg + " " + e.getLocalizedMessage()) - .build(); - } - } else { - try { - // update the mapping of applicationId and the home subClusterId to - // the new subClusterId we have selected - federationFacade.updateApplicationHomeSubCluster(appHomeSubCluster); - } catch (YarnException e) { - String errMsg = "Unable to update the ApplicationId " + applicationId - + " into the FederationStateStore"; - SubClusterId subClusterIdInStateStore; - try { - subClusterIdInStateStore = - federationFacade.getApplicationHomeSubCluster(applicationId); - } catch (YarnException e1) { - routerMetrics.incrAppsFailedSubmitted(); - return Response - .status(Status.SERVICE_UNAVAILABLE) - .entity(e1.getLocalizedMessage()) - .build(); - } - if (subClusterId == subClusterIdInStateStore) { - LOG.info("Application {} already submitted on SubCluster {}.", - applicationId, subClusterId); - } else { - routerMetrics.incrAppsFailedSubmitted(); - return Response - .status(Status.SERVICE_UNAVAILABLE) - .entity(errMsg) - .build(); - } - } - } - - SubClusterInfo subClusterInfo; - try { - subClusterInfo = federationFacade.getSubCluster(subClusterId); - } catch (YarnException e) { - routerMetrics.incrAppsFailedSubmitted(); - return Response - .status(Status.SERVICE_UNAVAILABLE) - .entity(e.getLocalizedMessage()) - .build(); - } - - Response response = null; - try { - response = getOrCreateInterceptorForSubCluster(subClusterId, - subClusterInfo.getRMWebServiceAddress()).submitApplication(newApp, - hsr); - } catch (Exception e) { - LOG.warn("Unable to submit the application {} to SubCluster {}", - applicationId, subClusterId.getId(), e); - } - - if (response != null && - response.getStatus() == HttpServletResponse.SC_ACCEPTED) { - LOG.info("Application {} with appId {} submitted on {}", - context.getApplicationName(), applicationId, subClusterId); - + List blackList = new ArrayList<>(); + try { + int activeSubClustersCount = federationFacade.getActiveSubClustersCount(); + int actualRetryNums = Math.min(activeSubClustersCount, numSubmitRetries); + Response response = ((FederationActionRetry) (retryCount) -> + invokeSubmitApplication(newApp, blackList, hsr, retryCount)). + runWithRetries(actualRetryNums, submitIntervalTime); + if (response != null) { long stopTime = clock.getTime(); routerMetrics.succeededAppsSubmitted(stopTime - startTime); - return response; - } else { - // Empty response from the ResourceManager. - // Blacklist this subcluster for this request. - blacklist.add(subClusterId); } + } catch (Exception e) { + routerMetrics.incrAppsFailedSubmitted(); + return Response.status(Status.SERVICE_UNAVAILABLE).entity(e.getLocalizedMessage()).build(); } routerMetrics.incrAppsFailedSubmitted(); - String errMsg = "Application " + newApp.getApplicationName() - + " with appId " + applicationId + " failed to be submitted."; + String errMsg = String.format("Application %s with appId %s failed to be submitted.", + newApp.getApplicationName(), newApp.getApplicationId()); LOG.error(errMsg); - return Response - .status(Status.SERVICE_UNAVAILABLE) - .entity(errMsg) - .build(); + return Response.status(Status.SERVICE_UNAVAILABLE).entity(errMsg).build(); + } + + /** + * Invoke SubmitApplication to different subClusters. + * + * @param submissionContext application submission context. + * @param blackList Blacklist avoid repeated calls to unavailable subCluster. + * @param hsr HttpServletRequest. + * @param retryCount number of retries. + * @return Get response, If the response is empty or status not equal SC_ACCEPTED, + * the request fails, if the response is not empty and status equal SC_OK, + * the request is successful. + * @throws YarnException yarn exception. + * @throws IOException io error. + */ + private Response invokeSubmitApplication(ApplicationSubmissionContextInfo submissionContext, + List blackList, HttpServletRequest hsr, int retryCount) + throws YarnException, IOException, InterruptedException { + + // Step1. We convert ApplicationSubmissionContextInfo to ApplicationSubmissionContext + // and Prepare parameters. + ApplicationSubmissionContext context = + RMWebAppUtil.createAppSubmissionContext(submissionContext, this.getConf()); + ApplicationId applicationId = ApplicationId.fromString(submissionContext.getApplicationId()); + SubClusterId subClusterId = null; + + try { + // Get subClusterId from policy. + subClusterId = policyFacade.getHomeSubcluster(context, blackList); + + // Print the log of submitting the submitApplication. + LOG.info("submitApplication appId {} try #{} on SubCluster {}.", + applicationId, retryCount, subClusterId); + + // Step2. We Store the mapping relationship + // between Application and HomeSubCluster in stateStore. + federationFacade.addOrUpdateApplicationHomeSubCluster( + applicationId, subClusterId, retryCount); + + // Step3. We get subClusterInfo based on subClusterId. + SubClusterInfo subClusterInfo = federationFacade.getSubCluster(subClusterId); + + // Step4. Submit the request, if the response is HttpServletResponse.SC_ACCEPTED, + // We return the response, otherwise we throw an exception. + Response response = getOrCreateInterceptorForSubCluster(subClusterId, + subClusterInfo.getRMWebServiceAddress()).submitApplication(submissionContext, hsr); + if (response != null && response.getStatus() == HttpServletResponse.SC_ACCEPTED) { + LOG.info("Application {} with appId {} submitted on {}.", + context.getApplicationName(), applicationId, subClusterId); + return response; + } + String msg = String.format("application %s failed to be submitted.", applicationId); + throw new YarnException(msg); + } catch (Exception e) { + LOG.warn("Unable to submit the application {} to SubCluster {}.", applicationId, + subClusterId, e); + if (subClusterId != null) { + blackList.add(subClusterId); + } + throw e; + } } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorRESTRetry.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorRESTRetry.java index e598a52f87..e2b2103c7d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorRESTRetry.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorRESTRetry.java @@ -180,6 +180,9 @@ public void testGetNewApplicationOneBadSC() @Test public void testGetNewApplicationTwoBadSCs() throws YarnException, IOException, InterruptedException { + + LOG.info("Test getNewApplication with two bad SCs."); + setupCluster(Arrays.asList(bad1, bad2)); Response response = interceptor.createNewApplication(null); @@ -195,17 +198,21 @@ public void testGetNewApplicationTwoBadSCs() @Test public void testGetNewApplicationOneBadOneGood() throws YarnException, IOException, InterruptedException { - System.out.println("Test getNewApplication with one bad, one good SC"); + + LOG.info("Test getNewApplication with one bad, one good SC."); + setupCluster(Arrays.asList(good, bad2)); Response response = interceptor.createNewApplication(null); - + Assert.assertNotNull(response); Assert.assertEquals(OK, response.getStatus()); NewApplication newApp = (NewApplication) response.getEntity(); - ApplicationId appId = ApplicationId.fromString(newApp.getApplicationId()); + Assert.assertNotNull(newApp); - Assert.assertEquals(Integer.parseInt(good.getId()), - appId.getClusterTimestamp()); + ApplicationId appId = ApplicationId.fromString(newApp.getApplicationId()); + Assert.assertNotNull(appId); + + Assert.assertEquals(Integer.parseInt(good.getId()), appId.getClusterTimestamp()); } /** @@ -216,6 +223,8 @@ public void testGetNewApplicationOneBadOneGood() public void testSubmitApplicationOneBadSC() throws YarnException, IOException, InterruptedException { + LOG.info("Test submitApplication with one bad SC."); + setupCluster(Arrays.asList(bad2)); ApplicationId appId =