YARN-10893. Adding metrics for getClusterMetrics and getApplications APIs in FederationClientInterceptor (#3325)

This commit is contained in:
Akshat Bordia 2021-09-09 21:50:57 +05:30 committed by GitHub
parent 99cb2b6b7f
commit dee6dc2f89
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 104 additions and 6 deletions

View File

@ -53,6 +53,8 @@ public final class RouterMetrics {
private MutableGaugeInt numMultipleAppsFailedRetrieved;
@Metric("# of applicationAttempt reports failed to be retrieved")
private MutableGaugeInt numAppAttemptsFailedRetrieved;
@Metric("# of getClusterMetrics failed to be retrieved")
private MutableGaugeInt numGetClusterMetricsFailedRetrieved;
// Aggregate metrics are shared, and don't have to be looked up per call
@Metric("Total number of successful Submitted apps and latency(ms)")
@ -69,6 +71,9 @@ public final class RouterMetrics {
@Metric("Total number of successful Retrieved " +
"appAttempt reports and latency(ms)")
private MutableRate totalSucceededAppAttemptsRetrieved;
@Metric("Total number of successful Retrieved getClusterMetrics and "
+ "latency(ms)")
private MutableRate totalSucceededGetClusterMetricsRetrieved;
/**
@ -80,6 +85,7 @@ public final class RouterMetrics {
private MutableQuantiles getApplicationReportLatency;
private MutableQuantiles getApplicationsReportLatency;
private MutableQuantiles getApplicationAttemptReportLatency;
private MutableQuantiles getClusterMetricsLatency;
private static volatile RouterMetrics INSTANCE = null;
private static MetricsRegistry registry;
@ -103,6 +109,9 @@ private RouterMetrics() {
registry.newQuantiles("getApplicationAttemptReportLatency",
"latency of get applicationattempt " +
"report", "ops", "latency", 10);
getClusterMetricsLatency =
registry.newQuantiles("getClusterMetricsLatency",
"latency of get cluster metrics", "ops", "latency", 10);
}
public static RouterMetrics getMetrics() {
@ -154,6 +163,11 @@ public long getNumSucceededMultipleAppsRetrieved() {
return totalSucceededMultipleAppsRetrieved.lastStat().numSamples();
}
@VisibleForTesting
public long getNumSucceededGetClusterMetricsRetrieved(){
return totalSucceededGetClusterMetricsRetrieved.lastStat().numSamples();
}
@VisibleForTesting
public double getLatencySucceededAppsCreated() {
return totalSucceededAppsCreated.lastStat().mean();
@ -184,6 +198,11 @@ public double getLatencySucceededMultipleGetAppReport() {
return totalSucceededMultipleAppsRetrieved.lastStat().mean();
}
@VisibleForTesting
public double getLatencySucceededGetClusterMetricsRetrieved() {
return totalSucceededGetClusterMetricsRetrieved.lastStat().mean();
}
@VisibleForTesting
public int getAppsFailedCreated() {
return numAppsFailedCreated.value();
@ -214,6 +233,11 @@ public int getMultipleAppsFailedRetrieved() {
return numMultipleAppsFailedRetrieved.value();
}
@VisibleForTesting
public int getClusterMetricsFailedRetrieved() {
return numGetClusterMetricsFailedRetrieved.value();
}
public void succeededAppsCreated(long duration) {
totalSucceededAppsCreated.add(duration);
getNewApplicationLatency.add(duration);
@ -244,6 +268,11 @@ public void succeededAppAttemptsRetrieved(long duration) {
getApplicationAttemptReportLatency.add(duration);
}
public void succeededGetClusterMetricsRetrieved(long duration) {
totalSucceededGetClusterMetricsRetrieved.add(duration);
getClusterMetricsLatency.add(duration);
}
public void incrAppsFailedCreated() {
numAppsFailedCreated.incr();
}
@ -268,4 +297,8 @@ public void incrAppAttemptsFailedRetrieved() {
numAppAttemptsFailedRetrieved.incr();
}
public void incrGetClusterMetricsFailedRetrieved() {
numGetClusterMetricsFailedRetrieved.incr();
}
}

View File

@ -628,18 +628,29 @@ public GetApplicationReportResponse getApplicationReport(
public GetApplicationsResponse getApplications(GetApplicationsRequest request)
throws YarnException, IOException {
if (request == null) {
routerMetrics.incrMultipleAppsFailedRetrieved();
RouterServerUtil.logAndThrowException(
"Missing getApplications request.",
null);
}
long startTime = clock.getTime();
Map<SubClusterId, SubClusterInfo> subclusters =
federationFacade.getSubClusters(true);
ClientMethod remoteMethod = new ClientMethod("getApplications",
new Class[] {GetApplicationsRequest.class}, new Object[] {request});
Map<SubClusterId, GetApplicationsResponse> applications =
invokeConcurrent(subclusters.keySet(), remoteMethod,
Map<SubClusterId, GetApplicationsResponse> applications;
try {
applications = invokeConcurrent(subclusters.keySet(), remoteMethod,
GetApplicationsResponse.class);
} catch (Exception ex) {
routerMetrics.incrMultipleAppsFailedRetrieved();
LOG.error("Unable to get applications due to exception.", ex);
throw ex;
}
long stopTime = clock.getTime();
routerMetrics.succeededMultipleAppsRetrieved(stopTime - startTime);
// Merge the Application Reports
return RouterYarnClientUtils.mergeApplications(applications.values(),
returnPartialReport);
@ -648,14 +659,26 @@ public GetApplicationsResponse getApplications(GetApplicationsRequest request)
@Override
public GetClusterMetricsResponse getClusterMetrics(
GetClusterMetricsRequest request) throws YarnException, IOException {
long startTime = clock.getTime();
Map<SubClusterId, SubClusterInfo> subclusters =
federationFacade.getSubClusters(true);
ClientMethod remoteMethod = new ClientMethod("getClusterMetrics",
new Class[] {GetClusterMetricsRequest.class}, new Object[] {request});
ArrayList<SubClusterId> clusterList = new ArrayList<>(subclusters.keySet());
Map<SubClusterId, GetClusterMetricsResponse> clusterMetrics =
invokeConcurrent(clusterList, remoteMethod,
Map<SubClusterId, GetClusterMetricsResponse> clusterMetrics;
try {
clusterMetrics = invokeConcurrent(clusterList, remoteMethod,
GetClusterMetricsResponse.class);
} catch (Exception ex) {
routerMetrics.incrGetClusterMetricsFailedRetrieved();
LOG.error("Unable to get cluster metrics due to exception.", ex);
throw ex;
}
long stopTime = clock.getTime();
routerMetrics.succeededGetClusterMetricsRetrieved(stopTime - startTime);
return RouterYarnClientUtils.merge(clusterMetrics.values());
}

View File

@ -279,6 +279,37 @@ public void testMulipleAppsReportFailed() {
metrics.getMultipleAppsFailedRetrieved());
}
/**
* This test validates the correctness of the metric: Retrieved getClusterMetrics
* multiple times successfully.
*/
@Test
public void testSucceededGetClusterMetrics() {
long totalGoodBefore = metrics.getNumSucceededGetClusterMetricsRetrieved();
goodSubCluster.getClusterMetrics(100);
Assert.assertEquals(totalGoodBefore + 1,
metrics.getNumSucceededGetClusterMetricsRetrieved());
Assert.assertEquals(100, metrics.getLatencySucceededGetClusterMetricsRetrieved(),
0);
goodSubCluster.getClusterMetrics(200);
Assert.assertEquals(totalGoodBefore + 2,
metrics.getNumSucceededGetClusterMetricsRetrieved());
Assert.assertEquals(150, metrics.getLatencySucceededGetClusterMetricsRetrieved(),
0);
}
/**
* This test validates the correctness of the metric: Failed to
* retrieve getClusterMetrics.
*/
@Test
public void testGetClusterMetricsFailed() {
long totalBadbefore = metrics.getClusterMetricsFailedRetrieved();
badSubCluster.getClusterMetrics();
Assert.assertEquals(totalBadbefore + 1,
metrics.getClusterMetricsFailedRetrieved());
}
// Records failures for all calls
private class MockBadSubCluster {
public void getNewApplication() {
@ -310,6 +341,11 @@ public void getApplicationsReport() {
LOG.info("Mocked: failed getApplicationsReport call");
metrics.incrMultipleAppsFailedRetrieved();
}
public void getClusterMetrics() {
LOG.info("Mocked: failed getClusterMetrics call");
metrics.incrGetClusterMetricsFailedRetrieved();
}
}
// Records successes for all calls
@ -350,5 +386,11 @@ public void getApplicationsReport(long duration) {
duration);
metrics.succeededMultipleAppsRetrieved(duration);
}
public void getClusterMetrics(long duration){
LOG.info("Mocked: successful getClusterMetrics call with duration {}",
duration);
metrics.succeededGetClusterMetricsRetrieved(duration);
}
}
}