YARN-11342. [Federation] Refactor getNewApplication, submitApplication Use FederationActionRetry. (#5005)

This commit is contained in:
slfan1989 2022-10-21 00:22:24 +08:00 committed by GitHub
parent c5c00f3d2c
commit 9adf0ca089
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 375 additions and 170 deletions

View File

@ -4117,6 +4117,14 @@ public static boolean isAclEnabled(Configuration conf) {
ROUTER_PREFIX + "submit.retry"; ROUTER_PREFIX + "submit.retry";
public static final int DEFAULT_ROUTER_CLIENTRM_SUBMIT_RETRY = 3; public static final int DEFAULT_ROUTER_CLIENTRM_SUBMIT_RETRY = 3;
/**
* GetNewApplication and SubmitApplication request retry interval time.
*/
public static final String ROUTER_CLIENTRM_SUBMIT_INTERVAL_TIME =
ROUTER_PREFIX + "submit.interval.time";
public static final long DEFAULT_CLIENTRM_SUBMIT_INTERVAL_TIME =
TimeUnit.MILLISECONDS.toMillis(10);
/** /**
* The interceptor class used in FederationClientInterceptor should return * The interceptor class used in FederationClientInterceptor should return
* partial ApplicationReports. * partial ApplicationReports.

View File

@ -5047,4 +5047,13 @@
</description> </description>
</property> </property>
<property>
<name>yarn.router.submit.interval.time</name>
<value>10ms</value>
<description>
The interval Time between calling different subCluster requests.
Default is 10ms.
</description>
</property>
</configuration> </configuration>

View File

@ -24,13 +24,13 @@ public interface FederationActionRetry<T> {
Logger LOG = LoggerFactory.getLogger(FederationActionRetry.class); Logger LOG = LoggerFactory.getLogger(FederationActionRetry.class);
T run() throws Exception; T run(int retry) throws Exception;
default T runWithRetries(int retryCount, long retrySleepTime) throws Exception { default T runWithRetries(int retryCount, long retrySleepTime) throws Exception {
int retry = 0; int retry = 0;
while (true) { while (true) {
try { try {
return run(); return run(retry);
} catch (Exception e) { } catch (Exception e) {
LOG.info("Exception while executing an Federation operation.", e); LOG.info("Exception while executing an Federation operation.", e);
if (++retry > retryCount) { if (++retry > retryCount) {

View File

@ -502,11 +502,11 @@ public boolean cleanUpFinishApplicationsWithRetries(ApplicationId appId, boolean
throws Exception { throws Exception {
// Generate a request to delete data // Generate a request to delete data
DeleteApplicationHomeSubClusterRequest request = DeleteApplicationHomeSubClusterRequest req =
DeleteApplicationHomeSubClusterRequest.newInstance(appId); DeleteApplicationHomeSubClusterRequest.newInstance(appId);
// CleanUp Finish App. // CleanUp Finish App.
return ((FederationActionRetry<Boolean>) () -> invokeCleanUpFinishApp(appId, isQuery, request)) return ((FederationActionRetry<Boolean>) (retry) -> invokeCleanUpFinishApp(appId, isQuery, req))
.runWithRetries(cleanUpRetryCountNum, cleanUpRetrySleepTime); .runWithRetries(cleanUpRetryCountNum, cleanUpRetrySleepTime);
} }

View File

@ -196,6 +196,24 @@ public static void logFailure(String user, String operation, String perm,
} }
} }
/**
* Create a readable and parsable audit log string for a failed event.
*
* @param user User who made the service request.
* @param operation Operation requested by the user.
* @param perm Target permissions.
* @param target The target on which the operation is being performed.
* @param description Some additional information as to why the operation failed.
* @param subClusterId SubCluster Id in which operation was performed.
*/
public static void logFailure(String user, String operation, String perm,
String target, String description, SubClusterId subClusterId) {
if (LOG.isInfoEnabled()) {
LOG.info(createFailureLog(user, operation, perm, target, description, null,
subClusterId));
}
}
/** /**
* A helper api for creating an audit log for a failure event. * A helper api for creating an audit log for a failure event.
*/ */

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.router; package org.apache.hadoop.yarn.server.router;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.math.NumberUtils; import org.apache.commons.lang3.math.NumberUtils;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceAudience.Public;
@ -27,6 +29,9 @@
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -35,6 +40,8 @@
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Random;
import java.io.IOException; import java.io.IOException;
/** /**
@ -53,6 +60,8 @@ public final class RouterServerUtil {
private static final String EPOCH_PREFIX = "e"; private static final String EPOCH_PREFIX = "e";
private static Random rand = new Random(System.currentTimeMillis());
/** Disable constructor. */ /** Disable constructor. */
private RouterServerUtil() { private RouterServerUtil() {
} }
@ -446,4 +455,40 @@ public static void validateContainerId(String containerId)
throw new IllegalArgumentException("Invalid ContainerId: " + containerId); throw new IllegalArgumentException("Invalid ContainerId: " + containerId);
} }
} }
/**
* Randomly pick ActiveSubCluster.
* During the selection process, we will exclude SubClusters from the blacklist.
*
* @param activeSubClusters List of active subClusters.
* @param blackList blacklist.
* @return Active SubClusterId.
* @throws YarnException When there is no Active SubCluster,
* an exception will be thrown (No active SubCluster available to submit the request.)
*/
public static SubClusterId getRandomActiveSubCluster(
Map<SubClusterId, SubClusterInfo> activeSubClusters, List<SubClusterId> blackList)
throws YarnException {
// Check if activeSubClusters is empty, if it is empty, we need to throw an exception
if (MapUtils.isEmpty(activeSubClusters)) {
logAndThrowException(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE, null);
}
// Change activeSubClusters to List
List<SubClusterId> subClusterIds = new ArrayList<>(activeSubClusters.keySet());
// If the blacklist is not empty, we need to remove all the subClusters in the blacklist
if (CollectionUtils.isNotEmpty(blackList)) {
subClusterIds.removeAll(blackList);
}
// Check there are still active subcluster after removing the blacklist
if (CollectionUtils.isEmpty(subClusterIds)) {
logAndThrowException(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE, null);
}
// Randomly choose a SubCluster
return subClusterIds.get(rand.nextInt(subClusterIds.size()));
}
} }

