HDFS-17148. RBF: SQLDelegationTokenSecretManager must cleanup expired tokens in SQL (#5936)
This commit is contained in:
parent
59f3a16819
commit
ad2f45c64f
@ -190,6 +190,14 @@ public long getCurrentTokensSize() {
|
||||
return currentTokens.size();
|
||||
}
|
||||
|
||||
/**
|
||||
* Interval for tokens to be renewed.
|
||||
* @return Renew interval in milliseconds.
|
||||
*/
|
||||
protected long getTokenRenewInterval() {
|
||||
return this.tokenRenewInterval;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a previously used master key to cache (when NN restarts),
|
||||
* should be called before activate().
|
||||
@ -751,7 +759,7 @@ private void removeExpiredToken() throws IOException {
|
||||
Set<TokenIdent> expiredTokens = new HashSet<>();
|
||||
synchronized (this) {
|
||||
Iterator<Map.Entry<TokenIdent, DelegationTokenInformation>> i =
|
||||
currentTokens.entrySet().iterator();
|
||||
getCandidateTokensForCleanup().entrySet().iterator();
|
||||
while (i.hasNext()) {
|
||||
Map.Entry<TokenIdent, DelegationTokenInformation> entry = i.next();
|
||||
long renewDate = entry.getValue().getRenewDate();
|
||||
@ -766,6 +774,10 @@ private void removeExpiredToken() throws IOException {
|
||||
logExpireTokens(expiredTokens);
|
||||
}
|
||||
|
||||
protected Map<TokenIdent, DelegationTokenInformation> getCandidateTokensForCleanup() {
|
||||
return this.currentTokens;
|
||||
}
|
||||
|
||||
protected void logExpireTokens(
|
||||
Collection<TokenIdent> expiredTokens) throws IOException {
|
||||
for (TokenIdent ident : expiredTokens) {
|
||||
|
@ -24,6 +24,8 @@
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.sql.SQLException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.hadoop.classification.VisibleForTesting;
|
||||
@ -50,6 +52,9 @@ public abstract class SQLDelegationTokenSecretManager<TokenIdent
|
||||
private static final String SQL_DTSM_TOKEN_SEQNUM_BATCH_SIZE = SQL_DTSM_CONF_PREFIX
|
||||
+ "token.seqnum.batch.size";
|
||||
public static final int DEFAULT_SEQ_NUM_BATCH_SIZE = 10;
|
||||
public static final String SQL_DTSM_TOKEN_MAX_CLEANUP_RESULTS = SQL_DTSM_CONF_PREFIX
|
||||
+ "token.max.cleanup.results";
|
||||
public static final int SQL_DTSM_TOKEN_MAX_CLEANUP_RESULTS_DEFAULT = 1000;
|
||||
public static final String SQL_DTSM_TOKEN_LOADING_CACHE_EXPIRATION = SQL_DTSM_CONF_PREFIX
|
||||
+ "token.loading.cache.expiration";
|
||||
public static final long SQL_DTSM_TOKEN_LOADING_CACHE_EXPIRATION_DEFAULT =
|
||||
@ -63,6 +68,9 @@ public abstract class SQLDelegationTokenSecretManager<TokenIdent
|
||||
// exhausted, including during initialization.
|
||||
private final int seqNumBatchSize;
|
||||
|
||||
// Number of tokens to obtain from SQL during the cleanup process.
|
||||
private final int maxTokenCleanupResults;
|
||||
|
||||
// Last sequenceNum in the current batch that has been allocated to a token.
|
||||
private int currentSeqNum;
|
||||
|
||||
@ -82,6 +90,8 @@ public SQLDelegationTokenSecretManager(Configuration conf) {
|
||||
|
||||
this.seqNumBatchSize = conf.getInt(SQL_DTSM_TOKEN_SEQNUM_BATCH_SIZE,
|
||||
DEFAULT_SEQ_NUM_BATCH_SIZE);
|
||||
this.maxTokenCleanupResults = conf.getInt(SQL_DTSM_TOKEN_MAX_CLEANUP_RESULTS,
|
||||
SQL_DTSM_TOKEN_MAX_CLEANUP_RESULTS_DEFAULT);
|
||||
|
||||
long cacheExpirationMs = conf.getTimeDuration(SQL_DTSM_TOKEN_LOADING_CACHE_EXPIRATION,
|
||||
SQL_DTSM_TOKEN_LOADING_CACHE_EXPIRATION_DEFAULT, TimeUnit.MILLISECONDS);
|
||||
@ -153,6 +163,39 @@ public synchronized TokenIdent cancelToken(Token<TokenIdent> token,
|
||||
return super.cancelToken(token, canceller);
|
||||
}
|
||||
|
||||
/**
|
||||
* Obtain a list of tokens that will be considered for cleanup, based on the last
|
||||
* time the token was updated in SQL. This list may include tokens that are not
|
||||
* expired and should not be deleted (e.g. if the token was last renewed using a
|
||||
* higher renewal interval).
|
||||
* The number of results is limited to reduce performance impact. Some level of
|
||||
* contention is expected when multiple routers run cleanup simultaneously.
|
||||
* @return Map of tokens that have not been updated in SQL after the token renewal
|
||||
* period.
|
||||
*/
|
||||
@Override
|
||||
protected Map<TokenIdent, DelegationTokenInformation> getCandidateTokensForCleanup() {
|
||||
Map<TokenIdent, DelegationTokenInformation> tokens = new HashMap<>();
|
||||
try {
|
||||
// Query SQL for tokens that haven't been updated after
|
||||
// the last token renewal period.
|
||||
long maxModifiedTime = Time.now() - getTokenRenewInterval();
|
||||
Map<byte[], byte[]> tokenInfoBytesList = selectStaleTokenInfos(maxModifiedTime,
|
||||
this.maxTokenCleanupResults);
|
||||
|
||||
LOG.info("Found {} tokens for cleanup", tokenInfoBytesList.size());
|
||||
for (Map.Entry<byte[], byte[]> tokenInfoBytes : tokenInfoBytesList.entrySet()) {
|
||||
TokenIdent tokenIdent = createTokenIdent(tokenInfoBytes.getKey());
|
||||
DelegationTokenInformation tokenInfo = createTokenInfo(tokenInfoBytes.getValue());
|
||||
tokens.put(tokenIdent, tokenInfo);
|
||||
}
|
||||
} catch (IOException | SQLException e) {
|
||||
LOG.error("Failed to get candidate tokens for cleanup in SQL secret manager", e);
|
||||
}
|
||||
|
||||
return tokens;
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes the existing TokenInformation from the SQL database to
|
||||
* invalidate it.
|
||||
@ -415,6 +458,8 @@ public int incrementCurrentKeyId() {
|
||||
// Token operations in SQL database
|
||||
protected abstract byte[] selectTokenInfo(int sequenceNum, byte[] tokenIdentifier)
|
||||
throws SQLException;
|
||||
protected abstract Map<byte[], byte[]> selectStaleTokenInfos(long maxModifiedTime,
|
||||
int maxResults) throws SQLException;
|
||||
protected abstract void insertToken(int sequenceNum, byte[] tokenIdentifier, byte[] tokenInfo)
|
||||
throws SQLException;
|
||||
protected abstract void updateToken(int sequenceNum, byte[] tokenIdentifier, byte[] tokenInfo)
|
||||
|
@ -23,6 +23,9 @@
|
||||
import java.sql.PreparedStatement;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Timestamp;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import org.apache.hadoop.classification.VisibleForTesting;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||
@ -150,6 +153,27 @@ protected byte[] selectTokenInfo(int sequenceNum, byte[] tokenIdentifier) throws
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Map<byte[], byte[]> selectStaleTokenInfos(long maxModifiedTime, int maxResults)
|
||||
throws SQLException {
|
||||
return retryHandler.execute(() -> {
|
||||
try (Connection connection = connectionFactory.getConnection();
|
||||
PreparedStatement statement = connection.prepareStatement(
|
||||
"SELECT tokenIdentifier, tokenInfo FROM Tokens WHERE modifiedTime < ?")) {
|
||||
statement.setTimestamp(1, new Timestamp(maxModifiedTime));
|
||||
statement.setMaxRows(maxResults);
|
||||
try (ResultSet result = statement.executeQuery()) {
|
||||
Map<byte[], byte[]> results = new HashMap<>();
|
||||
while (result.next()) {
|
||||
results.put(result.getBytes("tokenIdentifier"),
|
||||
result.getBytes("tokenInfo"));
|
||||
}
|
||||
return results;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void insertDelegationKey(int keyId, byte[] delegationKey) throws SQLException {
|
||||
retryHandler.execute(() -> {
|
||||
|
@ -30,12 +30,15 @@
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.SecretManager;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
|
||||
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
|
||||
import org.apache.hadoop.security.token.delegation.SQLDelegationTokenSecretManager;
|
||||
import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
|
||||
import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.test.LambdaTestUtils;
|
||||
@ -52,6 +55,7 @@ public class TestSQLDelegationTokenSecretManagerImpl {
|
||||
private static final String CONNECTION_URL = "jdbc:derby:memory:TokenStore";
|
||||
private static final int TEST_MAX_RETRIES = 3;
|
||||
private static final int TOKEN_EXPIRATION_SECONDS = 1;
|
||||
private static final int TOKEN_EXPIRATION_SCAN_SECONDS = 1;
|
||||
private static Configuration conf;
|
||||
|
||||
@Before
|
||||
@ -75,6 +79,7 @@ public static void initDatabase() throws SQLException {
|
||||
conf.set(SQLConnectionFactory.CONNECTION_DRIVER, "org.apache.derby.jdbc.EmbeddedDriver");
|
||||
conf.setInt(SQLSecretManagerRetriableHandlerImpl.MAX_RETRIES, TEST_MAX_RETRIES);
|
||||
conf.setInt(SQLSecretManagerRetriableHandlerImpl.RETRY_SLEEP_TIME_MS, 10);
|
||||
conf.setInt(DelegationTokenManager.REMOVAL_SCAN_INTERVAL, TOKEN_EXPIRATION_SCAN_SECONDS);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
@ -188,6 +193,62 @@ public void testRenewToken() throws Exception {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRemoveExpiredTokens() throws Exception {
|
||||
DelegationTokenManager tokenManager = createTokenManager(getShortLivedTokenConf());
|
||||
|
||||
try {
|
||||
TestDelegationTokenSecretManager secretManager =
|
||||
(TestDelegationTokenSecretManager) tokenManager.getDelegationTokenSecretManager();
|
||||
|
||||
// Create token to be constantly renewed.
|
||||
Token token1 = tokenManager.createToken(UserGroupInformation.getCurrentUser(), "foo");
|
||||
AbstractDelegationTokenIdentifier tokenId1 =
|
||||
(AbstractDelegationTokenIdentifier) token1.decodeIdentifier();
|
||||
|
||||
// Create token expected to expire soon.
|
||||
long expirationTime2 = Time.now();
|
||||
AbstractDelegationTokenIdentifier tokenId2 = storeToken(secretManager, 2, expirationTime2);
|
||||
|
||||
// Create token not expected to expire soon.
|
||||
long expirationTime3 = Time.now() + TimeUnit.SECONDS.toMillis(TOKEN_EXPIRATION_SECONDS) * 10;
|
||||
AbstractDelegationTokenIdentifier tokenId3 = storeToken(secretManager, 3, expirationTime3);
|
||||
|
||||
GenericTestUtils.waitFor(() -> {
|
||||
try {
|
||||
// Constantly renew token so it doesn't expire.
|
||||
tokenManager.renewToken(token1, "foo");
|
||||
|
||||
// Wait for cleanup to happen so expired token is deleted from SQL.
|
||||
return !isTokenInSQL(secretManager, tokenId2);
|
||||
} catch (IOException | SQLException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}, 100, 6000);
|
||||
|
||||
Assert.assertTrue("Renewed token must not be cleaned up",
|
||||
isTokenInSQL(secretManager, tokenId1));
|
||||
Assert.assertTrue("Token with future expiration must not be cleaned up",
|
||||
isTokenInSQL(secretManager, tokenId3));
|
||||
} finally {
|
||||
stopTokenManager(tokenManager);
|
||||
}
|
||||
}
|
||||
|
||||
private AbstractDelegationTokenIdentifier storeToken(
|
||||
TestDelegationTokenSecretManager secretManager, int sequenceNum, long expirationTime)
|
||||
throws IOException {
|
||||
AbstractDelegationTokenIdentifier tokenId = new DelegationTokenIdentifier(new Text("Test"));
|
||||
tokenId.setOwner(new Text("foo"));
|
||||
tokenId.setSequenceNumber(sequenceNum);
|
||||
|
||||
AbstractDelegationTokenSecretManager.DelegationTokenInformation tokenInfo =
|
||||
new AbstractDelegationTokenSecretManager.DelegationTokenInformation(expirationTime, null);
|
||||
secretManager.storeToken(tokenId, tokenInfo);
|
||||
|
||||
return tokenId;
|
||||
}
|
||||
|
||||
private Configuration getShortLivedTokenConf() {
|
||||
Configuration shortLivedConf = new Configuration(conf);
|
||||
shortLivedConf.setTimeDuration(
|
||||
@ -201,13 +262,12 @@ private void callRemoveExpiredTokensAndValidateSQL(
|
||||
TestDelegationTokenSecretManager secretManager, AbstractDelegationTokenIdentifier tokenId,
|
||||
boolean expectedInSQL) throws SQLException {
|
||||
secretManager.removeExpiredStoredToken(tokenId);
|
||||
byte[] tokenInfo = secretManager.selectTokenInfo(tokenId.getSequenceNumber(),
|
||||
tokenId.getBytes());
|
||||
if (expectedInSQL) {
|
||||
Assert.assertNotNull("Verify token exists in database", tokenInfo);
|
||||
} else {
|
||||
Assert.assertNull("Verify token was removed from database", tokenInfo);
|
||||
}
|
||||
Assert.assertEquals(expectedInSQL, isTokenInSQL(secretManager, tokenId));
|
||||
}
|
||||
|
||||
private boolean isTokenInSQL(TestDelegationTokenSecretManager secretManager,
|
||||
AbstractDelegationTokenIdentifier tokenId) throws SQLException {
|
||||
return secretManager.selectTokenInfo(tokenId.getSequenceNumber(), tokenId.getBytes()) != null;
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -542,6 +602,11 @@ public void removeExpiredStoredToken(TokenIdentifier tokenId) {
|
||||
super.removeExpiredStoredToken((AbstractDelegationTokenIdentifier) tokenId);
|
||||
}
|
||||
|
||||
public void storeToken(AbstractDelegationTokenIdentifier ident,
|
||||
DelegationTokenInformation tokenInfo) throws IOException {
|
||||
super.storeToken(ident, tokenInfo);
|
||||
}
|
||||
|
||||
public void setReadOnly(boolean readOnly) {
|
||||
((TestConnectionFactory) getConnectionFactory()).readOnly = readOnly;
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user