diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index 7f39fe14b9..55c9d9eeae 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.SystemClock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.curator.framework.CuratorFramework; @@ -234,6 +236,10 @@ public class ZKRMStateStore extends RMStateStore { /** Manager for the ZooKeeper connection. */ private ZKCuratorManager zkManager; + private volatile Clock clock = SystemClock.getInstance(); + @VisibleForTesting + protected ZKRMStateStoreOpDurations opDurations; + /* * Indicates different app attempt state store operations. */ @@ -329,6 +335,8 @@ public synchronized void initInternal(Configuration conf) appIdNodeSplitIndex = YarnConfiguration.DEFAULT_ZK_APPID_NODE_SPLIT_INDEX; } + opDurations = ZKRMStateStoreOpDurations.getInstance(); + zkAcl = ZKCuratorManager.getZKAcls(conf); if (HAUtil.isHAEnabled(conf)) { @@ -518,6 +526,7 @@ public synchronized long getAndIncrementEpoch() throws Exception { @Override public synchronized RMState loadState() throws Exception { + long start = clock.getTime(); RMState rmState = new RMState(); // recover DelegationTokenSecretManager loadRMDTSecretManagerState(rmState); @@ -529,6 +538,7 @@ public synchronized RMState loadState() throws Exception { loadReservationSystemState(rmState); // recover ProxyCAManager state loadProxyCAManagerState(rmState); + opDurations.addLoadStateCallDuration(clock.getTime() - start); return rmState; } @@ -834,6 +844,7 @@ private void loadProxyCAManagerState(RMState rmState) throws Exception { @Override public synchronized void storeApplicationStateInternal(ApplicationId appId, ApplicationStateData appStateDataPB) throws Exception { + long start = clock.getTime(); String nodeCreatePath = getLeafAppIdNodePath(appId.toString(), true); LOG.debug("Storing info for app: {} at: {}", appId, nodeCreatePath); @@ -850,12 +861,14 @@ public synchronized void storeApplicationStateInternal(ApplicationId appId, + " exceeds the maximum allowed size for application data. " + "See yarn.resourcemanager.zk-max-znode-size.bytes."); } + opDurations.addStoreApplicationStateCallDuration(clock.getTime() - start); } @Override protected synchronized void updateApplicationStateInternal( ApplicationId appId, ApplicationStateData appStateDataPB) throws Exception { + long start = clock.getTime(); String nodeUpdatePath = getLeafAppIdNodePath(appId.toString(), false); boolean pathExists = true; // Look for paths based on other split indices if path as per split index @@ -892,6 +905,7 @@ protected synchronized void updateApplicationStateInternal( LOG.debug("Path {} for {} didn't exist. Creating a new znode to update" + " the application state.", nodeUpdatePath, appId); } + opDurations.addUpdateApplicationStateCallDuration(clock.getTime() - start); } /* @@ -976,8 +990,10 @@ protected synchronized void removeApplicationAttemptInternal( @Override protected synchronized void removeApplicationStateInternal( ApplicationStateData appState) throws Exception { + long start = clock.getTime(); removeApp(appState.getApplicationSubmissionContext(). getApplicationId().toString(), true, appState.attempts.keySet()); + opDurations.addRemoveApplicationStateCallDuration(clock.getTime() - start); } private void removeApp(String removeAppId) throws Exception { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStoreOpDurations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStoreOpDurations.java new file mode 100644 index 0000000000..f1ec2419f7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStoreOpDurations.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.recovery; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; + +import static org.apache.hadoop.metrics2.lib.Interns.info; +import org.apache.hadoop.metrics2.lib.MutableRate; + +/** + * Class to capture the performance metrics of ZKRMStateStore. + * This should be a singleton. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +@Metrics(context="ZKRMStateStore-op-durations") +public final class ZKRMStateStoreOpDurations implements MetricsSource { + + @Metric("Duration for a load state call") + MutableRate loadStateCall; + + @Metric("Duration for a store application state call") + MutableRate storeApplicationStateCall; + + @Metric("Duration for a update application state call") + MutableRate updateApplicationStateCall; + + @Metric("Duration to handle a remove application state call") + MutableRate removeApplicationStateCall; + + protected static final MetricsInfo RECORD_INFO = + info("ZKRMStateStoreOpDurations", "Durations of ZKRMStateStore calls"); + + private final MetricsRegistry registry; + + private static final ZKRMStateStoreOpDurations INSTANCE + = new ZKRMStateStoreOpDurations(); + + public static ZKRMStateStoreOpDurations getInstance() { + return INSTANCE; + } + + private ZKRMStateStoreOpDurations() { + registry = new MetricsRegistry(RECORD_INFO); + registry.tag(RECORD_INFO, "ZKRMStateStoreOpDurations"); + + MetricsSystem ms = DefaultMetricsSystem.instance(); + if (ms != null) { + ms.register(RECORD_INFO.name(), RECORD_INFO.description(), this); + } + } + + @Override + public synchronized void getMetrics(MetricsCollector collector, boolean all) { + registry.snapshot(collector.addRecord(registry.info()), all); + } + + public void addLoadStateCallDuration(long value) { + loadStateCall.add(value); + } + + public void addStoreApplicationStateCallDuration(long value) { + storeApplicationStateCall.add(value); + } + + public void addUpdateApplicationStateCallDuration(long value) { + updateApplicationStateCall.add(value); + } + + public void addRemoveApplicationStateCallDuration(long value) { + removeApplicationStateCall.add(value); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java index 69b9be26e0..7ffaba5eb6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java @@ -22,6 +22,9 @@ import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.RetryNTimes; import org.apache.curator.test.TestingServer; +import org.apache.hadoop.metrics2.MetricsRecord; +import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl; +import org.apache.hadoop.metrics2.impl.MetricsRecords; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -1567,4 +1570,40 @@ public void testAppSubmissionContextIsPrunedInFinalApplicationState() Collections.emptyMap(), ctx.getApplicationSchedulingPropertiesMap()); store.close(); } + + @Test + public void testMetricsInited() throws Exception { + TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester(); + Configuration conf = createConfForDelegationTokenNodeSplit(1); + MetricsCollectorImpl collector = new MetricsCollectorImpl(); + ZKRMStateStoreOpDurations opDurations = + ((ZKRMStateStore)zkTester.getRMStateStore(conf)).opDurations; + + long anyDuration = 10; + opDurations.addLoadStateCallDuration(anyDuration); + opDurations.addStoreApplicationStateCallDuration(anyDuration); + opDurations.addUpdateApplicationStateCallDuration(anyDuration); + opDurations.addRemoveApplicationStateCallDuration(anyDuration); + + Thread.sleep(110); + + opDurations.getMetrics(collector, true); + assertEquals("Incorrect number of perf metrics", 1, + collector.getRecords().size()); + MetricsRecord record = collector.getRecords().get(0); + MetricsRecords.assertTag(record, + ZKRMStateStoreOpDurations.RECORD_INFO.name(), + "ZKRMStateStoreOpDurations"); + + double expectAvgTime = anyDuration; + MetricsRecords.assertMetric(record, + "LoadStateCallAvgTime", expectAvgTime); + MetricsRecords.assertMetric(record, + "StoreApplicationStateCallAvgTime", expectAvgTime); + MetricsRecords.assertMetric(record, + "UpdateApplicationStateCallAvgTime", expectAvgTime); + MetricsRecords.assertMetric(record, + "RemoveApplicationStateCallAvgTime", expectAvgTime); + } + }