From f8b9dd911cdc6b154fa5ea06259b8e0c8c37923e Mon Sep 17 00:00:00 2001 From: slfan1989 <55643692+slfan1989@users.noreply.github.com> Date: Sat, 27 Aug 2022 07:01:17 +0800 Subject: [PATCH] YARN-11219. [Federation] Add getAppActivities, getAppStatistics REST APIs for Router. (#4757) --- .../webapp/dao/ApplicationStatisticsInfo.java | 5 + .../webapp/dao/StatisticsItemInfo.java | 9 ++ .../webapp/FederationInterceptorREST.java | 41 +++++- .../router/webapp/RouterWebServiceUtil.java | 36 +++++ .../MockDefaultRequestInterceptorREST.java | 131 +++++++++++++++++- .../webapp/TestFederationInterceptorREST.java | 79 +++++++++++ .../webapp/TestRouterWebServiceUtil.java | 86 ++++++++++++ 7 files changed, 383 insertions(+), 4 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ApplicationStatisticsInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ApplicationStatisticsInfo.java index b77ffb4ed6..7b7066a731 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ApplicationStatisticsInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ApplicationStatisticsInfo.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao; import java.util.ArrayList; +import java.util.Collection; import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessorType; @@ -33,6 +34,10 @@ public class ApplicationStatisticsInfo { public ApplicationStatisticsInfo() { } // JAXB needs this + public ApplicationStatisticsInfo(Collection items) { + statItem.addAll(items); + } + public void add(StatisticsItemInfo statItem) { this.statItem.add(statItem); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/StatisticsItemInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/StatisticsItemInfo.java index e12dd5fb63..8f79fef0ce 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/StatisticsItemInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/StatisticsItemInfo.java @@ -41,6 +41,12 @@ public StatisticsItemInfo( this.count = count; } + public StatisticsItemInfo(StatisticsItemInfo info) { + this.state = info.state; + this.type = info.type; + this.count = info.count; + } + public YarnApplicationState getState() { return state; } @@ -53,4 +59,7 @@ public long getCount() { return count; } + public void setCount(long count) { + this.count = count; + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java index 31a841cfb9..3819f1095e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java @@ -1129,13 +1129,50 @@ public AppActivitiesInfo getAppActivities(HttpServletRequest hsr, String appId, String time, Set requestPriorities, Set allocationRequestIds, String groupBy, String limit, Set actions, boolean summarize) { - throw new NotImplementedException("Code is not implemented"); + + // Only verify the app_id, + // because the specific subCluster needs to be found according to the app_id, + // and other verifications are directly handed over to the corresponding subCluster RM + if (appId == null || appId.isEmpty()) { + throw new IllegalArgumentException("Parameter error, the appId is empty or null."); + } + + try { + SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId); + DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster( + subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress()); + + final HttpServletRequest hsrCopy = clone(hsr); + return interceptor.getAppActivities(hsrCopy, appId, time, requestPriorities, + allocationRequestIds, groupBy, limit, actions, summarize); + } catch (IllegalArgumentException e) { + RouterServerUtil.logAndThrowRunTimeException(e, "Unable to get subCluster by appId: %s.", + appId); + } catch (YarnException e) { + RouterServerUtil.logAndThrowRunTimeException("getAppActivities Failed.", e); + } + + return null; } @Override public ApplicationStatisticsInfo getAppStatistics(HttpServletRequest hsr, Set stateQueries, Set typeQueries) { - throw new NotImplementedException("Code is not implemented"); + try { + Map subClustersActive = getActiveSubclusters(); + final HttpServletRequest hsrCopy = clone(hsr); + Class[] argsClasses = new Class[]{HttpServletRequest.class, Set.class, Set.class}; + Object[] args = new Object[]{hsrCopy, stateQueries, typeQueries}; + ClientMethod remoteMethod = new ClientMethod("getAppStatistics", argsClasses, args); + Map appStatisticsMap = invokeConcurrent( + subClustersActive.values(), remoteMethod, ApplicationStatisticsInfo.class); + return RouterWebServiceUtil.mergeApplicationStatisticsInfo(appStatisticsMap.values()); + } catch (IOException e) { + RouterServerUtil.logAndThrowRunTimeException(e, "Get all active sub cluster(s) error."); + } catch (YarnException e) { + RouterServerUtil.logAndThrowRunTimeException(e, "getAppStatistics error."); + } + return null; } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServiceUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServiceUtil.java index 336e772bef..7423c8c907 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServiceUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServiceUtil.java @@ -57,6 +57,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelsInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationStatisticsInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.StatisticsItemInfo; import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager; import org.apache.hadoop.yarn.webapp.BadRequestException; import org.apache.hadoop.yarn.webapp.ForbiddenException; @@ -540,4 +542,38 @@ public static NodeToLabelsInfo mergeNodeToLabels( return new NodeToLabelsInfo(nodeToLabels); } + + public static ApplicationStatisticsInfo mergeApplicationStatisticsInfo( + Collection appStatistics) { + ApplicationStatisticsInfo result = new ApplicationStatisticsInfo(); + Map statisticsItemMap = new HashMap<>(); + + appStatistics.stream().forEach(appStatistic -> { + List statisticsItemInfos = appStatistic.getStatItems(); + for (StatisticsItemInfo statisticsItemInfo : statisticsItemInfos) { + + String statisticsItemKey = + statisticsItemInfo.getType() + "_" + statisticsItemInfo.getState().toString(); + + StatisticsItemInfo statisticsItemValue; + if (statisticsItemMap.containsKey(statisticsItemKey)) { + statisticsItemValue = statisticsItemMap.get(statisticsItemKey); + long statisticsItemValueCount = statisticsItemValue.getCount(); + long statisticsItemInfoCount = statisticsItemInfo.getCount(); + long newCount = statisticsItemValueCount + statisticsItemInfoCount; + statisticsItemValue.setCount(newCount); + } else { + statisticsItemValue = new StatisticsItemInfo(statisticsItemInfo); + } + + statisticsItemMap.put(statisticsItemKey, statisticsItemValue); + } + }); + + if (!statisticsItemMap.isEmpty()) { + result.getStatItems().addAll(statisticsItemMap.values()); + } + + return result; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java index f42ffd5961..67edc53161 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java @@ -27,6 +27,8 @@ import java.util.Collections; import java.util.Arrays; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -58,6 +60,20 @@ import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityLevel; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.webapp.NodeIDsInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelsInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LabelsToNodesInfo; @@ -78,13 +94,22 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutsInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppPriority; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppQueue; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.StatisticsItemInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationStatisticsInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo; import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo; import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo; +import org.apache.hadoop.yarn.util.SystemClock; +import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.webapp.NotFoundException; +import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.mockito.Mockito.mock; + /** * This class mocks the RESTRequestInterceptor. */ @@ -132,8 +157,9 @@ public Response submitApplication(ApplicationSubmissionContextInfo newApp, // Initialize appReport ApplicationReport appReport = ApplicationReport.newInstance( appId, ApplicationAttemptId.newInstance(appId, 1), null, newApp.getQueue(), null, null, 0, - null, YarnApplicationState.ACCEPTED, "", null, 0, 0, null, null, null, 0, null, null, null, - false, Priority.newInstance(newApp.getPriority()), null, null); + null, YarnApplicationState.ACCEPTED, "", null, 0, 0, null, null, null, 0, + newApp.getApplicationType(), null, null, false, Priority.newInstance(newApp.getPriority()), + null, null); // Initialize appTimeoutsMap HashMap appTimeoutsMap = new HashMap<>(); @@ -661,4 +687,105 @@ public Response updateAppQueue(AppQueue targetQueue, HttpServletRequest hsr, Str AppQueue targetAppQueue = new AppQueue(targetQueue.getQueue()); return Response.status(Status.OK).entity(targetAppQueue).build(); } + + public void updateApplicationState(YarnApplicationState appState, String appId) + throws AuthorizationException, YarnException, InterruptedException, IOException { + validateRunning(); + ApplicationId applicationId = ApplicationId.fromString(appId); + if (!applicationMap.containsKey(applicationId)) { + throw new NotFoundException("app with id: " + appId + " not found"); + } + ApplicationReport appReport = applicationMap.get(applicationId); + appReport.setYarnApplicationState(appState); + } + + @Override + public ApplicationStatisticsInfo getAppStatistics( + HttpServletRequest hsr, Set stateQueries, Set typeQueries) { + if (!isRunning) { + throw new RuntimeException("RM is stopped"); + } + + Map itemInfoMap = new HashMap<>(); + + for (ApplicationReport appReport : applicationMap.values()) { + + YarnApplicationState appState = appReport.getYarnApplicationState(); + String appType = appReport.getApplicationType(); + + if (stateQueries.contains(appState.name()) && typeQueries.contains(appType)) { + String itemInfoMapKey = appState.toString() + "_" + appType; + StatisticsItemInfo itemInfo = itemInfoMap.getOrDefault(itemInfoMapKey, null); + if (itemInfo == null) { + itemInfo = new StatisticsItemInfo(appState, appType, 1); + } else { + long newCount = itemInfo.getCount() + 1; + itemInfo.setCount(newCount); + } + itemInfoMap.put(itemInfoMapKey, itemInfo); + } + } + + return new ApplicationStatisticsInfo(itemInfoMap.values()); + } + + @Override + public AppActivitiesInfo getAppActivities( + HttpServletRequest hsr, String appId, String time, Set requestPriorities, + Set allocationRequestIds, String groupBy, String limit, Set actions, + boolean summarize) { + if (!isRunning) { + throw new RuntimeException("RM is stopped"); + } + + ApplicationId applicationId = ApplicationId.fromString(appId); + if (!applicationMap.containsKey(applicationId)) { + throw new NotFoundException("app with id: " + appId + " not found"); + } + + SchedulerNode schedulerNode = TestUtils.getMockNode("host0", "rack", 1, 10240); + + RMContext rmContext = Mockito.mock(RMContext.class); + Mockito.when(rmContext.getYarnConfiguration()).thenReturn(this.getConf()); + ResourceScheduler scheduler = Mockito.mock(ResourceScheduler.class); + Mockito.when(scheduler.getMinimumResourceCapability()).thenReturn(Resources.none()); + Mockito.when(rmContext.getScheduler()).thenReturn(scheduler); + LeafQueue mockQueue = Mockito.mock(LeafQueue.class); + Map rmApps = new ConcurrentHashMap<>(); + Mockito.doReturn(rmApps).when(rmContext).getRMApps(); + + FiCaSchedulerNode node = (FiCaSchedulerNode) schedulerNode; + ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(applicationId, 0); + RMApp mockApp = Mockito.mock(RMApp.class); + Mockito.doReturn(appAttemptId.getApplicationId()).when(mockApp).getApplicationId(); + Mockito.doReturn(FinalApplicationStatus.UNDEFINED).when(mockApp).getFinalApplicationStatus(); + rmApps.put(appAttemptId.getApplicationId(), mockApp); + FiCaSchedulerApp app = new FiCaSchedulerApp(appAttemptId, "user", mockQueue, + mock(ActiveUsersManager.class), rmContext); + + ActivitiesManager newActivitiesManager = new ActivitiesManager(rmContext); + newActivitiesManager.turnOnAppActivitiesRecording(app.getApplicationId(), 3); + + int numActivities = 10; + for (int i = 0; i < numActivities; i++) { + ActivitiesLogger.APP.startAppAllocationRecording(newActivitiesManager, node, + SystemClock.getInstance().getTime(), app); + ActivitiesLogger.APP.recordAppActivityWithoutAllocation(newActivitiesManager, node, app, + new SchedulerRequestKey(Priority.newInstance(0), 0, null), + ActivityDiagnosticConstant.NODE_IS_BLACKLISTED, ActivityState.REJECTED, + ActivityLevel.NODE); + ActivitiesLogger.APP.finishSkippedAppAllocationRecording(newActivitiesManager, + app.getApplicationId(), ActivityState.SKIPPED, ActivityDiagnosticConstant.EMPTY); + } + + Set prioritiesInt = + requestPriorities.stream().map(pri -> Integer.parseInt(pri)).collect(Collectors.toSet()); + Set allocationReqIds = + allocationRequestIds.stream().map(id -> Long.parseLong(id)).collect(Collectors.toSet()); + AppActivitiesInfo appActivitiesInfo = newActivitiesManager. + getAppActivitiesInfo(app.getApplicationId(), prioritiesInt, allocationReqIds, null, + Integer.parseInt(limit), summarize, 3); + + return appActivitiesInfo; + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java index a8cafa0140..e3e97159e6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java @@ -23,6 +23,9 @@ import java.util.List; import java.util.HashMap; import java.util.Map; +import java.util.Set; +import java.util.HashSet; +import java.util.Collections; import javax.ws.rs.core.Response; @@ -33,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager; @@ -41,6 +45,9 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUtil; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo; @@ -61,9 +68,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutsInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.StatisticsItemInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppPriority; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppQueue; import org.apache.hadoop.yarn.server.resourcemanager.webapp.NodeIDsInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationStatisticsInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo; import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo; import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo; import org.apache.hadoop.yarn.util.MonotonicClock; @@ -949,4 +959,73 @@ public void testGetAppQueue() throws IOException, InterruptedException, YarnExce Assert.assertNotNull(queue); Assert.assertEquals(queueName, queue.getQueue()); } + + @Test + public void testGetAppStatistics() throws IOException, InterruptedException, YarnException { + AppState appStateRUNNING = new AppState(YarnApplicationState.RUNNING.name()); + + // Submit application to multiSubCluster + ApplicationId appId = ApplicationId.newInstance(Time.now(), 1); + ApplicationSubmissionContextInfo context = new ApplicationSubmissionContextInfo(); + context.setApplicationId(appId.toString()); + context.setApplicationType("MapReduce"); + context.setQueue("queue"); + + Assert.assertNotNull(interceptor.submitApplication(context, null)); + + GetApplicationHomeSubClusterRequest request = + GetApplicationHomeSubClusterRequest.newInstance(appId); + GetApplicationHomeSubClusterResponse response = + stateStore.getApplicationHomeSubCluster(request); + + Assert.assertNotNull(response); + ApplicationHomeSubCluster homeSubCluster = response.getApplicationHomeSubCluster(); + + DefaultRequestInterceptorREST interceptorREST = + interceptor.getInterceptorForSubCluster(homeSubCluster.getHomeSubCluster()); + + MockDefaultRequestInterceptorREST mockInterceptorREST = + (MockDefaultRequestInterceptorREST) interceptorREST; + mockInterceptorREST.updateApplicationState(YarnApplicationState.RUNNING, + appId.toString()); + + Set stateQueries = new HashSet<>(); + stateQueries.add(YarnApplicationState.RUNNING.name()); + + Set typeQueries = new HashSet<>(); + typeQueries.add("MapReduce"); + + ApplicationStatisticsInfo response2 = + interceptor.getAppStatistics(null, stateQueries, typeQueries); + + Assert.assertNotNull(response2); + Assert.assertFalse(response2.getStatItems().isEmpty()); + + StatisticsItemInfo result = response2.getStatItems().get(0); + Assert.assertEquals(1, result.getCount()); + Assert.assertEquals(YarnApplicationState.RUNNING, result.getState()); + Assert.assertEquals("MapReduce", result.getType()); + } + + @Test + public void testGetAppActivities() throws IOException, InterruptedException { + // Submit application to multiSubCluster + ApplicationId appId = ApplicationId.newInstance(Time.now(), 1); + ApplicationSubmissionContextInfo context = new ApplicationSubmissionContextInfo(); + context.setApplicationId(appId.toString()); + context.setApplicationType("MapReduce"); + context.setQueue("queue"); + + Assert.assertNotNull(interceptor.submitApplication(context, null)); + Set prioritiesSet = Collections.singleton("0"); + Set allocationRequestIdsSet = Collections.singleton("0"); + + AppActivitiesInfo appActivitiesInfo = + interceptor.getAppActivities(null, appId.toString(), String.valueOf(Time.now()), + prioritiesSet, allocationRequestIdsSet, null, "-1", null, false); + + Assert.assertNotNull(appActivitiesInfo); + Assert.assertEquals(appId.toString(), appActivitiesInfo.getApplicationId()); + Assert.assertEquals(10, appActivitiesInfo.getAllocations().size()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestRouterWebServiceUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestRouterWebServiceUtil.java index 565074b61f..96a6881adc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestRouterWebServiceUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestRouterWebServiceUtil.java @@ -31,6 +31,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceRequestInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationStatisticsInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.StatisticsItemInfo; import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager; import org.junit.Assert; import org.junit.Test; @@ -592,4 +594,88 @@ public static AppAttemptInfo generateAppAttemptInfo(int attemptId) { when(appAttemptInfo.getLogsLink()).thenReturn("LogLink_" + attemptId); return appAttemptInfo; } + + @Test + public void testMergeApplicationStatisticsInfo() { + ApplicationStatisticsInfo infoA = new ApplicationStatisticsInfo(); + ApplicationStatisticsInfo infoB = new ApplicationStatisticsInfo(); + + StatisticsItemInfo item1 = new StatisticsItemInfo(YarnApplicationState.ACCEPTED, "*", 10); + StatisticsItemInfo item2 = new StatisticsItemInfo(YarnApplicationState.ACCEPTED, "*", 20); + + infoA.add(item1); + infoB.add(item2); + + List lists = new ArrayList<>(); + lists.add(infoA); + lists.add(infoB); + + ApplicationStatisticsInfo mergeInfo = + RouterWebServiceUtil.mergeApplicationStatisticsInfo(lists); + ArrayList statItem = mergeInfo.getStatItems(); + + Assert.assertNotNull(statItem); + Assert.assertEquals(1, statItem.size()); + + StatisticsItemInfo first = statItem.get(0); + + Assert.assertEquals(item1.getCount() + item2.getCount(), first.getCount()); + Assert.assertEquals(item1.getType(), first.getType()); + Assert.assertEquals(item1.getState(), first.getState()); + } + + @Test + public void testMergeDiffApplicationStatisticsInfo() { + ApplicationStatisticsInfo infoA = new ApplicationStatisticsInfo(); + StatisticsItemInfo item1 = new StatisticsItemInfo(YarnApplicationState.ACCEPTED, "*", 10); + StatisticsItemInfo item2 = + new StatisticsItemInfo(YarnApplicationState.NEW_SAVING, "test1", 20); + infoA.add(item1); + infoA.add(item2); + + ApplicationStatisticsInfo infoB = new ApplicationStatisticsInfo(); + StatisticsItemInfo item3 = + new StatisticsItemInfo(YarnApplicationState.NEW_SAVING, "test1", 30); + StatisticsItemInfo item4 = new StatisticsItemInfo(YarnApplicationState.FINISHED, "test3", 40); + infoB.add(item3); + infoB.add(item4); + + List lists = new ArrayList<>(); + lists.add(infoA); + lists.add(infoB); + + ApplicationStatisticsInfo mergeInfo = + RouterWebServiceUtil.mergeApplicationStatisticsInfo(lists); + + Assert.assertEquals(3, mergeInfo.getStatItems().size()); + List mergeInfoStatItems = mergeInfo.getStatItems(); + + StatisticsItemInfo item1Result = null; + StatisticsItemInfo item2Result = null; + StatisticsItemInfo item3Result = null; + + for (StatisticsItemInfo item : mergeInfoStatItems) { + // ACCEPTED + if (item.getState() == YarnApplicationState.ACCEPTED) { + item1Result = item; + } + + // NEW_SAVING + if (item.getState() == YarnApplicationState.NEW_SAVING) { + item2Result = item; + } + + // FINISHED + if (item.getState() == YarnApplicationState.FINISHED) { + item3Result = item; + } + } + + Assert.assertEquals(YarnApplicationState.ACCEPTED, item1Result.getState()); + Assert.assertEquals(item1.getCount(), item1Result.getCount()); + Assert.assertEquals(YarnApplicationState.NEW_SAVING, item2Result.getState()); + Assert.assertEquals((item2.getCount() + item3.getCount()), item2Result.getCount()); + Assert.assertEquals(YarnApplicationState.FINISHED, item3Result.getState()); + Assert.assertEquals(item4.getCount(), item3Result.getCount()); + } } \ No newline at end of file