Revert "HADOOP-18851: Performance improvement for DelegationTokenSecretManager. (#6001). Contributed by Vikas Kumar."

This reverts commit e283375cdf.
This commit is contained in:
Sammi Chen 2024-05-07 13:22:45 +08:00
parent edf985e269
commit 43e8ca428e
2 changed files with 33 additions and 39 deletions

View File

@ -120,12 +120,12 @@ private String formatTokenId(TokenIdent id) {
/** /**
* Access to currentKey is protected by this object lock * Access to currentKey is protected by this object lock
*/ */
private volatile DelegationKey currentKey; private DelegationKey currentKey;
private final long keyUpdateInterval; private long keyUpdateInterval;
private final long tokenMaxLifetime; private long tokenMaxLifetime;
private final long tokenRemoverScanInterval; private long tokenRemoverScanInterval;
private final long tokenRenewInterval; private long tokenRenewInterval;
/** /**
* Whether to store a token's tracking ID in its TokenInformation. * Whether to store a token's tracking ID in its TokenInformation.
* Can be overridden by a subclass. * Can be overridden by a subclass.
@ -491,18 +491,17 @@ private synchronized void removeExpiredKeys() {
} }
@Override @Override
protected byte[] createPassword(TokenIdent identifier) { protected synchronized byte[] createPassword(TokenIdent identifier) {
int sequenceNum; int sequenceNum;
long now = Time.now(); long now = Time.now();
sequenceNum = incrementDelegationTokenSeqNum(); sequenceNum = incrementDelegationTokenSeqNum();
identifier.setIssueDate(now); identifier.setIssueDate(now);
identifier.setMaxDate(now + tokenMaxLifetime); identifier.setMaxDate(now + tokenMaxLifetime);
DelegationKey delegationCurrentKey = currentKey; identifier.setMasterKeyId(currentKey.getKeyId());
identifier.setMasterKeyId(delegationCurrentKey.getKeyId());
identifier.setSequenceNumber(sequenceNum); identifier.setSequenceNumber(sequenceNum);
LOG.info("Creating password for identifier: " + formatTokenId(identifier) LOG.info("Creating password for identifier: " + formatTokenId(identifier)
+ ", currentKey: " + delegationCurrentKey.getKeyId()); + ", currentKey: " + currentKey.getKeyId());
byte[] password = createPassword(identifier.getBytes(), delegationCurrentKey.getKey()); byte[] password = createPassword(identifier.getBytes(), currentKey.getKey());
DelegationTokenInformation tokenInfo = new DelegationTokenInformation(now DelegationTokenInformation tokenInfo = new DelegationTokenInformation(now
+ tokenRenewInterval, password, getTrackingIdIfEnabled(identifier)); + tokenRenewInterval, password, getTrackingIdIfEnabled(identifier));
try { try {
@ -527,6 +526,7 @@ protected byte[] createPassword(TokenIdent identifier) {
*/ */
protected DelegationTokenInformation checkToken(TokenIdent identifier) protected DelegationTokenInformation checkToken(TokenIdent identifier)
throws InvalidToken { throws InvalidToken {
assert Thread.holdsLock(this);
DelegationTokenInformation info = getTokenInfo(identifier); DelegationTokenInformation info = getTokenInfo(identifier);
String err; String err;
if (info == null) { if (info == null) {
@ -546,7 +546,7 @@ protected DelegationTokenInformation checkToken(TokenIdent identifier)
} }
@Override @Override
public byte[] retrievePassword(TokenIdent identifier) public synchronized byte[] retrievePassword(TokenIdent identifier)
throws InvalidToken { throws InvalidToken {
return checkToken(identifier).getPassword(); return checkToken(identifier).getPassword();
} }
@ -558,7 +558,7 @@ protected String getTrackingIdIfEnabled(TokenIdent ident) {
return null; return null;
} }
public String getTokenTrackingId(TokenIdent identifier) { public synchronized String getTokenTrackingId(TokenIdent identifier) {
DelegationTokenInformation info = getTokenInfo(identifier); DelegationTokenInformation info = getTokenInfo(identifier);
if (info == null) { if (info == null) {
return null; return null;
@ -572,7 +572,7 @@ public String getTokenTrackingId(TokenIdent identifier) {
* @param password Password in the token. * @param password Password in the token.
* @throws InvalidToken InvalidToken. * @throws InvalidToken InvalidToken.
*/ */
public void verifyToken(TokenIdent identifier, byte[] password) public synchronized void verifyToken(TokenIdent identifier, byte[] password)
throws InvalidToken { throws InvalidToken {
byte[] storedPassword = retrievePassword(identifier); byte[] storedPassword = retrievePassword(identifier);
if (!MessageDigest.isEqual(password, storedPassword)) { if (!MessageDigest.isEqual(password, storedPassword)) {
@ -589,7 +589,7 @@ public void verifyToken(TokenIdent identifier, byte[] password)
* @throws InvalidToken if the token is invalid * @throws InvalidToken if the token is invalid
* @throws AccessControlException if the user can't renew token * @throws AccessControlException if the user can't renew token
*/ */
public long renewToken(Token<TokenIdent> token, public synchronized long renewToken(Token<TokenIdent> token,
String renewer) throws InvalidToken, IOException { String renewer) throws InvalidToken, IOException {
ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier()); ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
DataInputStream in = new DataInputStream(buf); DataInputStream in = new DataInputStream(buf);
@ -651,7 +651,7 @@ public long renewToken(Token<TokenIdent> token,
* @throws InvalidToken for invalid token * @throws InvalidToken for invalid token
* @throws AccessControlException if the user isn't allowed to cancel * @throws AccessControlException if the user isn't allowed to cancel
*/ */
public TokenIdent cancelToken(Token<TokenIdent> token, public synchronized TokenIdent cancelToken(Token<TokenIdent> token,
String canceller) throws IOException { String canceller) throws IOException {
ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier()); ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
DataInputStream in = new DataInputStream(buf); DataInputStream in = new DataInputStream(buf);

View File

@ -25,7 +25,6 @@
import java.io.IOException; import java.io.IOException;
import java.io.UncheckedIOException; import java.io.UncheckedIOException;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.apache.curator.RetryPolicy; import org.apache.curator.RetryPolicy;
@ -153,7 +152,7 @@ protected static CuratorFramework getCurator() {
private final int seqNumBatchSize; private final int seqNumBatchSize;
private int currentSeqNum; private int currentSeqNum;
private int currentMaxSeqNum; private int currentMaxSeqNum;
private final ReentrantLock currentSeqNumLock;
private final boolean isTokenWatcherEnabled; private final boolean isTokenWatcherEnabled;
public ZKDelegationTokenSecretManager(Configuration conf) { public ZKDelegationTokenSecretManager(Configuration conf) {
@ -169,8 +168,7 @@ public ZKDelegationTokenSecretManager(Configuration conf) {
ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE_DEFAULT); ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE_DEFAULT);
isTokenWatcherEnabled = conf.getBoolean(ZK_DTSM_TOKEN_WATCHER_ENABLED, isTokenWatcherEnabled = conf.getBoolean(ZK_DTSM_TOKEN_WATCHER_ENABLED,
ZK_DTSM_TOKEN_WATCHER_ENABLED_DEFAULT); ZK_DTSM_TOKEN_WATCHER_ENABLED_DEFAULT);
this.currentSeqNumLock = new ReentrantLock(true);
String workPath = conf.get(ZK_DTSM_ZNODE_WORKING_PATH, ZK_DTSM_ZNODE_WORKING_PATH_DEAFULT); String workPath = conf.get(ZK_DTSM_ZNODE_WORKING_PATH, ZK_DTSM_ZNODE_WORKING_PATH_DEAFULT);
String nameSpace = workPath + "/" + ZK_DTSM_NAMESPACE; String nameSpace = workPath + "/" + ZK_DTSM_NAMESPACE;
if (CURATOR_TL.get() != null) { if (CURATOR_TL.get() != null) {
@ -506,28 +504,24 @@ protected int incrementDelegationTokenSeqNum() {
// The secret manager will keep a local range of seq num which won't be // The secret manager will keep a local range of seq num which won't be
// seen by peers, so only when the range is exhausted it will ask zk for // seen by peers, so only when the range is exhausted it will ask zk for
// another range again // another range again
try { if (currentSeqNum >= currentMaxSeqNum) {
this.currentSeqNumLock.lock(); try {
if (currentSeqNum >= currentMaxSeqNum) { // after a successful batch request, we can get the range starting point
try { currentSeqNum = incrSharedCount(delTokSeqCounter, seqNumBatchSize);
// after a successful batch request, we can get the range starting point currentMaxSeqNum = currentSeqNum + seqNumBatchSize;
currentSeqNum = incrSharedCount(delTokSeqCounter, seqNumBatchSize); LOG.info("Fetched new range of seq num, from {} to {} ",
currentMaxSeqNum = currentSeqNum + seqNumBatchSize; currentSeqNum+1, currentMaxSeqNum);
LOG.info("Fetched new range of seq num, from {} to {} ", } catch (InterruptedException e) {
currentSeqNum+1, currentMaxSeqNum); // The ExpirationThread is just finishing.. so dont do anything..
} catch (InterruptedException e) { LOG.debug(
// The ExpirationThread is just finishing.. so dont do anything.. "Thread interrupted while performing token counter increment", e);
LOG.debug( Thread.currentThread().interrupt();
"Thread interrupted while performing token counter increment", e); } catch (Exception e) {
Thread.currentThread().interrupt(); throw new RuntimeException("Could not increment shared counter !!", e);
} catch (Exception e) {
throw new RuntimeException("Could not increment shared counter !!", e);
}
} }
return ++currentSeqNum;
} finally {
this.currentSeqNumLock.unlock();
} }
return ++currentSeqNum;
} }
@Override @Override