From ad2f45c64f01a520a01f6876d8493140b9f89b03 Mon Sep 17 00:00:00 2001 From: hchaverri <55413673+hchaverri@users.noreply.github.com> Date: Fri, 11 Aug 2023 13:04:32 -0700 Subject: [PATCH] HDFS-17148. RBF: SQLDelegationTokenSecretManager must cleanup expired tokens in SQL (#5936) --- .../AbstractDelegationTokenSecretManager.java | 14 +++- .../SQLDelegationTokenSecretManager.java | 45 +++++++++++ .../SQLDelegationTokenSecretManagerImpl.java | 24 ++++++ ...stSQLDelegationTokenSecretManagerImpl.java | 79 +++++++++++++++++-- 4 files changed, 154 insertions(+), 8 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java index cf5e9a6631..283e773c81 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java @@ -190,6 +190,14 @@ public long getCurrentTokensSize() { return currentTokens.size(); } + /** + * Interval for tokens to be renewed. + * @return Renew interval in milliseconds. + */ + protected long getTokenRenewInterval() { + return this.tokenRenewInterval; + } + /** * Add a previously used master key to cache (when NN restarts), * should be called before activate(). @@ -751,7 +759,7 @@ private void removeExpiredToken() throws IOException { Set expiredTokens = new HashSet<>(); synchronized (this) { Iterator> i = - currentTokens.entrySet().iterator(); + getCandidateTokensForCleanup().entrySet().iterator(); while (i.hasNext()) { Map.Entry entry = i.next(); long renewDate = entry.getValue().getRenewDate(); @@ -766,6 +774,10 @@ private void removeExpiredToken() throws IOException { logExpireTokens(expiredTokens); } + protected Map getCandidateTokensForCleanup() { + return this.currentTokens; + } + protected void logExpireTokens( Collection expiredTokens) throws IOException { for (TokenIdent ident : expiredTokens) { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/SQLDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/SQLDelegationTokenSecretManager.java index 75f00d3f92..d2c41f31d1 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/SQLDelegationTokenSecretManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/SQLDelegationTokenSecretManager.java @@ -24,6 +24,8 @@ import java.io.DataOutputStream; import java.io.IOException; import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; import java.util.NoSuchElementException; import java.util.concurrent.TimeUnit; import org.apache.hadoop.classification.VisibleForTesting; @@ -50,6 +52,9 @@ public abstract class SQLDelegationTokenSecretManager token, return super.cancelToken(token, canceller); } + /** + * Obtain a list of tokens that will be considered for cleanup, based on the last + * time the token was updated in SQL. This list may include tokens that are not + * expired and should not be deleted (e.g. if the token was last renewed using a + * higher renewal interval). + * The number of results is limited to reduce performance impact. Some level of + * contention is expected when multiple routers run cleanup simultaneously. + * @return Map of tokens that have not been updated in SQL after the token renewal + * period. + */ + @Override + protected Map getCandidateTokensForCleanup() { + Map tokens = new HashMap<>(); + try { + // Query SQL for tokens that haven't been updated after + // the last token renewal period. + long maxModifiedTime = Time.now() - getTokenRenewInterval(); + Map tokenInfoBytesList = selectStaleTokenInfos(maxModifiedTime, + this.maxTokenCleanupResults); + + LOG.info("Found {} tokens for cleanup", tokenInfoBytesList.size()); + for (Map.Entry tokenInfoBytes : tokenInfoBytesList.entrySet()) { + TokenIdent tokenIdent = createTokenIdent(tokenInfoBytes.getKey()); + DelegationTokenInformation tokenInfo = createTokenInfo(tokenInfoBytes.getValue()); + tokens.put(tokenIdent, tokenInfo); + } + } catch (IOException | SQLException e) { + LOG.error("Failed to get candidate tokens for cleanup in SQL secret manager", e); + } + + return tokens; + } + /** * Removes the existing TokenInformation from the SQL database to * invalidate it. @@ -415,6 +458,8 @@ public int incrementCurrentKeyId() { // Token operations in SQL database protected abstract byte[] selectTokenInfo(int sequenceNum, byte[] tokenIdentifier) throws SQLException; + protected abstract Map selectStaleTokenInfos(long maxModifiedTime, + int maxResults) throws SQLException; protected abstract void insertToken(int sequenceNum, byte[] tokenIdentifier, byte[] tokenInfo) throws SQLException; protected abstract void updateToken(int sequenceNum, byte[] tokenIdentifier, byte[] tokenInfo) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/SQLDelegationTokenSecretManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/SQLDelegationTokenSecretManagerImpl.java index 7da54778f3..e85baae0c3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/SQLDelegationTokenSecretManagerImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/SQLDelegationTokenSecretManagerImpl.java @@ -23,6 +23,9 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.Timestamp; +import java.util.HashMap; +import java.util.Map; import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; @@ -150,6 +153,27 @@ protected byte[] selectTokenInfo(int sequenceNum, byte[] tokenIdentifier) throws }); } + @Override + protected Map selectStaleTokenInfos(long maxModifiedTime, int maxResults) + throws SQLException { + return retryHandler.execute(() -> { + try (Connection connection = connectionFactory.getConnection(); + PreparedStatement statement = connection.prepareStatement( + "SELECT tokenIdentifier, tokenInfo FROM Tokens WHERE modifiedTime < ?")) { + statement.setTimestamp(1, new Timestamp(maxModifiedTime)); + statement.setMaxRows(maxResults); + try (ResultSet result = statement.executeQuery()) { + Map results = new HashMap<>(); + while (result.next()) { + results.put(result.getBytes("tokenIdentifier"), + result.getBytes("tokenInfo")); + } + return results; + } + } + }); + } + @Override protected void insertDelegationKey(int keyId, byte[] delegationKey) throws SQLException { retryHandler.execute(() -> { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/security/token/TestSQLDelegationTokenSecretManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/security/token/TestSQLDelegationTokenSecretManagerImpl.java index dbbb85662c..d82be8f5cb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/security/token/TestSQLDelegationTokenSecretManagerImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/security/token/TestSQLDelegationTokenSecretManagerImpl.java @@ -30,12 +30,15 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier; +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager; import org.apache.hadoop.security.token.delegation.SQLDelegationTokenSecretManager; +import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier; import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.LambdaTestUtils; @@ -52,6 +55,7 @@ public class TestSQLDelegationTokenSecretManagerImpl { private static final String CONNECTION_URL = "jdbc:derby:memory:TokenStore"; private static final int TEST_MAX_RETRIES = 3; private static final int TOKEN_EXPIRATION_SECONDS = 1; + private static final int TOKEN_EXPIRATION_SCAN_SECONDS = 1; private static Configuration conf; @Before @@ -75,6 +79,7 @@ public static void initDatabase() throws SQLException { conf.set(SQLConnectionFactory.CONNECTION_DRIVER, "org.apache.derby.jdbc.EmbeddedDriver"); conf.setInt(SQLSecretManagerRetriableHandlerImpl.MAX_RETRIES, TEST_MAX_RETRIES); conf.setInt(SQLSecretManagerRetriableHandlerImpl.RETRY_SLEEP_TIME_MS, 10); + conf.setInt(DelegationTokenManager.REMOVAL_SCAN_INTERVAL, TOKEN_EXPIRATION_SCAN_SECONDS); } @AfterClass @@ -188,6 +193,62 @@ public void testRenewToken() throws Exception { } } + @Test + public void testRemoveExpiredTokens() throws Exception { + DelegationTokenManager tokenManager = createTokenManager(getShortLivedTokenConf()); + + try { + TestDelegationTokenSecretManager secretManager = + (TestDelegationTokenSecretManager) tokenManager.getDelegationTokenSecretManager(); + + // Create token to be constantly renewed. + Token token1 = tokenManager.createToken(UserGroupInformation.getCurrentUser(), "foo"); + AbstractDelegationTokenIdentifier tokenId1 = + (AbstractDelegationTokenIdentifier) token1.decodeIdentifier(); + + // Create token expected to expire soon. + long expirationTime2 = Time.now(); + AbstractDelegationTokenIdentifier tokenId2 = storeToken(secretManager, 2, expirationTime2); + + // Create token not expected to expire soon. + long expirationTime3 = Time.now() + TimeUnit.SECONDS.toMillis(TOKEN_EXPIRATION_SECONDS) * 10; + AbstractDelegationTokenIdentifier tokenId3 = storeToken(secretManager, 3, expirationTime3); + + GenericTestUtils.waitFor(() -> { + try { + // Constantly renew token so it doesn't expire. + tokenManager.renewToken(token1, "foo"); + + // Wait for cleanup to happen so expired token is deleted from SQL. + return !isTokenInSQL(secretManager, tokenId2); + } catch (IOException | SQLException e) { + throw new RuntimeException(e); + } + }, 100, 6000); + + Assert.assertTrue("Renewed token must not be cleaned up", + isTokenInSQL(secretManager, tokenId1)); + Assert.assertTrue("Token with future expiration must not be cleaned up", + isTokenInSQL(secretManager, tokenId3)); + } finally { + stopTokenManager(tokenManager); + } + } + + private AbstractDelegationTokenIdentifier storeToken( + TestDelegationTokenSecretManager secretManager, int sequenceNum, long expirationTime) + throws IOException { + AbstractDelegationTokenIdentifier tokenId = new DelegationTokenIdentifier(new Text("Test")); + tokenId.setOwner(new Text("foo")); + tokenId.setSequenceNumber(sequenceNum); + + AbstractDelegationTokenSecretManager.DelegationTokenInformation tokenInfo = + new AbstractDelegationTokenSecretManager.DelegationTokenInformation(expirationTime, null); + secretManager.storeToken(tokenId, tokenInfo); + + return tokenId; + } + private Configuration getShortLivedTokenConf() { Configuration shortLivedConf = new Configuration(conf); shortLivedConf.setTimeDuration( @@ -201,13 +262,12 @@ private void callRemoveExpiredTokensAndValidateSQL( TestDelegationTokenSecretManager secretManager, AbstractDelegationTokenIdentifier tokenId, boolean expectedInSQL) throws SQLException { secretManager.removeExpiredStoredToken(tokenId); - byte[] tokenInfo = secretManager.selectTokenInfo(tokenId.getSequenceNumber(), - tokenId.getBytes()); - if (expectedInSQL) { - Assert.assertNotNull("Verify token exists in database", tokenInfo); - } else { - Assert.assertNull("Verify token was removed from database", tokenInfo); - } + Assert.assertEquals(expectedInSQL, isTokenInSQL(secretManager, tokenId)); + } + + private boolean isTokenInSQL(TestDelegationTokenSecretManager secretManager, + AbstractDelegationTokenIdentifier tokenId) throws SQLException { + return secretManager.selectTokenInfo(tokenId.getSequenceNumber(), tokenId.getBytes()) != null; } @Test @@ -542,6 +602,11 @@ public void removeExpiredStoredToken(TokenIdentifier tokenId) { super.removeExpiredStoredToken((AbstractDelegationTokenIdentifier) tokenId); } + public void storeToken(AbstractDelegationTokenIdentifier ident, + DelegationTokenInformation tokenInfo) throws IOException { + super.storeToken(ident, tokenInfo); + } + public void setReadOnly(boolean readOnly) { ((TestConnectionFactory) getConnectionFactory()).readOnly = readOnly; }