HADOOP-18222. Prevent DelegationTokenSecretManagerMetrics from registering multiple times
Fixes #4266 Signed-off-by: Owen O'Malley <oomalley@linkedin.com>
This commit is contained in:
parent
d486ae8c0f
commit
99a83fd4bd
@ -70,6 +70,12 @@ class AbstractDelegationTokenSecretManager<TokenIdent
|
||||
private static final Logger LOG = LoggerFactory
|
||||
.getLogger(AbstractDelegationTokenSecretManager.class);
|
||||
|
||||
/**
|
||||
* Metrics to track token management operations.
|
||||
*/
|
||||
private static final DelegationTokenSecretManagerMetrics METRICS
|
||||
= DelegationTokenSecretManagerMetrics.create();
|
||||
|
||||
private String formatTokenId(TokenIdent id) {
|
||||
return "(" + id + ")";
|
||||
}
|
||||
@ -107,10 +113,6 @@ private String formatTokenId(TokenIdent id) {
|
||||
* Access to currentKey is protected by this object lock
|
||||
*/
|
||||
private DelegationKey currentKey;
|
||||
/**
|
||||
* Metrics to track token management operations.
|
||||
*/
|
||||
private DelegationTokenSecretManagerMetrics metrics;
|
||||
|
||||
private long keyUpdateInterval;
|
||||
private long tokenMaxLifetime;
|
||||
@ -149,7 +151,6 @@ public AbstractDelegationTokenSecretManager(long delegationKeyUpdateInterval,
|
||||
this.tokenRenewInterval = delegationTokenRenewInterval;
|
||||
this.tokenRemoverScanInterval = delegationTokenRemoverScanInterval;
|
||||
this.storeTokenTrackingId = false;
|
||||
this.metrics = DelegationTokenSecretManagerMetrics.create();
|
||||
}
|
||||
|
||||
/** should be called before this object is used */
|
||||
@ -446,7 +447,7 @@ protected synchronized byte[] createPassword(TokenIdent identifier) {
|
||||
DelegationTokenInformation tokenInfo = new DelegationTokenInformation(now
|
||||
+ tokenRenewInterval, password, getTrackingIdIfEnabled(identifier));
|
||||
try {
|
||||
metrics.trackStoreToken(() -> storeToken(identifier, tokenInfo));
|
||||
METRICS.trackStoreToken(() -> storeToken(identifier, tokenInfo));
|
||||
} catch (IOException ioe) {
|
||||
LOG.error("Could not store token " + formatTokenId(identifier) + "!!",
|
||||
ioe);
|
||||
@ -571,7 +572,7 @@ public synchronized long renewToken(Token<TokenIdent> token,
|
||||
throw new InvalidToken("Renewal request for unknown token "
|
||||
+ formatTokenId(id));
|
||||
}
|
||||
metrics.trackUpdateToken(() -> updateToken(id, info));
|
||||
METRICS.trackUpdateToken(() -> updateToken(id, info));
|
||||
return renewTime;
|
||||
}
|
||||
|
||||
@ -607,7 +608,7 @@ public synchronized TokenIdent cancelToken(Token<TokenIdent> token,
|
||||
if (info == null) {
|
||||
throw new InvalidToken("Token not found " + formatTokenId(id));
|
||||
}
|
||||
metrics.trackRemoveToken(() -> {
|
||||
METRICS.trackRemoveToken(() -> {
|
||||
removeTokenForOwnerStats(id);
|
||||
removeStoredToken(id);
|
||||
});
|
||||
@ -845,7 +846,7 @@ protected void syncTokenOwnerStats() {
|
||||
}
|
||||
|
||||
protected DelegationTokenSecretManagerMetrics getMetrics() {
|
||||
return metrics;
|
||||
return METRICS;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -33,6 +33,7 @@
|
||||
import java.util.concurrent.Callable;
|
||||
import org.apache.hadoop.fs.statistics.IOStatisticAssertions;
|
||||
import org.apache.hadoop.fs.statistics.MeanStatistic;
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
|
||||
import org.apache.hadoop.metrics2.lib.MutableRate;
|
||||
import org.apache.hadoop.test.LambdaTestUtils;
|
||||
@ -635,6 +636,23 @@ public void testEmptyToken() throws IOException {
|
||||
assertEquals(token1.encodeToUrlString(), token2.encodeToUrlString());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultipleDelegationTokenSecretManagerMetrics() {
|
||||
TestDelegationTokenSecretManager dtSecretManager1 =
|
||||
new TestDelegationTokenSecretManager(0, 0, 0, 0);
|
||||
assertNotNull(dtSecretManager1.getMetrics());
|
||||
|
||||
TestDelegationTokenSecretManager dtSecretManager2 =
|
||||
new TestDelegationTokenSecretManager(0, 0, 0, 0);
|
||||
assertNotNull(dtSecretManager2.getMetrics());
|
||||
|
||||
DefaultMetricsSystem.instance().init("test");
|
||||
|
||||
TestDelegationTokenSecretManager dtSecretManager3 =
|
||||
new TestDelegationTokenSecretManager(0, 0, 0, 0);
|
||||
assertNotNull(dtSecretManager3.getMetrics());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDelegationTokenSecretManagerMetrics() throws Exception {
|
||||
TestDelegationTokenSecretManager dtSecretManager =
|
||||
@ -645,13 +663,13 @@ public void testDelegationTokenSecretManagerMetrics() throws Exception {
|
||||
|
||||
final Token<TestDelegationTokenIdentifier> token = callAndValidateMetrics(
|
||||
dtSecretManager, dtSecretManager.getMetrics().getStoreToken(), "storeToken",
|
||||
() -> generateDelegationToken(dtSecretManager, "SomeUser", "JobTracker"), 1);
|
||||
() -> generateDelegationToken(dtSecretManager, "SomeUser", "JobTracker"));
|
||||
|
||||
callAndValidateMetrics(dtSecretManager, dtSecretManager.getMetrics().getUpdateToken(),
|
||||
"updateToken", () -> dtSecretManager.renewToken(token, "JobTracker"), 1);
|
||||
"updateToken", () -> dtSecretManager.renewToken(token, "JobTracker"));
|
||||
|
||||
callAndValidateMetrics(dtSecretManager, dtSecretManager.getMetrics().getRemoveToken(),
|
||||
"removeToken", () -> dtSecretManager.cancelToken(token, "JobTracker"), 1);
|
||||
"removeToken", () -> dtSecretManager.cancelToken(token, "JobTracker"));
|
||||
} finally {
|
||||
dtSecretManager.stopThreads();
|
||||
}
|
||||
@ -671,14 +689,14 @@ public void testDelegationTokenSecretManagerMetricsFailures() throws Exception {
|
||||
|
||||
dtSecretManager.setThrowError(true);
|
||||
|
||||
callAndValidateFailureMetrics(dtSecretManager, "storeToken", 1, 1, false,
|
||||
callAndValidateFailureMetrics(dtSecretManager, "storeToken", false,
|
||||
errorSleepMillis,
|
||||
() -> generateDelegationToken(dtSecretManager, "SomeUser", "JobTracker"));
|
||||
|
||||
callAndValidateFailureMetrics(dtSecretManager, "updateToken", 1, 2, true,
|
||||
callAndValidateFailureMetrics(dtSecretManager, "updateToken", true,
|
||||
errorSleepMillis, () -> dtSecretManager.renewToken(token, "JobTracker"));
|
||||
|
||||
callAndValidateFailureMetrics(dtSecretManager, "removeToken", 1, 3, true,
|
||||
callAndValidateFailureMetrics(dtSecretManager, "removeToken", true,
|
||||
errorSleepMillis, () -> dtSecretManager.cancelToken(token, "JobTracker"));
|
||||
} finally {
|
||||
dtSecretManager.stopThreads();
|
||||
@ -686,33 +704,33 @@ public void testDelegationTokenSecretManagerMetricsFailures() throws Exception {
|
||||
}
|
||||
|
||||
private <T> T callAndValidateMetrics(TestDelegationTokenSecretManager dtSecretManager,
|
||||
MutableRate metric, String statName, Callable<T> callable, int expectedCount)
|
||||
MutableRate metric, String statName, Callable<T> callable)
|
||||
throws Exception {
|
||||
MeanStatistic stat = IOStatisticAssertions.lookupMeanStatistic(
|
||||
dtSecretManager.getMetrics().getIoStatistics(), statName + ".mean");
|
||||
assertEquals(expectedCount - 1, metric.lastStat().numSamples());
|
||||
assertEquals(expectedCount - 1, stat.getSamples());
|
||||
long metricBefore = metric.lastStat().numSamples();
|
||||
long statBefore = stat.getSamples();
|
||||
T returnedObject = callable.call();
|
||||
assertEquals(expectedCount, metric.lastStat().numSamples());
|
||||
assertEquals(expectedCount, stat.getSamples());
|
||||
assertEquals(metricBefore + 1, metric.lastStat().numSamples());
|
||||
assertEquals(statBefore + 1, stat.getSamples());
|
||||
return returnedObject;
|
||||
}
|
||||
|
||||
private <T> void callAndValidateFailureMetrics(TestDelegationTokenSecretManager dtSecretManager,
|
||||
String statName, int expectedStatCount, int expectedMetricCount, boolean expectError,
|
||||
int errorSleepMillis, Callable<T> callable) throws Exception {
|
||||
String statName, boolean expectError, int errorSleepMillis, Callable<T> callable)
|
||||
throws Exception {
|
||||
MutableCounterLong counter = dtSecretManager.getMetrics().getTokenFailure();
|
||||
MeanStatistic failureStat = IOStatisticAssertions.lookupMeanStatistic(
|
||||
dtSecretManager.getMetrics().getIoStatistics(), statName + ".failures.mean");
|
||||
assertEquals(expectedMetricCount - 1, counter.value());
|
||||
assertEquals(expectedStatCount - 1, failureStat.getSamples());
|
||||
long counterBefore = counter.value();
|
||||
long statBefore = failureStat.getSamples();
|
||||
if (expectError) {
|
||||
LambdaTestUtils.intercept(IOException.class, callable);
|
||||
} else {
|
||||
callable.call();
|
||||
}
|
||||
assertEquals(expectedMetricCount, counter.value());
|
||||
assertEquals(expectedStatCount, failureStat.getSamples());
|
||||
assertEquals(counterBefore + 1, counter.value());
|
||||
assertEquals(statBefore + 1, failureStat.getSamples());
|
||||
assertTrue(failureStat.getSum() >= errorSleepMillis);
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user