HDFS-16959. RBF: State store cache loading metrics (#5497)
This commit is contained in:
parent
5bc8f25327
commit
b4bcbb9515
@ -593,7 +593,7 @@ StateStoreMetrics
|
|||||||
StateStoreMetrics shows the statistics of the State Store component in Router-based federation.
|
StateStoreMetrics shows the statistics of the State Store component in Router-based federation.
|
||||||
|
|
||||||
| Name | Description |
|
| Name | Description |
|
||||||
|:---- |:---- |
|
|:------------------------------------------|:-----------------------------------------------------------------------------------|
|
||||||
| `ReadsNumOps` | Number of GET transactions for State Store within an interval time of metric |
|
| `ReadsNumOps` | Number of GET transactions for State Store within an interval time of metric |
|
||||||
| `ReadsAvgTime` | Average time of GET transactions for State Store in milliseconds |
|
| `ReadsAvgTime` | Average time of GET transactions for State Store in milliseconds |
|
||||||
| `WritesNumOps` | Number of PUT transactions for State Store within an interval time of metric |
|
| `WritesNumOps` | Number of PUT transactions for State Store within an interval time of metric |
|
||||||
@ -603,6 +603,8 @@ StateStoreMetrics shows the statistics of the State Store component in Router-ba
|
|||||||
| `FailuresNumOps` | Number of failed transactions for State Store within an interval time of metric |
|
| `FailuresNumOps` | Number of failed transactions for State Store within an interval time of metric |
|
||||||
| `FailuresAvgTime` | Average time of failed transactions for State Store in milliseconds |
|
| `FailuresAvgTime` | Average time of failed transactions for State Store in milliseconds |
|
||||||
| `Cache`*BaseRecord*`Size` | Number of store records to cache in State Store |
|
| `Cache`*BaseRecord*`Size` | Number of store records to cache in State Store |
|
||||||
|
| `Cache`*BaseRecord*`LoadNumOps` | Number of times store records are loaded in the State Store Cache from State Store |
|
||||||
|
| `Cache`*BaseRecord*`LoadAvgTime` | Average time of loading State Store Cache from State Store in milliseconds |
|
||||||
|
|
||||||
yarn context
|
yarn context
|
||||||
============
|
============
|
||||||
|
@ -20,6 +20,7 @@
|
|||||||
import static org.apache.hadoop.metrics2.impl.MsInfo.ProcessName;
|
import static org.apache.hadoop.metrics2.impl.MsInfo.ProcessName;
|
||||||
import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId;
|
import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
@ -54,6 +55,7 @@ public class StateStoreMetrics implements StateStoreMBean {
|
|||||||
private MutableRate failures;
|
private MutableRate failures;
|
||||||
|
|
||||||
private Map<String, MutableGaugeInt> cacheSizes;
|
private Map<String, MutableGaugeInt> cacheSizes;
|
||||||
|
private final Map<String, MutableRate> cacheLoadMetrics = new HashMap<>();
|
||||||
|
|
||||||
protected StateStoreMetrics() {}
|
protected StateStoreMetrics() {}
|
||||||
|
|
||||||
@ -150,6 +152,32 @@ public void setLocationCache(String name, long count) {
|
|||||||
counter.set(count);
|
counter.set(count);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the cache loading metrics for the state store interface.
|
||||||
|
*
|
||||||
|
* @param name Name of the record of the cache.
|
||||||
|
* @param value The time duration interval as the cache value.
|
||||||
|
*/
|
||||||
|
public void setCacheLoading(String name, long value) {
|
||||||
|
String cacheLoad = "Cache" + name + "Load";
|
||||||
|
MutableRate cacheLoadMetric = cacheLoadMetrics.get(cacheLoad);
|
||||||
|
if (cacheLoadMetric == null) {
|
||||||
|
cacheLoadMetric = registry.newRate(cacheLoad, name, false);
|
||||||
|
cacheLoadMetrics.put(cacheLoad, cacheLoadMetric);
|
||||||
|
}
|
||||||
|
cacheLoadMetrics.get(cacheLoad).add(value);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Retrieve unmodifiable map of cache loading metrics.
|
||||||
|
*
|
||||||
|
* @return unmodifiable map of cache loading metrics.
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
public Map<String, MutableRate> getCacheLoadMetrics() {
|
||||||
|
return Collections.unmodifiableMap(cacheLoadMetrics);
|
||||||
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public void reset() {
|
public void reset() {
|
||||||
reads.resetMinMax();
|
reads.resetMinMax();
|
||||||
|
@ -113,6 +113,7 @@ public boolean loadCache(boolean force) throws IOException {
|
|||||||
if (force || isUpdateTime()) {
|
if (force || isUpdateTime()) {
|
||||||
List<R> newRecords = null;
|
List<R> newRecords = null;
|
||||||
long t = -1;
|
long t = -1;
|
||||||
|
long startTime = Time.monotonicNow();
|
||||||
try {
|
try {
|
||||||
QueryResult<R> result = getDriver().get(getRecordClass());
|
QueryResult<R> result = getDriver().get(getRecordClass());
|
||||||
newRecords = result.getRecords();
|
newRecords = result.getRecords();
|
||||||
@ -143,6 +144,7 @@ public boolean loadCache(boolean force) throws IOException {
|
|||||||
StateStoreMetrics metrics = getDriver().getMetrics();
|
StateStoreMetrics metrics = getDriver().getMetrics();
|
||||||
if (metrics != null) {
|
if (metrics != null) {
|
||||||
String recordName = getRecordClass().getSimpleName();
|
String recordName = getRecordClass().getSimpleName();
|
||||||
|
metrics.setCacheLoading(recordName, Time.monotonicNow() - startTime);
|
||||||
metrics.setCacheSize(recordName, this.records.size());
|
metrics.setCacheSize(recordName, this.records.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -48,6 +48,8 @@
|
|||||||
import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult;
|
import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult;
|
||||||
import org.apache.hadoop.hdfs.server.federation.store.records.RouterState;
|
import org.apache.hadoop.hdfs.server.federation.store.records.RouterState;
|
||||||
import org.apache.hadoop.hdfs.server.federation.store.records.StateStoreVersion;
|
import org.apache.hadoop.hdfs.server.federation.store.records.StateStoreVersion;
|
||||||
|
import org.apache.hadoop.metrics2.lib.MutableRate;
|
||||||
|
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
@ -76,6 +78,10 @@ protected StateStoreDriver getStateStoreDriver() {
|
|||||||
return stateStore.getDriver();
|
return stateStore.getDriver();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected StateStoreService getStateStoreService() {
|
||||||
|
return stateStore;
|
||||||
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
public void cleanMetrics() {
|
public void cleanMetrics() {
|
||||||
if (stateStore != null) {
|
if (stateStore != null) {
|
||||||
@ -574,6 +580,36 @@ private static Map<String, Class<?>> getFields(BaseRecord record) {
|
|||||||
return getters;
|
return getters;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public long getMountTableCacheLoadSamples(StateStoreDriver driver) throws IOException {
|
||||||
|
final MutableRate mountTableCache = getMountTableCache(driver);
|
||||||
|
return mountTableCache.lastStat().numSamples();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static MutableRate getMountTableCache(StateStoreDriver driver) throws IOException {
|
||||||
|
StateStoreMetrics metrics = stateStore.getMetrics();
|
||||||
|
final Query<MountTable> query = new Query<>(MountTable.newInstance());
|
||||||
|
driver.getMultiple(MountTable.class, query);
|
||||||
|
final Map<String, MutableRate> cacheLoadMetrics = metrics.getCacheLoadMetrics();
|
||||||
|
final MutableRate mountTableCache = cacheLoadMetrics.get("CacheMountTableLoad");
|
||||||
|
assertNotNull("CacheMountTableLoad should be present in the state store metrics",
|
||||||
|
mountTableCache);
|
||||||
|
return mountTableCache;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testCacheLoadMetrics(StateStoreDriver driver, long numRefresh,
|
||||||
|
double expectedHigherThan) throws IOException, IllegalArgumentException {
|
||||||
|
final MutableRate mountTableCache = getMountTableCache(driver);
|
||||||
|
// CacheMountTableLoadNumOps
|
||||||
|
final long mountTableCacheLoadNumOps = getMountTableCacheLoadSamples(driver);
|
||||||
|
assertEquals("Num of samples collected should match", numRefresh, mountTableCacheLoadNumOps);
|
||||||
|
// CacheMountTableLoadAvgTime ms
|
||||||
|
final double mountTableCacheLoadAvgTimeMs = mountTableCache.lastStat().mean();
|
||||||
|
assertTrue(
|
||||||
|
"Mean time duration for cache load is expected to be higher than " + expectedHigherThan
|
||||||
|
+ " ms." + " Actual value: " + mountTableCacheLoadAvgTimeMs,
|
||||||
|
mountTableCacheLoadAvgTimeMs > expectedHigherThan);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the type of field.
|
* Get the type of field.
|
||||||
*
|
*
|
||||||
|
@ -73,4 +73,16 @@ public void testMetrics()
|
|||||||
throws IllegalArgumentException, IllegalAccessException, IOException {
|
throws IllegalArgumentException, IllegalAccessException, IOException {
|
||||||
testMetrics(getStateStoreDriver());
|
testMetrics(getStateStoreDriver());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCacheLoadMetrics() throws IOException {
|
||||||
|
// inject value of CacheMountTableLoad as -1 initially, if tests get CacheMountTableLoadAvgTime
|
||||||
|
// value as -1 ms, that would mean no other sample with value >= 0 would have been received and
|
||||||
|
// hence this would be failure to assert that mount table avg load time is higher than -1
|
||||||
|
getStateStoreService().getMetrics().setCacheLoading("MountTable", -1);
|
||||||
|
long curMountTableLoadNum = getMountTableCacheLoadSamples(getStateStoreDriver());
|
||||||
|
getStateStoreService().refreshCaches(true);
|
||||||
|
testCacheLoadMetrics(getStateStoreDriver(), curMountTableLoadNum + 1, -1);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
@ -115,4 +115,16 @@ public void testInsertWithErrorDuringWrite()
|
|||||||
|
|
||||||
testInsertWithErrorDuringWrite(driver, MembershipState.class);
|
testInsertWithErrorDuringWrite(driver, MembershipState.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCacheLoadMetrics() throws IOException {
|
||||||
|
// inject value of CacheMountTableLoad as -1 initially, if tests get CacheMountTableLoadAvgTime
|
||||||
|
// value as -1 ms, that would mean no other sample with value >= 0 would have been received and
|
||||||
|
// hence this would be failure to assert that mount table avg load time is higher than -1
|
||||||
|
getStateStoreService().getMetrics().setCacheLoading("MountTable", -1);
|
||||||
|
long curMountTableLoadNum = getMountTableCacheLoadSamples(getStateStoreDriver());
|
||||||
|
getStateStoreService().refreshCaches(true);
|
||||||
|
getStateStoreService().refreshCaches(true);
|
||||||
|
testCacheLoadMetrics(getStateStoreDriver(), curMountTableLoadNum + 2, -1);
|
||||||
|
}
|
||||||
}
|
}
|
@ -206,4 +206,18 @@ public void testFetchErrors()
|
|||||||
stateStoreDriver.setEnableConcurrent(true);
|
stateStoreDriver.setEnableConcurrent(true);
|
||||||
testFetchErrors(stateStoreDriver);
|
testFetchErrors(stateStoreDriver);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCacheLoadMetrics() throws IOException {
|
||||||
|
// inject value of CacheMountTableLoad as -1 initially, if tests get CacheMountTableLoadAvgTime
|
||||||
|
// value as -1 ms, that would mean no other sample with value >= 0 would have been received and
|
||||||
|
// hence this would be failure to assert that mount table avg load time is higher than -1
|
||||||
|
getStateStoreService().getMetrics().setCacheLoading("MountTable", -1);
|
||||||
|
long curMountTableLoadNum = getMountTableCacheLoadSamples(getStateStoreDriver());
|
||||||
|
getStateStoreService().refreshCaches(true);
|
||||||
|
getStateStoreService().refreshCaches(true);
|
||||||
|
getStateStoreService().refreshCaches(true);
|
||||||
|
testCacheLoadMetrics(getStateStoreDriver(), curMountTableLoadNum + 3, -1);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
Loading…
Reference in New Issue
Block a user