diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationClientMethod.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationClientMethod.java new file mode 100644 index 0000000000..4faa9812f3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationClientMethod.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.federation; + +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; +import org.apache.hadoop.yarn.util.Clock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Method; +import java.util.Arrays; + +/** + * Class to define client method,params and arguments. + */ +public class FederationClientMethod { + + public static final Logger LOG = + LoggerFactory.getLogger(FederationClientMethod.class); + + /** + * List of parameters: static and dynamic values, matchings types. + */ + private final Object[] params; + + /** + * List of method parameters types, matches parameters. + */ + private final Class[] types; + + /** + * String name of the method. + */ + private final String methodName; + + private FederationStateStore stateStoreClient = null; + + private Clock clock = null; + + private Class clazz; + + public FederationClientMethod(String method, Class[] pTypes, Object... pParams) + throws YarnException { + if (pParams.length != pTypes.length) { + throw new YarnException("Invalid parameters for method " + method); + } + + this.params = pParams; + this.types = Arrays.copyOf(pTypes, pTypes.length); + this.methodName = method; + } + + public FederationClientMethod(String method, Class pTypes, Object pParams) + throws YarnException { + this(method, new Class[]{pTypes}, new Object[]{pParams}); + } + + public FederationClientMethod(String method, Class pTypes, Object pParams, Class rTypes, + FederationStateStore fedStateStore, Clock fedClock) throws YarnException { + this(method, pTypes, pParams); + this.stateStoreClient = fedStateStore; + this.clock = fedClock; + this.clazz = rTypes; + } + + public Object[] getParams() { + return Arrays.copyOf(this.params, this.params.length); + } + + public String getMethodName() { + return methodName; + } + + /** + * Get the calling types for this method. + * + * @return An array of calling types. + */ + public Class[] getTypes() { + return Arrays.copyOf(this.types, this.types.length); + } + + /** + * We will use the invoke method to call the method in FederationStateStoreService. + * + * @return The result returned after calling the interface. + * @throws YarnException yarn exception. + */ + protected R invoke() throws YarnException { + try { + long startTime = clock.getTime(); + Method method = FederationStateStore.class.getMethod(methodName, types); + R result = clazz.cast(method.invoke(stateStoreClient, params)); + + long stopTime = clock.getTime(); + FederationStateStoreServiceMetrics.succeededStateStoreServiceCall( + methodName, stopTime - startTime); + return result; + } catch (Exception e) { + LOG.error("stateStoreClient call method {} error.", methodName, e); + FederationStateStoreServiceMetrics.failedStateStoreServiceCall(methodName); + throw new YarnException(e); + } + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java index 90dcadb721..d71a7f45e0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java @@ -85,6 +85,8 @@ import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.MonotonicClock; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.webapp.util.WebAppUtils; import org.slf4j.Logger; @@ -110,6 +112,8 @@ public class FederationStateStoreService extends AbstractService private long heartbeatInterval; private long heartbeatInitialDelay; private RMContext rmContext; + private final Clock clock = new MonotonicClock(); + private FederationStateStoreServiceMetrics metrics; private String cleanUpThreadNamePrefix = "FederationStateStoreService-Clean-Thread"; private int cleanUpRetryCountNum; private long cleanUpRetrySleepTime; @@ -171,6 +175,9 @@ protected void serviceInit(Configuration conf) throws Exception { LOG.info("Initialized federation membership service."); + this.metrics = FederationStateStoreServiceMetrics.getMetrics(); + LOG.info("Initialized federation statestore service metrics."); + super.serviceInit(conf); } @@ -283,154 +290,251 @@ public void checkVersion() throws Exception { @Override public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration( GetSubClusterPolicyConfigurationRequest request) throws YarnException { - return stateStoreClient.getPolicyConfiguration(request); + FederationClientMethod clientMethod = + new FederationClientMethod<>("getPolicyConfiguration", + GetSubClusterPolicyConfigurationRequest.class, request, + GetSubClusterPolicyConfigurationResponse.class, stateStoreClient, clock); + return clientMethod.invoke(); } @Override public SetSubClusterPolicyConfigurationResponse setPolicyConfiguration( SetSubClusterPolicyConfigurationRequest request) throws YarnException { - return stateStoreClient.setPolicyConfiguration(request); + FederationClientMethod clientMethod = + new FederationClientMethod<>("setPolicyConfiguration", + SetSubClusterPolicyConfigurationRequest.class, request, + SetSubClusterPolicyConfigurationResponse.class, stateStoreClient, clock); + return clientMethod.invoke(); } @Override public GetSubClusterPoliciesConfigurationsResponse getPoliciesConfigurations( GetSubClusterPoliciesConfigurationsRequest request) throws YarnException { - return stateStoreClient.getPoliciesConfigurations(request); + FederationClientMethod clientMethod = + new FederationClientMethod<>("getPoliciesConfigurations", + GetSubClusterPoliciesConfigurationsRequest.class, request, + GetSubClusterPoliciesConfigurationsResponse.class, stateStoreClient, clock); + return clientMethod.invoke(); } @Override - public SubClusterRegisterResponse registerSubCluster( - SubClusterRegisterRequest registerSubClusterRequest) + public SubClusterRegisterResponse registerSubCluster(SubClusterRegisterRequest request) throws YarnException { - return stateStoreClient.registerSubCluster(registerSubClusterRequest); + FederationClientMethod clientMethod = + new FederationClientMethod<>("registerSubCluster", + SubClusterRegisterRequest.class, request, + SubClusterRegisterResponse.class, stateStoreClient, clock); + return clientMethod.invoke(); } @Override - public SubClusterDeregisterResponse deregisterSubCluster( - SubClusterDeregisterRequest subClusterDeregisterRequest) + public SubClusterDeregisterResponse deregisterSubCluster(SubClusterDeregisterRequest request) throws YarnException { - return stateStoreClient.deregisterSubCluster(subClusterDeregisterRequest); + FederationClientMethod clientMethod = + new FederationClientMethod<>("deregisterSubCluster", + SubClusterDeregisterRequest.class, request, + SubClusterDeregisterResponse.class, stateStoreClient, clock); + return clientMethod.invoke(); } @Override - public SubClusterHeartbeatResponse subClusterHeartbeat( - SubClusterHeartbeatRequest subClusterHeartbeatRequest) + public SubClusterHeartbeatResponse subClusterHeartbeat(SubClusterHeartbeatRequest request) throws YarnException { - return stateStoreClient.subClusterHeartbeat(subClusterHeartbeatRequest); + FederationClientMethod clientMethod = + new FederationClientMethod<>("subClusterHeartbeat", + SubClusterHeartbeatRequest.class, request, + SubClusterHeartbeatResponse.class, stateStoreClient, clock); + return clientMethod.invoke(); } @Override - public GetSubClusterInfoResponse getSubCluster( - GetSubClusterInfoRequest subClusterRequest) throws YarnException { - return stateStoreClient.getSubCluster(subClusterRequest); + public GetSubClusterInfoResponse getSubCluster(GetSubClusterInfoRequest request) + throws YarnException { + FederationClientMethod clientMethod = + new FederationClientMethod<>("getSubCluster", + GetSubClusterInfoRequest.class, request, + GetSubClusterInfoResponse.class, stateStoreClient, clock); + return clientMethod.invoke(); } @Override - public GetSubClustersInfoResponse getSubClusters( - GetSubClustersInfoRequest subClustersRequest) throws YarnException { - return stateStoreClient.getSubClusters(subClustersRequest); + public GetSubClustersInfoResponse getSubClusters(GetSubClustersInfoRequest request) + throws YarnException { + FederationClientMethod clientMethod = + new FederationClientMethod<>("getSubClusters", + GetSubClustersInfoRequest.class, request, + GetSubClustersInfoResponse.class, stateStoreClient, clock); + return clientMethod.invoke(); } @Override public AddApplicationHomeSubClusterResponse addApplicationHomeSubCluster( AddApplicationHomeSubClusterRequest request) throws YarnException { - return stateStoreClient.addApplicationHomeSubCluster(request); + FederationClientMethod clientMethod = + new FederationClientMethod<>("addApplicationHomeSubCluster", + AddApplicationHomeSubClusterRequest.class, request, + AddApplicationHomeSubClusterResponse.class, stateStoreClient, clock); + return clientMethod.invoke(); } @Override public UpdateApplicationHomeSubClusterResponse updateApplicationHomeSubCluster( UpdateApplicationHomeSubClusterRequest request) throws YarnException { - return stateStoreClient.updateApplicationHomeSubCluster(request); + FederationClientMethod clientMethod = + new FederationClientMethod<>("updateApplicationHomeSubCluster", + AddApplicationHomeSubClusterRequest.class, request, + UpdateApplicationHomeSubClusterResponse.class, stateStoreClient, clock); + return clientMethod.invoke(); } @Override public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster( GetApplicationHomeSubClusterRequest request) throws YarnException { - return stateStoreClient.getApplicationHomeSubCluster(request); + FederationClientMethod clientMethod = + new FederationClientMethod<>("getApplicationHomeSubCluster", + GetApplicationHomeSubClusterRequest.class, request, + GetApplicationHomeSubClusterResponse.class, stateStoreClient, clock); + return clientMethod.invoke(); } @Override public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster( GetApplicationsHomeSubClusterRequest request) throws YarnException { - return stateStoreClient.getApplicationsHomeSubCluster(request); + FederationClientMethod clientMethod = + new FederationClientMethod<>("getApplicationsHomeSubCluster", + GetApplicationsHomeSubClusterRequest.class, request, + GetApplicationsHomeSubClusterResponse.class, stateStoreClient, clock); + return clientMethod.invoke(); } @Override public DeleteApplicationHomeSubClusterResponse deleteApplicationHomeSubCluster( DeleteApplicationHomeSubClusterRequest request) throws YarnException { - return stateStoreClient.deleteApplicationHomeSubCluster(request); + FederationClientMethod clientMethod = + new FederationClientMethod<>("deleteApplicationHomeSubCluster", + DeleteApplicationHomeSubClusterRequest.class, request, + DeleteApplicationHomeSubClusterResponse.class, stateStoreClient, clock); + return clientMethod.invoke(); } @Override public AddReservationHomeSubClusterResponse addReservationHomeSubCluster( AddReservationHomeSubClusterRequest request) throws YarnException { - return stateStoreClient.addReservationHomeSubCluster(request); + FederationClientMethod clientMethod = + new FederationClientMethod<>("addReservationHomeSubCluster", + AddReservationHomeSubClusterRequest.class, request, + AddReservationHomeSubClusterResponse.class, stateStoreClient, clock); + return clientMethod.invoke(); } @Override public GetReservationHomeSubClusterResponse getReservationHomeSubCluster( GetReservationHomeSubClusterRequest request) throws YarnException { - return stateStoreClient.getReservationHomeSubCluster(request); + FederationClientMethod clientMethod = + new FederationClientMethod<>("getReservationHomeSubCluster", + GetReservationHomeSubClusterRequest.class, request, + GetReservationHomeSubClusterResponse.class, stateStoreClient, clock); + return clientMethod.invoke(); } @Override public GetReservationsHomeSubClusterResponse getReservationsHomeSubCluster( GetReservationsHomeSubClusterRequest request) throws YarnException { - return stateStoreClient.getReservationsHomeSubCluster(request); + FederationClientMethod clientMethod = + new FederationClientMethod<>("getReservationsHomeSubCluster", + GetReservationsHomeSubClusterRequest.class, request, + GetReservationsHomeSubClusterResponse.class, stateStoreClient, clock); + return clientMethod.invoke(); } @Override public UpdateReservationHomeSubClusterResponse updateReservationHomeSubCluster( UpdateReservationHomeSubClusterRequest request) throws YarnException { - return stateStoreClient.updateReservationHomeSubCluster(request); + FederationClientMethod clientMethod = + new FederationClientMethod<>("updateReservationHomeSubCluster", + GetReservationsHomeSubClusterRequest.class, request, + UpdateReservationHomeSubClusterResponse.class, stateStoreClient, clock); + return clientMethod.invoke(); } @Override public DeleteReservationHomeSubClusterResponse deleteReservationHomeSubCluster( DeleteReservationHomeSubClusterRequest request) throws YarnException { - return stateStoreClient.deleteReservationHomeSubCluster(request); + FederationClientMethod clientMethod = + new FederationClientMethod<>("deleteReservationHomeSubCluster", + DeleteReservationHomeSubClusterRequest.class, request, + DeleteReservationHomeSubClusterResponse.class, stateStoreClient, clock); + return clientMethod.invoke(); } @Override public RouterMasterKeyResponse storeNewMasterKey(RouterMasterKeyRequest request) throws YarnException, IOException { - return stateStoreClient.storeNewMasterKey(request); + FederationClientMethod clientMethod = new FederationClientMethod<>( + "storeNewMasterKey", + RouterMasterKeyRequest.class, request, + RouterMasterKeyResponse.class, stateStoreClient, clock); + return clientMethod.invoke(); } @Override public RouterMasterKeyResponse removeStoredMasterKey(RouterMasterKeyRequest request) throws YarnException, IOException { - return stateStoreClient.removeStoredMasterKey(request); + FederationClientMethod clientMethod = new FederationClientMethod<>( + "removeStoredMasterKey", + RouterMasterKeyRequest.class, request, + RouterMasterKeyResponse.class, stateStoreClient, clock); + return clientMethod.invoke(); } @Override public RouterMasterKeyResponse getMasterKeyByDelegationKey(RouterMasterKeyRequest request) throws YarnException, IOException { - return stateStoreClient.getMasterKeyByDelegationKey(request); + FederationClientMethod clientMethod = new FederationClientMethod<>( + "getMasterKeyByDelegationKey", + RouterMasterKeyRequest.class, request, + RouterMasterKeyResponse.class, stateStoreClient, clock); + return clientMethod.invoke(); } @Override public RouterRMTokenResponse storeNewToken(RouterRMTokenRequest request) throws YarnException, IOException { - return stateStoreClient.storeNewToken(request); + FederationClientMethod clientMethod = new FederationClientMethod<>( + "storeNewToken", + RouterRMTokenRequest.class, request, + RouterRMTokenResponse.class, stateStoreClient, clock); + return clientMethod.invoke(); } @Override public RouterRMTokenResponse updateStoredToken(RouterRMTokenRequest request) throws YarnException, IOException { - return stateStoreClient.updateStoredToken(request); + FederationClientMethod clientMethod = new FederationClientMethod<>( + "updateStoredToken", + RouterRMTokenRequest.class, request, + RouterRMTokenResponse.class, stateStoreClient, clock); + return clientMethod.invoke(); } @Override public RouterRMTokenResponse removeStoredToken(RouterRMTokenRequest request) throws YarnException, IOException { - return stateStoreClient.removeStoredToken(request); + FederationClientMethod clientMethod = new FederationClientMethod<>( + "removeStoredToken", + RouterRMTokenRequest.class, request, + RouterRMTokenResponse.class, stateStoreClient, clock); + return clientMethod.invoke(); } @Override public RouterRMTokenResponse getTokenByRouterStoreToken(RouterRMTokenRequest request) throws YarnException, IOException { - return stateStoreClient.getTokenByRouterStoreToken(request); + FederationClientMethod clientMethod = new FederationClientMethod<>( + "getTokenByRouterStoreToken", + RouterRMTokenRequest.class, request, + RouterRMTokenResponse.class, stateStoreClient, clock); + return clientMethod.invoke(); } @Override @@ -612,5 +716,4 @@ private boolean isApplicationNeedClean(ApplicationId applicationId) { } return true; } - -} +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreServiceMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreServiceMetrics.java new file mode 100644 index 0000000000..f2312fd5e1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreServiceMetrics.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.federation; + +import org.apache.commons.lang3.ArrayUtils; +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableCounterLong; +import org.apache.hadoop.metrics2.lib.MutableQuantiles; +import org.apache.hadoop.metrics2.lib.MutableRate; +import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Method; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.hadoop.metrics2.lib.Interns.info; + +@Metrics(about = "Metrics for FederationStateStoreService", context = "fedr") +public final class FederationStateStoreServiceMetrics { + + public static final Logger LOG = + LoggerFactory.getLogger(FederationStateStoreServiceMetrics.class); + + private static final MetricsInfo RECORD_INFO = + info("FederationStateStoreServiceMetrics", "Metrics for the RM FederationStateStoreService"); + + private static volatile FederationStateStoreServiceMetrics instance = null; + private MetricsRegistry registry; + + private final static Method[] STATESTORE_API_METHODS = FederationStateStore.class.getMethods(); + + // Map method names to counter objects + private static final Map FAILED_CALLS = new HashMap<>(); + private static final Map SUCCESSFUL_CALLS = new HashMap<>(); + // Provide quantile latency for each api call. + private static final Map QUANTILE_METRICS = new HashMap<>(); + + // Error string templates for logging calls from methods not in + // FederationStateStore API + private static final String UNKOWN_FAIL_ERROR_MSG = + "Not recording failed call for unknown FederationStateStore method {}"; + private static final String UNKNOWN_SUCCESS_ERROR_MSG = + "Not recording successful call for unknown FederationStateStore method {}"; + + /** + * Initialize the singleton instance. + * + * @return the singleton + */ + public static FederationStateStoreServiceMetrics getMetrics() { + synchronized (FederationStateStoreServiceMetrics.class) { + if (instance == null) { + instance = DefaultMetricsSystem.instance() + .register(new FederationStateStoreServiceMetrics()); + } + } + return instance; + } + + private FederationStateStoreServiceMetrics() { + registry = new MetricsRegistry(RECORD_INFO); + registry.tag(RECORD_INFO, "FederationStateStoreServiceMetrics"); + + // Create the metrics for each method and put them into the map + for (Method m : STATESTORE_API_METHODS) { + String methodName = m.getName(); + LOG.debug("Registering Federation StateStore Service metrics for {}", methodName); + + // This metric only records the number of failed calls; it does not + // capture latency information + FAILED_CALLS.put(methodName, registry.newCounter(methodName + "NumFailedCalls", + "# failed calls to " + methodName, 0L)); + + // This metric records both the number and average latency of successful + // calls. + SUCCESSFUL_CALLS.put(methodName, registry.newRate(methodName + "SuccessfulCalls", + "# successful calls and latency(ms) for" + methodName)); + + // This metric records the quantile-based latency of each successful call, + // re-sampled every 10 seconds. + QUANTILE_METRICS.put(methodName, registry.newQuantiles(methodName + "Latency", + "Quantile latency (ms) for " + methodName, "ops", "latency", 10)); + } + } + + // Aggregate metrics are shared, and don't have to be looked up per call + @Metric("Total number of successful calls and latency(ms)") + private static MutableRate totalSucceededCalls; + + @Metric("Total number of failed StateStore calls") + private static MutableCounterLong totalFailedCalls; + + public static void failedStateStoreServiceCall() { + String methodName = Thread.currentThread().getStackTrace()[2].getMethodName(); + MutableCounterLong methodMetric = FAILED_CALLS.get(methodName); + + if (methodMetric == null) { + LOG.error(UNKOWN_FAIL_ERROR_MSG, methodName); + return; + } + + totalFailedCalls.incr(); + methodMetric.incr(); + } + + public static void failedStateStoreServiceCall(String methodName) { + MutableCounterLong methodMetric = FAILED_CALLS.get(methodName); + if (methodMetric == null) { + LOG.error(UNKOWN_FAIL_ERROR_MSG, methodName); + return; + } + totalFailedCalls.incr(); + methodMetric.incr(); + } + + public static void succeededStateStoreServiceCall(long duration) { + StackTraceElement[] stackTraceElements = Thread.currentThread().getStackTrace(); + if (ArrayUtils.isNotEmpty(stackTraceElements) && stackTraceElements.length > 2) { + String methodName = Thread.currentThread().getStackTrace()[2].getMethodName(); + if(SUCCESSFUL_CALLS.containsKey(methodName)) { + succeededStateStoreServiceCall(methodName, duration); + } else { + LOG.error(UNKNOWN_SUCCESS_ERROR_MSG, methodName); + } + } else { + LOG.error("stackTraceElements is empty or length < 2."); + } + } + + public static void succeededStateStoreServiceCall(String methodName, long duration) { + if (SUCCESSFUL_CALLS.containsKey(methodName)) { + MutableRate methodMetric = SUCCESSFUL_CALLS.get(methodName); + MutableQuantiles methodQuantileMetric = QUANTILE_METRICS.get(methodName); + if (methodMetric == null || methodQuantileMetric == null) { + LOG.error(UNKNOWN_SUCCESS_ERROR_MSG, methodName); + return; + } + totalSucceededCalls.add(duration); + methodMetric.add(duration); + methodQuantileMetric.add(duration); + } + } + + // Getters for unit testing + @VisibleForTesting + public static long getNumFailedCallsForMethod(String methodName) { + return FAILED_CALLS.get(methodName).value(); + } + + @VisibleForTesting + public static long getNumSucceessfulCallsForMethod(String methodName) { + return SUCCESSFUL_CALLS.get(methodName).lastStat().numSamples(); + } + + @VisibleForTesting + public static double getLatencySucceessfulCallsForMethod(String methodName) { + return SUCCESSFUL_CALLS.get(methodName).lastStat().mean(); + } + + @VisibleForTesting + public static long getNumFailedCalls() { + return totalFailedCalls.value(); + } + + @VisibleForTesting + public static long getNumSucceededCalls() { + return totalSucceededCalls.lastStat().numSamples(); + } + + @VisibleForTesting + public static double getLatencySucceededCalls() { + return totalSucceededCalls.lastStat().mean(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java index b8e2ce6ef3..9a85315628 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java @@ -38,6 +38,8 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; +import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager; import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreException; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest; @@ -52,6 +54,17 @@ import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse; import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest; import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; +import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatResponse; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.RMAppManager; @@ -89,6 +102,7 @@ public class TestFederationRMStateStoreService { private long lastHearbeatTS = 0; private JSONJAXBContext jc; private JSONUnmarshaller unmarshaller; + private MockRM mockRM; @Before public void setUp() throws IOException, YarnException, JAXBException { @@ -97,12 +111,23 @@ public void setUp() throws IOException, YarnException, JAXBException { JSONConfiguration.mapped().rootUnwrapping(false).build(), ClusterMetricsInfo.class); unmarshaller = jc.createJSONUnmarshaller(); + + conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true); + conf.setInt(YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY, 10); + conf.set(YarnConfiguration.RM_CLUSTER_ID, subClusterId.getId()); + + // set up MockRM + mockRM = new MockRM(conf); + mockRM.init(conf); + mockRM.start(); } @After public void tearDown() throws Exception { unmarshaller = null; jc = null; + mockRM.stop(); + mockRM = null; } @Test @@ -250,10 +275,8 @@ public void testCleanUpApplication() throws Exception { // init subCluster Heartbeat, // and check that the subCluster is in a running state - FederationStateStoreService stateStoreService = - rm.getFederationStateStoreService(); - FederationStateStoreHeartbeat storeHeartbeat = - stateStoreService.getStateStoreHeartbeatThread(); + FederationStateStoreService stateStoreService = rm.getFederationStateStoreService(); + FederationStateStoreHeartbeat storeHeartbeat = stateStoreService.getStateStoreHeartbeatThread(); storeHeartbeat.run(); checkSubClusterInfo(SubClusterState.SC_RUNNING); @@ -482,4 +505,149 @@ private void addApplication2RMAppManager(MockRM rm, ApplicationId appId) { rmAppMaps.putIfAbsent(application.getApplicationId(), application); } + + + @Test + public void testPolicyConfigurationMethod() throws YarnException { + + // This test case tests 3 methods. + // 1.setPolicyConfiguration + // 2.getPolicyConfiguration + // 3.getPolicyConfigurations + FederationStateStoreService stateStoreService = mockRM.getFederationStateStoreService(); + + // set queue basic information (queue1) + String queue1 = "queue1"; + SubClusterPolicyConfiguration requestPolicyConf1 = getUniformPolicy(queue1); + SetSubClusterPolicyConfigurationRequest configurationRequest1 = + SetSubClusterPolicyConfigurationRequest.newInstance(requestPolicyConf1); + // store policy configuration (queue1) + stateStoreService.setPolicyConfiguration(configurationRequest1); + + // set queue basic information (queue2) + String queue2 = "queue2"; + SubClusterPolicyConfiguration requestPolicyConf2 = getUniformPolicy(queue2); + SetSubClusterPolicyConfigurationRequest configurationRequest2 = + SetSubClusterPolicyConfigurationRequest.newInstance(requestPolicyConf2); + // store policy configuration (queue1) + stateStoreService.setPolicyConfiguration(configurationRequest2); + + // get policy configuration + GetSubClusterPolicyConfigurationRequest request1 = + GetSubClusterPolicyConfigurationRequest.newInstance(queue1); + GetSubClusterPolicyConfigurationResponse response = + stateStoreService.getPolicyConfiguration(request1); + Assert.assertNotNull(response); + + SubClusterPolicyConfiguration responsePolicyConf = + response.getPolicyConfiguration(); + Assert.assertNotNull(responsePolicyConf); + Assert.assertEquals(requestPolicyConf1, responsePolicyConf); + + // get policy configurations + GetSubClusterPoliciesConfigurationsRequest policiesRequest1 = + GetSubClusterPoliciesConfigurationsRequest.newInstance(); + GetSubClusterPoliciesConfigurationsResponse policiesResponse1 = + stateStoreService.getPoliciesConfigurations(policiesRequest1); + Assert.assertNotNull(policiesResponse1); + + List policiesConfigs = policiesResponse1.getPoliciesConfigs(); + Assert.assertNotNull(policiesConfigs); + Assert.assertEquals(2, policiesConfigs.size()); + Assert.assertTrue(policiesConfigs.contains(requestPolicyConf1)); + Assert.assertTrue(policiesConfigs.contains(requestPolicyConf2)); + } + + public SubClusterPolicyConfiguration getUniformPolicy(String queue) + throws FederationPolicyInitializationException { + UniformBroadcastPolicyManager wfp = new UniformBroadcastPolicyManager(); + wfp.setQueue(queue); + SubClusterPolicyConfiguration fpc = wfp.serializeConf(); + return fpc; + } + + @Test + public void testSubClusterMethod() throws YarnException { + + // This test case tests 5 methods. + // 1.registerSubCluster + // 2.deregisterSubCluster + // 3.subClusterHeartbeat + // 4.getSubCluster + // 5.getSubClusters + + FederationStateStoreService stateStoreService = + mockRM.getFederationStateStoreService(); + + // registerSubCluster subCluster1 + SubClusterId subClusterId1 = SubClusterId.newInstance("SC1"); + SubClusterInfo subClusterInfo1 = createSubClusterInfo(subClusterId1); + + SubClusterRegisterRequest registerRequest1 = + SubClusterRegisterRequest.newInstance(subClusterInfo1); + stateStoreService.registerSubCluster(registerRequest1); + + // registerSubCluster subCluster2 + SubClusterId subClusterId2 = SubClusterId.newInstance("SC2"); + SubClusterInfo subClusterInfo2 = createSubClusterInfo(subClusterId2); + + SubClusterRegisterRequest registerRequest2 = + SubClusterRegisterRequest.newInstance(subClusterInfo2); + stateStoreService.registerSubCluster(registerRequest2); + + // getSubCluster subCluster1 + GetSubClusterInfoRequest subClusterRequest = + GetSubClusterInfoRequest.newInstance(subClusterId1); + GetSubClusterInfoResponse subClusterResponse = + stateStoreService.getSubCluster(subClusterRequest); + Assert.assertNotNull(subClusterResponse); + + // We query subCluster1, we want to get SubClusterInfo of subCluster1 + SubClusterInfo subClusterInfo1Resp = subClusterResponse.getSubClusterInfo(); + Assert.assertNotNull(subClusterInfo1Resp); + Assert.assertEquals(subClusterInfo1, subClusterInfo1Resp); + + // We call the getSubClusters method and filter the Active SubCluster + // subCluster1 and subCluster2 are just registered, they are in NEW state, + // so we will get 0 active subclusters + GetSubClustersInfoRequest subClustersInfoRequest = + GetSubClustersInfoRequest.newInstance(true); + GetSubClustersInfoResponse subClustersInfoResp = + stateStoreService.getSubClusters(subClustersInfoRequest); + Assert.assertNotNull(subClustersInfoResp); + List subClusterInfos = subClustersInfoResp.getSubClusters(); + Assert.assertNotNull(subClusterInfos); + Assert.assertEquals(0, subClusterInfos.size()); + + // We let subCluster1 heartbeat and set subCluster1 to Running state + SubClusterHeartbeatRequest heartbeatRequest = + SubClusterHeartbeatRequest.newInstance(subClusterId1, SubClusterState.SC_RUNNING, + "capability"); + SubClusterHeartbeatResponse heartbeatResponse = + stateStoreService.subClusterHeartbeat(heartbeatRequest); + Assert.assertNotNull(heartbeatResponse); + + // We call the getSubClusters method again and filter the Active SubCluster + // We want to get 1 active SubCluster + GetSubClustersInfoRequest subClustersInfoRequest1 = + GetSubClustersInfoRequest.newInstance(true); + GetSubClustersInfoResponse subClustersInfoResp1 = + stateStoreService.getSubClusters(subClustersInfoRequest1); + Assert.assertNotNull(subClustersInfoResp1); + List subClusterInfos1 = subClustersInfoResp1.getSubClusters(); + Assert.assertNotNull(subClusterInfos1); + Assert.assertEquals(1, subClusterInfos1.size()); + } + + private SubClusterInfo createSubClusterInfo(SubClusterId clusterId) { + + String amRMAddress = "1.2.3.4:1"; + String clientRMAddress = "1.2.3.4:2"; + String rmAdminAddress = "1.2.3.4:3"; + String webAppAddress = "1.2.3.4:4"; + + return SubClusterInfo.newInstance(clusterId, amRMAddress, + clientRMAddress, rmAdminAddress, webAppAddress, SubClusterState.SC_NEW, + Time.now(), "capability"); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationStateStoreServiceMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationStateStoreServiceMetrics.java new file mode 100644 index 0000000000..e7a79b843e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationStateStoreServiceMetrics.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.federation; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.Assert.assertEquals; + +/** + * Unit tests for TestFederationStateStoreServiceMetrics. + */ +public class TestFederationStateStoreServiceMetrics { + + public static final Logger LOG = + LoggerFactory.getLogger(TestFederationStateStoreServiceMetrics.class); + + private static FederationStateStoreServiceMetrics metrics = + FederationStateStoreServiceMetrics.getMetrics(); + + private MockBadFederationStateStoreService badStateStore = + new MockBadFederationStateStoreService(); + private MockGoodFederationStateStoreService goodStateStore = + new MockGoodFederationStateStoreService(); + + // Records failures for all calls + private class MockBadFederationStateStoreService { + public void registerSubCluster() { + LOG.info("Mocked: failed registerSubCluster call"); + FederationStateStoreServiceMetrics.failedStateStoreServiceCall(); + } + } + + // Records successes for all calls + private class MockGoodFederationStateStoreService { + public void registerSubCluster(long duration) { + LOG.info("Mocked: successful registerSubCluster call with duration {}", duration); + FederationStateStoreServiceMetrics.succeededStateStoreServiceCall(duration); + } + } + + @Test + public void testFederationStateStoreServiceMetricInit() { + LOG.info("Test: aggregate metrics are initialized correctly"); + assertEquals(0, FederationStateStoreServiceMetrics.getNumSucceededCalls()); + assertEquals(0, FederationStateStoreServiceMetrics.getNumFailedCalls()); + LOG.info("Test: aggregate metrics are updated correctly"); + } + + @Test + public void testRegisterSubClusterSuccessfulCalls() { + LOG.info("Test: Aggregate and method successful calls updated correctly."); + + long totalGoodBefore = FederationStateStoreServiceMetrics.getNumSucceededCalls(); + long apiGoodBefore = FederationStateStoreServiceMetrics. + getNumSucceessfulCallsForMethod("registerSubCluster"); + + // Call the registerSubCluster method + goodStateStore.registerSubCluster(100); + + assertEquals(totalGoodBefore + 1, + FederationStateStoreServiceMetrics.getNumSucceededCalls()); + assertEquals(100, FederationStateStoreServiceMetrics.getLatencySucceededCalls(), 0); + assertEquals(apiGoodBefore + 1, + FederationStateStoreServiceMetrics.getNumSucceededCalls()); + double latencySucceessfulCalls = + FederationStateStoreServiceMetrics.getLatencySucceessfulCallsForMethod( + "registerSubCluster"); + assertEquals(100, latencySucceessfulCalls, 0); + + LOG.info("Test: Running stats correctly calculated for 2 metrics"); + + // Call the registerSubCluster method + goodStateStore.registerSubCluster(200); + + assertEquals(totalGoodBefore + 2, + FederationStateStoreServiceMetrics.getNumSucceededCalls()); + assertEquals(150, FederationStateStoreServiceMetrics.getLatencySucceededCalls(), 0); + assertEquals(apiGoodBefore + 2, + FederationStateStoreServiceMetrics.getNumSucceededCalls()); + double latencySucceessfulCalls2 = + FederationStateStoreServiceMetrics.getLatencySucceessfulCallsForMethod( + "registerSubCluster"); + assertEquals(150, latencySucceessfulCalls2, 0); + } +}