From 468135a4d99ad334a545127bd144c46c23e0ac3b Mon Sep 17 00:00:00 2001 From: slfan1989 <55643692+slfan1989@users.noreply.github.com> Date: Fri, 27 Jan 2023 03:14:05 +0800 Subject: [PATCH] YARN-11218. [Federation] Add getActivities, getBulkActivities REST APIs for Router. (#5284) --- .../webapp/dao/BulkActivitiesInfo.java | 10 ++ .../yarn/server/router/RouterMetrics.java | 62 +++++++++++ .../webapp/FederationInterceptorREST.java | 100 +++++++++++++++++- .../dao/FederationBulkActivitiesInfo.java | 49 +++++++++ .../yarn/server/router/TestRouterMetrics.java | 66 ++++++++++++ .../MockDefaultRequestInterceptorREST.java | 63 +++++++++++ .../webapp/TestFederationInterceptorREST.java | 86 +++++++++++++++ 7 files changed, 434 insertions(+), 2 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/dao/FederationBulkActivitiesInfo.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/BulkActivitiesInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/BulkActivitiesInfo.java index ad360cc6fc..462003f946 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/BulkActivitiesInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/BulkActivitiesInfo.java @@ -34,6 +34,8 @@ public class BulkActivitiesInfo { private ArrayList activities = new ArrayList<>(); + private String subClusterId; + public BulkActivitiesInfo() { // JAXB needs this } @@ -49,4 +51,12 @@ public ArrayList getActivities() { public void addAll(List activitiesInfoList) { activities.addAll(activitiesInfoList); } + + public String getSubClusterId() { + return subClusterId; + } + + public void setSubClusterId(String subClusterId) { + this.subClusterId = subClusterId; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java index 97d4c42497..033aa07665 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java @@ -135,6 +135,10 @@ public final class RouterMetrics { private MutableGaugeInt numRenewDelegationTokenFailedRetrieved; @Metric("# of renewDelegationToken failed to be retrieved") private MutableGaugeInt numCancelDelegationTokenFailedRetrieved; + @Metric("# of getActivities failed to be retrieved") + private MutableGaugeInt numGetActivitiesFailedRetrieved; + @Metric("# of getBulkActivities failed to be retrieved") + private MutableGaugeInt numGetBulkActivitiesFailedRetrieved; @Metric("# of getSchedulerInfo failed to be retrieved") private MutableGaugeInt numGetSchedulerInfoFailedRetrieved; @Metric("# of refreshSuperUserGroupsConfiguration failed to be retrieved") @@ -237,6 +241,10 @@ public final class RouterMetrics { private MutableRate totalSucceededRenewDelegationTokenRetrieved; @Metric("Total number of successful Retrieved CancelDelegationToken and latency(ms)") private MutableRate totalSucceededCancelDelegationTokenRetrieved; + @Metric("Total number of successful Retrieved GetActivities and latency(ms)") + private MutableRate totalSucceededGetActivitiesRetrieved; + @Metric("Total number of successful Retrieved GetBulkActivities and latency(ms)") + private MutableRate totalSucceededGetBulkActivitiesRetrieved; @Metric("Total number of successful Retrieved RefreshSuperUserGroupsConfig and latency(ms)") private MutableRate totalSucceededRefreshSuperUserGroupsConfigurationRetrieved; @Metric("Total number of successful Retrieved RefreshUserToGroupsMappings and latency(ms)") @@ -295,6 +303,8 @@ public final class RouterMetrics { private MutableQuantiles getDelegationTokenLatency; private MutableQuantiles renewDelegationTokenLatency; private MutableQuantiles cancelDelegationTokenLatency; + private MutableQuantiles getActivitiesLatency; + private MutableQuantiles getBulkActivitiesLatency; private MutableQuantiles getSchedulerInfoRetrievedLatency; private MutableQuantiles refreshSuperUserGroupsConfLatency; private MutableQuantiles refreshUserToGroupsMappingsLatency; @@ -472,6 +482,12 @@ private RouterMetrics() { cancelDelegationTokenLatency = registry.newQuantiles("cancelDelegationTokenLatency", "latency of cancel delegation token timeouts", "ops", "latency", 10); + getActivitiesLatency = registry.newQuantiles("getActivitiesLatency", + "latency of get activities timeouts", "ops", "latency", 10); + + getBulkActivitiesLatency = registry.newQuantiles("getBulkActivitiesLatency", + "latency of get bulk activities timeouts", "ops", "latency", 10); + getSchedulerInfoRetrievedLatency = registry.newQuantiles("getSchedulerInfoRetrievedLatency", "latency of get scheduler info timeouts", "ops", "latency", 10); @@ -736,6 +752,16 @@ public long getNumSucceededCancelDelegationTokenRetrieved() { return totalSucceededCancelDelegationTokenRetrieved.lastStat().numSamples(); } + @VisibleForTesting + public long getNumSucceededGetActivitiesRetrieved() { + return totalSucceededGetActivitiesRetrieved.lastStat().numSamples(); + } + + @VisibleForTesting + public long getNumSucceededGetBulkActivitiesRetrieved() { + return totalSucceededGetBulkActivitiesRetrieved.lastStat().numSamples(); + } + @VisibleForTesting public long getNumSucceededGetSchedulerInfoRetrieved() { return totalSucceededGetSchedulerInfoRetrieved.lastStat().numSamples(); @@ -981,6 +1007,16 @@ public double getLatencySucceededCancelDelegationTokenRetrieved() { return totalSucceededCancelDelegationTokenRetrieved.lastStat().mean(); } + @VisibleForTesting + public double getLatencySucceededGetActivitiesRetrieved() { + return totalSucceededGetActivitiesRetrieved.lastStat().mean(); + } + + @VisibleForTesting + public double getLatencySucceededGetBulkActivitiesRetrieved() { + return totalSucceededGetBulkActivitiesRetrieved.lastStat().mean(); + } + @VisibleForTesting public double getLatencySucceededGetSchedulerInfoRetrieved() { return totalSucceededGetSchedulerInfoRetrieved.lastStat().mean(); @@ -1209,6 +1245,14 @@ public int getCancelDelegationTokenFailedRetrieved() { return numCancelDelegationTokenFailedRetrieved.value(); } + public int getActivitiesFailedRetrieved() { + return numGetActivitiesFailedRetrieved.value(); + } + + public int getBulkActivitiesFailedRetrieved(){ + return numGetBulkActivitiesFailedRetrieved.value(); + } + public int getSchedulerInfoFailedRetrieved() { return numGetSchedulerInfoFailedRetrieved.value(); } @@ -1448,6 +1492,16 @@ public void succeededCancelDelegationTokenRetrieved(long duration) { cancelDelegationTokenLatency.add(duration); } + public void succeededGetActivitiesLatencyRetrieved(long duration) { + totalSucceededGetActivitiesRetrieved.add(duration); + getActivitiesLatency.add(duration); + } + + public void succeededGetBulkActivitiesRetrieved(long duration) { + totalSucceededGetBulkActivitiesRetrieved.add(duration); + getBulkActivitiesLatency.add(duration); + } + public void succeededGetSchedulerInfoRetrieved(long duration) { totalSucceededGetSchedulerInfoRetrieved.add(duration); getSchedulerInfoRetrievedLatency.add(duration); @@ -1659,6 +1713,14 @@ public void incrCancelDelegationTokenFailedRetrieved() { numCancelDelegationTokenFailedRetrieved.incr(); } + public void incrGetActivitiesFailedRetrieved() { + numGetActivitiesFailedRetrieved.incr(); + } + + public void incrGetBulkActivitiesFailedRetrieved() { + numGetBulkActivitiesFailedRetrieved.incr(); + } + public void incrGetSchedulerInfoFailedRetrieved() { numGetSchedulerInfoFailedRetrieved.incr(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java index 73b0c5f2af..69dba5b07e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java @@ -46,6 +46,7 @@ import org.apache.commons.lang3.NotImplementedException; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.impl.prefetch.Validate; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AuthorizationException; @@ -121,6 +122,7 @@ import org.apache.hadoop.yarn.server.router.clientrm.RouterClientRMService; import org.apache.hadoop.yarn.server.router.security.RouterDelegationTokenSecretManager; import org.apache.hadoop.yarn.server.router.webapp.cache.RouterAppInfoCacheKey; +import org.apache.hadoop.yarn.server.router.webapp.dao.FederationBulkActivitiesInfo; import org.apache.hadoop.yarn.server.router.webapp.dao.FederationRMQueueAclInfo; import org.apache.hadoop.yarn.server.router.webapp.dao.SubClusterResult; import org.apache.hadoop.yarn.server.router.webapp.dao.FederationSchedulerTypeInfo; @@ -1187,16 +1189,110 @@ public String dumpSchedulerLogs(String time, HttpServletRequest hsr) throw new NotImplementedException("Code is not implemented"); } + /** + * This method retrieve all the activities in a specific node, and it is + * reachable by using {@link RMWSConsts#SCHEDULER_ACTIVITIES}. + * + * @param hsr the servlet request + * @param nodeId the node we want to retrieve the activities. It is a + * QueryParam. + * @param groupBy the groupBy type by which the activities should be + * aggregated. It is a QueryParam. + * @return all the activities in the specific node + */ @Override public ActivitiesInfo getActivities(HttpServletRequest hsr, String nodeId, String groupBy) { - throw new NotImplementedException("Code is not implemented"); + try { + // Check the parameters to ensure that the parameters are not empty + Validate.checkNotNullAndNotEmpty(nodeId, "nodeId"); + Validate.checkNotNullAndNotEmpty(groupBy, "groupBy"); + + // Query SubClusterInfo according to id, + // if the nodeId cannot get SubClusterInfo, an exception will be thrown directly. + SubClusterInfo subClusterInfo = getNodeSubcluster(nodeId); + + // Call the corresponding subCluster to get ActivitiesInfo. + long startTime = clock.getTime(); + DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster( + subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress()); + final HttpServletRequest hsrCopy = clone(hsr); + ActivitiesInfo activitiesInfo = interceptor.getActivities(hsrCopy, nodeId, groupBy); + if (activitiesInfo != null) { + long stopTime = clock.getTime(); + routerMetrics.succeededGetActivitiesLatencyRetrieved(stopTime - startTime); + return activitiesInfo; + } + } catch (IllegalArgumentException e) { + routerMetrics.incrGetActivitiesFailedRetrieved(); + throw e; + } catch (NotFoundException e) { + routerMetrics.incrGetActivitiesFailedRetrieved(); + throw e; + } + + routerMetrics.incrGetActivitiesFailedRetrieved(); + throw new RuntimeException("getActivities Failed."); } + /** + * This method retrieve the last n activities inside scheduler, and it is + * reachable by using {@link RMWSConsts#SCHEDULER_BULK_ACTIVITIES}. + * + * @param hsr the servlet request + * @param groupBy the groupBy type by which the activities should be + * aggregated. It is a QueryParam. + * @param activitiesCount number of activities + * @return last n activities + */ @Override public BulkActivitiesInfo getBulkActivities(HttpServletRequest hsr, String groupBy, int activitiesCount) throws InterruptedException { - throw new NotImplementedException("Code is not implemented"); + try { + // Step1. Check the parameters to ensure that the parameters are not empty + Validate.checkNotNullAndNotEmpty(groupBy, "groupBy"); + Validate.checkNotNegative(activitiesCount, "activitiesCount"); + + // Step2. Call the interface of subCluster concurrently and get the returned result. + Map subClustersActive = getActiveSubclusters(); + final HttpServletRequest hsrCopy = clone(hsr); + Class[] argsClasses = new Class[]{HttpServletRequest.class, String.class, int.class}; + Object[] args = new Object[]{hsrCopy, groupBy, activitiesCount}; + ClientMethod remoteMethod = new ClientMethod("getBulkActivities", argsClasses, args); + Map appStatisticsMap = invokeConcurrent( + subClustersActive.values(), remoteMethod, BulkActivitiesInfo.class); + + // Step3. Generate Federation objects and set subCluster information. + long startTime = clock.getTime(); + FederationBulkActivitiesInfo fedBulkActivitiesInfo = new FederationBulkActivitiesInfo(); + appStatisticsMap.forEach((subClusterInfo, bulkActivitiesInfo) -> { + SubClusterId subClusterId = subClusterInfo.getSubClusterId(); + bulkActivitiesInfo.setSubClusterId(subClusterId.getId()); + fedBulkActivitiesInfo.getList().add(bulkActivitiesInfo); + }); + long stopTime = clock.getTime(); + routerMetrics.succeededGetBulkActivitiesRetrieved(stopTime - startTime); + return fedBulkActivitiesInfo; + } catch (IllegalArgumentException e) { + routerMetrics.incrGetBulkActivitiesFailedRetrieved(); + throw e; + } catch (NotFoundException e) { + routerMetrics.incrGetBulkActivitiesFailedRetrieved(); + RouterServerUtil.logAndThrowRunTimeException("get all active sub cluster(s) error.", e); + } catch (IOException e) { + routerMetrics.incrGetBulkActivitiesFailedRetrieved(); + RouterServerUtil.logAndThrowRunTimeException(e, + "getBulkActivities by groupBy = %s, activitiesCount = %s with io error.", + groupBy, String.valueOf(activitiesCount)); + } catch (YarnException e) { + routerMetrics.incrGetBulkActivitiesFailedRetrieved(); + RouterServerUtil.logAndThrowRunTimeException(e, + "getBulkActivities by groupBy = %s, activitiesCount = %s with yarn error.", + groupBy, String.valueOf(activitiesCount)); + } + + routerMetrics.incrGetBulkActivitiesFailedRetrieved(); + throw new RuntimeException("getBulkActivities Failed."); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/dao/FederationBulkActivitiesInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/dao/FederationBulkActivitiesInfo.java new file mode 100644 index 0000000000..87d11ad0fe --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/dao/FederationBulkActivitiesInfo.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.router.webapp.dao; + +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.BulkActivitiesInfo; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlRootElement; +import java.util.ArrayList; + +@XmlRootElement +@XmlAccessorType(XmlAccessType.FIELD) +public class FederationBulkActivitiesInfo extends BulkActivitiesInfo { + + @XmlElement(name = "subCluster") + private ArrayList list = new ArrayList<>(); + + public FederationBulkActivitiesInfo() { + } // JAXB needs this + + public FederationBulkActivitiesInfo(ArrayList list) { + this.list = list; + } + + public ArrayList getList() { + return list; + } + + public void setList(ArrayList list) { + this.list = list; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java index 9d5aeab5c6..c26df63c95 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java @@ -533,6 +533,16 @@ public void getRenewDelegationTokenFailed() { LOG.info("Mocked: failed renewDelegationToken call"); metrics.incrRenewDelegationTokenFailedRetrieved(); } + + public void getActivitiesFailed() { + LOG.info("Mocked: failed getBulkActivitie call"); + metrics.incrGetActivitiesFailedRetrieved(); + } + + public void getBulkActivitiesFailed() { + LOG.info("Mocked: failed getBulkActivitie call"); + metrics.incrGetBulkActivitiesFailedRetrieved(); + } } // Records successes for all calls @@ -763,6 +773,16 @@ public void getRenewDelegationTokenRetrieved(long duration) { LOG.info("Mocked: successful RenewDelegationToken call with duration {}", duration); metrics.succeededRenewDelegationTokenRetrieved(duration); } + + public void getActivitiesRetrieved(long duration) { + LOG.info("Mocked: successful GetActivities call with duration {}", duration); + metrics.succeededGetActivitiesLatencyRetrieved(duration); + } + + public void getBulkActivitiesRetrieved(long duration) { + LOG.info("Mocked: successful GetBulkActivities call with duration {}", duration); + metrics.succeededGetBulkActivitiesRetrieved(duration); + } } @Test @@ -1597,4 +1617,50 @@ public void testRenewDelegationTokenRetrievedFailed() { Assert.assertEquals(totalBadBefore + 1, metrics.getRenewDelegationTokenFailedRetrieved()); } + + @Test + public void testGetActivitiesRetrieved() { + long totalGoodBefore = metrics.getNumSucceededGetActivitiesRetrieved(); + goodSubCluster.getActivitiesRetrieved(150); + Assert.assertEquals(totalGoodBefore + 1, + metrics.getNumSucceededGetActivitiesRetrieved()); + Assert.assertEquals(150, + metrics.getLatencySucceededGetActivitiesRetrieved(), ASSERT_DOUBLE_DELTA); + goodSubCluster.getActivitiesRetrieved(300); + Assert.assertEquals(totalGoodBefore + 2, + metrics.getNumSucceededGetActivitiesRetrieved()); + Assert.assertEquals(225, + metrics.getLatencySucceededGetActivitiesRetrieved(), ASSERT_DOUBLE_DELTA); + } + + @Test + public void testGetActivitiesRetrievedFailed() { + long totalBadBefore = metrics.getActivitiesFailedRetrieved(); + badSubCluster.getActivitiesFailed(); + Assert.assertEquals(totalBadBefore + 1, + metrics.getActivitiesFailedRetrieved()); + } + + @Test + public void testGetBulkActivitiesRetrieved() { + long totalGoodBefore = metrics.getNumSucceededGetBulkActivitiesRetrieved(); + goodSubCluster.getBulkActivitiesRetrieved(150); + Assert.assertEquals(totalGoodBefore + 1, + metrics.getNumSucceededGetBulkActivitiesRetrieved()); + Assert.assertEquals(150, + metrics.getLatencySucceededGetBulkActivitiesRetrieved(), ASSERT_DOUBLE_DELTA); + goodSubCluster.getBulkActivitiesRetrieved(300); + Assert.assertEquals(totalGoodBefore + 2, + metrics.getNumSucceededGetBulkActivitiesRetrieved()); + Assert.assertEquals(225, + metrics.getLatencySucceededGetBulkActivitiesRetrieved(), ASSERT_DOUBLE_DELTA); + } + + @Test + public void testGetBulkActivitiesRetrievedFailed() { + long totalBadBefore = metrics.getBulkActivitiesFailedRetrieved(); + badSubCluster.getBulkActivitiesFailed(); + Assert.assertEquals(totalBadBefore + 1, + metrics.getBulkActivitiesFailedRetrieved()); + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java index ddd15138fd..2e118d172c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java @@ -137,6 +137,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestsInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateResponseInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDeleteResponseInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.BulkActivitiesInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeAllocationInfo; import org.apache.hadoop.yarn.server.router.RouterServerUtil; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.PartitionInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.RMQueueAclInfo; @@ -1213,6 +1217,65 @@ public RMQueueAclInfo checkUserAccessToQueue( } @Override + public ActivitiesInfo getActivities(HttpServletRequest hsr, String nodeId, String groupBy) { + if (!EnumUtils.isValidEnum(RMWSConsts.ActivitiesGroupBy.class, groupBy.toUpperCase())) { + String errMessage = "Got invalid groupBy: " + groupBy + ", valid groupBy types: " + + Arrays.asList(RMWSConsts.ActivitiesGroupBy.values()); + throw new IllegalArgumentException(errMessage); + } + + SubClusterId subClusterId = getSubClusterId(); + ActivitiesInfo activitiesInfo = mock(ActivitiesInfo.class); + Mockito.when(activitiesInfo.getNodeId()).thenReturn(nodeId); + Mockito.when(activitiesInfo.getTimestamp()).thenReturn(1673081972L); + Mockito.when(activitiesInfo.getDiagnostic()).thenReturn("Diagnostic:" + subClusterId.getId()); + + List allocationInfos = new ArrayList<>(); + NodeAllocationInfo nodeAllocationInfo = mock(NodeAllocationInfo.class); + Mockito.when(nodeAllocationInfo.getPartition()).thenReturn("p" + subClusterId.getId()); + Mockito.when(nodeAllocationInfo.getFinalAllocationState()).thenReturn("ALLOCATED"); + + allocationInfos.add(nodeAllocationInfo); + Mockito.when(activitiesInfo.getAllocations()).thenReturn(allocationInfos); + return activitiesInfo; + } + + @Override + public BulkActivitiesInfo getBulkActivities(HttpServletRequest hsr, + String groupBy, int activitiesCount) { + + if (activitiesCount <= 0) { + throw new IllegalArgumentException("activitiesCount needs to be greater than 0."); + } + + if (!EnumUtils.isValidEnum(RMWSConsts.ActivitiesGroupBy.class, groupBy.toUpperCase())) { + String errMessage = "Got invalid groupBy: " + groupBy + ", valid groupBy types: " + + Arrays.asList(RMWSConsts.ActivitiesGroupBy.values()); + throw new IllegalArgumentException(errMessage); + } + + BulkActivitiesInfo bulkActivitiesInfo = new BulkActivitiesInfo(); + + for (int i = 0; i < activitiesCount; i++) { + SubClusterId subClusterId = getSubClusterId(); + ActivitiesInfo activitiesInfo = mock(ActivitiesInfo.class); + Mockito.when(activitiesInfo.getNodeId()).thenReturn(subClusterId + "-nodeId-" + i); + Mockito.when(activitiesInfo.getTimestamp()).thenReturn(1673081972L); + Mockito.when(activitiesInfo.getDiagnostic()).thenReturn("Diagnostic:" + subClusterId.getId()); + + List allocationInfos = new ArrayList<>(); + NodeAllocationInfo nodeAllocationInfo = mock(NodeAllocationInfo.class); + Mockito.when(nodeAllocationInfo.getPartition()).thenReturn("p" + subClusterId.getId()); + Mockito.when(nodeAllocationInfo.getFinalAllocationState()).thenReturn("ALLOCATED"); + + allocationInfos.add(nodeAllocationInfo); + Mockito.when(activitiesInfo.getAllocations()).thenReturn(allocationInfos); + bulkActivitiesInfo.getActivities().add(activitiesInfo); + } + + return bulkActivitiesInfo; + } + public SchedulerTypeInfo getSchedulerInfo() { try { ResourceManager resourceManager = CapacitySchedulerTestUtilities.createResourceManager(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java index 070c883615..edaa1e26e9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java @@ -34,6 +34,7 @@ import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.security.authorize.AuthorizationException; import org.apache.hadoop.test.LambdaTestUtils; import org.apache.hadoop.conf.Configuration; @@ -115,9 +116,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewReservation; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateRequestInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDeleteRequestInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeAllocationInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.BulkActivitiesInfo; import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo; import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo; import org.apache.hadoop.yarn.server.router.webapp.dao.FederationRMQueueAclInfo; +import org.apache.hadoop.yarn.server.router.webapp.dao.FederationBulkActivitiesInfo; import org.apache.hadoop.yarn.server.router.webapp.dao.FederationSchedulerTypeInfo; import org.apache.hadoop.yarn.util.LRUCacheHashMap; import org.apache.hadoop.yarn.util.MonotonicClock; @@ -1779,4 +1784,85 @@ public void testCancelDelegationToken() throws Exception { Assert.assertNotNull(cancelResponse); Assert.assertEquals(response.getStatus(), Status.OK.getStatusCode()); } + + @Test + public void testGetActivitiesNormal() { + ActivitiesInfo activitiesInfo = interceptor.getActivities(null, "1", "DIAGNOSTIC"); + Assert.assertNotNull(activitiesInfo); + + String nodeId = activitiesInfo.getNodeId(); + Assert.assertNotNull(nodeId); + Assert.assertEquals("1", nodeId); + + String diagnostic = activitiesInfo.getDiagnostic(); + Assert.assertNotNull(diagnostic); + Assert.assertTrue(StringUtils.contains(diagnostic, "Diagnostic")); + + long timestamp = activitiesInfo.getTimestamp(); + Assert.assertEquals(1673081972L, timestamp); + + List allocationInfos = activitiesInfo.getAllocations(); + Assert.assertNotNull(allocationInfos); + Assert.assertEquals(1, allocationInfos.size()); + } + + @Test + public void testGetActivitiesError() throws Exception { + // nodeId is empty + LambdaTestUtils.intercept(IllegalArgumentException.class, + "'nodeId' must not be empty.", + () -> interceptor.getActivities(null, "", "DIAGNOSTIC")); + + // groupBy is empty + LambdaTestUtils.intercept(IllegalArgumentException.class, + "'groupBy' must not be empty.", + () -> interceptor.getActivities(null, "1", "")); + + // groupBy value is wrong + LambdaTestUtils.intercept(IllegalArgumentException.class, + "Got invalid groupBy: TEST1, valid groupBy types: [DIAGNOSTIC]", + () -> interceptor.getActivities(null, "1", "TEST1")); + } + + @Test + public void testGetBulkActivitiesNormal() throws InterruptedException { + BulkActivitiesInfo bulkActivitiesInfo = + interceptor.getBulkActivities(null, "DIAGNOSTIC", 5); + Assert.assertNotNull(bulkActivitiesInfo); + + Assert.assertTrue(bulkActivitiesInfo instanceof FederationBulkActivitiesInfo); + + FederationBulkActivitiesInfo federationBulkActivitiesInfo = + FederationBulkActivitiesInfo.class.cast(bulkActivitiesInfo); + Assert.assertNotNull(federationBulkActivitiesInfo); + + List activitiesInfos = federationBulkActivitiesInfo.getList(); + Assert.assertNotNull(activitiesInfos); + Assert.assertEquals(4, activitiesInfos.size()); + + for (BulkActivitiesInfo activitiesInfo : activitiesInfos) { + Assert.assertNotNull(activitiesInfo); + List activitiesInfoList = activitiesInfo.getActivities(); + Assert.assertNotNull(activitiesInfoList); + Assert.assertEquals(5, activitiesInfoList.size()); + } + } + + @Test + public void testGetBulkActivitiesError() throws Exception { + // activitiesCount < 0 + LambdaTestUtils.intercept(IllegalArgumentException.class, + "'activitiesCount' must not be negative.", + () -> interceptor.getBulkActivities(null, "DIAGNOSTIC", -1)); + + // groupBy value is wrong + LambdaTestUtils.intercept(YarnRuntimeException.class, + "Got invalid groupBy: TEST1, valid groupBy types: [DIAGNOSTIC]", + () -> interceptor.getBulkActivities(null, "TEST1", 1)); + + // groupBy is empty + LambdaTestUtils.intercept(IllegalArgumentException.class, + "'groupBy' must not be empty.", + () -> interceptor.getBulkActivities(null, "", 1)); + } } \ No newline at end of file