View File

@ -116,6 +116,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
@ -124,6 +125,7 @@
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
import org.apache.hadoop.yarn.server.federation.policies.RouterPolicyFacade; import org.apache.hadoop.yarn.server.federation.policies.RouterPolicyFacade;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.retry.FederationActionRetry;
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.ReservationHomeSubCluster; import org.apache.hadoop.yarn.server.federation.store.records.ReservationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
@ -176,6 +178,7 @@ public class FederationClientInterceptor
private ThreadPoolExecutor executorService; private ThreadPoolExecutor executorService;
private final Clock clock = new MonotonicClock(); private final Clock clock = new MonotonicClock();
private boolean returnPartialReport; private boolean returnPartialReport;
private long submitIntervalTime;
@Override @Override
public void init(String userName) { public void init(String userName) {
@ -207,6 +210,10 @@ public void init(String userName) {
YarnConfiguration.ROUTER_CLIENTRM_SUBMIT_RETRY, YarnConfiguration.ROUTER_CLIENTRM_SUBMIT_RETRY,
YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_SUBMIT_RETRY); YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_SUBMIT_RETRY);
submitIntervalTime = conf.getTimeDuration(
YarnConfiguration.ROUTER_CLIENTRM_SUBMIT_INTERVAL_TIME,
YarnConfiguration.DEFAULT_CLIENTRM_SUBMIT_INTERVAL_TIME, TimeUnit.MILLISECONDS);
clientRMProxies = new ConcurrentHashMap<>(); clientRMProxies = new ConcurrentHashMap<>();
routerMetrics = RouterMetrics.getMetrics(); routerMetrics = RouterMetrics.getMetrics();
@ -260,6 +267,17 @@ private SubClusterId getRandomActiveSubCluster(
return list.get(rand.nextInt(list.size())); return list.get(rand.nextInt(list.size()));
} }
@VisibleForTesting
private int getActiveSubClustersCount() throws YarnException {
Map<SubClusterId, SubClusterInfo> activeSubClusters =
federationFacade.getSubClusters(true);
if (activeSubClusters == null || activeSubClusters.isEmpty()) {
return 0;
} else {
return activeSubClusters.size();
}
}
/** /**
* YARN Router forwards every getNewApplication requests to any RM. During * YARN Router forwards every getNewApplication requests to any RM. During
* this operation there will be no communication with the State Store. The * this operation there will be no communication with the State Store. The
@ -293,25 +311,25 @@ public GetNewApplicationResponse getNewApplication(
Map<SubClusterId, SubClusterInfo> subClustersActive = Map<SubClusterId, SubClusterInfo> subClustersActive =
federationFacade.getSubClusters(true); federationFacade.getSubClusters(true);
for (int i = 0; i < numSubmitRetries; ++i) { // Try calling the getNewApplication method
SubClusterId subClusterId = getRandomActiveSubCluster(subClustersActive); List<SubClusterId> blacklist = new ArrayList<>();
LOG.info("getNewApplication try #{} on SubCluster {}.", i, subClusterId); int activeSubClustersCount = getActiveSubClustersCount();
ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId); int actualRetryNums = Math.min(activeSubClustersCount, numSubmitRetries) + 1;
GetNewApplicationResponse response = null;
try { try {
response = clientRMProxy.getNewApplication(request); GetNewApplicationResponse response =
} catch (Exception e) { ((FederationActionRetry<GetNewApplicationResponse>) (retryCount) ->
LOG.warn("Unable to create a new ApplicationId in SubCluster {}.", subClusterId.getId(), e); invokeGetNewApplication(subClustersActive, blacklist, request, retryCount)).
subClustersActive.remove(subClusterId); runWithRetries(actualRetryNums, submitIntervalTime);
}
if (response != null) { if (response != null) {
long stopTime = clock.getTime(); long stopTime = clock.getTime();
routerMetrics.succeededAppsCreated(stopTime - startTime); routerMetrics.succeededAppsCreated(stopTime - startTime);
RouterAuditLogger.logSuccess(user.getShortUserName(), GET_NEW_APP,
TARGET_CLIENT_RM_SERVICE, response.getApplicationId());
return response; return response;
} }
} catch (Exception e) {
routerMetrics.incrAppsFailedCreated();
RouterServerUtil.logAndThrowException(e.getMessage(), e);
} }
routerMetrics.incrAppsFailedCreated(); routerMetrics.incrAppsFailedCreated();
@ -321,6 +339,46 @@ public GetNewApplicationResponse getNewApplication(
throw new YarnException(errMsg); throw new YarnException(errMsg);
} }
/**
* Invoke GetNewApplication to different subClusters.
*
* @param subClustersActive Active SubClusters
* @param blackList Blacklist avoid repeated calls to unavailable subCluster.
* @param request getNewApplicationRequest.
* @param retryCount number of retries.
* @return Get NewApplicationResponse response, If the response is empty, the request fails,
* if the response is not empty, the request is successful.
* @throws YarnException yarn exception.
* @throws IOException io error.
*/
private GetNewApplicationResponse invokeGetNewApplication(
Map<SubClusterId, SubClusterInfo> subClustersActive,
List<SubClusterId> blackList, GetNewApplicationRequest request, int retryCount)
throws YarnException, IOException {
SubClusterId subClusterId =
RouterServerUtil.getRandomActiveSubCluster(subClustersActive, blackList);
LOG.info("getNewApplication try #{} on SubCluster {}.", retryCount, subClusterId);
ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId);
try {
GetNewApplicationResponse response = clientRMProxy.getNewApplication(request);
if (response != null) {
RouterAuditLogger.logSuccess(user.getShortUserName(), GET_NEW_APP,
TARGET_CLIENT_RM_SERVICE, response.getApplicationId(), subClusterId);
return response;
}
} catch (Exception e) {
RouterAuditLogger.logFailure(user.getShortUserName(), GET_NEW_APP, UNKNOWN,
TARGET_CLIENT_RM_SERVICE, e.getMessage(), subClusterId);
LOG.warn("Unable to create a new ApplicationId in SubCluster {}.", subClusterId.getId(), e);
blackList.add(subClusterId);
throw e;
}
// If SubmitApplicationResponse is empty, the request fails.
String msg = String.format("Unable to create a new ApplicationId in SubCluster %s.",
subClusterId.getId());
throw new YarnException(msg);
}
/** /**
* Today, in YARN there are no checks of any applicationId submitted. * Today, in YARN there are no checks of any applicationId submitted.
* *
@ -400,88 +458,35 @@ public SubmitApplicationResponse submitApplication(
RouterServerUtil.logAndThrowException(errMsg, null); RouterServerUtil.logAndThrowException(errMsg, null);
} }
SubmitApplicationResponse response = null;
long startTime = clock.getTime(); long startTime = clock.getTime();
ApplicationId applicationId = ApplicationId applicationId =
request.getApplicationSubmissionContext().getApplicationId(); request.getApplicationSubmissionContext().getApplicationId();
List<SubClusterId> blacklist = new ArrayList<>(); List<SubClusterId> blacklist = new ArrayList<>();
for (int i = 0; i < numSubmitRetries; ++i) { try {
SubClusterId subClusterId = policyFacade.getHomeSubcluster( // We need to handle this situation,
request.getApplicationSubmissionContext(), blacklist); // the user will provide us with an expected submitRetries,
// but if the number of Active SubClusters is less than this number at this time,
// we should provide a high number of retry according to the number of Active SubClusters.
int activeSubClustersCount = getActiveSubClustersCount();
int actualRetryNums = Math.min(activeSubClustersCount, numSubmitRetries) + 1;
LOG.info("submitApplication appId {} try #{} on SubCluster {}.", // Try calling the SubmitApplication method
applicationId, i, subClusterId); SubmitApplicationResponse response =
((FederationActionRetry<SubmitApplicationResponse>) (retryCount) ->
ApplicationHomeSubCluster appHomeSubCluster = invokeSubmitApplication(blacklist, request, retryCount)).
ApplicationHomeSubCluster.newInstance(applicationId, subClusterId); runWithRetries(actualRetryNums, submitIntervalTime);
if (i == 0) {
try {
// persist the mapping of applicationId and the subClusterId which has
// been selected as its home
subClusterId =
federationFacade.addApplicationHomeSubCluster(appHomeSubCluster);
} catch (YarnException e) {
routerMetrics.incrAppsFailedSubmitted();
String message =
String.format("Unable to insert the ApplicationId %s into the FederationStateStore.",
applicationId);
RouterAuditLogger.logFailure(user.getShortUserName(), SUBMIT_NEW_APP, UNKNOWN,
TARGET_CLIENT_RM_SERVICE, message, applicationId, subClusterId);
RouterServerUtil.logAndThrowException(message, e);
}
} else {
try {
// update the mapping of applicationId and the home subClusterId to
// the new subClusterId we have selected
federationFacade.updateApplicationHomeSubCluster(appHomeSubCluster);
} catch (YarnException e) {
String message =
String.format("Unable to update the ApplicationId %s into the FederationStateStore.",
applicationId);
SubClusterId subClusterIdInStateStore =
federationFacade.getApplicationHomeSubCluster(applicationId);
if (subClusterId == subClusterIdInStateStore) {
LOG.info("Application {} already submitted on SubCluster {}.",
applicationId, subClusterId);
} else {
routerMetrics.incrAppsFailedSubmitted();
RouterAuditLogger.logFailure(user.getShortUserName(), SUBMIT_NEW_APP, UNKNOWN,
TARGET_CLIENT_RM_SERVICE, message, applicationId, subClusterId);
RouterServerUtil.logAndThrowException(message, e);
}
}
}
ApplicationClientProtocol clientRMProxy =
getClientRMProxyForSubCluster(subClusterId);
try {
response = clientRMProxy.submitApplication(request);
} catch (Exception e) {
LOG.warn("Unable to submit the application {} to SubCluster {} error = {}.",
applicationId, subClusterId.getId(), e);
}
if (response != null) { if (response != null) {
LOG.info("Application {} with appId {} submitted on {}.",
request.getApplicationSubmissionContext().getApplicationName(),
applicationId, subClusterId);
long stopTime = clock.getTime(); long stopTime = clock.getTime();
routerMetrics.succeededAppsSubmitted(stopTime - startTime); routerMetrics.succeededAppsSubmitted(stopTime - startTime);
RouterAuditLogger.logSuccess(user.getShortUserName(), SUBMIT_NEW_APP,
TARGET_CLIENT_RM_SERVICE, applicationId, subClusterId);
return response; return response;
} else {
// Empty response from the ResourceManager.
// Blacklist this subcluster for this request.
blacklist.add(subClusterId);
} }
} catch (Exception e){
routerMetrics.incrAppsFailedSubmitted();
RouterServerUtil.logAndThrowException(e.getMessage(), e);
} }
routerMetrics.incrAppsFailedSubmitted(); routerMetrics.incrAppsFailedSubmitted();
@ -492,6 +497,149 @@ public SubmitApplicationResponse submitApplication(
throw new YarnException(msg); throw new YarnException(msg);
} }
/**
* Invoke SubmitApplication to different subClusters.
*
* Step1. Select homeSubCluster for Application according to Policy.
*
* Step2. Query homeSubCluster according to ApplicationId,
* if homeSubCluster does not exist or first attempt(consider repeated submissions), write;
* if homeSubCluster exists, update.
*
* Step3. Find the clientRMProxy of the corresponding cluster according to homeSubCluster,
* and then call the SubmitApplication method.
*
* Step4. If SubmitApplicationResponse is empty, the request fails,
* if SubmitApplicationResponse is not empty, the request is successful.
*
* @param blackList Blacklist avoid repeated calls to unavailable subCluster.
* @param request submitApplicationRequest.
* @param retryCount number of retries.
* @return submitApplication response, If the response is empty, the request fails,
* if the response is not empty, the request is successful.
* @throws YarnException yarn exception.
*/
private SubmitApplicationResponse invokeSubmitApplication(
List<SubClusterId> blackList, SubmitApplicationRequest request, int retryCount)
throws YarnException, IOException {
// The request is not checked here,
// because the request has been checked before the method is called.
// We get applicationId and subClusterId from context.
ApplicationSubmissionContext appSubmissionContext = request.getApplicationSubmissionContext();
ApplicationId applicationId = appSubmissionContext.getApplicationId();
SubClusterId subClusterId = null;
try {
// Step1. Select homeSubCluster for Application according to Policy.
subClusterId = policyFacade.getHomeSubcluster(appSubmissionContext, blackList);
LOG.info("submitApplication appId {} try #{} on SubCluster {}.",
applicationId, retryCount, subClusterId);
// Step2. Query homeSubCluster according to ApplicationId.
Boolean exists = existsApplicationHomeSubCluster(applicationId);
ApplicationHomeSubCluster appHomeSubCluster =
ApplicationHomeSubCluster.newInstance(applicationId, subClusterId);
if (exists || retryCount == 0) {
addApplicationHomeSubCluster(applicationId, appHomeSubCluster);
} else {
updateApplicationHomeSubCluster(subClusterId, applicationId, appHomeSubCluster);
}
// Step3. SubmitApplication to the subCluster
ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId);
SubmitApplicationResponse response = clientRMProxy.submitApplication(request);
// Step4. if SubmitApplicationResponse is not empty, the request is successful.
if (response != null) {
LOG.info("Application {} submitted on subCluster {}.", applicationId, subClusterId);
RouterAuditLogger.logSuccess(user.getShortUserName(), SUBMIT_NEW_APP,
TARGET_CLIENT_RM_SERVICE, applicationId, subClusterId);
return response;
}
} catch (Exception e) {
RouterAuditLogger.logFailure(user.getShortUserName(), SUBMIT_NEW_APP, UNKNOWN,
TARGET_CLIENT_RM_SERVICE, e.getMessage(), applicationId, subClusterId);
LOG.warn("Unable to submitApplication appId {} try #{} on SubCluster {} error = {}.",
applicationId, subClusterId, e);
if (subClusterId != null) {
blackList.add(subClusterId);
}
throw e;
}
// If SubmitApplicationResponse is empty, the request fails.
String msg = String.format("Application %s failed to be submitted.", applicationId);
throw new YarnException(msg);
}
/**
* Add ApplicationHomeSubCluster to FederationStateStore.
*
* @param applicationId applicationId.
* @param homeSubCluster homeSubCluster, homeSubCluster selected according to policy.
* @throws YarnException yarn exception.
*/
private void addApplicationHomeSubCluster(ApplicationId applicationId,
ApplicationHomeSubCluster homeSubCluster) throws YarnException {
try {
federationFacade.addApplicationHomeSubCluster(homeSubCluster);
} catch (YarnException e) {
RouterServerUtil.logAndThrowException(e,
"Unable to insert the ApplicationId %s into the FederationStateStore.", applicationId);
}
}
/**
* Update ApplicationHomeSubCluster to FederationStateStore.
*
* @param subClusterId homeSubClusterId
* @param applicationId applicationId.
* @param homeSubCluster homeSubCluster, homeSubCluster selected according to policy.
* @throws YarnException yarn exception.
*/
private void updateApplicationHomeSubCluster(SubClusterId subClusterId,
ApplicationId applicationId, ApplicationHomeSubCluster homeSubCluster) throws YarnException {
try {
federationFacade.updateApplicationHomeSubCluster(homeSubCluster);
} catch (YarnException e) {
SubClusterId subClusterIdInStateStore =
federationFacade.getApplicationHomeSubCluster(applicationId);
if (subClusterId == subClusterIdInStateStore) {
LOG.info("Application {} already submitted on SubCluster {}.",
applicationId, subClusterId);
} else {
RouterServerUtil.logAndThrowException(e,
"Unable to update the ApplicationId %s into the FederationStateStore.",
applicationId);
}
}
}
/**
* Query SubClusterId By applicationId.
*
* If SubClusterId is not empty, it means it exists and returns true;
* if SubClusterId is empty, it means it does not exist and returns false.
*
* @param applicationId applicationId
* @return true, SubClusterId exists; false, SubClusterId not exists.
*/
private boolean existsApplicationHomeSubCluster(ApplicationId applicationId) {
try {
SubClusterId subClusterId = federationFacade.getApplicationHomeSubCluster(applicationId);
if (subClusterId != null) {
return true;
}
} catch (YarnException e) {
LOG.warn("get homeSubCluster by applicationId = {} error.", applicationId, e);
}
return false;
}
/** /**
* The YARN Router will forward to the respective YARN RM in which the AM is * The YARN Router will forward to the respective YARN RM in which the AM is
* running. * running.

View File

@ -25,20 +25,23 @@
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.yarn.MockApps; import org.apache.hadoop.yarn.MockApps;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager; import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager;
import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore; import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest; import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUtil; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUtil;
@ -49,6 +52,8 @@
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import static org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE;
/** /**
* Extends the {@code BaseRouterClientRMTest} and overrides methods in order to * Extends the {@code BaseRouterClientRMTest} and overrides methods in order to
* use the {@code RouterClientRMService} pipeline test cases for testing the * use the {@code RouterClientRMService} pipeline test cases for testing the
@ -77,7 +82,7 @@ public class TestFederationClientInterceptorRetry
private static SubClusterId bad1; private static SubClusterId bad1;
private static SubClusterId bad2; private static SubClusterId bad2;
private static List<SubClusterId> scs = new ArrayList<SubClusterId>(); private static List<SubClusterId> scs = new ArrayList<>();
@Override @Override
public void setUp() throws IOException { public void setUp() throws IOException {
@ -114,8 +119,7 @@ public void tearDown() {
super.tearDown(); super.tearDown();
} }
private void setupCluster(List<SubClusterId> scsToRegister) private void setupCluster(List<SubClusterId> scsToRegister) throws YarnException {
throws YarnException {
try { try {
// Clean up the StateStore before every test // Clean up the StateStore before every test
@ -132,6 +136,7 @@ private void setupCluster(List<SubClusterId> scsToRegister)
@Override @Override
protected YarnConfiguration createConfiguration() { protected YarnConfiguration createConfiguration() {
YarnConfiguration conf = new YarnConfiguration(); YarnConfiguration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true); conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
String mockPassThroughInterceptorClass = String mockPassThroughInterceptorClass =
@ -159,20 +164,14 @@ protected YarnConfiguration createConfiguration() {
* cluster is composed of only 1 bad SubCluster. * cluster is composed of only 1 bad SubCluster.
*/ */
@Test @Test
public void testGetNewApplicationOneBadSC() public void testGetNewApplicationOneBadSC() throws Exception {
throws YarnException, IOException, InterruptedException {
System.out.println("Test getNewApplication with one bad SubCluster"); LOG.info("Test getNewApplication with one bad SubCluster");
setupCluster(Arrays.asList(bad2)); setupCluster(Arrays.asList(bad2));
try { GetNewApplicationRequest request = GetNewApplicationRequest.newInstance();
interceptor.getNewApplication(GetNewApplicationRequest.newInstance()); LambdaTestUtils.intercept(YarnException.class, NO_ACTIVE_SUBCLUSTER_AVAILABLE,
Assert.fail(); () -> interceptor.getNewApplication(request));
} catch (Exception e) {
System.out.println(e.toString());
Assert.assertTrue(e.getMessage()
.equals(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE));
}
} }
/** /**
@ -180,19 +179,14 @@ public void testGetNewApplicationOneBadSC()
* cluster is composed of only 2 bad SubClusters. * cluster is composed of only 2 bad SubClusters.
*/ */
@Test @Test
public void testGetNewApplicationTwoBadSCs() public void testGetNewApplicationTwoBadSCs() throws Exception {
throws YarnException, IOException, InterruptedException {
System.out.println("Test getNewApplication with two bad SubClusters"); LOG.info("Test getNewApplication with two bad SubClusters");
setupCluster(Arrays.asList(bad1, bad2)); setupCluster(Arrays.asList(bad1, bad2));
try { GetNewApplicationRequest request = GetNewApplicationRequest.newInstance();
interceptor.getNewApplication(GetNewApplicationRequest.newInstance()); LambdaTestUtils.intercept(YarnException.class, NO_ACTIVE_SUBCLUSTER_AVAILABLE,
Assert.fail(); () -> interceptor.getNewApplication(request));
} catch (Exception e) {
System.out.println(e.toString());
Assert.assertTrue(e.getMessage()
.equals(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE));
}
} }
/** /**
@ -200,17 +194,14 @@ public void testGetNewApplicationTwoBadSCs()
* cluster is composed of only 1 bad SubCluster and 1 good one. * cluster is composed of only 1 bad SubCluster and 1 good one.
*/ */
@Test @Test
public void testGetNewApplicationOneBadOneGood() public void testGetNewApplicationOneBadOneGood() throws YarnException, IOException {
throws YarnException, IOException, InterruptedException {
System.out.println("Test getNewApplication with one bad, one good SC"); LOG.info("Test getNewApplication with one bad, one good SC");
setupCluster(Arrays.asList(good, bad2)); setupCluster(Arrays.asList(good, bad2));
GetNewApplicationResponse response = null; GetNewApplicationRequest request = GetNewApplicationRequest.newInstance();
try { GetNewApplicationResponse response = interceptor.getNewApplication(request);
response =
interceptor.getNewApplication(GetNewApplicationRequest.newInstance()); Assert.assertNotNull(response);
} catch (Exception e) {
Assert.fail();
}
Assert.assertEquals(ResourceManager.getClusterTimeStamp(), Assert.assertEquals(ResourceManager.getClusterTimeStamp(),
response.getApplicationId().getClusterTimestamp()); response.getApplicationId().getClusterTimestamp());
} }
@ -220,38 +211,27 @@ public void testGetNewApplicationOneBadOneGood()
* cluster is composed of only 1 bad SubCluster. * cluster is composed of only 1 bad SubCluster.
*/ */
@Test @Test
public void testSubmitApplicationOneBadSC() public void testSubmitApplicationOneBadSC() throws Exception {
throws YarnException, IOException, InterruptedException {
System.out.println("Test submitApplication with one bad SubCluster"); LOG.info("Test submitApplication with one bad SubCluster");
setupCluster(Arrays.asList(bad2)); setupCluster(Arrays.asList(bad2));
final ApplicationId appId = final ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1); ApplicationId.newInstance(System.currentTimeMillis(), 1);
final SubmitApplicationRequest request = mockSubmitApplicationRequest( final SubmitApplicationRequest request = mockSubmitApplicationRequest(appId);
appId); LambdaTestUtils.intercept(YarnException.class, NO_ACTIVE_SUBCLUSTER_AVAILABLE,
try { () -> interceptor.submitApplication(request));
interceptor.submitApplication(request);
Assert.fail();
} catch (Exception e) {
System.out.println(e);
Assert.assertTrue(e.getMessage()
.equals(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE));
}
} }
private SubmitApplicationRequest mockSubmitApplicationRequest( private SubmitApplicationRequest mockSubmitApplicationRequest(ApplicationId appId) {
ApplicationId appId) {
ContainerLaunchContext amContainerSpec = mock(ContainerLaunchContext.class); ContainerLaunchContext amContainerSpec = mock(ContainerLaunchContext.class);
ApplicationSubmissionContext context = ApplicationSubmissionContext ApplicationSubmissionContext context = ApplicationSubmissionContext
.newInstance(appId, MockApps.newAppName(), "q1", .newInstance(appId, MockApps.newAppName(), "q1",
Priority.newInstance(0), amContainerSpec, false, false, -1, Priority.newInstance(0), amContainerSpec, false, false, -1,
Resources.createResource( Resources.createResource(YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB),
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB), "MockApp");
"MockApp"); SubmitApplicationRequest request = SubmitApplicationRequest.newInstance(context);
SubmitApplicationRequest request = SubmitApplicationRequest
.newInstance(context);
return request; return request;
} }
@ -260,24 +240,17 @@ private SubmitApplicationRequest mockSubmitApplicationRequest(
* cluster is composed of only 2 bad SubClusters. * cluster is composed of only 2 bad SubClusters.
*/ */
@Test @Test
public void testSubmitApplicationTwoBadSCs() public void testSubmitApplicationTwoBadSCs() throws Exception {
throws YarnException, IOException, InterruptedException {
System.out.println("Test submitApplication with two bad SubClusters"); LOG.info("Test submitApplication with two bad SubClusters.");
setupCluster(Arrays.asList(bad1, bad2)); setupCluster(Arrays.asList(bad1, bad2));
final ApplicationId appId = final ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1); ApplicationId.newInstance(System.currentTimeMillis(), 1);
final SubmitApplicationRequest request = mockSubmitApplicationRequest( final SubmitApplicationRequest request = mockSubmitApplicationRequest(appId);
appId); LambdaTestUtils.intercept(YarnException.class, NO_ACTIVE_SUBCLUSTER_AVAILABLE,
try { () -> interceptor.submitApplication(request));
interceptor.submitApplication(request);
Assert.fail();
} catch (Exception e) {
System.out.println(e.toString());
Assert.assertTrue(e.getMessage()
.equals(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE));
}
} }
/** /**
@ -287,24 +260,27 @@ public void testSubmitApplicationTwoBadSCs()
@Test @Test
public void testSubmitApplicationOneBadOneGood() public void testSubmitApplicationOneBadOneGood()
throws YarnException, IOException, InterruptedException { throws YarnException, IOException, InterruptedException {
System.out.println("Test submitApplication with one bad, one good SC");
LOG.info("Test submitApplication with one bad, one good SC.");
setupCluster(Arrays.asList(good, bad2)); setupCluster(Arrays.asList(good, bad2));
final ApplicationId appId = final ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1); ApplicationId.newInstance(System.currentTimeMillis(), 1);
final SubmitApplicationRequest request = mockSubmitApplicationRequest( final SubmitApplicationRequest request = mockSubmitApplicationRequest(appId);
appId); SubmitApplicationResponse response = interceptor.submitApplication(request);
try { Assert.assertNotNull(response);
interceptor.submitApplication(request);
} catch (Exception e) {
Assert.fail();
}
Assert.assertEquals(good,
stateStore
.getApplicationHomeSubCluster(
GetApplicationHomeSubClusterRequest.newInstance(appId))
.getApplicationHomeSubCluster().getHomeSubCluster());
}
GetApplicationHomeSubClusterRequest getAppRequest =
GetApplicationHomeSubClusterRequest.newInstance(appId);
GetApplicationHomeSubClusterResponse getAppResponse =
stateStore.getApplicationHomeSubCluster(getAppRequest);
Assert.assertNotNull(getAppResponse);
ApplicationHomeSubCluster responseHomeSubCluster =
getAppResponse.getApplicationHomeSubCluster();
Assert.assertNotNull(responseHomeSubCluster);
SubClusterId respSubClusterId = responseHomeSubCluster.getHomeSubCluster();
Assert.assertEquals(good, respSubClusterId);
}
} }

View File

@ -270,6 +270,7 @@ Optional:
|`yarn.router.admin.address` | `0.0.0.0:8052` | Admin address at the router. | |`yarn.router.admin.address` | `0.0.0.0:8052` | Admin address at the router. |
|`yarn.router.webapp.https.address` | `0.0.0.0:8091` | Secure webapp address at the router. | |`yarn.router.webapp.https.address` | `0.0.0.0:8091` | Secure webapp address at the router. |
|`yarn.router.submit.retry` | `3` | The number of retries in the router before we give up. | |`yarn.router.submit.retry` | `3` | The number of retries in the router before we give up. |
|`yarn.router.submit.interval.time` | `10ms` | The interval between two retry, the default value is 10ms. |
|`yarn.federation.statestore.max-connections` | `10` | This is the maximum number of parallel connections each Router makes to the state-store. | |`yarn.federation.statestore.max-connections` | `10` | This is the maximum number of parallel connections each Router makes to the state-store. |
|`yarn.federation.cache-ttl.secs` | `60` | The Router caches informations, and this is the time to leave before the cache is invalidated. | |`yarn.federation.cache-ttl.secs` | `60` | The Router caches informations, and this is the time to leave before the cache is invalidated. |
|`yarn.router.webapp.interceptor-class.pipeline` | `org.apache.hadoop.yarn.server.router.webapp.FederationInterceptorREST` | A comma-separated list of interceptor classes to be run at the router when interfacing with the client via REST interface. The last step of this pipeline must be the Federation Interceptor REST. | |`yarn.router.webapp.interceptor-class.pipeline` | `org.apache.hadoop.yarn.server.router.webapp.FederationInterceptorREST` | A comma-separated list of interceptor classes to be run at the router when interfacing with the client via REST interface. The last step of this pipeline must be the Federation Interceptor REST. |