diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java index 0d8371bade..920b8e8912 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java @@ -249,8 +249,7 @@ public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster( FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); } - return GetApplicationHomeSubClusterResponse.newInstance( - ApplicationHomeSubCluster.newInstance(appId, applications.get(appId))); + return GetApplicationHomeSubClusterResponse.newInstance(appId, applications.get(appId)); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java index 241224aa2f..dffcfa6a10 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java @@ -720,8 +720,7 @@ public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster( FederationStateStoreUtils.returnToPool(LOG, cstmt); } return GetApplicationHomeSubClusterResponse - .newInstance(ApplicationHomeSubCluster - .newInstance(request.getApplicationId(), homeRM)); + .newInstance(request.getApplicationId(), homeRM); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZKFederationStateStoreOpDurations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZKFederationStateStoreOpDurations.java new file mode 100644 index 0000000000..6ce5e2ef46 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZKFederationStateStoreOpDurations.java @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.yarn.server.federation.store.impl; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableRate; + +import static org.apache.hadoop.metrics2.lib.Interns.info; + +@InterfaceAudience.Private +@InterfaceStability.Unstable +@Metrics(context="ZKFederationStateStore-op-durations") +public final class ZKFederationStateStoreOpDurations implements MetricsSource { + + @Metric("Duration for a add application homeSubcluster call") + private MutableRate addAppHomeSubCluster; + + @Metric("Duration for a update application homeSubcluster call") + private MutableRate updateAppHomeSubCluster; + + @Metric("Duration for a get application homeSubcluster call") + private MutableRate getAppHomeSubCluster; + + @Metric("Duration for a get applications homeSubcluster call") + private MutableRate getAppsHomeSubCluster; + + @Metric("Duration for a delete applications homeSubcluster call") + private MutableRate deleteAppHomeSubCluster; + + @Metric("Duration for a register subCluster call") + private MutableRate registerSubCluster; + + @Metric("Duration for a deregister subCluster call") + private MutableRate deregisterSubCluster; + + @Metric("Duration for a subCluster Heartbeat call") + private MutableRate subClusterHeartbeat; + + @Metric("Duration for a get SubCluster call") + private MutableRate getSubCluster; + + @Metric("Duration for a get SubClusters call") + private MutableRate getSubClusters; + + @Metric("Duration for a get PolicyConfiguration call") + private MutableRate getPolicyConfiguration; + + @Metric("Duration for a set PolicyConfiguration call") + private MutableRate setPolicyConfiguration; + + @Metric("Duration for a get PolicyConfigurations call") + private MutableRate getPoliciesConfigurations; + + protected static final MetricsInfo RECORD_INFO = + info("ZKFederationStateStoreOpDurations", "Durations of ZKFederationStateStore calls"); + + private final MetricsRegistry registry; + + private static final ZKFederationStateStoreOpDurations INSTANCE = + new ZKFederationStateStoreOpDurations(); + + public static ZKFederationStateStoreOpDurations getInstance() { + return INSTANCE; + } + + private ZKFederationStateStoreOpDurations() { + registry = new MetricsRegistry(RECORD_INFO); + registry.tag(RECORD_INFO, "ZKFederationStateStoreOpDurations"); + + MetricsSystem ms = DefaultMetricsSystem.instance(); + if (ms != null) { + ms.register(RECORD_INFO.name(), RECORD_INFO.description(), this); + } + } + + @Override + public synchronized void getMetrics(MetricsCollector collector, boolean all) { + registry.snapshot(collector.addRecord(registry.info()), all); + } + + public void addAppHomeSubClusterDuration(long startTime, long endTime) { + addAppHomeSubCluster.add(endTime - startTime); + } + + public void addUpdateAppHomeSubClusterDuration(long startTime, long endTime) { + updateAppHomeSubCluster.add(endTime - startTime); + } + + public void addGetAppHomeSubClusterDuration(long startTime, long endTime) { + getAppHomeSubCluster.add(endTime - startTime); + } + + public void addGetAppsHomeSubClusterDuration(long startTime, long endTime) { + getAppsHomeSubCluster.add(endTime - startTime); + } + + public void addDeleteAppHomeSubClusterDuration(long startTime, long endTime) { + deleteAppHomeSubCluster.add(endTime - startTime); + } + + public void addRegisterSubClusterDuration(long startTime, long endTime) { + registerSubCluster.add(endTime - startTime); + } + + public void addDeregisterSubClusterDuration(long startTime, long endTime) { + deregisterSubCluster.add(endTime - startTime); + } + + public void addSubClusterHeartbeatDuration(long startTime, long endTime) { + subClusterHeartbeat.add(endTime - startTime); + } + + public void addGetSubClusterDuration(long startTime, long endTime) { + getSubCluster.add(endTime - startTime); + } + + public void addGetSubClustersDuration(long startTime, long endTime) { + getSubClusters.add(endTime - startTime); + } + + public void addGetPolicyConfigurationDuration(long startTime, long endTime) { + getPolicyConfiguration.add(endTime - startTime); + } + + public void addSetPolicyConfigurationDuration(long startTime, long endTime) { + setPolicyConfiguration.add(endTime - startTime); + } + + public void addGetPoliciesConfigurationsDuration(long startTime, long endTime) { + getPoliciesConfigurations.add(endTime - startTime); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java index 888d7aa3d3..5d9b948e5e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.TimeZone; +import org.apache.hadoop.classification.VisibleForTesting; import org.apache.commons.lang3.NotImplementedException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.curator.ZKCuratorManager; @@ -84,6 +85,8 @@ import org.apache.hadoop.yarn.server.federation.store.utils.FederationPolicyStoreInputValidator; import org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils; import org.apache.hadoop.yarn.server.records.Version; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.SystemClock; import org.apache.zookeeper.data.ACL; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -124,6 +127,12 @@ public class ZookeeperFederationStateStore implements FederationStateStore { private String membershipZNode; private String policiesZNode; + private volatile Clock clock = SystemClock.getInstance(); + + @VisibleForTesting + private ZKFederationStateStoreOpDurations opDurations = + ZKFederationStateStoreOpDurations.getInstance(); + @Override public void init(Configuration conf) throws YarnException { LOG.info("Initializing ZooKeeper connection"); @@ -153,7 +162,6 @@ public void init(Configuration conf) throws YarnException { String errMsg = "Cannot create base directories: " + e.getMessage(); FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); } - } @Override @@ -167,6 +175,7 @@ public void close() throws Exception { public AddApplicationHomeSubClusterResponse addApplicationHomeSubCluster( AddApplicationHomeSubClusterRequest request) throws YarnException { + long start = clock.getTime(); FederationApplicationHomeSubClusterStoreInputValidator.validate(request); ApplicationHomeSubCluster app = request.getApplicationHomeSubCluster(); ApplicationId appId = app.getApplicationId(); @@ -187,7 +196,8 @@ public AddApplicationHomeSubClusterResponse addApplicationHomeSubCluster( String errMsg = "Cannot check app home subcluster for " + appId; FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); } - + long end = clock.getTime(); + opDurations.addAppHomeSubClusterDuration(start, end); return AddApplicationHomeSubClusterResponse .newInstance(homeSubCluster); } @@ -198,6 +208,7 @@ public AddApplicationHomeSubClusterResponse addApplicationHomeSubCluster( UpdateApplicationHomeSubClusterRequest request) throws YarnException { + long start = clock.getTime(); FederationApplicationHomeSubClusterStoreInputValidator.validate(request); ApplicationHomeSubCluster app = request.getApplicationHomeSubCluster(); ApplicationId appId = app.getApplicationId(); @@ -209,6 +220,9 @@ public AddApplicationHomeSubClusterResponse addApplicationHomeSubCluster( SubClusterId newSubClusterId = request.getApplicationHomeSubCluster().getHomeSubCluster(); putApp(appId, newSubClusterId, true); + + long end = clock.getTime(); + opDurations.addUpdateAppHomeSubClusterDuration(start, end); return UpdateApplicationHomeSubClusterResponse.newInstance(); } @@ -216,6 +230,7 @@ public AddApplicationHomeSubClusterResponse addApplicationHomeSubCluster( public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster( GetApplicationHomeSubClusterRequest request) throws YarnException { + long start = clock.getTime(); FederationApplicationHomeSubClusterStoreInputValidator.validate(request); ApplicationId appId = request.getApplicationId(); SubClusterId homeSubCluster = getApp(appId); @@ -223,13 +238,15 @@ public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster( String errMsg = "Application " + appId + " does not exist"; FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); } - return GetApplicationHomeSubClusterResponse.newInstance( - ApplicationHomeSubCluster.newInstance(appId, homeSubCluster)); + long end = clock.getTime(); + opDurations.addGetAppHomeSubClusterDuration(start, end); + return GetApplicationHomeSubClusterResponse.newInstance(appId, homeSubCluster); } @Override public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster( GetApplicationsHomeSubClusterRequest request) throws YarnException { + long start = clock.getTime(); List result = new ArrayList<>(); try { @@ -244,7 +261,8 @@ public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster( String errMsg = "Cannot get apps: " + e.getMessage(); FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); } - + long end = clock.getTime(); + opDurations.addGetAppsHomeSubClusterDuration(start, end); return GetApplicationsHomeSubClusterResponse.newInstance(result); } @@ -253,7 +271,7 @@ public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster( deleteApplicationHomeSubCluster( DeleteApplicationHomeSubClusterRequest request) throws YarnException { - + long start = clock.getTime(); FederationApplicationHomeSubClusterStoreInputValidator.validate(request); ApplicationId appId = request.getApplicationId(); String appZNode = getNodePath(appsZNode, appId.toString()); @@ -276,13 +294,15 @@ public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster( String errMsg = "Cannot delete app: " + e.getMessage(); FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); } - + long end = clock.getTime(); + opDurations.addDeleteAppHomeSubClusterDuration(start, end); return DeleteApplicationHomeSubClusterResponse.newInstance(); } @Override public SubClusterRegisterResponse registerSubCluster( SubClusterRegisterRequest request) throws YarnException { + long start = clock.getTime(); FederationMembershipStateStoreInputValidator.validate(request); SubClusterInfo subClusterInfo = request.getSubClusterInfo(); SubClusterId subclusterId = subClusterInfo.getSubClusterId(); @@ -297,12 +317,15 @@ public SubClusterRegisterResponse registerSubCluster( String errMsg = "Cannot register subcluster: " + e.getMessage(); FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); } + long end = clock.getTime(); + opDurations.addRegisterSubClusterDuration(start, end); return SubClusterRegisterResponse.newInstance(); } @Override public SubClusterDeregisterResponse deregisterSubCluster( SubClusterDeregisterRequest request) throws YarnException { + long start = clock.getTime(); FederationMembershipStateStoreInputValidator.validate(request); SubClusterId subClusterId = request.getSubClusterId(); SubClusterState state = request.getState(); @@ -316,14 +339,15 @@ public SubClusterDeregisterResponse deregisterSubCluster( subClusterInfo.setState(state); putSubclusterInfo(subClusterId, subClusterInfo, true); } - + long end = clock.getTime(); + opDurations.addDeregisterSubClusterDuration(start, end); return SubClusterDeregisterResponse.newInstance(); } @Override public SubClusterHeartbeatResponse subClusterHeartbeat( SubClusterHeartbeatRequest request) throws YarnException { - + long start = clock.getTime(); FederationMembershipStateStoreInputValidator.validate(request); SubClusterId subClusterId = request.getSubClusterId(); @@ -340,14 +364,15 @@ public SubClusterHeartbeatResponse subClusterHeartbeat( subClusterInfo.setCapability(request.getCapability()); putSubclusterInfo(subClusterId, subClusterInfo, true); - + long end = clock.getTime(); + opDurations.addSubClusterHeartbeatDuration(start, end); return SubClusterHeartbeatResponse.newInstance(); } @Override public GetSubClusterInfoResponse getSubCluster( GetSubClusterInfoRequest request) throws YarnException { - + long start = clock.getTime(); FederationMembershipStateStoreInputValidator.validate(request); SubClusterId subClusterId = request.getSubClusterId(); SubClusterInfo subClusterInfo = null; @@ -361,12 +386,15 @@ public GetSubClusterInfoResponse getSubCluster( String errMsg = "Cannot get subcluster: " + e.getMessage(); FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); } + long end = clock.getTime(); + opDurations.addGetSubClusterDuration(start, end); return GetSubClusterInfoResponse.newInstance(subClusterInfo); } @Override public GetSubClustersInfoResponse getSubClusters( GetSubClustersInfoRequest request) throws YarnException { + long start = clock.getTime(); List result = new ArrayList<>(); try { @@ -382,6 +410,8 @@ public GetSubClustersInfoResponse getSubClusters( String errMsg = "Cannot get subclusters: " + e.getMessage(); FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); } + long end = clock.getTime(); + opDurations.addGetSubClustersDuration(start, end); return GetSubClustersInfoResponse.newInstance(result); } @@ -389,7 +419,7 @@ public GetSubClustersInfoResponse getSubClusters( @Override public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration( GetSubClusterPolicyConfigurationRequest request) throws YarnException { - + long start = clock.getTime(); FederationPolicyStoreInputValidator.validate(request); String queue = request.getQueue(); SubClusterPolicyConfiguration policy = null; @@ -404,6 +434,8 @@ public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration( LOG.warn("Policy for queue: {} does not exist.", queue); return null; } + long end = clock.getTime(); + opDurations.addGetPolicyConfigurationDuration(start, end); return GetSubClusterPolicyConfigurationResponse .newInstance(policy); } @@ -411,7 +443,7 @@ public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration( @Override public SetSubClusterPolicyConfigurationResponse setPolicyConfiguration( SetSubClusterPolicyConfigurationRequest request) throws YarnException { - + long start = clock.getTime(); FederationPolicyStoreInputValidator.validate(request); SubClusterPolicyConfiguration policy = request.getPolicyConfiguration(); @@ -422,12 +454,15 @@ public SetSubClusterPolicyConfigurationResponse setPolicyConfiguration( String errMsg = "Cannot set policy: " + e.getMessage(); FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); } + long end = clock.getTime(); + opDurations.addSetPolicyConfigurationDuration(start, end); return SetSubClusterPolicyConfigurationResponse.newInstance(); } @Override public GetSubClusterPoliciesConfigurationsResponse getPoliciesConfigurations( GetSubClusterPoliciesConfigurationsRequest request) throws YarnException { + long start = clock.getTime(); List result = new ArrayList<>(); try { @@ -443,6 +478,8 @@ public GetSubClusterPoliciesConfigurationsResponse getPoliciesConfigurations( String errMsg = "Cannot get policies: " + e.getMessage(); FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); } + long end = clock.getTime(); + opDurations.addGetPoliciesConfigurationsDuration(start, end); return GetSubClusterPoliciesConfigurationsResponse.newInstance(result); } @@ -649,6 +686,11 @@ private static long getCurrentTime() { return cal.getTimeInMillis(); } + @VisibleForTesting + public ZKFederationStateStoreOpDurations getOpDurations() { + return opDurations; + } + @Override public AddReservationHomeSubClusterResponse addReservationHomeSubCluster( AddReservationHomeSubClusterRequest request) throws YarnException { @@ -678,4 +720,4 @@ public UpdateReservationHomeSubClusterResponse updateReservationHomeSubCluster( UpdateReservationHomeSubClusterRequest request) throws YarnException { throw new NotImplementedException("Code is not implemented"); } -} +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetApplicationHomeSubClusterResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetApplicationHomeSubClusterResponse.java index 60735b382f..6144b01e86 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetApplicationHomeSubClusterResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetApplicationHomeSubClusterResponse.java @@ -20,6 +20,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.util.Records; /** @@ -42,7 +43,9 @@ public abstract class GetApplicationHomeSubClusterResponse { @Private @Unstable public static GetApplicationHomeSubClusterResponse newInstance( - ApplicationHomeSubCluster applicationHomeSubCluster) { + ApplicationId appId, SubClusterId homeSubCluster) { + ApplicationHomeSubCluster applicationHomeSubCluster = + ApplicationHomeSubCluster.newInstance(appId, homeSubCluster); GetApplicationHomeSubClusterResponse mapResponse = Records.newRecord(GetApplicationHomeSubClusterResponse.class); mapResponse.setApplicationHomeSubCluster(applicationHomeSubCluster); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestZookeeperFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestZookeeperFederationStateStore.java index fe28641eb2..584f3355ff 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestZookeeperFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestZookeeperFederationStateStore.java @@ -25,14 +25,21 @@ import org.apache.curator.test.TestingServer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.metrics2.MetricsRecord; +import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl; +import org.apache.hadoop.metrics2.impl.MetricsRecords; +import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; import org.junit.After; import org.junit.Before; +import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.junit.Assert.assertEquals; + /** * Unit tests for ZookeeperFederationStateStore. */ @@ -84,4 +91,66 @@ protected FederationStateStore createStateStore() { super.setConf(getConf()); return new ZookeeperFederationStateStore(); } + + @Test + public void testMetricsInited() throws Exception { + ZookeeperFederationStateStore zkStateStore = (ZookeeperFederationStateStore) createStateStore(); + ZKFederationStateStoreOpDurations zkStateStoreOpDurations = zkStateStore.getOpDurations(); + MetricsCollectorImpl collector = new MetricsCollectorImpl(); + + long anyDuration = 10; + long start = Time.now(); + long end = start + anyDuration; + + zkStateStoreOpDurations.addAppHomeSubClusterDuration(start, end); + zkStateStoreOpDurations.addUpdateAppHomeSubClusterDuration(start, end); + zkStateStoreOpDurations.addGetAppHomeSubClusterDuration(start, end); + zkStateStoreOpDurations.addGetAppsHomeSubClusterDuration(start, end); + zkStateStoreOpDurations.addDeleteAppHomeSubClusterDuration(start, end); + zkStateStoreOpDurations.addRegisterSubClusterDuration(start, end); + zkStateStoreOpDurations.addDeregisterSubClusterDuration(start, end); + zkStateStoreOpDurations.addSubClusterHeartbeatDuration(start, end); + zkStateStoreOpDurations.addGetSubClusterDuration(start, end); + zkStateStoreOpDurations.addGetSubClustersDuration(start, end); + zkStateStoreOpDurations.addGetPolicyConfigurationDuration(start, end); + zkStateStoreOpDurations.addSetPolicyConfigurationDuration(start, end); + zkStateStoreOpDurations.addGetPoliciesConfigurationsDuration(start, end); + + zkStateStoreOpDurations.getMetrics(collector, true); + assertEquals("Incorrect number of perf metrics", 1, collector.getRecords().size()); + + MetricsRecord record = collector.getRecords().get(0); + MetricsRecords.assertTag(record, ZKFederationStateStoreOpDurations.RECORD_INFO.name(), + "ZKFederationStateStoreOpDurations"); + + double expectAvgTime = anyDuration; + MetricsRecords.assertMetric(record, "AddAppHomeSubClusterAvgTime", expectAvgTime); + MetricsRecords.assertMetric(record, "UpdateAppHomeSubClusterAvgTime", expectAvgTime); + MetricsRecords.assertMetric(record, "GetAppHomeSubClusterAvgTime", expectAvgTime); + MetricsRecords.assertMetric(record, "GetAppsHomeSubClusterAvgTime", expectAvgTime); + MetricsRecords.assertMetric(record, "DeleteAppHomeSubClusterAvgTime", expectAvgTime); + MetricsRecords.assertMetric(record, "RegisterSubClusterAvgTime", expectAvgTime); + MetricsRecords.assertMetric(record, "DeregisterSubClusterAvgTime", expectAvgTime); + MetricsRecords.assertMetric(record, "SubClusterHeartbeatAvgTime", expectAvgTime); + MetricsRecords.assertMetric(record, "GetSubClusterAvgTime", expectAvgTime); + MetricsRecords.assertMetric(record, "GetSubClustersAvgTime", expectAvgTime); + MetricsRecords.assertMetric(record, "GetPolicyConfigurationAvgTime", expectAvgTime); + MetricsRecords.assertMetric(record, "SetPolicyConfigurationAvgTime", expectAvgTime); + MetricsRecords.assertMetric(record, "GetPoliciesConfigurationsAvgTime", expectAvgTime); + + long expectOps = 1; + MetricsRecords.assertMetric(record, "AddAppHomeSubClusterNumOps", expectOps); + MetricsRecords.assertMetric(record, "UpdateAppHomeSubClusterNumOps", expectOps); + MetricsRecords.assertMetric(record, "GetAppHomeSubClusterNumOps", expectOps); + MetricsRecords.assertMetric(record, "GetAppsHomeSubClusterNumOps", expectOps); + MetricsRecords.assertMetric(record, "DeleteAppHomeSubClusterNumOps", expectOps); + MetricsRecords.assertMetric(record, "RegisterSubClusterNumOps", expectOps); + MetricsRecords.assertMetric(record, "DeregisterSubClusterNumOps", expectOps); + MetricsRecords.assertMetric(record, "SubClusterHeartbeatNumOps", expectOps); + MetricsRecords.assertMetric(record, "GetSubClusterNumOps", expectOps); + MetricsRecords.assertMetric(record, "GetSubClustersNumOps", expectOps); + MetricsRecords.assertMetric(record, "GetPolicyConfigurationNumOps", expectOps); + MetricsRecords.assertMetric(record, "SetPolicyConfigurationNumOps", expectOps); + MetricsRecords.assertMetric(record, "GetPoliciesConfigurationsNumOps", expectOps); + } } \ No newline at end of file