diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java index 63d8e42460..533f9c82ef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java @@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreInvalidInputException; +import org.apache.hadoop.yarn.server.federation.store.metrics.FederationStateStoreClientMetrics; import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest; import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse; import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; @@ -72,6 +73,8 @@ import org.apache.hadoop.yarn.server.federation.store.utils.FederationPolicyStoreInputValidator; import org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils; import org.apache.hadoop.yarn.server.records.Version; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.MonotonicClock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -137,6 +140,7 @@ public class SQLFederationStateStore implements FederationStateStore { private String url; private int maximumPoolSize; private HikariDataSource dataSource = null; + private final Clock clock = new MonotonicClock(); @Override public void init(Configuration conf) throws YarnException { @@ -203,7 +207,9 @@ public SubClusterRegisterResponse registerSubCluster( cstmt.registerOutParameter(9, java.sql.Types.INTEGER); // Execute the query + long startTime = clock.getTime(); cstmt.executeUpdate(); + long stopTime = clock.getTime(); // Check the ROWCOUNT value, if it is equal to 0 it means the call // did not add a new subcluster into FederationStateStore @@ -222,8 +228,11 @@ public SubClusterRegisterResponse registerSubCluster( LOG.info( "Registered the SubCluster " + subClusterId + " into the StateStore"); + FederationStateStoreClientMetrics + .succeededStateStoreCall(stopTime - startTime); } catch (SQLException e) { + FederationStateStoreClientMetrics.failedStateStoreCall(); FederationStateStoreUtils.logAndThrowRetriableException(LOG, "Unable to register the SubCluster " + subClusterId + " into the StateStore", @@ -260,7 +269,9 @@ public SubClusterDeregisterResponse deregisterSubCluster( cstmt.registerOutParameter(3, java.sql.Types.INTEGER); // Execute the query + long startTime = clock.getTime(); cstmt.executeUpdate(); + long stopTime = clock.getTime(); // Check the ROWCOUNT value, if it is equal to 0 it means the call // did not deregister the subcluster into FederationStateStore @@ -278,8 +289,11 @@ public SubClusterDeregisterResponse deregisterSubCluster( LOG.info("Deregistered the SubCluster " + subClusterId + " state to " + state.toString()); + FederationStateStoreClientMetrics + .succeededStateStoreCall(stopTime - startTime); } catch (SQLException e) { + FederationStateStoreClientMetrics.failedStateStoreCall(); FederationStateStoreUtils.logAndThrowRetriableException(LOG, "Unable to deregister the sub-cluster " + subClusterId + " state to " + state.toString(), @@ -317,7 +331,9 @@ public SubClusterHeartbeatResponse subClusterHeartbeat( cstmt.registerOutParameter(4, java.sql.Types.INTEGER); // Execute the query + long startTime = clock.getTime(); cstmt.executeUpdate(); + long stopTime = clock.getTime(); // Check the ROWCOUNT value, if it is equal to 0 it means the call // did not update the subcluster into FederationStateStore @@ -336,8 +352,11 @@ public SubClusterHeartbeatResponse subClusterHeartbeat( LOG.info("Heartbeated the StateStore for the specified SubCluster " + subClusterId); + FederationStateStoreClientMetrics + .succeededStateStoreCall(stopTime - startTime); } catch (SQLException e) { + FederationStateStoreClientMetrics.failedStateStoreCall(); FederationStateStoreUtils.logAndThrowRetriableException(LOG, "Unable to heartbeat the StateStore for the specified SubCluster " + subClusterId, @@ -378,7 +397,9 @@ public GetSubClusterInfoResponse getSubCluster( cstmt.registerOutParameter(9, java.sql.Types.VARCHAR); // Execute the query + long startTime = clock.getTime(); cstmt.execute(); + long stopTime = clock.getTime(); String amRMAddress = cstmt.getString(2); String clientRMAddress = cstmt.getString(3); @@ -403,6 +424,9 @@ public GetSubClusterInfoResponse getSubCluster( clientRMAddress, rmAdminAddress, webAppAddress, lastHeartBeat, state, lastStartTime, capability); + FederationStateStoreClientMetrics + .succeededStateStoreCall(stopTime - startTime); + // Check if the output it is a valid subcluster try { FederationMembershipStateStoreInputValidator @@ -417,6 +441,7 @@ public GetSubClusterInfoResponse getSubCluster( + subClusterInfo.toString()); } } catch (SQLException e) { + FederationStateStoreClientMetrics.failedStateStoreCall(); FederationStateStoreUtils.logAndThrowRetriableException(LOG, "Unable to obtain the SubCluster information for " + subClusterId, e); } finally { @@ -439,7 +464,9 @@ public GetSubClustersInfoResponse getSubClusters( cstmt = conn.prepareCall(CALL_SP_GET_SUBCLUSTERS); // Execute the query + long startTime = clock.getTime(); rs = cstmt.executeQuery(); + long stopTime = clock.getTime(); while (rs.next()) { @@ -459,6 +486,10 @@ public GetSubClustersInfoResponse getSubClusters( amRMAddress, clientRMAddress, rmAdminAddress, webAppAddress, lastHeartBeat, state, lastStartTime, capability); + FederationStateStoreClientMetrics + .succeededStateStoreCall(stopTime - startTime); + + // Check if the output it is a valid subcluster try { FederationMembershipStateStoreInputValidator @@ -477,6 +508,7 @@ public GetSubClustersInfoResponse getSubClusters( } } catch (SQLException e) { + FederationStateStoreClientMetrics.failedStateStoreCall(); FederationStateStoreUtils.logAndThrowRetriableException(LOG, "Unable to obtain the information for all the SubClusters ", e); } finally { @@ -513,11 +545,16 @@ public AddApplicationHomeSubClusterResponse addApplicationHomeSubCluster( cstmt.registerOutParameter(4, java.sql.Types.INTEGER); // Execute the query + long startTime = clock.getTime(); cstmt.executeUpdate(); + long stopTime = clock.getTime(); subClusterHome = cstmt.getString(3); SubClusterId subClusterIdHome = SubClusterId.newInstance(subClusterHome); + FederationStateStoreClientMetrics + .succeededStateStoreCall(stopTime - startTime); + // For failover reason, we check the returned SubClusterId. // If it is equal to the subclusterId we sent, the call added the new // application into FederationStateStore. If the call returns a different @@ -554,6 +591,7 @@ public AddApplicationHomeSubClusterResponse addApplicationHomeSubCluster( } } catch (SQLException e) { + FederationStateStoreClientMetrics.failedStateStoreCall(); FederationStateStoreUtils .logAndThrowRetriableException(LOG, "Unable to insert the newly generated application " @@ -592,7 +630,9 @@ public UpdateApplicationHomeSubClusterResponse updateApplicationHomeSubCluster( cstmt.registerOutParameter(3, java.sql.Types.INTEGER); // Execute the query + long startTime = clock.getTime(); cstmt.executeUpdate(); + long stopTime = clock.getTime(); // Check the ROWCOUNT value, if it is equal to 0 it means the call // did not update the application into FederationStateStore @@ -611,8 +651,11 @@ public UpdateApplicationHomeSubClusterResponse updateApplicationHomeSubCluster( LOG.info( "Update the SubCluster to {} for application {} in the StateStore", subClusterId, appId); + FederationStateStoreClientMetrics + .succeededStateStoreCall(stopTime - startTime); } catch (SQLException e) { + FederationStateStoreClientMetrics.failedStateStoreCall(); FederationStateStoreUtils .logAndThrowRetriableException(LOG, "Unable to update the application " @@ -645,7 +688,9 @@ public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster( cstmt.registerOutParameter(2, java.sql.Types.VARCHAR); // Execute the query + long startTime = clock.getTime(); cstmt.execute(); + long stopTime = clock.getTime(); if (cstmt.getString(2) != null) { homeRM = SubClusterId.newInstance(cstmt.getString(2)); @@ -659,7 +704,12 @@ public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster( LOG.debug("Got the information about the specified application " + request.getApplicationId() + ". The AM is running in " + homeRM); } + + FederationStateStoreClientMetrics + .succeededStateStoreCall(stopTime - startTime); + } catch (SQLException e) { + FederationStateStoreClientMetrics.failedStateStoreCall(); FederationStateStoreUtils.logAndThrowRetriableException(LOG, "Unable to obtain the application information " + "for the specified application " + request.getApplicationId(), @@ -688,7 +738,9 @@ public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster( cstmt = conn.prepareCall(CALL_SP_GET_APPLICATIONS_HOME_SUBCLUSTER); // Execute the query + long startTime = clock.getTime(); rs = cstmt.executeQuery(); + long stopTime = clock.getTime(); while (rs.next()) { @@ -701,7 +753,11 @@ public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster( SubClusterId.newInstance(homeSubCluster))); } + FederationStateStoreClientMetrics + .succeededStateStoreCall(stopTime - startTime); + } catch (SQLException e) { + FederationStateStoreClientMetrics.failedStateStoreCall(); FederationStateStoreUtils.logAndThrowRetriableException(LOG, "Unable to obtain the information for all the applications ", e); } finally { @@ -731,7 +787,9 @@ public DeleteApplicationHomeSubClusterResponse deleteApplicationHomeSubCluster( cstmt.registerOutParameter(2, java.sql.Types.INTEGER); // Execute the query + long startTime = clock.getTime(); cstmt.executeUpdate(); + long stopTime = clock.getTime(); // Check the ROWCOUNT value, if it is equal to 0 it means the call // did not delete the application from FederationStateStore @@ -750,8 +808,11 @@ public DeleteApplicationHomeSubClusterResponse deleteApplicationHomeSubCluster( LOG.info("Delete from the StateStore the application: {}", request.getApplicationId()); + FederationStateStoreClientMetrics + .succeededStateStoreCall(stopTime - startTime); } catch (SQLException e) { + FederationStateStoreClientMetrics.failedStateStoreCall(); FederationStateStoreUtils.logAndThrowRetriableException(LOG, "Unable to delete the application " + request.getApplicationId(), e); } finally { @@ -782,7 +843,9 @@ public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration( cstmt.registerOutParameter(3, java.sql.Types.VARBINARY); // Execute the query + long startTime = clock.getTime(); cstmt.executeUpdate(); + long stopTime = clock.getTime(); // Check if the output it is a valid policy if (cstmt.getString(2) != null && cstmt.getBytes(3) != null) { @@ -798,7 +861,11 @@ public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration( return null; } + FederationStateStoreClientMetrics + .succeededStateStoreCall(stopTime - startTime); + } catch (SQLException e) { + FederationStateStoreClientMetrics.failedStateStoreCall(); FederationStateStoreUtils.logAndThrowRetriableException(LOG, "Unable to select the policy for the queue :" + request.getQueue(), e); @@ -833,7 +900,9 @@ public SetSubClusterPolicyConfigurationResponse setPolicyConfiguration( cstmt.registerOutParameter(4, java.sql.Types.INTEGER); // Execute the query + long startTime = clock.getTime(); cstmt.executeUpdate(); + long stopTime = clock.getTime(); // Check the ROWCOUNT value, if it is equal to 0 it means the call // did not add a new policy into FederationStateStore @@ -852,8 +921,11 @@ public SetSubClusterPolicyConfigurationResponse setPolicyConfiguration( LOG.info("Insert into the state store the policy for the queue: " + policyConf.getQueue()); + FederationStateStoreClientMetrics + .succeededStateStoreCall(stopTime - startTime); } catch (SQLException e) { + FederationStateStoreClientMetrics.failedStateStoreCall(); FederationStateStoreUtils.logAndThrowRetriableException(LOG, "Unable to insert the newly generated policy for the queue :" + policyConf.getQueue(), @@ -880,7 +952,9 @@ public GetSubClusterPoliciesConfigurationsResponse getPoliciesConfigurations( cstmt = conn.prepareCall(CALL_SP_GET_POLICIES_CONFIGURATIONS); // Execute the query + long startTime = clock.getTime(); rs = cstmt.executeQuery(); + long stopTime = clock.getTime(); while (rs.next()) { @@ -894,7 +968,12 @@ public GetSubClusterPoliciesConfigurationsResponse getPoliciesConfigurations( ByteBuffer.wrap(policyInfo)); policyConfigurations.add(subClusterPolicyConfiguration); } + + FederationStateStoreClientMetrics + .succeededStateStoreCall(stopTime - startTime); + } catch (SQLException e) { + FederationStateStoreClientMetrics.failedStateStoreCall(); FederationStateStoreUtils.logAndThrowRetriableException(LOG, "Unable to obtain the policy information for all the queues.", e); } finally { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/metrics/FederationStateStoreClientMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/metrics/FederationStateStoreClientMetrics.java new file mode 100644 index 0000000000..27b46cde8e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/metrics/FederationStateStoreClientMetrics.java @@ -0,0 +1,184 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.federation.store.metrics; + +import java.lang.reflect.Method; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableCounterLong; +import org.apache.hadoop.metrics2.lib.MutableQuantiles; +import org.apache.hadoop.metrics2.lib.MutableRate; +import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Performance metrics for FederationStateStore implementations. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +@Metrics(about = "Performance and usage metrics for Federation StateStore", + context = "fedr") +public final class FederationStateStoreClientMetrics implements MetricsSource { + public static final Logger LOG = + LoggerFactory.getLogger(FederationStateStoreClientMetrics.class); + + private static final MetricsRegistry REGISTRY = + new MetricsRegistry("FederationStateStoreClientMetrics"); + private final static Method[] STATESTORE_API_METHODS = + FederationStateStore.class.getMethods(); + + // Map method names to counter objects + private static final Map API_TO_FAILED_CALLS = + new HashMap(); + private static final Map API_TO_SUCCESSFUL_CALLS = + new HashMap(); + + // Provide quantile latency for each api call. + private static final Map API_TO_QUANTILE_METRICS = + new HashMap(); + + // Error string templates for logging calls from methods not in + // FederationStateStore API + private static final String UNKOWN_FAIL_ERROR_MSG = + "Not recording failed call for unknown FederationStateStore method {}"; + private static final String UNKNOWN_SUCCESS_ERROR_MSG = + "Not recording successful call for unknown " + + "FederationStateStore method {}"; + + // Aggregate metrics are shared, and don't have to be looked up per call + @Metric("Total number of successful calls and latency(ms)") + private static MutableRate totalSucceededCalls; + + @Metric("Total number of failed StateStore calls") + private static MutableCounterLong totalFailedCalls; + + // This after the static members are initialized, or the constructor will + // throw a NullPointerException + private static final FederationStateStoreClientMetrics S_INSTANCE = + DefaultMetricsSystem.instance() + .register(new FederationStateStoreClientMetrics()); + + synchronized public static FederationStateStoreClientMetrics getInstance() { + return S_INSTANCE; + } + + private FederationStateStoreClientMetrics() { + // Create the metrics for each method and put them into the map + for (Method m : STATESTORE_API_METHODS) { + String methodName = m.getName(); + LOG.debug("Registering Federation StateStore Client metrics for {}", + methodName); + + // This metric only records the number of failed calls; it does not + // capture latency information + API_TO_FAILED_CALLS.put(methodName, + REGISTRY.newCounter(methodName + "_numFailedCalls", + "# failed calls to " + methodName, 0L)); + + // This metric records both the number and average latency of successful + // calls. + API_TO_SUCCESSFUL_CALLS.put(methodName, + REGISTRY.newRate(methodName + "_successfulCalls", + "# successful calls and latency(ms) for" + methodName)); + + // This metric records the quantile-based latency of each successful call, + // re-sampled every 10 seconds. + API_TO_QUANTILE_METRICS.put(methodName, + REGISTRY.newQuantiles(methodName + "Latency", + "Quantile latency (ms) for " + methodName, "ops", "latency", 10)); + } + } + + public static void failedStateStoreCall() { + String methodName = + Thread.currentThread().getStackTrace()[2].getMethodName(); + MutableCounterLong methodMetric = API_TO_FAILED_CALLS.get(methodName); + if (methodMetric == null) { + LOG.error(UNKOWN_FAIL_ERROR_MSG, methodName); + return; + } + + totalFailedCalls.incr(); + methodMetric.incr(); + } + + public static void succeededStateStoreCall(long duration) { + String methodName = + Thread.currentThread().getStackTrace()[2].getMethodName(); + MutableRate methodMetric = API_TO_SUCCESSFUL_CALLS.get(methodName); + MutableQuantiles methodQuantileMetric = + API_TO_QUANTILE_METRICS.get(methodName); + if (methodMetric == null || methodQuantileMetric == null) { + LOG.error(UNKNOWN_SUCCESS_ERROR_MSG, methodName); + return; + } + + totalSucceededCalls.add(duration); + methodMetric.add(duration); + methodQuantileMetric.add(duration); + } + + @Override + public void getMetrics(MetricsCollector collector, boolean all) { + REGISTRY.snapshot(collector.addRecord(REGISTRY.info()), all); + } + + // Getters for unit testing + @VisibleForTesting + static long getNumFailedCallsForMethod(String methodName) { + return API_TO_FAILED_CALLS.get(methodName).value(); + } + + @VisibleForTesting + static long getNumSucceessfulCallsForMethod(String methodName) { + return API_TO_SUCCESSFUL_CALLS.get(methodName).lastStat().numSamples(); + } + + @VisibleForTesting + static double getLatencySucceessfulCallsForMethod(String methodName) { + return API_TO_SUCCESSFUL_CALLS.get(methodName).lastStat().mean(); + } + + @VisibleForTesting + static long getNumFailedCalls() { + return totalFailedCalls.value(); + } + + @VisibleForTesting + static long getNumSucceededCalls() { + return totalSucceededCalls.lastStat().numSamples(); + } + + @VisibleForTesting + static double getLatencySucceededCalls() { + return totalSucceededCalls.lastStat().mean(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/metrics/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/metrics/package-info.java new file mode 100644 index 0000000000..eb548f4027 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/metrics/package-info.java @@ -0,0 +1,17 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.yarn.server.federation.store.metrics; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/metrics/TestFederationStateStoreClientMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/metrics/TestFederationStateStoreClientMetrics.java new file mode 100644 index 0000000000..241d5e2501 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/metrics/TestFederationStateStoreClientMetrics.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.store.metrics; + +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Unittests for {@link FederationStateStoreClientMetrics}. + * + */ +public class TestFederationStateStoreClientMetrics { + public static final Logger LOG = + LoggerFactory.getLogger(TestFederationStateStoreClientMetrics.class); + + private MockBadFederationStateStore badStateStore = + new MockBadFederationStateStore(); + private MockGoodFederationStateStore goodStateStore = + new MockGoodFederationStateStore(); + + @Test + public void testAggregateMetricInit() { + LOG.info("Test: aggregate metrics are initialized correctly"); + + Assert.assertEquals(0, + FederationStateStoreClientMetrics.getNumSucceededCalls()); + Assert.assertEquals(0, + FederationStateStoreClientMetrics.getNumFailedCalls()); + + LOG.info("Test: aggregate metrics are updated correctly"); + } + + @Test + public void testSuccessfulCalls() { + LOG.info("Test: Aggregate and method successful calls updated correctly"); + + long totalGoodBefore = + FederationStateStoreClientMetrics.getNumSucceededCalls(); + long apiGoodBefore = FederationStateStoreClientMetrics + .getNumSucceessfulCallsForMethod("registerSubCluster"); + + goodStateStore.registerSubCluster(100); + + Assert.assertEquals(totalGoodBefore + 1, + FederationStateStoreClientMetrics.getNumSucceededCalls()); + Assert.assertEquals(100, + FederationStateStoreClientMetrics.getLatencySucceededCalls(), 0); + Assert.assertEquals(apiGoodBefore + 1, + FederationStateStoreClientMetrics.getNumSucceededCalls()); + Assert.assertEquals(100, FederationStateStoreClientMetrics + .getLatencySucceessfulCallsForMethod("registerSubCluster"), 0); + + LOG.info("Test: Running stats correctly calculated for 2 metrics"); + + goodStateStore.registerSubCluster(200); + + Assert.assertEquals(totalGoodBefore + 2, + FederationStateStoreClientMetrics.getNumSucceededCalls()); + Assert.assertEquals(150, + FederationStateStoreClientMetrics.getLatencySucceededCalls(), 0); + Assert.assertEquals(apiGoodBefore + 2, + FederationStateStoreClientMetrics.getNumSucceededCalls()); + Assert.assertEquals(150, FederationStateStoreClientMetrics + .getLatencySucceessfulCallsForMethod("registerSubCluster"), 0); + + } + + @Test + public void testFailedCalls() { + + long totalBadbefore = FederationStateStoreClientMetrics.getNumFailedCalls(); + long apiBadBefore = FederationStateStoreClientMetrics + .getNumFailedCallsForMethod("registerSubCluster"); + + badStateStore.registerSubCluster(); + + LOG.info("Test: Aggregate and method failed calls updated correctly"); + Assert.assertEquals(totalBadbefore + 1, + FederationStateStoreClientMetrics.getNumFailedCalls()); + Assert.assertEquals(apiBadBefore + 1, FederationStateStoreClientMetrics + .getNumFailedCallsForMethod("registerSubCluster")); + + } + + @Test + public void testCallsUnknownMethod() { + + long totalBadbefore = FederationStateStoreClientMetrics.getNumFailedCalls(); + long apiBadBefore = FederationStateStoreClientMetrics + .getNumFailedCallsForMethod("registerSubCluster"); + long totalGoodBefore = + FederationStateStoreClientMetrics.getNumSucceededCalls(); + long apiGoodBefore = FederationStateStoreClientMetrics + .getNumSucceessfulCallsForMethod("registerSubCluster"); + + LOG.info("Calling Metrics class directly"); + FederationStateStoreClientMetrics.failedStateStoreCall(); + FederationStateStoreClientMetrics.succeededStateStoreCall(100); + + LOG.info("Test: Aggregate and method calls did not update"); + Assert.assertEquals(totalBadbefore, + FederationStateStoreClientMetrics.getNumFailedCalls()); + Assert.assertEquals(apiBadBefore, FederationStateStoreClientMetrics + .getNumFailedCallsForMethod("registerSubCluster")); + + Assert.assertEquals(totalGoodBefore, + FederationStateStoreClientMetrics.getNumSucceededCalls()); + Assert.assertEquals(apiGoodBefore, FederationStateStoreClientMetrics + .getNumSucceessfulCallsForMethod("registerSubCluster")); + + } + + // Records failures for all calls + private class MockBadFederationStateStore { + public void registerSubCluster() { + LOG.info("Mocked: failed registerSubCluster call"); + FederationStateStoreClientMetrics.failedStateStoreCall(); + } + } + + // Records successes for all calls + private class MockGoodFederationStateStore { + public void registerSubCluster(long duration) { + LOG.info("Mocked: successful registerSubCluster call with duration {}", + duration); + FederationStateStoreClientMetrics.succeededStateStoreCall(duration); + } + } +}