HADOOP-18167. Add metrics to track delegation token secret manager op… (#4092)
* HADOOP-18167. Add metrics to track delegation token secret manager operations
This commit is contained in:
parent
f1e5f8e764
commit
d60262fe00
@ -36,7 +36,17 @@
|
|||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.fs.statistics.DurationTracker;
|
||||||
|
import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
|
||||||
|
import org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding;
|
||||||
|
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.metrics2.annotation.Metric;
|
||||||
|
import org.apache.hadoop.metrics2.annotation.Metrics;
|
||||||
|
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||||
|
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
|
||||||
|
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
|
||||||
|
import org.apache.hadoop.metrics2.lib.MutableRate;
|
||||||
import org.apache.hadoop.metrics2.util.Metrics2Util.NameValuePair;
|
import org.apache.hadoop.metrics2.util.Metrics2Util.NameValuePair;
|
||||||
import org.apache.hadoop.metrics2.util.Metrics2Util.TopN;
|
import org.apache.hadoop.metrics2.util.Metrics2Util.TopN;
|
||||||
import org.apache.hadoop.security.AccessControlException;
|
import org.apache.hadoop.security.AccessControlException;
|
||||||
@ -47,6 +57,7 @@
|
|||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
|
|
||||||
import org.apache.hadoop.util.Preconditions;
|
import org.apache.hadoop.util.Preconditions;
|
||||||
|
import org.apache.hadoop.util.functional.InvocationRaisingIOE;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
@ -96,6 +107,10 @@ private String formatTokenId(TokenIdent id) {
|
|||||||
* Access to currentKey is protected by this object lock
|
* Access to currentKey is protected by this object lock
|
||||||
*/
|
*/
|
||||||
private DelegationKey currentKey;
|
private DelegationKey currentKey;
|
||||||
|
/**
|
||||||
|
* Metrics to track token management operations.
|
||||||
|
*/
|
||||||
|
private DelegationTokenSecretManagerMetrics metrics;
|
||||||
|
|
||||||
private long keyUpdateInterval;
|
private long keyUpdateInterval;
|
||||||
private long tokenMaxLifetime;
|
private long tokenMaxLifetime;
|
||||||
@ -134,6 +149,7 @@ public AbstractDelegationTokenSecretManager(long delegationKeyUpdateInterval,
|
|||||||
this.tokenRenewInterval = delegationTokenRenewInterval;
|
this.tokenRenewInterval = delegationTokenRenewInterval;
|
||||||
this.tokenRemoverScanInterval = delegationTokenRemoverScanInterval;
|
this.tokenRemoverScanInterval = delegationTokenRemoverScanInterval;
|
||||||
this.storeTokenTrackingId = false;
|
this.storeTokenTrackingId = false;
|
||||||
|
this.metrics = DelegationTokenSecretManagerMetrics.create();
|
||||||
}
|
}
|
||||||
|
|
||||||
/** should be called before this object is used */
|
/** should be called before this object is used */
|
||||||
@ -430,14 +446,14 @@ protected synchronized byte[] createPassword(TokenIdent identifier) {
|
|||||||
DelegationTokenInformation tokenInfo = new DelegationTokenInformation(now
|
DelegationTokenInformation tokenInfo = new DelegationTokenInformation(now
|
||||||
+ tokenRenewInterval, password, getTrackingIdIfEnabled(identifier));
|
+ tokenRenewInterval, password, getTrackingIdIfEnabled(identifier));
|
||||||
try {
|
try {
|
||||||
storeToken(identifier, tokenInfo);
|
metrics.trackStoreToken(() -> storeToken(identifier, tokenInfo));
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
LOG.error("Could not store token " + formatTokenId(identifier) + "!!",
|
LOG.error("Could not store token " + formatTokenId(identifier) + "!!",
|
||||||
ioe);
|
ioe);
|
||||||
}
|
}
|
||||||
return password;
|
return password;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -555,7 +571,7 @@ public synchronized long renewToken(Token<TokenIdent> token,
|
|||||||
throw new InvalidToken("Renewal request for unknown token "
|
throw new InvalidToken("Renewal request for unknown token "
|
||||||
+ formatTokenId(id));
|
+ formatTokenId(id));
|
||||||
}
|
}
|
||||||
updateToken(id, info);
|
metrics.trackUpdateToken(() -> updateToken(id, info));
|
||||||
return renewTime;
|
return renewTime;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -591,8 +607,10 @@ public synchronized TokenIdent cancelToken(Token<TokenIdent> token,
|
|||||||
if (info == null) {
|
if (info == null) {
|
||||||
throw new InvalidToken("Token not found " + formatTokenId(id));
|
throw new InvalidToken("Token not found " + formatTokenId(id));
|
||||||
}
|
}
|
||||||
removeTokenForOwnerStats(id);
|
metrics.trackRemoveToken(() -> {
|
||||||
removeStoredToken(id);
|
removeTokenForOwnerStats(id);
|
||||||
|
removeStoredToken(id);
|
||||||
|
});
|
||||||
return id;
|
return id;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -825,4 +843,97 @@ protected void syncTokenOwnerStats() {
|
|||||||
addTokenForOwnerStats(id);
|
addTokenForOwnerStats(id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected DelegationTokenSecretManagerMetrics getMetrics() {
|
||||||
|
return metrics;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* DelegationTokenSecretManagerMetrics tracks token management operations
|
||||||
|
* and publishes them through the metrics interfaces.
|
||||||
|
*/
|
||||||
|
@Metrics(about="Delegation token secret manager metrics", context="token")
|
||||||
|
static class DelegationTokenSecretManagerMetrics implements DurationTrackerFactory {
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(
|
||||||
|
DelegationTokenSecretManagerMetrics.class);
|
||||||
|
|
||||||
|
final static String STORE_TOKEN_STAT = "storeToken";
|
||||||
|
final static String UPDATE_TOKEN_STAT = "updateToken";
|
||||||
|
final static String REMOVE_TOKEN_STAT = "removeToken";
|
||||||
|
final static String TOKEN_FAILURE_STAT = "tokenFailure";
|
||||||
|
|
||||||
|
private final MetricsRegistry registry;
|
||||||
|
private final IOStatisticsStore ioStatistics;
|
||||||
|
|
||||||
|
@Metric("Rate of storage of delegation tokens and latency (milliseconds)")
|
||||||
|
private MutableRate storeToken;
|
||||||
|
@Metric("Rate of update of delegation tokens and latency (milliseconds)")
|
||||||
|
private MutableRate updateToken;
|
||||||
|
@Metric("Rate of removal of delegation tokens and latency (milliseconds)")
|
||||||
|
private MutableRate removeToken;
|
||||||
|
@Metric("Counter of delegation tokens operation failures")
|
||||||
|
private MutableCounterLong tokenFailure;
|
||||||
|
|
||||||
|
static DelegationTokenSecretManagerMetrics create() {
|
||||||
|
return DefaultMetricsSystem.instance().register(new DelegationTokenSecretManagerMetrics());
|
||||||
|
}
|
||||||
|
|
||||||
|
DelegationTokenSecretManagerMetrics() {
|
||||||
|
ioStatistics = IOStatisticsBinding.iostatisticsStore()
|
||||||
|
.withDurationTracking(STORE_TOKEN_STAT, UPDATE_TOKEN_STAT, REMOVE_TOKEN_STAT)
|
||||||
|
.withCounters(TOKEN_FAILURE_STAT)
|
||||||
|
.build();
|
||||||
|
registry = new MetricsRegistry("DelegationTokenSecretManagerMetrics");
|
||||||
|
LOG.debug("Initialized {}", registry);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void trackStoreToken(InvocationRaisingIOE invocation) throws IOException {
|
||||||
|
trackInvocation(invocation, STORE_TOKEN_STAT, storeToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void trackUpdateToken(InvocationRaisingIOE invocation) throws IOException {
|
||||||
|
trackInvocation(invocation, UPDATE_TOKEN_STAT, updateToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void trackRemoveToken(InvocationRaisingIOE invocation) throws IOException {
|
||||||
|
trackInvocation(invocation, REMOVE_TOKEN_STAT, removeToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void trackInvocation(InvocationRaisingIOE invocation, String statistic,
|
||||||
|
MutableRate metric) throws IOException {
|
||||||
|
try {
|
||||||
|
long start = Time.monotonicNow();
|
||||||
|
IOStatisticsBinding.trackDurationOfInvocation(this, statistic, invocation);
|
||||||
|
metric.add(Time.monotonicNow() - start);
|
||||||
|
} catch (Exception ex) {
|
||||||
|
tokenFailure.incr();
|
||||||
|
throw ex;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public DurationTracker trackDuration(String key, long count) {
|
||||||
|
return ioStatistics.trackDuration(key, count);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected MutableRate getStoreToken() {
|
||||||
|
return storeToken;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected MutableRate getUpdateToken() {
|
||||||
|
return updateToken;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected MutableRate getRemoveToken() {
|
||||||
|
return removeToken;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected MutableCounterLong getTokenFailure() {
|
||||||
|
return tokenFailure;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected IOStatisticsStore getIoStatistics() {
|
||||||
|
return ioStatistics;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -30,6 +30,12 @@
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
|
import org.apache.hadoop.fs.statistics.IOStatisticAssertions;
|
||||||
|
import org.apache.hadoop.fs.statistics.MeanStatistic;
|
||||||
|
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
|
||||||
|
import org.apache.hadoop.metrics2.lib.MutableRate;
|
||||||
|
import org.apache.hadoop.test.LambdaTestUtils;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
|
||||||
import org.apache.hadoop.io.DataInputBuffer;
|
import org.apache.hadoop.io.DataInputBuffer;
|
||||||
@ -155,6 +161,55 @@ public DelegationKey getKey(TestDelegationTokenIdentifier id) {
|
|||||||
return allKeys.get(id.getMasterKeyId());
|
return allKeys.get(id.getMasterKeyId());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static class TestFailureDelegationTokenSecretManager
|
||||||
|
extends TestDelegationTokenSecretManager {
|
||||||
|
private boolean throwError = false;
|
||||||
|
private long errorSleepMillis;
|
||||||
|
|
||||||
|
public TestFailureDelegationTokenSecretManager(long errorSleepMillis) {
|
||||||
|
super(24*60*60*1000, 10*1000, 1*1000, 60*60*1000);
|
||||||
|
this.errorSleepMillis = errorSleepMillis;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setThrowError(boolean throwError) {
|
||||||
|
this.throwError = throwError;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void sleepAndThrow() throws IOException {
|
||||||
|
try {
|
||||||
|
Thread.sleep(errorSleepMillis);
|
||||||
|
throw new IOException("Test exception");
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void storeNewToken(TestDelegationTokenIdentifier ident, long renewDate)
|
||||||
|
throws IOException {
|
||||||
|
if (throwError) {
|
||||||
|
sleepAndThrow();
|
||||||
|
}
|
||||||
|
super.storeNewToken(ident, renewDate);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void removeStoredToken(TestDelegationTokenIdentifier ident) throws IOException {
|
||||||
|
if (throwError) {
|
||||||
|
sleepAndThrow();
|
||||||
|
}
|
||||||
|
super.removeStoredToken(ident);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void updateStoredToken(TestDelegationTokenIdentifier ident, long renewDate)
|
||||||
|
throws IOException {
|
||||||
|
if (throwError) {
|
||||||
|
sleepAndThrow();
|
||||||
|
}
|
||||||
|
super.updateStoredToken(ident, renewDate);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public static class TokenSelector extends
|
public static class TokenSelector extends
|
||||||
AbstractDelegationTokenSelector<TestDelegationTokenIdentifier>{
|
AbstractDelegationTokenSelector<TestDelegationTokenIdentifier>{
|
||||||
@ -579,4 +634,85 @@ public void testEmptyToken() throws IOException {
|
|||||||
assertEquals(token1, token2);
|
assertEquals(token1, token2);
|
||||||
assertEquals(token1.encodeToUrlString(), token2.encodeToUrlString());
|
assertEquals(token1.encodeToUrlString(), token2.encodeToUrlString());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDelegationTokenSecretManagerMetrics() throws Exception {
|
||||||
|
TestDelegationTokenSecretManager dtSecretManager =
|
||||||
|
new TestDelegationTokenSecretManager(24*60*60*1000,
|
||||||
|
10*1000, 1*1000, 60*60*1000);
|
||||||
|
try {
|
||||||
|
dtSecretManager.startThreads();
|
||||||
|
|
||||||
|
final Token<TestDelegationTokenIdentifier> token = callAndValidateMetrics(
|
||||||
|
dtSecretManager, dtSecretManager.getMetrics().getStoreToken(), "storeToken",
|
||||||
|
() -> generateDelegationToken(dtSecretManager, "SomeUser", "JobTracker"), 1);
|
||||||
|
|
||||||
|
callAndValidateMetrics(dtSecretManager, dtSecretManager.getMetrics().getUpdateToken(),
|
||||||
|
"updateToken", () -> dtSecretManager.renewToken(token, "JobTracker"), 1);
|
||||||
|
|
||||||
|
callAndValidateMetrics(dtSecretManager, dtSecretManager.getMetrics().getRemoveToken(),
|
||||||
|
"removeToken", () -> dtSecretManager.cancelToken(token, "JobTracker"), 1);
|
||||||
|
} finally {
|
||||||
|
dtSecretManager.stopThreads();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDelegationTokenSecretManagerMetricsFailures() throws Exception {
|
||||||
|
int errorSleepMillis = 200;
|
||||||
|
TestFailureDelegationTokenSecretManager dtSecretManager =
|
||||||
|
new TestFailureDelegationTokenSecretManager(errorSleepMillis);
|
||||||
|
|
||||||
|
try {
|
||||||
|
dtSecretManager.startThreads();
|
||||||
|
|
||||||
|
final Token<TestDelegationTokenIdentifier> token =
|
||||||
|
generateDelegationToken(dtSecretManager, "SomeUser", "JobTracker");
|
||||||
|
|
||||||
|
dtSecretManager.setThrowError(true);
|
||||||
|
|
||||||
|
callAndValidateFailureMetrics(dtSecretManager, "storeToken", 1, 1, false,
|
||||||
|
errorSleepMillis,
|
||||||
|
() -> generateDelegationToken(dtSecretManager, "SomeUser", "JobTracker"));
|
||||||
|
|
||||||
|
callAndValidateFailureMetrics(dtSecretManager, "updateToken", 1, 2, true,
|
||||||
|
errorSleepMillis, () -> dtSecretManager.renewToken(token, "JobTracker"));
|
||||||
|
|
||||||
|
callAndValidateFailureMetrics(dtSecretManager, "removeToken", 1, 3, true,
|
||||||
|
errorSleepMillis, () -> dtSecretManager.cancelToken(token, "JobTracker"));
|
||||||
|
} finally {
|
||||||
|
dtSecretManager.stopThreads();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private <T> T callAndValidateMetrics(TestDelegationTokenSecretManager dtSecretManager,
|
||||||
|
MutableRate metric, String statName, Callable<T> callable, int expectedCount)
|
||||||
|
throws Exception {
|
||||||
|
MeanStatistic stat = IOStatisticAssertions.lookupMeanStatistic(
|
||||||
|
dtSecretManager.getMetrics().getIoStatistics(), statName + ".mean");
|
||||||
|
assertEquals(expectedCount - 1, metric.lastStat().numSamples());
|
||||||
|
assertEquals(expectedCount - 1, stat.getSamples());
|
||||||
|
T returnedObject = callable.call();
|
||||||
|
assertEquals(expectedCount, metric.lastStat().numSamples());
|
||||||
|
assertEquals(expectedCount, stat.getSamples());
|
||||||
|
return returnedObject;
|
||||||
|
}
|
||||||
|
|
||||||
|
private <T> void callAndValidateFailureMetrics(TestDelegationTokenSecretManager dtSecretManager,
|
||||||
|
String statName, int expectedStatCount, int expectedMetricCount, boolean expectError,
|
||||||
|
int errorSleepMillis, Callable<T> callable) throws Exception {
|
||||||
|
MutableCounterLong counter = dtSecretManager.getMetrics().getTokenFailure();
|
||||||
|
MeanStatistic failureStat = IOStatisticAssertions.lookupMeanStatistic(
|
||||||
|
dtSecretManager.getMetrics().getIoStatistics(), statName + ".failures.mean");
|
||||||
|
assertEquals(expectedMetricCount - 1, counter.value());
|
||||||
|
assertEquals(expectedStatCount - 1, failureStat.getSamples());
|
||||||
|
if (expectError) {
|
||||||
|
LambdaTestUtils.intercept(IOException.class, callable);
|
||||||
|
} else {
|
||||||
|
callable.call();
|
||||||
|
}
|
||||||
|
assertEquals(expectedMetricCount, counter.value());
|
||||||
|
assertEquals(expectedStatCount, failureStat.getSamples());
|
||||||
|
assertTrue(failureStat.getSum() >= errorSleepMillis);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user