HADOOP-18851: Performance improvement for DelegationTokenSecretManager. (#6001). Contributed by Vikas Kumar.
Signed-off-by: Wei-Chiu Chuang <weichiu@apache.org> Signed-off-by: He Xiaoqiao <hexiaoqiao@apache.org>
This commit is contained in:
parent
23360b3f6b
commit
e283375cdf
@ -115,12 +115,12 @@ private String formatTokenId(TokenIdent id) {
|
|||||||
/**
|
/**
|
||||||
* Access to currentKey is protected by this object lock
|
* Access to currentKey is protected by this object lock
|
||||||
*/
|
*/
|
||||||
private DelegationKey currentKey;
|
private volatile DelegationKey currentKey;
|
||||||
|
|
||||||
private long keyUpdateInterval;
|
private final long keyUpdateInterval;
|
||||||
private long tokenMaxLifetime;
|
private final long tokenMaxLifetime;
|
||||||
private long tokenRemoverScanInterval;
|
private final long tokenRemoverScanInterval;
|
||||||
private long tokenRenewInterval;
|
private final long tokenRenewInterval;
|
||||||
/**
|
/**
|
||||||
* Whether to store a token's tracking ID in its TokenInformation.
|
* Whether to store a token's tracking ID in its TokenInformation.
|
||||||
* Can be overridden by a subclass.
|
* Can be overridden by a subclass.
|
||||||
@ -486,17 +486,18 @@ private synchronized void removeExpiredKeys() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected synchronized byte[] createPassword(TokenIdent identifier) {
|
protected byte[] createPassword(TokenIdent identifier) {
|
||||||
int sequenceNum;
|
int sequenceNum;
|
||||||
long now = Time.now();
|
long now = Time.now();
|
||||||
sequenceNum = incrementDelegationTokenSeqNum();
|
sequenceNum = incrementDelegationTokenSeqNum();
|
||||||
identifier.setIssueDate(now);
|
identifier.setIssueDate(now);
|
||||||
identifier.setMaxDate(now + tokenMaxLifetime);
|
identifier.setMaxDate(now + tokenMaxLifetime);
|
||||||
identifier.setMasterKeyId(currentKey.getKeyId());
|
DelegationKey delegationCurrentKey = currentKey;
|
||||||
|
identifier.setMasterKeyId(delegationCurrentKey.getKeyId());
|
||||||
identifier.setSequenceNumber(sequenceNum);
|
identifier.setSequenceNumber(sequenceNum);
|
||||||
LOG.info("Creating password for identifier: " + formatTokenId(identifier)
|
LOG.info("Creating password for identifier: " + formatTokenId(identifier)
|
||||||
+ ", currentKey: " + currentKey.getKeyId());
|
+ ", currentKey: " + delegationCurrentKey.getKeyId());
|
||||||
byte[] password = createPassword(identifier.getBytes(), currentKey.getKey());
|
byte[] password = createPassword(identifier.getBytes(), delegationCurrentKey.getKey());
|
||||||
DelegationTokenInformation tokenInfo = new DelegationTokenInformation(now
|
DelegationTokenInformation tokenInfo = new DelegationTokenInformation(now
|
||||||
+ tokenRenewInterval, password, getTrackingIdIfEnabled(identifier));
|
+ tokenRenewInterval, password, getTrackingIdIfEnabled(identifier));
|
||||||
try {
|
try {
|
||||||
@ -521,7 +522,6 @@ protected synchronized byte[] createPassword(TokenIdent identifier) {
|
|||||||
*/
|
*/
|
||||||
protected DelegationTokenInformation checkToken(TokenIdent identifier)
|
protected DelegationTokenInformation checkToken(TokenIdent identifier)
|
||||||
throws InvalidToken {
|
throws InvalidToken {
|
||||||
assert Thread.holdsLock(this);
|
|
||||||
DelegationTokenInformation info = getTokenInfo(identifier);
|
DelegationTokenInformation info = getTokenInfo(identifier);
|
||||||
String err;
|
String err;
|
||||||
if (info == null) {
|
if (info == null) {
|
||||||
@ -541,7 +541,7 @@ protected DelegationTokenInformation checkToken(TokenIdent identifier)
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized byte[] retrievePassword(TokenIdent identifier)
|
public byte[] retrievePassword(TokenIdent identifier)
|
||||||
throws InvalidToken {
|
throws InvalidToken {
|
||||||
return checkToken(identifier).getPassword();
|
return checkToken(identifier).getPassword();
|
||||||
}
|
}
|
||||||
@ -553,7 +553,7 @@ protected String getTrackingIdIfEnabled(TokenIdent ident) {
|
|||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized String getTokenTrackingId(TokenIdent identifier) {
|
public String getTokenTrackingId(TokenIdent identifier) {
|
||||||
DelegationTokenInformation info = getTokenInfo(identifier);
|
DelegationTokenInformation info = getTokenInfo(identifier);
|
||||||
if (info == null) {
|
if (info == null) {
|
||||||
return null;
|
return null;
|
||||||
@ -567,7 +567,7 @@ public synchronized String getTokenTrackingId(TokenIdent identifier) {
|
|||||||
* @param password Password in the token.
|
* @param password Password in the token.
|
||||||
* @throws InvalidToken InvalidToken.
|
* @throws InvalidToken InvalidToken.
|
||||||
*/
|
*/
|
||||||
public synchronized void verifyToken(TokenIdent identifier, byte[] password)
|
public void verifyToken(TokenIdent identifier, byte[] password)
|
||||||
throws InvalidToken {
|
throws InvalidToken {
|
||||||
byte[] storedPassword = retrievePassword(identifier);
|
byte[] storedPassword = retrievePassword(identifier);
|
||||||
if (!MessageDigest.isEqual(password, storedPassword)) {
|
if (!MessageDigest.isEqual(password, storedPassword)) {
|
||||||
@ -584,7 +584,7 @@ public synchronized void verifyToken(TokenIdent identifier, byte[] password)
|
|||||||
* @throws InvalidToken if the token is invalid
|
* @throws InvalidToken if the token is invalid
|
||||||
* @throws AccessControlException if the user can't renew token
|
* @throws AccessControlException if the user can't renew token
|
||||||
*/
|
*/
|
||||||
public synchronized long renewToken(Token<TokenIdent> token,
|
public long renewToken(Token<TokenIdent> token,
|
||||||
String renewer) throws InvalidToken, IOException {
|
String renewer) throws InvalidToken, IOException {
|
||||||
ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
|
ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
|
||||||
DataInputStream in = new DataInputStream(buf);
|
DataInputStream in = new DataInputStream(buf);
|
||||||
@ -646,7 +646,7 @@ public synchronized long renewToken(Token<TokenIdent> token,
|
|||||||
* @throws InvalidToken for invalid token
|
* @throws InvalidToken for invalid token
|
||||||
* @throws AccessControlException if the user isn't allowed to cancel
|
* @throws AccessControlException if the user isn't allowed to cancel
|
||||||
*/
|
*/
|
||||||
public synchronized TokenIdent cancelToken(Token<TokenIdent> token,
|
public TokenIdent cancelToken(Token<TokenIdent> token,
|
||||||
String canceller) throws IOException {
|
String canceller) throws IOException {
|
||||||
ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
|
ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
|
||||||
DataInputStream in = new DataInputStream(buf);
|
DataInputStream in = new DataInputStream(buf);
|
||||||
|
@ -27,6 +27,7 @@
|
|||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
import org.apache.curator.ensemble.fixed.FixedEnsembleProvider;
|
import org.apache.curator.ensemble.fixed.FixedEnsembleProvider;
|
||||||
@ -148,7 +149,7 @@ protected static CuratorFramework getCurator() {
|
|||||||
private final int seqNumBatchSize;
|
private final int seqNumBatchSize;
|
||||||
private int currentSeqNum;
|
private int currentSeqNum;
|
||||||
private int currentMaxSeqNum;
|
private int currentMaxSeqNum;
|
||||||
|
private final ReentrantLock currentSeqNumLock;
|
||||||
private final boolean isTokenWatcherEnabled;
|
private final boolean isTokenWatcherEnabled;
|
||||||
|
|
||||||
public ZKDelegationTokenSecretManager(Configuration conf) {
|
public ZKDelegationTokenSecretManager(Configuration conf) {
|
||||||
@ -164,6 +165,7 @@ public ZKDelegationTokenSecretManager(Configuration conf) {
|
|||||||
ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE_DEFAULT);
|
ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE_DEFAULT);
|
||||||
isTokenWatcherEnabled = conf.getBoolean(ZK_DTSM_TOKEN_WATCHER_ENABLED,
|
isTokenWatcherEnabled = conf.getBoolean(ZK_DTSM_TOKEN_WATCHER_ENABLED,
|
||||||
ZK_DTSM_TOKEN_WATCHER_ENABLED_DEFAULT);
|
ZK_DTSM_TOKEN_WATCHER_ENABLED_DEFAULT);
|
||||||
|
this.currentSeqNumLock = new ReentrantLock(true);
|
||||||
if (CURATOR_TL.get() != null) {
|
if (CURATOR_TL.get() != null) {
|
||||||
zkClient =
|
zkClient =
|
||||||
CURATOR_TL.get().usingNamespace(
|
CURATOR_TL.get().usingNamespace(
|
||||||
@ -520,24 +522,28 @@ protected int incrementDelegationTokenSeqNum() {
|
|||||||
// The secret manager will keep a local range of seq num which won't be
|
// The secret manager will keep a local range of seq num which won't be
|
||||||
// seen by peers, so only when the range is exhausted it will ask zk for
|
// seen by peers, so only when the range is exhausted it will ask zk for
|
||||||
// another range again
|
// another range again
|
||||||
if (currentSeqNum >= currentMaxSeqNum) {
|
try {
|
||||||
try {
|
this.currentSeqNumLock.lock();
|
||||||
// after a successful batch request, we can get the range starting point
|
if (currentSeqNum >= currentMaxSeqNum) {
|
||||||
currentSeqNum = incrSharedCount(delTokSeqCounter, seqNumBatchSize);
|
try {
|
||||||
currentMaxSeqNum = currentSeqNum + seqNumBatchSize;
|
// after a successful batch request, we can get the range starting point
|
||||||
LOG.info("Fetched new range of seq num, from {} to {} ",
|
currentSeqNum = incrSharedCount(delTokSeqCounter, seqNumBatchSize);
|
||||||
currentSeqNum+1, currentMaxSeqNum);
|
currentMaxSeqNum = currentSeqNum + seqNumBatchSize;
|
||||||
} catch (InterruptedException e) {
|
LOG.info("Fetched new range of seq num, from {} to {} ",
|
||||||
// The ExpirationThread is just finishing.. so dont do anything..
|
currentSeqNum+1, currentMaxSeqNum);
|
||||||
LOG.debug(
|
} catch (InterruptedException e) {
|
||||||
"Thread interrupted while performing token counter increment", e);
|
// The ExpirationThread is just finishing.. so dont do anything..
|
||||||
Thread.currentThread().interrupt();
|
LOG.debug(
|
||||||
} catch (Exception e) {
|
"Thread interrupted while performing token counter increment", e);
|
||||||
throw new RuntimeException("Could not increment shared counter !!", e);
|
Thread.currentThread().interrupt();
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new RuntimeException("Could not increment shared counter !!", e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
return ++currentSeqNum;
|
||||||
|
} finally {
|
||||||
|
this.currentSeqNumLock.unlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
return ++currentSeqNum;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
Loading…
Reference in New Issue
Block a user