YARN-10541. capture the performance metrics of ZKRMStateStore (#2568)
This commit is contained in:
parent
513f1995ad
commit
fa4cf91b57
@ -19,6 +19,8 @@
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
|
||||
|
||||
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.yarn.util.Clock;
|
||||
import org.apache.hadoop.yarn.util.SystemClock;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.curator.framework.CuratorFramework;
|
||||
@ -234,6 +236,10 @@ public class ZKRMStateStore extends RMStateStore {
|
||||
/** Manager for the ZooKeeper connection. */
|
||||
private ZKCuratorManager zkManager;
|
||||
|
||||
private volatile Clock clock = SystemClock.getInstance();
|
||||
@VisibleForTesting
|
||||
protected ZKRMStateStoreOpDurations opDurations;
|
||||
|
||||
/*
|
||||
* Indicates different app attempt state store operations.
|
||||
*/
|
||||
@ -329,6 +335,8 @@ public synchronized void initInternal(Configuration conf)
|
||||
appIdNodeSplitIndex = YarnConfiguration.DEFAULT_ZK_APPID_NODE_SPLIT_INDEX;
|
||||
}
|
||||
|
||||
opDurations = ZKRMStateStoreOpDurations.getInstance();
|
||||
|
||||
zkAcl = ZKCuratorManager.getZKAcls(conf);
|
||||
|
||||
if (HAUtil.isHAEnabled(conf)) {
|
||||
@ -518,6 +526,7 @@ public synchronized long getAndIncrementEpoch() throws Exception {
|
||||
|
||||
@Override
|
||||
public synchronized RMState loadState() throws Exception {
|
||||
long start = clock.getTime();
|
||||
RMState rmState = new RMState();
|
||||
// recover DelegationTokenSecretManager
|
||||
loadRMDTSecretManagerState(rmState);
|
||||
@ -529,6 +538,7 @@ public synchronized RMState loadState() throws Exception {
|
||||
loadReservationSystemState(rmState);
|
||||
// recover ProxyCAManager state
|
||||
loadProxyCAManagerState(rmState);
|
||||
opDurations.addLoadStateCallDuration(clock.getTime() - start);
|
||||
return rmState;
|
||||
}
|
||||
|
||||
@ -834,6 +844,7 @@ private void loadProxyCAManagerState(RMState rmState) throws Exception {
|
||||
@Override
|
||||
public synchronized void storeApplicationStateInternal(ApplicationId appId,
|
||||
ApplicationStateData appStateDataPB) throws Exception {
|
||||
long start = clock.getTime();
|
||||
String nodeCreatePath = getLeafAppIdNodePath(appId.toString(), true);
|
||||
|
||||
LOG.debug("Storing info for app: {} at: {}", appId, nodeCreatePath);
|
||||
@ -850,12 +861,14 @@ public synchronized void storeApplicationStateInternal(ApplicationId appId,
|
||||
+ " exceeds the maximum allowed size for application data. "
|
||||
+ "See yarn.resourcemanager.zk-max-znode-size.bytes.");
|
||||
}
|
||||
opDurations.addStoreApplicationStateCallDuration(clock.getTime() - start);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected synchronized void updateApplicationStateInternal(
|
||||
ApplicationId appId, ApplicationStateData appStateDataPB)
|
||||
throws Exception {
|
||||
long start = clock.getTime();
|
||||
String nodeUpdatePath = getLeafAppIdNodePath(appId.toString(), false);
|
||||
boolean pathExists = true;
|
||||
// Look for paths based on other split indices if path as per split index
|
||||
@ -892,6 +905,7 @@ protected synchronized void updateApplicationStateInternal(
|
||||
LOG.debug("Path {} for {} didn't exist. Creating a new znode to update"
|
||||
+ " the application state.", nodeUpdatePath, appId);
|
||||
}
|
||||
opDurations.addUpdateApplicationStateCallDuration(clock.getTime() - start);
|
||||
}
|
||||
|
||||
/*
|
||||
@ -976,8 +990,10 @@ protected synchronized void removeApplicationAttemptInternal(
|
||||
@Override
|
||||
protected synchronized void removeApplicationStateInternal(
|
||||
ApplicationStateData appState) throws Exception {
|
||||
long start = clock.getTime();
|
||||
removeApp(appState.getApplicationSubmissionContext().
|
||||
getApplicationId().toString(), true, appState.attempts.keySet());
|
||||
opDurations.addRemoveApplicationStateCallDuration(clock.getTime() - start);
|
||||
}
|
||||
|
||||
private void removeApp(String removeAppId) throws Exception {
|
||||
|
@ -0,0 +1,97 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.metrics2.MetricsCollector;
|
||||
import org.apache.hadoop.metrics2.MetricsInfo;
|
||||
import org.apache.hadoop.metrics2.MetricsSource;
|
||||
import org.apache.hadoop.metrics2.MetricsSystem;
|
||||
import org.apache.hadoop.metrics2.annotation.Metric;
|
||||
import org.apache.hadoop.metrics2.annotation.Metrics;
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
|
||||
|
||||
import static org.apache.hadoop.metrics2.lib.Interns.info;
|
||||
import org.apache.hadoop.metrics2.lib.MutableRate;
|
||||
|
||||
/**
|
||||
* Class to capture the performance metrics of ZKRMStateStore.
|
||||
* This should be a singleton.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
@Metrics(context="ZKRMStateStore-op-durations")
|
||||
public final class ZKRMStateStoreOpDurations implements MetricsSource {
|
||||
|
||||
@Metric("Duration for a load state call")
|
||||
MutableRate loadStateCall;
|
||||
|
||||
@Metric("Duration for a store application state call")
|
||||
MutableRate storeApplicationStateCall;
|
||||
|
||||
@Metric("Duration for a update application state call")
|
||||
MutableRate updateApplicationStateCall;
|
||||
|
||||
@Metric("Duration to handle a remove application state call")
|
||||
MutableRate removeApplicationStateCall;
|
||||
|
||||
protected static final MetricsInfo RECORD_INFO =
|
||||
info("ZKRMStateStoreOpDurations", "Durations of ZKRMStateStore calls");
|
||||
|
||||
private final MetricsRegistry registry;
|
||||
|
||||
private static final ZKRMStateStoreOpDurations INSTANCE
|
||||
= new ZKRMStateStoreOpDurations();
|
||||
|
||||
public static ZKRMStateStoreOpDurations getInstance() {
|
||||
return INSTANCE;
|
||||
}
|
||||
|
||||
private ZKRMStateStoreOpDurations() {
|
||||
registry = new MetricsRegistry(RECORD_INFO);
|
||||
registry.tag(RECORD_INFO, "ZKRMStateStoreOpDurations");
|
||||
|
||||
MetricsSystem ms = DefaultMetricsSystem.instance();
|
||||
if (ms != null) {
|
||||
ms.register(RECORD_INFO.name(), RECORD_INFO.description(), this);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void getMetrics(MetricsCollector collector, boolean all) {
|
||||
registry.snapshot(collector.addRecord(registry.info()), all);
|
||||
}
|
||||
|
||||
public void addLoadStateCallDuration(long value) {
|
||||
loadStateCall.add(value);
|
||||
}
|
||||
|
||||
public void addStoreApplicationStateCallDuration(long value) {
|
||||
storeApplicationStateCall.add(value);
|
||||
}
|
||||
|
||||
public void addUpdateApplicationStateCallDuration(long value) {
|
||||
updateApplicationStateCall.add(value);
|
||||
}
|
||||
|
||||
public void addRemoveApplicationStateCallDuration(long value) {
|
||||
removeApplicationStateCall.add(value);
|
||||
}
|
||||
}
|
@ -22,6 +22,9 @@
|
||||
import org.apache.curator.framework.CuratorFrameworkFactory;
|
||||
import org.apache.curator.retry.RetryNTimes;
|
||||
import org.apache.curator.test.TestingServer;
|
||||
import org.apache.hadoop.metrics2.MetricsRecord;
|
||||
import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl;
|
||||
import org.apache.hadoop.metrics2.impl.MetricsRecords;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
@ -1567,4 +1570,40 @@ public void testAppSubmissionContextIsPrunedInFinalApplicationState()
|
||||
Collections.emptyMap(), ctx.getApplicationSchedulingPropertiesMap());
|
||||
store.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMetricsInited() throws Exception {
|
||||
TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
|
||||
Configuration conf = createConfForDelegationTokenNodeSplit(1);
|
||||
MetricsCollectorImpl collector = new MetricsCollectorImpl();
|
||||
ZKRMStateStoreOpDurations opDurations =
|
||||
((ZKRMStateStore)zkTester.getRMStateStore(conf)).opDurations;
|
||||
|
||||
long anyDuration = 10;
|
||||
opDurations.addLoadStateCallDuration(anyDuration);
|
||||
opDurations.addStoreApplicationStateCallDuration(anyDuration);
|
||||
opDurations.addUpdateApplicationStateCallDuration(anyDuration);
|
||||
opDurations.addRemoveApplicationStateCallDuration(anyDuration);
|
||||
|
||||
Thread.sleep(110);
|
||||
|
||||
opDurations.getMetrics(collector, true);
|
||||
assertEquals("Incorrect number of perf metrics", 1,
|
||||
collector.getRecords().size());
|
||||
MetricsRecord record = collector.getRecords().get(0);
|
||||
MetricsRecords.assertTag(record,
|
||||
ZKRMStateStoreOpDurations.RECORD_INFO.name(),
|
||||
"ZKRMStateStoreOpDurations");
|
||||
|
||||
double expectAvgTime = anyDuration;
|
||||
MetricsRecords.assertMetric(record,
|
||||
"LoadStateCallAvgTime", expectAvgTime);
|
||||
MetricsRecords.assertMetric(record,
|
||||
"StoreApplicationStateCallAvgTime", expectAvgTime);
|
||||
MetricsRecords.assertMetric(record,
|
||||
"UpdateApplicationStateCallAvgTime", expectAvgTime);
|
||||
MetricsRecords.assertMetric(record,
|
||||
"RemoveApplicationStateCallAvgTime", expectAvgTime);
|
||||
}
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user