From 2680f17eb49d7d6e117156108f9a013b38141999 Mon Sep 17 00:00:00 2001 From: slfan1989 <55643692+slfan1989@users.noreply.github.com> Date: Fri, 29 Jul 2022 23:23:11 +0800 Subject: [PATCH] YARN-11180. Refactor some code of getNewApplication, submitApplication etc. (#4618) --- .../yarn/server/router/RouterAuditLogger.java | 2 + .../clientrm/FederationClientInterceptor.java | 299 +++++++-------- .../TestFederationClientInterceptor.java | 355 +++++++----------- 3 files changed, 282 insertions(+), 374 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterAuditLogger.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterAuditLogger.java index cc82087733..a89d0e4462 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterAuditLogger.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterAuditLogger.java @@ -49,6 +49,8 @@ public static class AuditConstants { public static final String SUBMIT_NEW_APP = "Submit New App"; public static final String FORCE_KILL_APP = "Force Kill App"; public static final String GET_APP_REPORT = "Get Application Report"; + public static final String TARGET_CLIENT_RM_SERVICE = "RouterClientRMService"; + public static final String UNKNOWN = "UNKNOWN"; } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java index 7fd1003552..947e5f07dc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java @@ -137,6 +137,13 @@ import org.apache.hadoop.classification.VisibleForTesting; +import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.GET_NEW_APP; +import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.SUBMIT_NEW_APP; +import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.GET_APP_REPORT; +import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.FORCE_KILL_APP; +import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.TARGET_CLIENT_RM_SERVICE; +import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.UNKNOWN; + /** * Extends the {@code AbstractRequestInterceptorClient} class and provides an * implementation for federation of YARN RM and scaling an application across @@ -228,8 +235,8 @@ protected ApplicationClientProtocol getClientRMProxyForSubCluster( CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false); UserGroupInformation realUser = user; if (serviceAuthEnabled) { - realUser = UserGroupInformation.createProxyUser(user.getShortUserName(), - UserGroupInformation.getLoginUser()); + realUser = UserGroupInformation.createProxyUser( + user.getShortUserName(), UserGroupInformation.getLoginUser()); } clientRMProxy = FederationProxyProviderUtil.createRMProxy(getConf(), ApplicationClientProtocol.class, subClusterId, realUser); @@ -237,21 +244,17 @@ protected ApplicationClientProtocol getClientRMProxyForSubCluster( RouterServerUtil.logAndThrowException( "Unable to create the interface to reach the SubCluster " + subClusterId, e); } - clientRMProxies.put(subClusterId, clientRMProxy); return clientRMProxy; } private SubClusterId getRandomActiveSubCluster( - Map activeSubclusters) - throws YarnException { - - if (activeSubclusters == null || activeSubclusters.size() < 1) { + Map activeSubClusters) throws YarnException { + if (activeSubClusters == null || activeSubClusters.isEmpty()) { RouterServerUtil.logAndThrowException( FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE, null); } - List list = new ArrayList<>(activeSubclusters.keySet()); - + List list = new ArrayList<>(activeSubClusters.keySet()); return list.get(rand.nextInt(list.size())); } @@ -276,45 +279,43 @@ private SubClusterId getRandomActiveSubCluster( public GetNewApplicationResponse getNewApplication( GetNewApplicationRequest request) throws YarnException, IOException { - long startTime = clock.getTime(); + if (request == null) { + routerMetrics.incrAppsFailedCreated(); + String errMsg = "Missing getNewApplication request."; + RouterAuditLogger.logFailure(user.getShortUserName(), GET_NEW_APP, UNKNOWN, + TARGET_CLIENT_RM_SERVICE, errMsg); + RouterServerUtil.logAndThrowException(errMsg, null); + } + long startTime = clock.getTime(); Map subClustersActive = federationFacade.getSubClusters(true); for (int i = 0; i < numSubmitRetries; ++i) { SubClusterId subClusterId = getRandomActiveSubCluster(subClustersActive); - LOG.debug("getNewApplication try #{} on SubCluster {}", i, subClusterId); - ApplicationClientProtocol clientRMProxy = - getClientRMProxyForSubCluster(subClusterId); + LOG.info("getNewApplication try #{} on SubCluster {}.", i, subClusterId); + ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId); GetNewApplicationResponse response = null; try { response = clientRMProxy.getNewApplication(request); } catch (Exception e) { - LOG.warn("Unable to create a new ApplicationId in SubCluster " - + subClusterId.getId(), e); - } - - if (response != null) { - - long stopTime = clock.getTime(); - routerMetrics.succeededAppsCreated(stopTime - startTime); - RouterAuditLogger.logSuccess(user.getShortUserName(), - RouterAuditLogger.AuditConstants.GET_NEW_APP, - "RouterClientRMService", response.getApplicationId()); - return response; - } else { - // Empty response from the ResourceManager. - // Blacklist this subcluster for this request. + LOG.warn("Unable to create a new ApplicationId in SubCluster {}.", subClusterId.getId(), e); subClustersActive.remove(subClusterId); } + if (response != null) { + long stopTime = clock.getTime(); + routerMetrics.succeededAppsCreated(stopTime - startTime); + RouterAuditLogger.logSuccess(user.getShortUserName(), GET_NEW_APP, + TARGET_CLIENT_RM_SERVICE, response.getApplicationId()); + return response; + } } routerMetrics.incrAppsFailedCreated(); - String errMsg = "Fail to create a new application."; - RouterAuditLogger.logFailure(user.getShortUserName(), - RouterAuditLogger.AuditConstants.GET_NEW_APP, "UNKNOWN", - "RouterClientRMService", errMsg); + String errMsg = "Failed to create a new application."; + RouterAuditLogger.logFailure(user.getShortUserName(), GET_NEW_APP, UNKNOWN, + TARGET_CLIENT_RM_SERVICE, errMsg); throw new YarnException(errMsg); } @@ -387,21 +388,20 @@ public GetNewApplicationResponse getNewApplication( public SubmitApplicationResponse submitApplication( SubmitApplicationRequest request) throws YarnException, IOException { - long startTime = clock.getTime(); - if (request == null || request.getApplicationSubmissionContext() == null - || request.getApplicationSubmissionContext() - .getApplicationId() == null) { + || request.getApplicationSubmissionContext().getApplicationId() == null) { routerMetrics.incrAppsFailedSubmitted(); String errMsg = - "Missing submitApplication request or applicationSubmissionContext " - + "information."; - RouterAuditLogger.logFailure(user.getShortUserName(), - RouterAuditLogger.AuditConstants.SUBMIT_NEW_APP, "UNKNOWN", - "RouterClientRMService", errMsg); - throw new YarnException(errMsg); + "Missing submitApplication request or applicationSubmissionContext information."; + RouterAuditLogger.logFailure(user.getShortUserName(), SUBMIT_NEW_APP, UNKNOWN, + TARGET_CLIENT_RM_SERVICE, errMsg); + RouterServerUtil.logAndThrowException(errMsg, null); } + SubmitApplicationResponse response = null; + + long startTime = clock.getTime(); + ApplicationId applicationId = request.getApplicationSubmissionContext().getApplicationId(); @@ -411,8 +411,9 @@ public SubmitApplicationResponse submitApplication( SubClusterId subClusterId = policyFacade.getHomeSubcluster( request.getApplicationSubmissionContext(), blacklist); - LOG.info("submitApplication appId {} try #{} on SubCluster {}.", applicationId, i, - subClusterId); + + LOG.info("submitApplication appId {} try #{} on SubCluster {}.", + applicationId, i, subClusterId); ApplicationHomeSubCluster appHomeSubCluster = ApplicationHomeSubCluster.newInstance(applicationId, subClusterId); @@ -425,12 +426,12 @@ public SubmitApplicationResponse submitApplication( federationFacade.addApplicationHomeSubCluster(appHomeSubCluster); } catch (YarnException e) { routerMetrics.incrAppsFailedSubmitted(); - String message = "Unable to insert the ApplicationId " + applicationId - + " into the FederationStateStore"; - RouterAuditLogger.logFailure(user.getShortUserName(), - RouterAuditLogger.AuditConstants.SUBMIT_NEW_APP, "UNKNOWN", - "RouterClientRMService", message, applicationId, subClusterId); - throw new YarnException(message, e); + String message = + String.format("Unable to insert the ApplicationId %s into the FederationStateStore.", + applicationId); + RouterAuditLogger.logFailure(user.getShortUserName(), SUBMIT_NEW_APP, UNKNOWN, + TARGET_CLIENT_RM_SERVICE, message, applicationId, subClusterId); + RouterServerUtil.logAndThrowException(message, e); } } else { try { @@ -438,19 +439,19 @@ public SubmitApplicationResponse submitApplication( // the new subClusterId we have selected federationFacade.updateApplicationHomeSubCluster(appHomeSubCluster); } catch (YarnException e) { - String message = "Unable to update the ApplicationId " + applicationId - + " into the FederationStateStore"; + String message = + String.format("Unable to update the ApplicationId %s into the FederationStateStore.", + applicationId); SubClusterId subClusterIdInStateStore = federationFacade.getApplicationHomeSubCluster(applicationId); if (subClusterId == subClusterIdInStateStore) { - LOG.info("Application {} already submitted on SubCluster {}.", applicationId, - subClusterId); + LOG.info("Application {} already submitted on SubCluster {}.", + applicationId, subClusterId); } else { routerMetrics.incrAppsFailedSubmitted(); - RouterAuditLogger.logFailure(user.getShortUserName(), - RouterAuditLogger.AuditConstants.SUBMIT_NEW_APP, "UNKNOWN", - "RouterClientRMService", message, applicationId, subClusterId); - throw new YarnException(message, e); + RouterAuditLogger.logFailure(user.getShortUserName(), SUBMIT_NEW_APP, UNKNOWN, + TARGET_CLIENT_RM_SERVICE, message, applicationId, subClusterId); + RouterServerUtil.logAndThrowException(message, e); } } } @@ -458,7 +459,6 @@ public SubmitApplicationResponse submitApplication( ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId); - SubmitApplicationResponse response = null; try { response = clientRMProxy.submitApplication(request); } catch (Exception e) { @@ -472,9 +472,8 @@ public SubmitApplicationResponse submitApplication( applicationId, subClusterId); long stopTime = clock.getTime(); routerMetrics.succeededAppsSubmitted(stopTime - startTime); - RouterAuditLogger.logSuccess(user.getShortUserName(), - RouterAuditLogger.AuditConstants.SUBMIT_NEW_APP, - "RouterClientRMService", applicationId, subClusterId); + RouterAuditLogger.logSuccess(user.getShortUserName(), SUBMIT_NEW_APP, + TARGET_CLIENT_RM_SERVICE, applicationId, subClusterId); return response; } else { // Empty response from the ResourceManager. @@ -484,13 +483,11 @@ public SubmitApplicationResponse submitApplication( } routerMetrics.incrAppsFailedSubmitted(); - String errMsg = "Application " - + request.getApplicationSubmissionContext().getApplicationName() - + " with appId " + applicationId + " failed to be submitted."; - RouterAuditLogger.logFailure(user.getShortUserName(), - RouterAuditLogger.AuditConstants.SUBMIT_NEW_APP, "UNKNOWN", - "RouterClientRMService", errMsg, applicationId); - throw new YarnException(errMsg); + String msg = String.format("Application %s with appId %s failed to be submitted.", + request.getApplicationSubmissionContext().getApplicationName(), applicationId); + RouterAuditLogger.logFailure(user.getShortUserName(), SUBMIT_NEW_APP, UNKNOWN, + TARGET_CLIENT_RM_SERVICE, msg, applicationId); + throw new YarnException(msg); } /** @@ -513,16 +510,16 @@ public SubmitApplicationResponse submitApplication( public KillApplicationResponse forceKillApplication( KillApplicationRequest request) throws YarnException, IOException { - long startTime = clock.getTime(); - if (request == null || request.getApplicationId() == null) { routerMetrics.incrAppsFailedKilled(); - String message = "Missing forceKillApplication request or ApplicationId."; - RouterAuditLogger.logFailure(user.getShortUserName(), - RouterAuditLogger.AuditConstants.FORCE_KILL_APP, "UNKNOWN", - "RouterClientRMService", message); - throw new YarnException(message); + String msg = "Missing forceKillApplication request or ApplicationId."; + RouterAuditLogger.logFailure(user.getShortUserName(), FORCE_KILL_APP, UNKNOWN, + TARGET_CLIENT_RM_SERVICE, msg); + RouterServerUtil.logAndThrowException(msg, null); } + + long startTime = clock.getTime(); + ApplicationId applicationId = request.getApplicationId(); SubClusterId subClusterId = null; @@ -531,12 +528,11 @@ public KillApplicationResponse forceKillApplication( .getApplicationHomeSubCluster(request.getApplicationId()); } catch (YarnException e) { routerMetrics.incrAppsFailedKilled(); - String msg = "Application " + applicationId - + " does not exist in FederationStateStore"; - RouterAuditLogger.logFailure(user.getShortUserName(), - RouterAuditLogger.AuditConstants.FORCE_KILL_APP, "UNKNOWN", - "RouterClientRMService", msg, applicationId); - throw new YarnException(msg, e); + String msg = + String.format("Application %s does not exist in FederationStateStore.", applicationId); + RouterAuditLogger.logFailure(user.getShortUserName(), FORCE_KILL_APP, UNKNOWN, + TARGET_CLIENT_RM_SERVICE, msg, applicationId); + RouterServerUtil.logAndThrowException(msg, e); } ApplicationClientProtocol clientRMProxy = @@ -548,11 +544,10 @@ public KillApplicationResponse forceKillApplication( response = clientRMProxy.forceKillApplication(request); } catch (Exception e) { routerMetrics.incrAppsFailedKilled(); - RouterAuditLogger.logFailure(user.getShortUserName(), - RouterAuditLogger.AuditConstants.FORCE_KILL_APP, "UNKNOWN", - "RouterClientRMService", "Unable to kill the application report", - applicationId, subClusterId); - throw e; + String msg = "Unable to kill the application report."; + RouterAuditLogger.logFailure(user.getShortUserName(), FORCE_KILL_APP, UNKNOWN, + TARGET_CLIENT_RM_SERVICE, msg, applicationId, subClusterId); + RouterServerUtil.logAndThrowException(msg, e); } if (response == null) { @@ -562,9 +557,8 @@ public KillApplicationResponse forceKillApplication( long stopTime = clock.getTime(); routerMetrics.succeededAppsKilled(stopTime - startTime); - RouterAuditLogger.logSuccess(user.getShortUserName(), - RouterAuditLogger.AuditConstants.FORCE_KILL_APP, - "RouterClientRMService", applicationId); + RouterAuditLogger.logSuccess(user.getShortUserName(), FORCE_KILL_APP, + TARGET_CLIENT_RM_SERVICE, applicationId); return response; } @@ -588,18 +582,15 @@ public KillApplicationResponse forceKillApplication( public GetApplicationReportResponse getApplicationReport( GetApplicationReportRequest request) throws YarnException, IOException { - long startTime = clock.getTime(); - if (request == null || request.getApplicationId() == null) { routerMetrics.incrAppsFailedRetrieved(); - String errMsg = - "Missing getApplicationReport request or applicationId information."; - RouterAuditLogger.logFailure(user.getShortUserName(), - RouterAuditLogger.AuditConstants.GET_APP_REPORT, "UNKNOWN", - "RouterClientRMService", errMsg); - throw new YarnException(errMsg); + String errMsg = "Missing getApplicationReport request or applicationId information."; + RouterAuditLogger.logFailure(user.getShortUserName(), GET_APP_REPORT, UNKNOWN, + TARGET_CLIENT_RM_SERVICE, errMsg); + RouterServerUtil.logAndThrowException(errMsg, null); } + long startTime = clock.getTime(); SubClusterId subClusterId = null; try { @@ -607,29 +598,26 @@ public GetApplicationReportResponse getApplicationReport( .getApplicationHomeSubCluster(request.getApplicationId()); } catch (YarnException e) { routerMetrics.incrAppsFailedRetrieved(); - String errMsg = "Application " + request.getApplicationId() - + " does not exist in FederationStateStore"; - RouterAuditLogger.logFailure(user.getShortUserName(), - RouterAuditLogger.AuditConstants.GET_APP_REPORT, "UNKNOWN", - "RouterClientRMService", errMsg, request.getApplicationId()); - throw new YarnException(errMsg, e); + String errMsg = String.format("Application %s does not exist in FederationStateStore.", + request.getApplicationId()); + RouterAuditLogger.logFailure(user.getShortUserName(), GET_APP_REPORT, UNKNOWN, + TARGET_CLIENT_RM_SERVICE, errMsg, request.getApplicationId()); + RouterServerUtil.logAndThrowException(errMsg, e); } ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId); - GetApplicationReportResponse response = null; + try { response = clientRMProxy.getApplicationReport(request); } catch (Exception e) { routerMetrics.incrAppsFailedRetrieved(); - String errMsg = "Unable to get the application report for " + request - .getApplicationId() + "to SubCluster " + subClusterId.getId(); - RouterAuditLogger.logFailure(user.getShortUserName(), - RouterAuditLogger.AuditConstants.GET_APP_REPORT, "UNKNOWN", - "RouterClientRMService", errMsg, request.getApplicationId(), - subClusterId); - throw e; + String errMsg = String.format("Unable to get the application report for %s to SubCluster %s.", + request.getApplicationId(), subClusterId.getId()); + RouterAuditLogger.logFailure(user.getShortUserName(), GET_APP_REPORT, UNKNOWN, + TARGET_CLIENT_RM_SERVICE, errMsg, request.getApplicationId(), subClusterId); + RouterServerUtil.logAndThrowException(errMsg, e); } if (response == null) { @@ -637,12 +625,10 @@ public GetApplicationReportResponse getApplicationReport( + "the application {} to SubCluster {}.", request.getApplicationId(), subClusterId.getId()); } - long stopTime = clock.getTime(); routerMetrics.succeededAppsRetrieved(stopTime - startTime); - RouterAuditLogger.logSuccess(user.getShortUserName(), - RouterAuditLogger.AuditConstants.GET_APP_REPORT, - "RouterClientRMService", request.getApplicationId()); + RouterAuditLogger.logSuccess(user.getShortUserName(), GET_APP_REPORT, + TARGET_CLIENT_RM_SERVICE, request.getApplicationId()); return response; } @@ -670,56 +656,48 @@ public GetApplicationsResponse getApplications(GetApplicationsRequest request) throws YarnException, IOException { if (request == null) { routerMetrics.incrMultipleAppsFailedRetrieved(); - RouterServerUtil.logAndThrowException( - "Missing getApplications request.", - null); + RouterServerUtil.logAndThrowException("Missing getApplications request.", null); } long startTime = clock.getTime(); Map subclusters = federationFacade.getSubClusters(true); ClientMethod remoteMethod = new ClientMethod("getApplications", new Class[] {GetApplicationsRequest.class}, new Object[] {request}); - Map applications; - + Map applications = null; try { applications = invokeConcurrent(subclusters.keySet(), remoteMethod, GetApplicationsResponse.class); - } catch (Exception ex) { routerMetrics.incrMultipleAppsFailedRetrieved(); - LOG.error("Unable to get applications due to exception.", ex); - throw ex; + RouterServerUtil.logAndThrowException("Unable to get applications due to exception.", ex); } long stopTime = clock.getTime(); routerMetrics.succeededMultipleAppsRetrieved(stopTime - startTime); // Merge the Application Reports - return RouterYarnClientUtils.mergeApplications(applications.values(), - returnPartialReport); + return RouterYarnClientUtils.mergeApplications(applications.values(), returnPartialReport); } @Override public GetClusterMetricsResponse getClusterMetrics( GetClusterMetricsRequest request) throws YarnException, IOException { + if (request == null) { + routerMetrics.incrGetClusterMetricsFailedRetrieved(); + RouterServerUtil.logAndThrowException("Missing getClusterMetrics request.", null); + } long startTime = clock.getTime(); - Map subclusters = - federationFacade.getSubClusters(true); ClientMethod remoteMethod = new ClientMethod("getClusterMetrics", new Class[] {GetClusterMetricsRequest.class}, new Object[] {request}); - Map clusterMetrics; - + Collection clusterMetrics = null; try { - clusterMetrics = invokeConcurrent(subclusters.keySet(), remoteMethod, - GetClusterMetricsResponse.class); - + clusterMetrics = invokeAppClientProtocolMethod( + true, remoteMethod, GetClusterMetricsResponse.class); } catch (Exception ex) { routerMetrics.incrGetClusterMetricsFailedRetrieved(); - LOG.error("Unable to get cluster metrics due to exception.", ex); - throw ex; + RouterServerUtil.logAndThrowException("Unable to get cluster metrics due to exception.", ex); } - long stopTime = clock.getTime(); routerMetrics.succeededGetClusterMetricsRetrieved(stopTime - startTime); - return RouterYarnClientUtils.merge(clusterMetrics.values()); + return RouterYarnClientUtils.merge(clusterMetrics); } Map invokeConcurrent(ArrayList clusterIds, @@ -804,8 +782,7 @@ public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request) clusterNodes.put(subClusterId, response); } catch (Exception ex) { routerMetrics.incrClusterNodesFailedRetrieved(); - LOG.error("Unable to get cluster nodes due to exception.", ex); - throw ex; + RouterServerUtil.logAndThrowException("Unable to get cluster nodes due to exception.", ex); } } long stopTime = clock.getTime(); @@ -850,14 +827,13 @@ public GetQueueUserAclsInfoResponse getQueueUserAcls( long startTime = clock.getTime(); ClientMethod remoteMethod = new ClientMethod("getQueueUserAcls", new Class[] {GetQueueUserAclsInfoRequest.class}, new Object[] {request}); - Collection queueUserAcls; + Collection queueUserAcls = null; try { queueUserAcls = invokeAppClientProtocolMethod(true, remoteMethod, GetQueueUserAclsInfoResponse.class); } catch (Exception ex) { routerMetrics.incrQueueUserAclsFailedRetrieved(); - LOG.error("Unable to get queue user Acls due to exception.", ex); - throw ex; + RouterServerUtil.logAndThrowException("Unable to get queue user Acls due to exception.", ex); } long stopTime = clock.getTime(); routerMetrics.succeededGetQueueUserAclsRetrieved(stopTime - startTime); @@ -931,14 +907,14 @@ public ReservationListResponse listReservations( long startTime = clock.getTime(); ClientMethod remoteMethod = new ClientMethod("listReservations", new Class[] {ReservationListRequest.class}, new Object[] {request}); - Collection listResponses; + Collection listResponses = null; try { listResponses = invokeAppClientProtocolMethod(true, remoteMethod, ReservationListResponse.class); } catch (Exception ex) { routerMetrics.incrListReservationsFailedRetrieved(); - LOG.error("Unable to list reservations node due to exception.", ex); - throw ex; + RouterServerUtil.logAndThrowException( + "Unable to list reservations node due to exception.", ex); } long stopTime = clock.getTime(); routerMetrics.succeededListReservationsRetrieved(stopTime - startTime); @@ -986,14 +962,13 @@ public GetNodesToLabelsResponse getNodeToLabels( long startTime = clock.getTime(); ClientMethod remoteMethod = new ClientMethod("getNodeToLabels", new Class[] {GetNodesToLabelsRequest.class}, new Object[] {request}); - Collection clusterNodes; + Collection clusterNodes = null; try { clusterNodes = invokeAppClientProtocolMethod(true, remoteMethod, GetNodesToLabelsResponse.class); } catch (Exception ex) { routerMetrics.incrNodeToLabelsFailedRetrieved(); - LOG.error("Unable to get label node due to exception.", ex); - throw ex; + RouterServerUtil.logAndThrowException("Unable to get node label due to exception.", ex); } long stopTime = clock.getTime(); routerMetrics.succeededGetNodeToLabelsRetrieved(stopTime - startTime); @@ -1010,15 +985,14 @@ public GetLabelsToNodesResponse getLabelsToNodes( } long startTime = clock.getTime(); ClientMethod remoteMethod = new ClientMethod("getLabelsToNodes", - new Class[] {GetLabelsToNodesRequest.class}, new Object[] {request}); - Collection labelNodes; + new Class[] {GetLabelsToNodesRequest.class}, new Object[] {request}); + Collection labelNodes = null; try { labelNodes = invokeAppClientProtocolMethod(true, remoteMethod, GetLabelsToNodesResponse.class); } catch (Exception ex) { routerMetrics.incrLabelsToNodesFailedRetrieved(); - LOG.error("Unable to get label node due to exception.", ex); - throw ex; + RouterServerUtil.logAndThrowException("Unable to get label node due to exception.", ex); } long stopTime = clock.getTime(); routerMetrics.succeededGetLabelsToNodesRetrieved(stopTime - startTime); @@ -1035,15 +1009,15 @@ public GetClusterNodeLabelsResponse getClusterNodeLabels( } long startTime = clock.getTime(); ClientMethod remoteMethod = new ClientMethod("getClusterNodeLabels", - new Class[] {GetClusterNodeLabelsRequest.class}, new Object[] {request}); - Collection nodeLabels; + new Class[] {GetClusterNodeLabelsRequest.class}, new Object[] {request}); + Collection nodeLabels = null; try { nodeLabels = invokeAppClientProtocolMethod(true, remoteMethod, GetClusterNodeLabelsResponse.class); } catch (Exception ex) { routerMetrics.incrClusterNodeLabelsFailedRetrieved(); - LOG.error("Unable to get cluster nodeLabels due to exception.", ex); - throw ex; + RouterServerUtil.logAndThrowException("Unable to get cluster nodeLabels due to exception.", + ex); } long stopTime = clock.getTime(); routerMetrics.succeededGetClusterNodeLabelsRetrieved(stopTime - startTime); @@ -1096,15 +1070,15 @@ public GetApplicationAttemptReportResponse getApplicationAttemptReport( ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId); - GetApplicationAttemptReportResponse response; + GetApplicationAttemptReportResponse response = null; try { response = clientRMProxy.getApplicationAttemptReport(request); } catch (Exception e) { routerMetrics.incrAppAttemptsFailedRetrieved(); - LOG.error("Unable to get the applicationAttempt report for {} " - + "to SubCluster {}, error = {}.", - request.getApplicationAttemptId(), subClusterId.getId(), e); - throw e; + String msg = String.format( + "Unable to get the applicationAttempt report for %s to SubCluster %s.", + request.getApplicationAttemptId(), subClusterId.getId()); + RouterServerUtil.logAndThrowException(msg, e); } if (response == null) { @@ -1328,8 +1302,7 @@ public UpdateApplicationPriorityResponse updateApplicationPriority( } catch (YarnException e) { routerMetrics.incrUpdateAppPriorityFailedRetrieved(); RouterServerUtil.logAndThrowException("Application " + - request.getApplicationId() + - " does not exist in FederationStateStore.", e); + request.getApplicationId() + " does not exist in FederationStateStore.", e); } ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java index f0aa48082b..6a47f15757 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java @@ -156,14 +156,13 @@ public void setUp() { stateStore = new MemoryFederationStateStore(); stateStore.init(this.getConf()); - FederationStateStoreFacade.getInstance().reinitialize(stateStore, - getConf()); + FederationStateStoreFacade.getInstance().reinitialize(stateStore, getConf()); stateStoreUtil = new FederationStateStoreTestUtil(stateStore); interceptor.setConf(this.getConf()); interceptor.init(user); - subClusters = new ArrayList(); + subClusters = new ArrayList<>(); try { for (int i = 0; i < NUM_SUBCLUSTER; i++) { @@ -197,7 +196,7 @@ protected YarnConfiguration createConfiguration() { // chain conf.set(YarnConfiguration.ROUTER_CLIENTRM_INTERCEPTOR_CLASS_PIPELINE, mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass - + "," + TestableFederationClientInterceptor.class.getName()); + + "," + TestableFederationClientInterceptor.class.getName()); conf.set(YarnConfiguration.FEDERATION_POLICY_MANAGER, UniformBroadcastPolicyManager.class.getName()); @@ -212,17 +211,16 @@ protected YarnConfiguration createConfiguration() { * ApplicationId has to belong to one of the SubCluster in the cluster. */ @Test - public void testGetNewApplication() - throws YarnException, IOException, InterruptedException { - LOG.info("Test FederationClientInterceptor: Get New Application"); + public void testGetNewApplication() throws YarnException, IOException { + LOG.info("Test FederationClientInterceptor: Get New Application."); GetNewApplicationRequest request = GetNewApplicationRequest.newInstance(); GetNewApplicationResponse response = interceptor.getNewApplication(request); Assert.assertNotNull(response); Assert.assertNotNull(response.getApplicationId()); - Assert.assertTrue(response.getApplicationId() - .getClusterTimestamp() == ResourceManager.getClusterTimeStamp()); + Assert.assertEquals(response.getApplicationId().getClusterTimestamp(), + ResourceManager.getClusterTimeStamp()); } /** @@ -232,10 +230,9 @@ public void testGetNewApplication() @Test public void testSubmitApplication() throws YarnException, IOException { - LOG.info("Test FederationClientInterceptor: Submit Application"); + LOG.info("Test FederationClientInterceptor: Submit Application."); - ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), - 1); + ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); SubmitApplicationResponse response = interceptor.submitApplication(request); @@ -249,14 +246,12 @@ public void testSubmitApplication() private SubmitApplicationRequest mockSubmitApplicationRequest( ApplicationId appId) { ContainerLaunchContext amContainerSpec = mock(ContainerLaunchContext.class); - ApplicationSubmissionContext context = ApplicationSubmissionContext - .newInstance(appId, MockApps.newAppName(), "default", - Priority.newInstance(APP_PRIORITY_ZERO), amContainerSpec, false, false, -1, - Resources.createResource( - YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB), - "MockApp"); - SubmitApplicationRequest request = SubmitApplicationRequest - .newInstance(context); + ApplicationSubmissionContext context = ApplicationSubmissionContext.newInstance( + appId, MockApps.newAppName(), "default", + Priority.newInstance(APP_PRIORITY_ZERO), amContainerSpec, false, false, -1, + Resources.createResource(YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB), + "MockApp"); + SubmitApplicationRequest request = SubmitApplicationRequest.newInstance(context); return request; } @@ -297,37 +292,27 @@ public void testSubmitApplicationMultipleSubmission() */ @Test public void testSubmitApplicationEmptyRequest() - throws YarnException, IOException, InterruptedException { - LOG.info( - "Test FederationClientInterceptor: Submit Application - Empty"); - try { - interceptor.submitApplication(null); - Assert.fail(); - } catch (YarnException e) { - Assert.assertTrue( - e.getMessage().startsWith("Missing submitApplication request or " - + "applicationSubmissionContext information.")); - } - try { - interceptor.submitApplication(SubmitApplicationRequest.newInstance(null)); - Assert.fail(); - } catch (YarnException e) { - Assert.assertTrue( - e.getMessage().startsWith("Missing submitApplication request or " - + "applicationSubmissionContext information.")); - } - try { - ApplicationSubmissionContext context = ApplicationSubmissionContext - .newInstance(null, "", "", null, null, false, false, -1, null, null); - SubmitApplicationRequest request = - SubmitApplicationRequest.newInstance(context); - interceptor.submitApplication(request); - Assert.fail(); - } catch (YarnException e) { - Assert.assertTrue( - e.getMessage().startsWith("Missing submitApplication request or " - + "applicationSubmissionContext information.")); - } + throws Exception { + LOG.info("Test FederationClientInterceptor: Submit Application - Empty."); + + // null request1 + LambdaTestUtils.intercept(YarnException.class, + "Missing submitApplication request or applicationSubmissionContext information.", + () -> interceptor.submitApplication(null)); + + // null request2 + LambdaTestUtils.intercept(YarnException.class, + "Missing submitApplication request or applicationSubmissionContext information.", + () -> interceptor.submitApplication(SubmitApplicationRequest.newInstance(null))); + + // null request3 + ApplicationSubmissionContext context = ApplicationSubmissionContext + .newInstance(null, "", "", null, null, false, false, -1, null, null); + SubmitApplicationRequest request = + SubmitApplicationRequest.newInstance(context); + LambdaTestUtils.intercept(YarnException.class, + "Missing submitApplication request or applicationSubmissionContext information.", + () -> interceptor.submitApplication(request)); } /** @@ -337,7 +322,7 @@ public void testSubmitApplicationEmptyRequest() @Test public void testForceKillApplication() throws YarnException, IOException, InterruptedException { - LOG.info("Test FederationClientInterceptor: Force Kill Application"); + LOG.info("Test FederationClientInterceptor: Force Kill Application."); ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); @@ -349,10 +334,8 @@ public void testForceKillApplication() Assert.assertNotNull(response); Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId)); - KillApplicationRequest requestKill = - KillApplicationRequest.newInstance(appId); - KillApplicationResponse responseKill = - interceptor.forceKillApplication(requestKill); + KillApplicationRequest requestKill = KillApplicationRequest.newInstance(appId); + KillApplicationResponse responseKill = interceptor.forceKillApplication(requestKill); Assert.assertNotNull(responseKill); } @@ -361,22 +344,17 @@ public void testForceKillApplication() * application does not exist in StateStore. */ @Test - public void testForceKillApplicationNotExists() - throws YarnException, IOException, InterruptedException { - LOG.info("Test FederationClientInterceptor: " - + "Force Kill Application - Not Exists"); + public void testForceKillApplicationNotExists() throws Exception { + LOG.info("Test FederationClientInterceptor: Force Kill Application - Not Exists"); ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); KillApplicationRequest requestKill = KillApplicationRequest.newInstance(appId); - try { - interceptor.forceKillApplication(requestKill); - Assert.fail(); - } catch (YarnException e) { - Assert.assertTrue(e.getMessage().equals( - "Application " + appId + " does not exist in FederationStateStore")); - } + + LambdaTestUtils.intercept(YarnException.class, + "Application " + appId + " does not exist in FederationStateStore.", + () -> interceptor.forceKillApplication(requestKill)); } /** @@ -385,24 +363,19 @@ public void testForceKillApplicationNotExists() */ @Test public void testForceKillApplicationEmptyRequest() - throws YarnException, IOException, InterruptedException { - LOG.info( - "Test FederationClientInterceptor: Force Kill Application - Empty"); - try { - interceptor.forceKillApplication(null); - Assert.fail(); - } catch (YarnException e) { - Assert.assertTrue(e.getMessage().startsWith( - "Missing forceKillApplication request or ApplicationId.")); - } - try { - interceptor - .forceKillApplication(KillApplicationRequest.newInstance(null)); - Assert.fail(); - } catch (YarnException e) { - Assert.assertTrue(e.getMessage().startsWith( - "Missing forceKillApplication request or ApplicationId.")); - } + throws Exception { + LOG.info("Test FederationClientInterceptor: Force Kill Application - Empty."); + + // null request1 + LambdaTestUtils.intercept(YarnException.class, + "Missing forceKillApplication request or ApplicationId.", + () -> interceptor.forceKillApplication(null)); + + // null request2 + KillApplicationRequest killRequest = KillApplicationRequest.newInstance(null); + LambdaTestUtils.intercept(YarnException.class, + "Missing forceKillApplication request or ApplicationId.", + () -> interceptor.forceKillApplication(killRequest)); } /** @@ -439,20 +412,14 @@ public void testGetApplicationReport() */ @Test public void testGetApplicationNotExists() - throws YarnException, IOException, InterruptedException { - LOG.info( - "Test ApplicationClientProtocol: Get Application Report - Not Exists"); - ApplicationId appId = - ApplicationId.newInstance(System.currentTimeMillis(), 1); - GetApplicationReportRequest requestGet = - GetApplicationReportRequest.newInstance(appId); - try { - interceptor.getApplicationReport(requestGet); - Assert.fail(); - } catch (YarnException e) { - Assert.assertTrue(e.getMessage().equals( - "Application " + appId + " does not exist in FederationStateStore")); - } + throws Exception { + LOG.info("Test ApplicationClientProtocol: Get Application Report - Not Exists."); + + ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); + GetApplicationReportRequest requestGet = GetApplicationReportRequest.newInstance(appId); + LambdaTestUtils.intercept(YarnException.class, + "Application " + appId + " does not exist in FederationStateStore.", + () -> interceptor.getApplicationReport(requestGet)); } /** @@ -461,31 +428,23 @@ public void testGetApplicationNotExists() */ @Test public void testGetApplicationEmptyRequest() - throws YarnException, IOException, InterruptedException { - LOG.info( - "Test FederationClientInterceptor: Get Application Report - Empty"); - try { - interceptor.getApplicationReport(null); - Assert.fail(); - } catch (YarnException e) { - Assert.assertTrue( - e.getMessage().startsWith("Missing getApplicationReport request or " - + "applicationId information.")); - } - try { - interceptor - .getApplicationReport(GetApplicationReportRequest.newInstance(null)); - Assert.fail(); - } catch (YarnException e) { - Assert.assertTrue( - e.getMessage().startsWith("Missing getApplicationReport request or " - + "applicationId information.")); - } + throws Exception { + LOG.info("Test FederationClientInterceptor: Get Application Report - Empty."); + + // null request1 + LambdaTestUtils.intercept(YarnException.class, + "Missing getApplicationReport request or applicationId information.", + () -> interceptor.getApplicationReport(null)); + + // null request2 + GetApplicationReportRequest reportRequest = GetApplicationReportRequest.newInstance(null); + LambdaTestUtils.intercept(YarnException.class, + "Missing getApplicationReport request or applicationId information.", + () -> interceptor.getApplicationReport(reportRequest)); } /** - * This test validates the correctness of - * GetApplicationAttemptReport in case the + * This test validates the correctness of GetApplicationAttemptReport in case the * application exists in the cluster. */ @Test @@ -529,77 +488,68 @@ public void testGetApplicationAttemptReport() } /** - * This test validates the correctness of - * GetApplicationAttemptReport in case the + * This test validates the correctness of GetApplicationAttemptReport in case the * application does not exist in StateStore. */ @Test - public void testGetApplicationAttemptNotExists() - throws Exception { - LOG.info( - "Test ApplicationClientProtocol: " + - "Get ApplicationAttempt Report - Not Exists"); + public void testGetApplicationAttemptNotExists() throws Exception { + LOG.info("Test FederationClientInterceptor: Get ApplicationAttempt Report - Not Exists."); + ApplicationId appId = - ApplicationId.newInstance(System.currentTimeMillis(), 1); + ApplicationId.newInstance(System.currentTimeMillis(), 1); ApplicationAttemptId appAttemptID = - ApplicationAttemptId.newInstance(appId, 1); + ApplicationAttemptId.newInstance(appId, 1); GetApplicationAttemptReportRequest requestGet = - GetApplicationAttemptReportRequest.newInstance(appAttemptID); + GetApplicationAttemptReportRequest.newInstance(appAttemptID); LambdaTestUtils.intercept(YarnException.class, "ApplicationAttempt " + - appAttemptID + " belongs to Application " + - appId + " does not exist in FederationStateStore.", + appAttemptID + " belongs to Application " + + appId + " does not exist in FederationStateStore.", () -> interceptor.getApplicationAttemptReport(requestGet)); } /** - * This test validates - * the correctness of GetApplicationAttemptReport in case of + * This test validates the correctness of GetApplicationAttemptReport in case of * empty request. */ @Test public void testGetApplicationAttemptEmptyRequest() - throws Exception { - LOG.info("Test FederationClientInterceptor: " + - "Get ApplicationAttempt Report - Empty"); + throws Exception { + LOG.info("Test FederationClientInterceptor: Get ApplicationAttempt Report - Empty."); + // null request1 LambdaTestUtils.intercept(YarnException.class, - "Missing getApplicationAttemptReport " + - "request or applicationId " + - "or applicationAttemptId information.", + "Missing getApplicationAttemptReport request or applicationId " + + "or applicationAttemptId information.", () -> interceptor.getApplicationAttemptReport(null)); + // null request2 LambdaTestUtils.intercept(YarnException.class, - "Missing getApplicationAttemptReport " + - "request or applicationId " + - "or applicationAttemptId information.", - () -> interceptor - .getApplicationAttemptReport( - GetApplicationAttemptReportRequest - .newInstance(null))); + "Missing getApplicationAttemptReport request or applicationId " + + "or applicationAttemptId information.", + () -> interceptor.getApplicationAttemptReport( + GetApplicationAttemptReportRequest.newInstance(null))); + // null request3 LambdaTestUtils.intercept(YarnException.class, - "Missing getApplicationAttemptReport " + - "request or applicationId " + - "or applicationAttemptId information.", - () -> interceptor - .getApplicationAttemptReport( - GetApplicationAttemptReportRequest.newInstance( - ApplicationAttemptId - .newInstance(null, 1) - ))); + "Missing getApplicationAttemptReport request or applicationId " + + "or applicationAttemptId information.", + () -> interceptor.getApplicationAttemptReport( + GetApplicationAttemptReportRequest.newInstance( + ApplicationAttemptId.newInstance(null, 1)))); } @Test - public void testGetClusterMetricsRequest() throws YarnException, IOException { - LOG.info("Test FederationClientInterceptor : Get Cluster Metrics request"); + public void testGetClusterMetricsRequest() throws Exception { + LOG.info("Test FederationClientInterceptor : Get Cluster Metrics request."); + // null request - GetClusterMetricsResponse response = interceptor.getClusterMetrics(null); - Assert.assertEquals(subClusters.size(), - response.getClusterMetrics().getNumNodeManagers()); + LambdaTestUtils.intercept(YarnException.class, "Missing getClusterMetrics request.", + () -> interceptor.getClusterMetrics(null)); + // normal request. - response = + GetClusterMetricsResponse response = interceptor.getClusterMetrics(GetClusterMetricsRequest.newInstance()); Assert.assertEquals(subClusters.size(), response.getClusterMetrics().getNumNodeManagers()); @@ -607,23 +557,20 @@ public void testGetClusterMetricsRequest() throws YarnException, IOException { ClientMethod remoteMethod = new ClientMethod("getClusterMetrics", new Class[] {GetClusterMetricsRequest.class}, new Object[] {GetClusterMetricsRequest.newInstance()}); - Map clusterMetrics =interceptor. - invokeConcurrent(new ArrayList<>(), remoteMethod, - GetClusterMetricsResponse.class); - Assert.assertEquals(true, clusterMetrics.isEmpty()); + Map clusterMetrics = interceptor. + invokeConcurrent(new ArrayList<>(), remoteMethod, GetClusterMetricsResponse.class); + Assert.assertTrue(clusterMetrics.isEmpty()); } /** - * This test validates the correctness of - * GetApplicationsResponse in case the + * This test validates the correctness of GetApplicationsResponse in case the * application exists in the cluster. */ @Test public void testGetApplicationsResponse() throws YarnException, IOException, InterruptedException { - LOG.info("Test FederationClientInterceptor: Get Applications Response"); - ApplicationId appId = - ApplicationId.newInstance(System.currentTimeMillis(), 1); + LOG.info("Test FederationClientInterceptor: Get Applications Response."); + ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); SubmitApplicationResponse response = interceptor.submitApplication(request); @@ -632,40 +579,32 @@ public void testGetApplicationsResponse() Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId)); Set appTypes = Collections.singleton("MockApp"); - GetApplicationsRequest requestGet = - GetApplicationsRequest.newInstance(appTypes); - - GetApplicationsResponse responseGet = - interceptor.getApplications(requestGet); + GetApplicationsRequest requestGet = GetApplicationsRequest.newInstance(appTypes); + GetApplicationsResponse responseGet = interceptor.getApplications(requestGet); Assert.assertNotNull(responseGet); } /** - * This test validates - * the correctness of GetApplicationsResponse in case of + * This test validates the correctness of GetApplicationsResponse in case of * empty request. */ @Test public void testGetApplicationsNullRequest() throws Exception { - LOG.info("Test FederationClientInterceptor: Get Applications request"); - LambdaTestUtils.intercept(YarnException.class, - "Missing getApplications request.", + LOG.info("Test FederationClientInterceptor: Get Applications request."); + LambdaTestUtils.intercept(YarnException.class, "Missing getApplications request.", () -> interceptor.getApplications(null)); } /** - * This test validates - * the correctness of GetApplicationsResponse in case applications + * This test validates the correctness of GetApplicationsResponse in case applications * with given type does not exist. */ @Test public void testGetApplicationsApplicationTypeNotExists() throws Exception{ - LOG.info("Test FederationClientInterceptor: Application with type does " - + "not exist"); + LOG.info("Test FederationClientInterceptor: Application with type does not exist."); - ApplicationId appId = - ApplicationId.newInstance(System.currentTimeMillis(), 1); + ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); SubmitApplicationResponse response = interceptor.submitApplication(request); @@ -675,25 +614,20 @@ public void testGetApplicationsApplicationTypeNotExists() throws Exception{ Set appTypes = Collections.singleton("SPARK"); - GetApplicationsRequest requestGet = - GetApplicationsRequest.newInstance(appTypes); - - GetApplicationsResponse responseGet = - interceptor.getApplications(requestGet); + GetApplicationsRequest requestGet = GetApplicationsRequest.newInstance(appTypes); + GetApplicationsResponse responseGet = interceptor.getApplications(requestGet); Assert.assertNotNull(responseGet); Assert.assertTrue(responseGet.getApplicationList().isEmpty()); } /** - * This test validates - * the correctness of GetApplicationsResponse in case applications + * This test validates the correctness of GetApplicationsResponse in case applications * with given YarnApplicationState does not exist. */ @Test public void testGetApplicationsApplicationStateNotExists() throws Exception { - LOG.info("Test FederationClientInterceptor:" + - " Application with state does not exist"); + LOG.info("Test FederationClientInterceptor: Application with state does not exist."); ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); @@ -711,8 +645,7 @@ public void testGetApplicationsApplicationStateNotExists() throws Exception { GetApplicationsRequest requestGet = GetApplicationsRequest.newInstance(applicationStates); - GetApplicationsResponse responseGet = - interceptor.getApplications(requestGet); + GetApplicationsResponse responseGet = interceptor.getApplications(requestGet); Assert.assertNotNull(responseGet); Assert.assertTrue(responseGet.getApplicationList().isEmpty()); @@ -720,7 +653,7 @@ public void testGetApplicationsApplicationStateNotExists() throws Exception { @Test public void testGetClusterNodesRequest() throws Exception { - LOG.info("Test FederationClientInterceptor : Get Cluster Nodeds request"); + LOG.info("Test FederationClientInterceptor : Get Cluster Nodes request."); // null request LambdaTestUtils.intercept(YarnException.class, "Missing getClusterNodes request.", () -> interceptor.getClusterNodes(null)); @@ -732,7 +665,7 @@ public void testGetClusterNodesRequest() throws Exception { @Test public void testGetNodeToLabelsRequest() throws Exception { - LOG.info("Test FederationClientInterceptor : Get Node To Labels request"); + LOG.info("Test FederationClientInterceptor : Get Node To Labels request."); // null request LambdaTestUtils.intercept(YarnException.class, "Missing getNodesToLabels request.", () -> interceptor.getNodeToLabels(null)); @@ -744,7 +677,7 @@ public void testGetNodeToLabelsRequest() throws Exception { @Test public void testGetLabelsToNodesRequest() throws Exception { - LOG.info("Test FederationClientInterceptor : Get Labels To Node request"); + LOG.info("Test FederationClientInterceptor : Get Labels To Node request."); // null request LambdaTestUtils.intercept(YarnException.class, "Missing getLabelsToNodes request.", () -> interceptor.getLabelsToNodes(null)); @@ -756,7 +689,7 @@ public void testGetLabelsToNodesRequest() throws Exception { @Test public void testClusterNodeLabelsRequest() throws Exception { - LOG.info("Test FederationClientInterceptor : Get Cluster NodeLabels request"); + LOG.info("Test FederationClientInterceptor : Get Cluster NodeLabels request."); // null request LambdaTestUtils.intercept(YarnException.class, "Missing getClusterNodeLabels request.", () -> interceptor.getClusterNodeLabels(null)); @@ -768,13 +701,13 @@ public void testClusterNodeLabelsRequest() throws Exception { @Test public void testGetQueueUserAcls() throws Exception { - LOG.info("Test FederationClientInterceptor : Get QueueUserAcls request"); + LOG.info("Test FederationClientInterceptor : Get QueueUserAcls request."); // null request LambdaTestUtils.intercept(YarnException.class, "Missing getQueueUserAcls request.", () -> interceptor.getQueueUserAcls(null)); - // noraml request + // normal request GetQueueUserAclsInfoResponse response = interceptor.getQueueUserAcls( GetQueueUserAclsInfoRequest.newInstance()); @@ -796,7 +729,7 @@ public void testGetQueueUserAcls() throws Exception { @Test public void testListReservations() throws Exception { - LOG.info("Test FederationClientInterceptor : Get ListReservations request"); + LOG.info("Test FederationClientInterceptor : Get ListReservations request."); // null request LambdaTestUtils.intercept(YarnException.class, "Missing listReservations request.", @@ -812,7 +745,7 @@ public void testListReservations() throws Exception { @Test public void testGetContainersRequest() throws Exception { - LOG.info("Test FederationClientInterceptor : Get Containers request"); + LOG.info("Test FederationClientInterceptor : Get Containers request."); // null request LambdaTestUtils.intercept(YarnException.class, "Missing getContainers request " + @@ -928,7 +861,7 @@ public void getApplicationAttempts() throws Exception { @Test public void testGetResourceTypeInfoRequest() throws Exception { - LOG.info("Test FederationClientInterceptor : Get Resource TypeInfo request"); + LOG.info("Test FederationClientInterceptor : Get Resource TypeInfo request."); // null request LambdaTestUtils.intercept(YarnException.class, "Missing getResourceTypeInfo request.", () -> interceptor.getResourceTypeInfo(null)); @@ -1109,7 +1042,7 @@ public void testSignalContainer() throws Exception { RMAppAttemptState.SCHEDULED); MockNM nm = interceptor.getMockNMs().get(subClusterId); nm.nodeHeartbeat(true); - mockRM.waitForState(rmApp.getCurrentAppAttempt(), RMAppAttemptState.ALLOCATED); + MockRM.waitForState(rmApp.getCurrentAppAttempt(), RMAppAttemptState.ALLOCATED); mockRM.sendAMLaunched(rmApp.getCurrentAppAttempt().getAppAttemptId()); ContainerId containerId = rmApp.getCurrentAppAttempt().getMasterContainer().getId(); @@ -1147,10 +1080,10 @@ public void testMoveApplicationAcrossQueues() throws Exception { mockRM.waitForState(appId, RMAppState.ACCEPTED); RMApp rmApp = mockRM.getRMContext().getRMApps().get(appId); mockRM.waitForState(rmApp.getCurrentAppAttempt().getAppAttemptId(), - RMAppAttemptState.SCHEDULED); + RMAppAttemptState.SCHEDULED); MockNM nm = interceptor.getMockNMs().get(subClusterId); nm.nodeHeartbeat(true); - mockRM.waitForState(rmApp.getCurrentAppAttempt(), RMAppAttemptState.ALLOCATED); + MockRM.waitForState(rmApp.getCurrentAppAttempt(), RMAppAttemptState.ALLOCATED); mockRM.sendAMLaunched(rmApp.getCurrentAppAttempt().getAppAttemptId()); MoveApplicationAcrossQueuesRequest acrossQueuesRequest =