HADOOP-18222. Prevent DelegationTokenSecretManagerMetrics from registering multiple times
Fixes #4266 Signed-off-by: Owen O'Malley <oomalley@linkedin.com>
This commit is contained in:
parent
277daca91f
commit
1e043b937a
@ -65,6 +65,12 @@ class AbstractDelegationTokenSecretManager<TokenIdent
|
|||||||
private static final Logger LOG = LoggerFactory
|
private static final Logger LOG = LoggerFactory
|
||||||
.getLogger(AbstractDelegationTokenSecretManager.class);
|
.getLogger(AbstractDelegationTokenSecretManager.class);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Metrics to track token management operations.
|
||||||
|
*/
|
||||||
|
private static final DelegationTokenSecretManagerMetrics METRICS
|
||||||
|
= DelegationTokenSecretManagerMetrics.create();
|
||||||
|
|
||||||
private String formatTokenId(TokenIdent id) {
|
private String formatTokenId(TokenIdent id) {
|
||||||
return "(" + id + ")";
|
return "(" + id + ")";
|
||||||
}
|
}
|
||||||
@ -96,10 +102,6 @@ private String formatTokenId(TokenIdent id) {
|
|||||||
* Access to currentKey is protected by this object lock
|
* Access to currentKey is protected by this object lock
|
||||||
*/
|
*/
|
||||||
private DelegationKey currentKey;
|
private DelegationKey currentKey;
|
||||||
/**
|
|
||||||
* Metrics to track token management operations.
|
|
||||||
*/
|
|
||||||
private DelegationTokenSecretManagerMetrics metrics;
|
|
||||||
|
|
||||||
private long keyUpdateInterval;
|
private long keyUpdateInterval;
|
||||||
private long tokenMaxLifetime;
|
private long tokenMaxLifetime;
|
||||||
@ -138,7 +140,6 @@ public AbstractDelegationTokenSecretManager(long delegationKeyUpdateInterval,
|
|||||||
this.tokenRenewInterval = delegationTokenRenewInterval;
|
this.tokenRenewInterval = delegationTokenRenewInterval;
|
||||||
this.tokenRemoverScanInterval = delegationTokenRemoverScanInterval;
|
this.tokenRemoverScanInterval = delegationTokenRemoverScanInterval;
|
||||||
this.storeTokenTrackingId = false;
|
this.storeTokenTrackingId = false;
|
||||||
this.metrics = DelegationTokenSecretManagerMetrics.create();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/** should be called before this object is used */
|
/** should be called before this object is used */
|
||||||
@ -433,7 +434,7 @@ protected synchronized byte[] createPassword(TokenIdent identifier) {
|
|||||||
DelegationTokenInformation tokenInfo = new DelegationTokenInformation(now
|
DelegationTokenInformation tokenInfo = new DelegationTokenInformation(now
|
||||||
+ tokenRenewInterval, password, getTrackingIdIfEnabled(identifier));
|
+ tokenRenewInterval, password, getTrackingIdIfEnabled(identifier));
|
||||||
try {
|
try {
|
||||||
metrics.trackStoreToken(() -> storeToken(identifier, tokenInfo));
|
METRICS.trackStoreToken(() -> storeToken(identifier, tokenInfo));
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
LOG.error("Could not store token " + formatTokenId(identifier) + "!!",
|
LOG.error("Could not store token " + formatTokenId(identifier) + "!!",
|
||||||
ioe);
|
ioe);
|
||||||
@ -558,7 +559,7 @@ public synchronized long renewToken(Token<TokenIdent> token,
|
|||||||
throw new InvalidToken("Renewal request for unknown token "
|
throw new InvalidToken("Renewal request for unknown token "
|
||||||
+ formatTokenId(id));
|
+ formatTokenId(id));
|
||||||
}
|
}
|
||||||
metrics.trackUpdateToken(() -> updateToken(id, info));
|
METRICS.trackUpdateToken(() -> updateToken(id, info));
|
||||||
return renewTime;
|
return renewTime;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -594,7 +595,7 @@ public synchronized TokenIdent cancelToken(Token<TokenIdent> token,
|
|||||||
if (info == null) {
|
if (info == null) {
|
||||||
throw new InvalidToken("Token not found " + formatTokenId(id));
|
throw new InvalidToken("Token not found " + formatTokenId(id));
|
||||||
}
|
}
|
||||||
metrics.trackRemoveToken(() -> {
|
METRICS.trackRemoveToken(() -> {
|
||||||
removeStoredToken(id);
|
removeStoredToken(id);
|
||||||
});
|
});
|
||||||
return id;
|
return id;
|
||||||
@ -745,7 +746,7 @@ public TokenIdent decodeTokenIdentifier(Token<TokenIdent> token) throws IOExcept
|
|||||||
}
|
}
|
||||||
|
|
||||||
protected DelegationTokenSecretManagerMetrics getMetrics() {
|
protected DelegationTokenSecretManagerMetrics getMetrics() {
|
||||||
return metrics;
|
return METRICS;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -33,6 +33,7 @@
|
|||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
import org.apache.hadoop.fs.statistics.IOStatisticAssertions;
|
import org.apache.hadoop.fs.statistics.IOStatisticAssertions;
|
||||||
import org.apache.hadoop.fs.statistics.MeanStatistic;
|
import org.apache.hadoop.fs.statistics.MeanStatistic;
|
||||||
|
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||||
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
|
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
|
||||||
import org.apache.hadoop.metrics2.lib.MutableRate;
|
import org.apache.hadoop.metrics2.lib.MutableRate;
|
||||||
import org.apache.hadoop.test.LambdaTestUtils;
|
import org.apache.hadoop.test.LambdaTestUtils;
|
||||||
@ -635,6 +636,23 @@ public void testEmptyToken() throws IOException {
|
|||||||
assertEquals(token1.encodeToUrlString(), token2.encodeToUrlString());
|
assertEquals(token1.encodeToUrlString(), token2.encodeToUrlString());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMultipleDelegationTokenSecretManagerMetrics() {
|
||||||
|
TestDelegationTokenSecretManager dtSecretManager1 =
|
||||||
|
new TestDelegationTokenSecretManager(0, 0, 0, 0);
|
||||||
|
assertNotNull(dtSecretManager1.getMetrics());
|
||||||
|
|
||||||
|
TestDelegationTokenSecretManager dtSecretManager2 =
|
||||||
|
new TestDelegationTokenSecretManager(0, 0, 0, 0);
|
||||||
|
assertNotNull(dtSecretManager2.getMetrics());
|
||||||
|
|
||||||
|
DefaultMetricsSystem.instance().init("test");
|
||||||
|
|
||||||
|
TestDelegationTokenSecretManager dtSecretManager3 =
|
||||||
|
new TestDelegationTokenSecretManager(0, 0, 0, 0);
|
||||||
|
assertNotNull(dtSecretManager3.getMetrics());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testDelegationTokenSecretManagerMetrics() throws Exception {
|
public void testDelegationTokenSecretManagerMetrics() throws Exception {
|
||||||
TestDelegationTokenSecretManager dtSecretManager =
|
TestDelegationTokenSecretManager dtSecretManager =
|
||||||
@ -645,13 +663,13 @@ public void testDelegationTokenSecretManagerMetrics() throws Exception {
|
|||||||
|
|
||||||
final Token<TestDelegationTokenIdentifier> token = callAndValidateMetrics(
|
final Token<TestDelegationTokenIdentifier> token = callAndValidateMetrics(
|
||||||
dtSecretManager, dtSecretManager.getMetrics().getStoreToken(), "storeToken",
|
dtSecretManager, dtSecretManager.getMetrics().getStoreToken(), "storeToken",
|
||||||
() -> generateDelegationToken(dtSecretManager, "SomeUser", "JobTracker"), 1);
|
() -> generateDelegationToken(dtSecretManager, "SomeUser", "JobTracker"));
|
||||||
|
|
||||||
callAndValidateMetrics(dtSecretManager, dtSecretManager.getMetrics().getUpdateToken(),
|
callAndValidateMetrics(dtSecretManager, dtSecretManager.getMetrics().getUpdateToken(),
|
||||||
"updateToken", () -> dtSecretManager.renewToken(token, "JobTracker"), 1);
|
"updateToken", () -> dtSecretManager.renewToken(token, "JobTracker"));
|
||||||
|
|
||||||
callAndValidateMetrics(dtSecretManager, dtSecretManager.getMetrics().getRemoveToken(),
|
callAndValidateMetrics(dtSecretManager, dtSecretManager.getMetrics().getRemoveToken(),
|
||||||
"removeToken", () -> dtSecretManager.cancelToken(token, "JobTracker"), 1);
|
"removeToken", () -> dtSecretManager.cancelToken(token, "JobTracker"));
|
||||||
} finally {
|
} finally {
|
||||||
dtSecretManager.stopThreads();
|
dtSecretManager.stopThreads();
|
||||||
}
|
}
|
||||||
@ -671,14 +689,14 @@ public void testDelegationTokenSecretManagerMetricsFailures() throws Exception {
|
|||||||
|
|
||||||
dtSecretManager.setThrowError(true);
|
dtSecretManager.setThrowError(true);
|
||||||
|
|
||||||
callAndValidateFailureMetrics(dtSecretManager, "storeToken", 1, 1, false,
|
callAndValidateFailureMetrics(dtSecretManager, "storeToken", false,
|
||||||
errorSleepMillis,
|
errorSleepMillis,
|
||||||
() -> generateDelegationToken(dtSecretManager, "SomeUser", "JobTracker"));
|
() -> generateDelegationToken(dtSecretManager, "SomeUser", "JobTracker"));
|
||||||
|
|
||||||
callAndValidateFailureMetrics(dtSecretManager, "updateToken", 1, 2, true,
|
callAndValidateFailureMetrics(dtSecretManager, "updateToken", true,
|
||||||
errorSleepMillis, () -> dtSecretManager.renewToken(token, "JobTracker"));
|
errorSleepMillis, () -> dtSecretManager.renewToken(token, "JobTracker"));
|
||||||
|
|
||||||
callAndValidateFailureMetrics(dtSecretManager, "removeToken", 1, 3, true,
|
callAndValidateFailureMetrics(dtSecretManager, "removeToken", true,
|
||||||
errorSleepMillis, () -> dtSecretManager.cancelToken(token, "JobTracker"));
|
errorSleepMillis, () -> dtSecretManager.cancelToken(token, "JobTracker"));
|
||||||
} finally {
|
} finally {
|
||||||
dtSecretManager.stopThreads();
|
dtSecretManager.stopThreads();
|
||||||
@ -686,33 +704,33 @@ public void testDelegationTokenSecretManagerMetricsFailures() throws Exception {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private <T> T callAndValidateMetrics(TestDelegationTokenSecretManager dtSecretManager,
|
private <T> T callAndValidateMetrics(TestDelegationTokenSecretManager dtSecretManager,
|
||||||
MutableRate metric, String statName, Callable<T> callable, int expectedCount)
|
MutableRate metric, String statName, Callable<T> callable)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
MeanStatistic stat = IOStatisticAssertions.lookupMeanStatistic(
|
MeanStatistic stat = IOStatisticAssertions.lookupMeanStatistic(
|
||||||
dtSecretManager.getMetrics().getIoStatistics(), statName + ".mean");
|
dtSecretManager.getMetrics().getIoStatistics(), statName + ".mean");
|
||||||
assertEquals(expectedCount - 1, metric.lastStat().numSamples());
|
long metricBefore = metric.lastStat().numSamples();
|
||||||
assertEquals(expectedCount - 1, stat.getSamples());
|
long statBefore = stat.getSamples();
|
||||||
T returnedObject = callable.call();
|
T returnedObject = callable.call();
|
||||||
assertEquals(expectedCount, metric.lastStat().numSamples());
|
assertEquals(metricBefore + 1, metric.lastStat().numSamples());
|
||||||
assertEquals(expectedCount, stat.getSamples());
|
assertEquals(statBefore + 1, stat.getSamples());
|
||||||
return returnedObject;
|
return returnedObject;
|
||||||
}
|
}
|
||||||
|
|
||||||
private <T> void callAndValidateFailureMetrics(TestDelegationTokenSecretManager dtSecretManager,
|
private <T> void callAndValidateFailureMetrics(TestDelegationTokenSecretManager dtSecretManager,
|
||||||
String statName, int expectedStatCount, int expectedMetricCount, boolean expectError,
|
String statName, boolean expectError, int errorSleepMillis, Callable<T> callable)
|
||||||
int errorSleepMillis, Callable<T> callable) throws Exception {
|
throws Exception {
|
||||||
MutableCounterLong counter = dtSecretManager.getMetrics().getTokenFailure();
|
MutableCounterLong counter = dtSecretManager.getMetrics().getTokenFailure();
|
||||||
MeanStatistic failureStat = IOStatisticAssertions.lookupMeanStatistic(
|
MeanStatistic failureStat = IOStatisticAssertions.lookupMeanStatistic(
|
||||||
dtSecretManager.getMetrics().getIoStatistics(), statName + ".failures.mean");
|
dtSecretManager.getMetrics().getIoStatistics(), statName + ".failures.mean");
|
||||||
assertEquals(expectedMetricCount - 1, counter.value());
|
long counterBefore = counter.value();
|
||||||
assertEquals(expectedStatCount - 1, failureStat.getSamples());
|
long statBefore = failureStat.getSamples();
|
||||||
if (expectError) {
|
if (expectError) {
|
||||||
LambdaTestUtils.intercept(IOException.class, callable);
|
LambdaTestUtils.intercept(IOException.class, callable);
|
||||||
} else {
|
} else {
|
||||||
callable.call();
|
callable.call();
|
||||||
}
|
}
|
||||||
assertEquals(expectedMetricCount, counter.value());
|
assertEquals(counterBefore + 1, counter.value());
|
||||||
assertEquals(expectedStatCount, failureStat.getSamples());
|
assertEquals(statBefore + 1, failureStat.getSamples());
|
||||||
assertTrue(failureStat.getSum() >= errorSleepMillis);
|
assertTrue(failureStat.getSum() >= errorSleepMillis);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user