YARN-6924. Metrics for Federation AMRMProxy. Contributed by Young Chen

This commit is contained in:
bibinchundatt 2020-03-07 09:34:42 +05:30
parent 69faaa1d58
commit 3859fa76d0
3 changed files with 457 additions and 52 deletions

View File

@ -0,0 +1,198 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.metrics2.MetricsInfo;
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
import org.apache.hadoop.metrics2.lib.MutableQuantiles;
import org.apache.hadoop.metrics2.lib.MutableRate;
import static org.apache.hadoop.metrics2.lib.Interns.info;
@Metrics(about = "Metrics for AMRMProxy", context = "fedr")
public final class AMRMProxyMetrics {
private static final MetricsInfo RECORD_INFO =
info("AMRMProxyMetrics", "Metrics for the AMRMProxy");
@Metric("# of failed applications start requests")
private MutableGaugeLong failedAppStartRequests;
@Metric("# of failed register AM requests")
private MutableGaugeLong failedRegisterAMRequests;
@Metric("# of failed finish AM requests")
private MutableGaugeLong failedFinishAMRequests;
@Metric("# of failed allocate requests ")
private MutableGaugeLong failedAllocateRequests;
@Metric("# of failed application recoveries")
private MutableGaugeLong failedAppRecoveryCount;
// Aggregate metrics are shared, and don't have to be looked up per call
@Metric("Application start request latency(ms)")
private MutableRate totalSucceededAppStartRequests;
@Metric("Register application master latency(ms)")
private MutableRate totalSucceededRegisterAMRequests;
@Metric("Finish application master latency(ms)")
private MutableRate totalSucceededFinishAMRequests;
@Metric("Allocate latency(ms)")
private MutableRate totalSucceededAllocateRequests;
// Quantile latency in ms - this is needed for SLA (95%, 99%, etc)
private MutableQuantiles applicationStartLatency;
private MutableQuantiles registerAMLatency;
private MutableQuantiles finishAMLatency;
private MutableQuantiles allocateLatency;
private static volatile AMRMProxyMetrics instance = null;
private MetricsRegistry registry;
private AMRMProxyMetrics() {
registry = new MetricsRegistry(RECORD_INFO);
registry.tag(RECORD_INFO, "AMRMProxy");
applicationStartLatency = registry
.newQuantiles("applicationStartLatency", "latency of app start", "ops",
"latency", 10);
registerAMLatency = registry
.newQuantiles("registerAMLatency", "latency of register AM", "ops",
"latency", 10);
finishAMLatency = registry
.newQuantiles("finishAMLatency", "latency of finish AM", "ops",
"latency", 10);
allocateLatency = registry
.newQuantiles("allocateLatency", "latency of allocate", "ops",
"latency", 10);
}
/**
* Initialize the singleton instance.
*
* @return the singleton
*/
public static AMRMProxyMetrics getMetrics() {
synchronized (AMRMProxyMetrics.class) {
if (instance == null) {
instance = DefaultMetricsSystem.instance()
.register("AMRMProxyMetrics", "Metrics for the Yarn AMRMProxy",
new AMRMProxyMetrics());
}
}
return instance;
}
@VisibleForTesting
long getNumSucceededAppStartRequests() {
return totalSucceededAppStartRequests.lastStat().numSamples();
}
@VisibleForTesting
double getLatencySucceededAppStartRequests() {
return totalSucceededAppStartRequests.lastStat().mean();
}
public void succeededAppStartRequests(long duration) {
totalSucceededAppStartRequests.add(duration);
applicationStartLatency.add(duration);
}
@VisibleForTesting
long getNumSucceededRegisterAMRequests() {
return totalSucceededRegisterAMRequests.lastStat().numSamples();
}
@VisibleForTesting
double getLatencySucceededRegisterAMRequests() {
return totalSucceededRegisterAMRequests.lastStat().mean();
}
public void succeededRegisterAMRequests(long duration) {
totalSucceededRegisterAMRequests.add(duration);
registerAMLatency.add(duration);
}
@VisibleForTesting
long getNumSucceededFinishAMRequests() {
return totalSucceededFinishAMRequests.lastStat().numSamples();
}
@VisibleForTesting
double getLatencySucceededFinishAMRequests() {
return totalSucceededFinishAMRequests.lastStat().mean();
}
public void succeededFinishAMRequests(long duration) {
totalSucceededFinishAMRequests.add(duration);
finishAMLatency.add(duration);
}
@VisibleForTesting
long getNumSucceededAllocateRequests() {
return totalSucceededAllocateRequests.lastStat().numSamples();
}
@VisibleForTesting
double getLatencySucceededAllocateRequests() {
return totalSucceededAllocateRequests.lastStat().mean();
}
public void succeededAllocateRequests(long duration) {
totalSucceededAllocateRequests.add(duration);
allocateLatency.add(duration);
}
long getFailedAppStartRequests() {
return failedAppStartRequests.value();
}
public void incrFailedAppStartRequests() {
failedAppStartRequests.incr();
}
long getFailedRegisterAMRequests() {
return failedRegisterAMRequests.value();
}
public void incrFailedRegisterAMRequests() {
failedRegisterAMRequests.incr();
}
long getFailedFinishAMRequests() {
return failedFinishAMRequests.value();
}
public void incrFailedFinishAMRequests() {
failedFinishAMRequests.incr();
}
long getFailedAllocateRequests() {
return failedAllocateRequests.value();
}
public void incrFailedAllocateRequests() {
failedAllocateRequests.incr();
}
long getFailedAppRecoveryCount() {
return failedAppRecoveryCount.value();
}
public void incrFailedAppRecoveryCount() {
failedAppRecoveryCount.incr();
}
}

