diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java index f61590c28e..cd3b8c0c0f 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java @@ -98,12 +98,16 @@ public abstract class ZKDelegationTokenSecretManager versionedValue = sharedCount.getVersionedValue(); - if (sharedCount.trySetCount(versionedValue, versionedValue.getValue() + 1)) { - break; + if (sharedCount.trySetCount( + versionedValue, versionedValue.getValue() + batchSize)) { + return versionedValue.getValue(); } } } @Override protected int incrementDelegationTokenSeqNum() { - try { - incrSharedCount(delTokSeqCounter); - } catch (InterruptedException e) { - // The ExpirationThread is just finishing.. so dont do anything.. - LOG.debug("Thread interrupted while performing token counter increment", e); - Thread.currentThread().interrupt(); - } catch (Exception e) { - throw new RuntimeException("Could not increment shared counter !!", e); + // The secret manager will keep a local range of seq num which won't be + // seen by peers, so only when the range is exhausted it will ask zk for + // another range again + if (currentSeqNum >= currentMaxSeqNum) { + try { + // after a successful batch request, we can get the range starting point + currentSeqNum = incrSharedCount(delTokSeqCounter, seqNumBatchSize); + currentMaxSeqNum = currentSeqNum + seqNumBatchSize; + LOG.info("Fetched new range of seq num, from {} to {} ", + currentSeqNum+1, currentMaxSeqNum); + } catch (InterruptedException e) { + // The ExpirationThread is just finishing.. so dont do anything.. + LOG.debug( + "Thread interrupted while performing token counter increment", e); + Thread.currentThread().interrupt(); + } catch (Exception e) { + throw new RuntimeException("Could not increment shared counter !!", e); + } } - return delTokSeqCounter.getCount(); + + return ++currentSeqNum; } @Override @@ -603,7 +631,7 @@ protected int getCurrentKeyId() { @Override protected int incrementCurrentKeyId() { try { - incrSharedCount(keyIdSeqCounter); + incrSharedCount(keyIdSeqCounter, 1); } catch (InterruptedException e) { // The ExpirationThread is just finishing.. so dont do anything.. LOG.debug("Thread interrupted while performing keyId increment", e); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java index c9571ff21e..b2e177976b 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java @@ -217,6 +217,58 @@ public void testNodeUpAferAWhile() throws Exception { } } + @SuppressWarnings("unchecked") + @Test + public void testMultiNodeCompeteForSeqNum() throws Exception { + DelegationTokenManager tm1, tm2 = null; + String connectString = zkServer.getConnectString(); + Configuration conf = getSecretConf(connectString); + conf.setInt( + ZKDelegationTokenSecretManager.ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE, 1000); + tm1 = new DelegationTokenManager(conf, new Text("bla")); + tm1.init(); + + Token token1 = + (Token) tm1.createToken( + UserGroupInformation.getCurrentUser(), "foo"); + Assert.assertNotNull(token1); + AbstractDelegationTokenIdentifier id1 = + tm1.getDelegationTokenSecretManager().decodeTokenIdentifier(token1); + Assert.assertEquals( + "Token seq should be the same", 1, id1.getSequenceNumber()); + Token token2 = + (Token) tm1.createToken( + UserGroupInformation.getCurrentUser(), "foo"); + Assert.assertNotNull(token2); + AbstractDelegationTokenIdentifier id2 = + tm1.getDelegationTokenSecretManager().decodeTokenIdentifier(token2); + Assert.assertEquals( + "Token seq should be the same", 2, id2.getSequenceNumber()); + + tm2 = new DelegationTokenManager(conf, new Text("bla")); + tm2.init(); + + Token token3 = + (Token) tm2.createToken( + UserGroupInformation.getCurrentUser(), "foo"); + Assert.assertNotNull(token3); + AbstractDelegationTokenIdentifier id3 = + tm2.getDelegationTokenSecretManager().decodeTokenIdentifier(token3); + Assert.assertEquals( + "Token seq should be the same", 1001, id3.getSequenceNumber()); + Token token4 = + (Token) tm2.createToken( + UserGroupInformation.getCurrentUser(), "foo"); + Assert.assertNotNull(token4); + AbstractDelegationTokenIdentifier id4 = + tm2.getDelegationTokenSecretManager().decodeTokenIdentifier(token4); + Assert.assertEquals( + "Token seq should be the same", 1002, id4.getSequenceNumber()); + + verifyDestroy(tm1, conf); + verifyDestroy(tm2, conf); + } + @SuppressWarnings("unchecked") @Test public void testRenewTokenSingleManager() throws Exception {