HADOOP-18167. Add metrics to track delegation token secret manager op… (#4092)

* HADOOP-18167. Add metrics to track delegation token secret manager operations
This commit is contained in:
hchaverri 2022-04-26 09:20:11 -07:00 committed by Owen O'Malley
parent f4290055c6
commit 4043ef0485
2 changed files with 249 additions and 3 deletions

View File

@ -33,7 +33,17 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.statistics.DurationTracker;
import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableRate;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.HadoopKerberosName;
import org.apache.hadoop.security.token.SecretManager;
@ -42,6 +52,7 @@
import org.apache.hadoop.util.Time;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
import org.apache.hadoop.util.functional.InvocationRaisingIOE;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -85,6 +96,10 @@ private String formatTokenId(TokenIdent id) {
* Access to currentKey is protected by this object lock
*/
private DelegationKey currentKey;
/**
* Metrics to track token management operations.
*/
private DelegationTokenSecretManagerMetrics metrics;
private long keyUpdateInterval;
private long tokenMaxLifetime;
@ -123,6 +138,7 @@ public AbstractDelegationTokenSecretManager(long delegationKeyUpdateInterval,
this.tokenRenewInterval = delegationTokenRenewInterval;
this.tokenRemoverScanInterval = delegationTokenRemoverScanInterval;
this.storeTokenTrackingId = false;
this.metrics = DelegationTokenSecretManagerMetrics.create();
}
/** should be called before this object is used */
@ -417,7 +433,7 @@ protected synchronized byte[] createPassword(TokenIdent identifier) {
DelegationTokenInformation tokenInfo = new DelegationTokenInformation(now
+ tokenRenewInterval, password, getTrackingIdIfEnabled(identifier));
try {
storeToken(identifier, tokenInfo);
metrics.trackStoreToken(() -> storeToken(identifier, tokenInfo));
} catch (IOException ioe) {
LOG.error("Could not store token " + formatTokenId(identifier) + "!!",
ioe);
@ -542,7 +558,7 @@ public synchronized long renewToken(Token<TokenIdent> token,
throw new InvalidToken("Renewal request for unknown token "
+ formatTokenId(id));
}
updateToken(id, info);
metrics.trackUpdateToken(() -> updateToken(id, info));
return renewTime;
}
@ -578,7 +594,9 @@ public synchronized TokenIdent cancelToken(Token<TokenIdent> token,
if (info == null) {
throw new InvalidToken("Token not found " + formatTokenId(id));
}
metrics.trackRemoveToken(() -> {
removeStoredToken(id);
});
return id;
}
@ -726,4 +744,96 @@ public TokenIdent decodeTokenIdentifier(Token<TokenIdent> token) throws IOExcept
return token.decodeIdentifier();
}
protected DelegationTokenSecretManagerMetrics getMetrics() {
return metrics;
}
/**
* DelegationTokenSecretManagerMetrics tracks token management operations
* and publishes them through the metrics interfaces.
*/
@Metrics(about="Delegation token secret manager metrics", context="token")
static class DelegationTokenSecretManagerMetrics implements DurationTrackerFactory {
private static final Logger LOG = LoggerFactory.getLogger(
DelegationTokenSecretManagerMetrics.class);
final static String STORE_TOKEN_STAT = "storeToken";
final static String UPDATE_TOKEN_STAT = "updateToken";
final static String REMOVE_TOKEN_STAT = "removeToken";
final static String TOKEN_FAILURE_STAT = "tokenFailure";
private final MetricsRegistry registry;
private final IOStatisticsStore ioStatistics;
@Metric("Rate of storage of delegation tokens and latency (milliseconds)")
private MutableRate storeToken;
@Metric("Rate of update of delegation tokens and latency (milliseconds)")
private MutableRate updateToken;
@Metric("Rate of removal of delegation tokens and latency (milliseconds)")
private MutableRate removeToken;
@Metric("Counter of delegation tokens operation failures")
private MutableCounterLong tokenFailure;
static DelegationTokenSecretManagerMetrics create() {
return DefaultMetricsSystem.instance().register(new DelegationTokenSecretManagerMetrics());
}
DelegationTokenSecretManagerMetrics() {
ioStatistics = IOStatisticsBinding.iostatisticsStore()
.withDurationTracking(STORE_TOKEN_STAT, UPDATE_TOKEN_STAT, REMOVE_TOKEN_STAT)
.withCounters(TOKEN_FAILURE_STAT)
.build();
registry = new MetricsRegistry("DelegationTokenSecretManagerMetrics");
LOG.debug("Initialized {}", registry);
}
public void trackStoreToken(InvocationRaisingIOE invocation) throws IOException {
trackInvocation(invocation, STORE_TOKEN_STAT, storeToken);
}
public void trackUpdateToken(InvocationRaisingIOE invocation) throws IOException {
trackInvocation(invocation, UPDATE_TOKEN_STAT, updateToken);
}
public void trackRemoveToken(InvocationRaisingIOE invocation) throws IOException {
trackInvocation(invocation, REMOVE_TOKEN_STAT, removeToken);
}
public void trackInvocation(InvocationRaisingIOE invocation, String statistic,
MutableRate metric) throws IOException {
try {
long start = Time.monotonicNow();
IOStatisticsBinding.trackDurationOfInvocation(this, statistic, invocation);
metric.add(Time.monotonicNow() - start);
} catch (Exception ex) {
tokenFailure.incr();
throw ex;
}
}
@Override
public DurationTracker trackDuration(String key, long count) {
return ioStatistics.trackDuration(key, count);
}
protected MutableRate getStoreToken() {
return storeToken;
}
protected MutableRate getUpdateToken() {
return updateToken;
}
protected MutableRate getRemoveToken() {
return removeToken;
}
protected MutableCounterLong getTokenFailure() {
return tokenFailure;
}
protected IOStatisticsStore getIoStatistics() {
return ioStatistics;
}
}
}

View File

@ -30,6 +30,12 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import org.apache.hadoop.fs.statistics.IOStatisticAssertions;
import org.apache.hadoop.fs.statistics.MeanStatistic;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableRate;
import org.apache.hadoop.test.LambdaTestUtils;
import org.junit.Assert;
import org.apache.hadoop.io.DataInputBuffer;
@ -156,6 +162,55 @@ public DelegationKey getKey(TestDelegationTokenIdentifier id) {
}
}
public static class TestFailureDelegationTokenSecretManager
extends TestDelegationTokenSecretManager {
private boolean throwError = false;
private long errorSleepMillis;
public TestFailureDelegationTokenSecretManager(long errorSleepMillis) {
super(24*60*60*1000, 10*1000, 1*1000, 60*60*1000);
this.errorSleepMillis = errorSleepMillis;
}
public void setThrowError(boolean throwError) {
this.throwError = throwError;
}
private void sleepAndThrow() throws IOException {
try {
Thread.sleep(errorSleepMillis);
throw new IOException("Test exception");
} catch (InterruptedException e) {
}
}
@Override
protected void storeNewToken(TestDelegationTokenIdentifier ident, long renewDate)
throws IOException {
if (throwError) {
sleepAndThrow();
}
super.storeNewToken(ident, renewDate);
}
@Override
protected void removeStoredToken(TestDelegationTokenIdentifier ident) throws IOException {
if (throwError) {
sleepAndThrow();
}
super.removeStoredToken(ident);
}
@Override
protected void updateStoredToken(TestDelegationTokenIdentifier ident, long renewDate)
throws IOException {
if (throwError) {
sleepAndThrow();
}
super.updateStoredToken(ident, renewDate);
}
}
public static class TokenSelector extends
AbstractDelegationTokenSelector<TestDelegationTokenIdentifier>{
@ -579,4 +634,85 @@ public void testEmptyToken() throws IOException {
assertEquals(token1, token2);
assertEquals(token1.encodeToUrlString(), token2.encodeToUrlString());
}
@Test
public void testDelegationTokenSecretManagerMetrics() throws Exception {
TestDelegationTokenSecretManager dtSecretManager =
new TestDelegationTokenSecretManager(24*60*60*1000,
10*1000, 1*1000, 60*60*1000);
try {
dtSecretManager.startThreads();
final Token<TestDelegationTokenIdentifier> token = callAndValidateMetrics(
dtSecretManager, dtSecretManager.getMetrics().getStoreToken(), "storeToken",
() -> generateDelegationToken(dtSecretManager, "SomeUser", "JobTracker"), 1);
callAndValidateMetrics(dtSecretManager, dtSecretManager.getMetrics().getUpdateToken(),
"updateToken", () -> dtSecretManager.renewToken(token, "JobTracker"), 1);
callAndValidateMetrics(dtSecretManager, dtSecretManager.getMetrics().getRemoveToken(),
"removeToken", () -> dtSecretManager.cancelToken(token, "JobTracker"), 1);
} finally {
dtSecretManager.stopThreads();
}
}
@Test
public void testDelegationTokenSecretManagerMetricsFailures() throws Exception {
int errorSleepMillis = 200;
TestFailureDelegationTokenSecretManager dtSecretManager =
new TestFailureDelegationTokenSecretManager(errorSleepMillis);
try {
dtSecretManager.startThreads();
final Token<TestDelegationTokenIdentifier> token =
generateDelegationToken(dtSecretManager, "SomeUser", "JobTracker");
dtSecretManager.setThrowError(true);
callAndValidateFailureMetrics(dtSecretManager, "storeToken", 1, 1, false,
errorSleepMillis,
() -> generateDelegationToken(dtSecretManager, "SomeUser", "JobTracker"));
callAndValidateFailureMetrics(dtSecretManager, "updateToken", 1, 2, true,
errorSleepMillis, () -> dtSecretManager.renewToken(token, "JobTracker"));
callAndValidateFailureMetrics(dtSecretManager, "removeToken", 1, 3, true,
errorSleepMillis, () -> dtSecretManager.cancelToken(token, "JobTracker"));
} finally {
dtSecretManager.stopThreads();
}
}
private <T> T callAndValidateMetrics(TestDelegationTokenSecretManager dtSecretManager,
MutableRate metric, String statName, Callable<T> callable, int expectedCount)
throws Exception {
MeanStatistic stat = IOStatisticAssertions.lookupMeanStatistic(
dtSecretManager.getMetrics().getIoStatistics(), statName + ".mean");
assertEquals(expectedCount - 1, metric.lastStat().numSamples());
assertEquals(expectedCount - 1, stat.getSamples());
T returnedObject = callable.call();
assertEquals(expectedCount, metric.lastStat().numSamples());
assertEquals(expectedCount, stat.getSamples());
return returnedObject;
}
private <T> void callAndValidateFailureMetrics(TestDelegationTokenSecretManager dtSecretManager,
String statName, int expectedStatCount, int expectedMetricCount, boolean expectError,
int errorSleepMillis, Callable<T> callable) throws Exception {
MutableCounterLong counter = dtSecretManager.getMetrics().getTokenFailure();
MeanStatistic failureStat = IOStatisticAssertions.lookupMeanStatistic(
dtSecretManager.getMetrics().getIoStatistics(), statName + ".failures.mean");
assertEquals(expectedMetricCount - 1, counter.value());
assertEquals(expectedStatCount - 1, failureStat.getSamples());
if (expectError) {
LambdaTestUtils.intercept(IOException.class, callable);
} else {
callable.call();
}
assertEquals(expectedMetricCount, counter.value());
assertEquals(expectedStatCount, failureStat.getSamples());
assertTrue(failureStat.getSum() >= errorSleepMillis);
}
}