View File

@ -75,7 +75,9 @@ import org.apache.hadoop.yarn.server.nodemanager.security.authorize
import org.apache.hadoop.yarn.server.security.MasterKeyData;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.MonotonicClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -97,6 +99,7 @@ public class AMRMProxyService extends CompositeService implements
private static final String NMSS_USER_KEY = "user";
private static final String NMSS_AMRMTOKEN_KEY = "amrmtoken";
private final Clock clock = new MonotonicClock();
private Server server;
private final Context nmContext;
private final AsyncDispatcher dispatcher;
@ -104,6 +107,7 @@ public class AMRMProxyService extends CompositeService implements
private AMRMProxyTokenSecretManager secretManager;
private Map<ApplicationId, RequestInterceptorChainWrapper> applPipelineMap;
private RegistryOperations registry;
private AMRMProxyMetrics metrics;
/**
* Creates an instance of the service.
@ -122,6 +126,8 @@ public class AMRMProxyService extends CompositeService implements
this.dispatcher.register(ApplicationEventType.class,
new ApplicationEventHandler());
metrics = AMRMProxyMetrics.getMetrics();
}
@Override
@ -272,6 +278,7 @@ public class AMRMProxyService extends CompositeService implements
} catch (Throwable e) {
LOG.error("Exception when recovering " + attemptId
+ ", removing it from NMStateStore and move on", e);
this.metrics.incrFailedAppRecoveryCount();
this.nmContext.getNMStateStore().removeAMRMProxyAppContext(attemptId);
}
}
@ -286,13 +293,26 @@ public class AMRMProxyService extends CompositeService implements
public RegisterApplicationMasterResponse registerApplicationMaster(
RegisterApplicationMasterRequest request) throws YarnException,
IOException {
LOG.info("Registering application master." + " Host:"
+ request.getHost() + " Port:" + request.getRpcPort()
+ " Tracking Url:" + request.getTrackingUrl());
RequestInterceptorChainWrapper pipeline =
authorizeAndGetInterceptorChain();
return pipeline.getRootInterceptor()
.registerApplicationMaster(request);
long startTime = clock.getTime();
try {
RequestInterceptorChainWrapper pipeline =
authorizeAndGetInterceptorChain();
LOG.info("Registering application master." + " Host:" + request.getHost()
+ " Port:" + request.getRpcPort() + " Tracking Url:" + request
.getTrackingUrl() + " for application " + pipeline
.getApplicationAttemptId());
RegisterApplicationMasterResponse response =
pipeline.getRootInterceptor().registerApplicationMaster(request);
long endTime = clock.getTime();
this.metrics.succeededRegisterAMRequests(endTime - startTime);
LOG.info("RegisterAM processing finished in {} ms for application {}",
endTime - startTime, pipeline.getApplicationAttemptId());
return response;
} catch (Throwable t) {
this.metrics.incrFailedRegisterAMRequests();
throw t;
}
}
/**
@ -304,11 +324,25 @@ public class AMRMProxyService extends CompositeService implements
public FinishApplicationMasterResponse finishApplicationMaster(
FinishApplicationMasterRequest request) throws YarnException,
IOException {
LOG.info("Finishing application master. Tracking Url:"
+ request.getTrackingUrl());
RequestInterceptorChainWrapper pipeline =
authorizeAndGetInterceptorChain();
return pipeline.getRootInterceptor().finishApplicationMaster(request);
long startTime = clock.getTime();
try {
RequestInterceptorChainWrapper pipeline =
authorizeAndGetInterceptorChain();
LOG.info("Finishing application master for {}. Tracking Url: {}",
pipeline.getApplicationAttemptId(), request.getTrackingUrl());
FinishApplicationMasterResponse response =
pipeline.getRootInterceptor().finishApplicationMaster(request);
long endTime = clock.getTime();
this.metrics.succeededFinishAMRequests(endTime - startTime);
LOG.info("FinishAM finished with isUnregistered = {} in {} ms for {}",
response.getIsUnregistered(), endTime - startTime,
pipeline.getApplicationAttemptId());
return response;
} catch (Throwable t) {
this.metrics.incrFailedFinishAMRequests();
throw t;
}
}
/**
@ -321,16 +355,26 @@ public class AMRMProxyService extends CompositeService implements
@Override
public AllocateResponse allocate(AllocateRequest request)
throws YarnException, IOException {
AMRMTokenIdentifier amrmTokenIdentifier =
YarnServerSecurityUtils.authorizeRequest();
RequestInterceptorChainWrapper pipeline =
getInterceptorChain(amrmTokenIdentifier);
AllocateResponse allocateResponse =
pipeline.getRootInterceptor().allocate(request);
long startTime = clock.getTime();
try {
AMRMTokenIdentifier amrmTokenIdentifier =
YarnServerSecurityUtils.authorizeRequest();
RequestInterceptorChainWrapper pipeline =
getInterceptorChain(amrmTokenIdentifier);
AllocateResponse allocateResponse =
pipeline.getRootInterceptor().allocate(request);
updateAMRMTokens(amrmTokenIdentifier, pipeline, allocateResponse);
updateAMRMTokens(amrmTokenIdentifier, pipeline, allocateResponse);
return allocateResponse;
long endTime = clock.getTime();
this.metrics.succeededAllocateRequests(endTime - startTime);
LOG.info("Allocate processing finished in {} ms for application {}",
endTime - startTime, pipeline.getApplicationAttemptId());
return allocateResponse;
} catch (Throwable t) {
this.metrics.incrFailedAllocateRequests();
throw t;
}
}
/**
@ -343,40 +387,47 @@ public class AMRMProxyService extends CompositeService implements
*/
public void processApplicationStartRequest(StartContainerRequest request)
throws IOException, YarnException {
LOG.info("Callback received for initializing request "
+ "processing pipeline for an AM");
ContainerTokenIdentifier containerTokenIdentifierForKey =
BuilderUtils.newContainerTokenIdentifier(request
.getContainerToken());
ApplicationAttemptId appAttemptId =
containerTokenIdentifierForKey.getContainerID()
.getApplicationAttemptId();
Credentials credentials =
YarnServerSecurityUtils.parseCredentials(request
.getContainerLaunchContext());
long startTime = clock.getTime();
try {
LOG.info("Callback received for initializing request "
+ "processing pipeline for an AM");
ContainerTokenIdentifier containerTokenIdentifierForKey =
BuilderUtils.newContainerTokenIdentifier(request.getContainerToken());
ApplicationAttemptId appAttemptId =
containerTokenIdentifierForKey.getContainerID()
.getApplicationAttemptId();
Credentials credentials = YarnServerSecurityUtils
.parseCredentials(request.getContainerLaunchContext());
Token<AMRMTokenIdentifier> amrmToken =
getFirstAMRMToken(credentials.getAllTokens());
if (amrmToken == null) {
throw new YarnRuntimeException(
"AMRMToken not found in the start container request for application:"
+ appAttemptId.toString());
Token<AMRMTokenIdentifier> amrmToken =
getFirstAMRMToken(credentials.getAllTokens());
if (amrmToken == null) {
throw new YarnRuntimeException(
"AMRMToken not found in the start container request for "
+ "application:" + appAttemptId.toString());
}
// Substitute the existing AMRM Token with a local one. Keep the rest of
// the tokens in the credentials intact.
Token<AMRMTokenIdentifier> localToken =
this.secretManager.createAndGetAMRMToken(appAttemptId);
credentials.addToken(localToken.getService(), localToken);
DataOutputBuffer dob = new DataOutputBuffer();
credentials.writeTokenStorageToStream(dob);
request.getContainerLaunchContext()
.setTokens(ByteBuffer.wrap(dob.getData(), 0, dob.getLength()));
initializePipeline(appAttemptId,
containerTokenIdentifierForKey.getApplicationSubmitter(), amrmToken,
localToken, null, false, credentials);
long endTime = clock.getTime();
this.metrics.succeededAppStartRequests(endTime - startTime);
} catch (Throwable t) {
this.metrics.incrFailedAppStartRequests();
throw t;
}
// Substitute the existing AMRM Token with a local one. Keep the rest of the
// tokens in the credentials intact.
Token<AMRMTokenIdentifier> localToken =
this.secretManager.createAndGetAMRMToken(appAttemptId);
credentials.addToken(localToken.getService(), localToken);
DataOutputBuffer dob = new DataOutputBuffer();
credentials.writeTokenStorageToStream(dob);
request.getContainerLaunchContext().setTokens(
ByteBuffer.wrap(dob.getData(), 0, dob.getLength()));
initializePipeline(appAttemptId,
containerTokenIdentifierForKey.getApplicationSubmitter(), amrmToken,
localToken, null, false, credentials);
}
/**

View File

@ -0,0 +1,156 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TestAMRMProxyMetrics extends BaseAMRMProxyTest {
public static final Logger LOG =
LoggerFactory.getLogger(TestAMRMProxyMetrics.class);
private static AMRMProxyMetrics metrics;
@BeforeClass
public static void init() {
metrics = AMRMProxyMetrics.getMetrics();
LOG.info("Test: aggregate metrics are initialized correctly");
Assert.assertEquals(0, metrics.getFailedAppStartRequests());
Assert.assertEquals(0, metrics.getFailedRegisterAMRequests());
Assert.assertEquals(0, metrics.getFailedFinishAMRequests());
Assert.assertEquals(0, metrics.getFailedAllocateRequests());
Assert.assertEquals(0, metrics.getNumSucceededAppStartRequests());
Assert.assertEquals(0, metrics.getNumSucceededRegisterAMRequests());
Assert.assertEquals(0, metrics.getNumSucceededFinishAMRequests());
Assert.assertEquals(0, metrics.getNumSucceededAllocateRequests());
LOG.info("Test: aggregate metrics are updated correctly");
}
@Test
public void testAllocateRequestWithNullValues() throws Exception {
long failedAppStartRequests = metrics.getFailedAppStartRequests();
long failedRegisterAMRequests = metrics.getFailedRegisterAMRequests();
long failedFinishAMRequests = metrics.getFailedFinishAMRequests();
long failedAllocateRequests = metrics.getFailedAllocateRequests();
long succeededAppStartRequests = metrics.getNumSucceededAppStartRequests();
long succeededRegisterAMRequests =
metrics.getNumSucceededRegisterAMRequests();
long succeededFinishAMRequests = metrics.getNumSucceededFinishAMRequests();
long succeededAllocateRequests = metrics.getNumSucceededAllocateRequests();
int testAppId = 1;
RegisterApplicationMasterResponse registerResponse =
registerApplicationMaster(testAppId);
Assert.assertNotNull(registerResponse);
Assert
.assertEquals(Integer.toString(testAppId), registerResponse.getQueue());
AllocateResponse allocateResponse = allocate(testAppId);
Assert.assertNotNull(allocateResponse);
FinishApplicationMasterResponse finshResponse =
finishApplicationMaster(testAppId, FinalApplicationStatus.SUCCEEDED);
Assert.assertNotNull(finshResponse);
Assert.assertEquals(true, finshResponse.getIsUnregistered());
Assert.assertEquals(failedAppStartRequests,
metrics.getFailedAppStartRequests());
Assert.assertEquals(failedRegisterAMRequests,
metrics.getFailedRegisterAMRequests());
Assert.assertEquals(failedFinishAMRequests,
metrics.getFailedFinishAMRequests());
Assert.assertEquals(failedAllocateRequests,
metrics.getFailedAllocateRequests());
Assert.assertEquals(succeededAppStartRequests,
metrics.getNumSucceededAppStartRequests());
Assert.assertEquals(1 + succeededRegisterAMRequests,
metrics.getNumSucceededRegisterAMRequests());
Assert.assertEquals(1 + succeededFinishAMRequests,
metrics.getNumSucceededFinishAMRequests());
Assert.assertEquals(1 + succeededAllocateRequests,
metrics.getNumSucceededAllocateRequests());
}
@Test
public void testFinishOneApplicationMasterWithFailure() throws Exception {
long failedAppStartRequests = metrics.getFailedAppStartRequests();
long failedRegisterAMRequests = metrics.getFailedRegisterAMRequests();
long failedFinishAMRequests = metrics.getFailedFinishAMRequests();
long failedAllocateRequests = metrics.getFailedAllocateRequests();
long succeededAppStartRequests = metrics.getNumSucceededAppStartRequests();
long succeededRegisterAMRequests =
metrics.getNumSucceededRegisterAMRequests();
long succeededFinishAMRequests = metrics.getNumSucceededFinishAMRequests();
long succeededAllocateRequests = metrics.getNumSucceededAllocateRequests();
int testAppId = 1;
RegisterApplicationMasterResponse registerResponse =
registerApplicationMaster(testAppId);
Assert.assertNotNull(registerResponse);
Assert
.assertEquals(Integer.toString(testAppId), registerResponse.getQueue());
FinishApplicationMasterResponse finshResponse =
finishApplicationMaster(testAppId, FinalApplicationStatus.FAILED);
Assert.assertNotNull(finshResponse);
try {
// Try to finish an application master that is already finished.
finishApplicationMaster(testAppId, FinalApplicationStatus.SUCCEEDED);
Assert
.fail("The request to finish application master should have failed");
} catch (Throwable ex) {
// This is expected. So nothing required here.
LOG.info("Finish registration failed as expected because it was not "
+ "registered");
}
Assert.assertEquals(failedAppStartRequests,
metrics.getFailedAppStartRequests());
Assert.assertEquals(failedRegisterAMRequests,
metrics.getFailedRegisterAMRequests());
Assert.assertEquals(1 + failedFinishAMRequests,
metrics.getFailedFinishAMRequests());
Assert.assertEquals(failedAllocateRequests,
metrics.getFailedAllocateRequests());
Assert.assertEquals(succeededAppStartRequests,
metrics.getNumSucceededAppStartRequests());
Assert.assertEquals(1 + succeededRegisterAMRequests,
metrics.getNumSucceededRegisterAMRequests());
Assert.assertEquals(1 + succeededFinishAMRequests,
metrics.getNumSucceededFinishAMRequests());
Assert.assertEquals(succeededAllocateRequests,
metrics.getNumSucceededAllocateRequests());
}
}