YARN-8865. RMStateStore contains large number of expired RMDelegationToken. Contributed by Wilfred Spiegelenburg
This commit is contained in:
parent
49412a1289
commit
ab6aa4c726
@ -300,7 +300,8 @@ protected void updateToken(TokenIdent ident,
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* This method is intended to be used for recovering persisted delegation
|
* This method is intended to be used for recovering persisted delegation
|
||||||
* tokens
|
* tokens. Tokens that have an unknown <code>DelegationKey</code> are
|
||||||
|
* marked as expired and automatically cleaned up.
|
||||||
* This method must be called before this secret manager is activated (before
|
* This method must be called before this secret manager is activated (before
|
||||||
* startThreads() is called)
|
* startThreads() is called)
|
||||||
* @param identifier identifier read from persistent storage
|
* @param identifier identifier read from persistent storage
|
||||||
@ -316,12 +317,15 @@ public synchronized void addPersistedDelegationToken(
|
|||||||
}
|
}
|
||||||
int keyId = identifier.getMasterKeyId();
|
int keyId = identifier.getMasterKeyId();
|
||||||
DelegationKey dKey = allKeys.get(keyId);
|
DelegationKey dKey = allKeys.get(keyId);
|
||||||
|
byte[] password = null;
|
||||||
if (dKey == null) {
|
if (dKey == null) {
|
||||||
LOG.warn("No KEY found for persisted identifier "
|
LOG.warn("No KEY found for persisted identifier, expiring stored token "
|
||||||
+ formatTokenId(identifier));
|
+ formatTokenId(identifier));
|
||||||
return;
|
// make sure the token is expired
|
||||||
|
renewDate = 0L;
|
||||||
|
} else {
|
||||||
|
password = createPassword(identifier.getBytes(), dKey.getKey());
|
||||||
}
|
}
|
||||||
byte[] password = createPassword(identifier.getBytes(), dKey.getKey());
|
|
||||||
if (identifier.getSequenceNumber() > getDelegationTokenSeqNum()) {
|
if (identifier.getSequenceNumber() > getDelegationTokenSeqNum()) {
|
||||||
setDelegationTokenSeqNum(identifier.getSequenceNumber());
|
setDelegationTokenSeqNum(identifier.getSequenceNumber());
|
||||||
}
|
}
|
||||||
|
@ -21,12 +21,12 @@
|
|||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
import com.google.common.base.Supplier;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.MRDelegationTokenIdentifier;
|
import org.apache.hadoop.mapreduce.v2.api.MRDelegationTokenIdentifier;
|
||||||
@ -34,17 +34,21 @@
|
|||||||
import org.apache.hadoop.security.authentication.util.KerberosName;
|
import org.apache.hadoop.security.authentication.util.KerberosName;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.security.token.delegation.DelegationKey;
|
import org.apache.hadoop.security.token.delegation.DelegationKey;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
public class TestJHSDelegationTokenSecretManager {
|
public class TestJHSDelegationTokenSecretManager {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRecovery() throws IOException {
|
public void testRecovery() throws Exception {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
HistoryServerStateStoreService store =
|
HistoryServerStateStoreService store =
|
||||||
new HistoryServerMemStateStoreService();
|
new HistoryServerMemStateStoreService();
|
||||||
store.init(conf);
|
store.init(conf);
|
||||||
store.start();
|
store.start();
|
||||||
|
Map<MRDelegationTokenIdentifier, Long> tokenState =
|
||||||
|
((HistoryServerMemStateStoreService) store).state.getTokenState();
|
||||||
JHSDelegationTokenSecretManagerForTest mgr =
|
JHSDelegationTokenSecretManagerForTest mgr =
|
||||||
new JHSDelegationTokenSecretManagerForTest(store);
|
new JHSDelegationTokenSecretManagerForTest(store);
|
||||||
mgr.startThreads();
|
mgr.startThreads();
|
||||||
@ -63,9 +67,15 @@ public void testRecovery() throws IOException {
|
|||||||
DelegationKey[] keys = mgr.getAllKeys();
|
DelegationKey[] keys = mgr.getAllKeys();
|
||||||
long tokenRenewDate1 = mgr.getAllTokens().get(tokenId1).getRenewDate();
|
long tokenRenewDate1 = mgr.getAllTokens().get(tokenId1).getRenewDate();
|
||||||
long tokenRenewDate2 = mgr.getAllTokens().get(tokenId2).getRenewDate();
|
long tokenRenewDate2 = mgr.getAllTokens().get(tokenId2).getRenewDate();
|
||||||
mgr.stopThreads();
|
// Make sure we stored the tokens
|
||||||
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||||
|
public Boolean get() {
|
||||||
|
return tokenState.size() == 2;
|
||||||
|
}
|
||||||
|
}, 10, 2000);
|
||||||
|
|
||||||
|
stopAndCleanSecretManager(mgr);
|
||||||
|
|
||||||
mgr = new JHSDelegationTokenSecretManagerForTest(store);
|
|
||||||
mgr.recover(store.loadState());
|
mgr.recover(store.loadState());
|
||||||
List<DelegationKey> recoveredKeys = Arrays.asList(mgr.getAllKeys());
|
List<DelegationKey> recoveredKeys = Arrays.asList(mgr.getAllKeys());
|
||||||
for (DelegationKey key : keys) {
|
for (DelegationKey key : keys) {
|
||||||
@ -106,25 +116,75 @@ public void testRecovery() throws IOException {
|
|||||||
}
|
}
|
||||||
// Succeed to cancel with full principal
|
// Succeed to cancel with full principal
|
||||||
mgr.cancelToken(tokenFull, tokenIdFull.getOwner().toString());
|
mgr.cancelToken(tokenFull, tokenIdFull.getOwner().toString());
|
||||||
|
// Make sure we removed the stored token
|
||||||
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||||
|
public Boolean get() {
|
||||||
|
return tokenState.size() == 2;
|
||||||
|
}
|
||||||
|
}, 10, 2000);
|
||||||
|
|
||||||
long tokenRenewDate3 = mgr.getAllTokens().get(tokenId3).getRenewDate();
|
long tokenRenewDate3 = mgr.getAllTokens().get(tokenId3).getRenewDate();
|
||||||
mgr.stopThreads();
|
stopAndCleanSecretManager(mgr);
|
||||||
|
|
||||||
mgr = new JHSDelegationTokenSecretManagerForTest(store);
|
|
||||||
mgr.recover(store.loadState());
|
mgr.recover(store.loadState());
|
||||||
assertFalse("token1 should be missing",
|
assertFalse("token1 should be missing",
|
||||||
mgr.getAllTokens().containsKey(tokenId1));
|
mgr.getAllTokens().containsKey(tokenId1));
|
||||||
assertTrue("token2 missing", mgr.getAllTokens().containsKey(tokenId2));
|
assertTrue("token2 missing", mgr.getAllTokens().containsKey(tokenId2));
|
||||||
assertEquals("token2 renew date", tokenRenewDate2,
|
assertEquals("token2 renew date incorrect", tokenRenewDate2,
|
||||||
mgr.getAllTokens().get(tokenId2).getRenewDate());
|
mgr.getAllTokens().get(tokenId2).getRenewDate());
|
||||||
assertTrue("token3 missing", mgr.getAllTokens().containsKey(tokenId3));
|
assertTrue("token3 missing from manager",
|
||||||
|
mgr.getAllTokens().containsKey(tokenId3));
|
||||||
assertEquals("token3 renew date", tokenRenewDate3,
|
assertEquals("token3 renew date", tokenRenewDate3,
|
||||||
mgr.getAllTokens().get(tokenId3).getRenewDate());
|
mgr.getAllTokens().get(tokenId3).getRenewDate());
|
||||||
|
|
||||||
mgr.startThreads();
|
mgr.startThreads();
|
||||||
mgr.verifyToken(tokenId2, token2.getPassword());
|
mgr.verifyToken(tokenId2, token2.getPassword());
|
||||||
mgr.verifyToken(tokenId3, token3.getPassword());
|
mgr.verifyToken(tokenId3, token3.getPassword());
|
||||||
|
// Set an unknown key ID: token should not be restored
|
||||||
|
tokenId3.setMasterKeyId(1000);
|
||||||
|
// Update renewal date to check the store write
|
||||||
|
mgr.updateStoredToken(tokenId3, tokenRenewDate3 + 5000);
|
||||||
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||||
|
public Boolean get() {
|
||||||
|
return tokenState.get(tokenId3).equals(tokenRenewDate3 + 5000);
|
||||||
|
}
|
||||||
|
}, 10, 2000);
|
||||||
|
stopAndCleanSecretManager(mgr);
|
||||||
|
|
||||||
|
// Store should contain token but manager should not
|
||||||
|
Assert.assertTrue("Store does not contain token3",
|
||||||
|
tokenState.containsKey(tokenId3));
|
||||||
|
Assert.assertFalse("Store does not contain token3",
|
||||||
|
mgr.getAllTokens().containsKey(tokenId3));
|
||||||
|
// Recover to load the token into the manager; renew date is set to 0
|
||||||
|
mgr.recover(store.loadState());
|
||||||
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||||
|
public Boolean get() {
|
||||||
|
return mgr.getAllTokens().get(tokenId3).getRenewDate() == 0L;
|
||||||
|
}
|
||||||
|
}, 10, 2000);
|
||||||
|
// Start the removal threads: cleanup manager and store
|
||||||
|
mgr.startThreads();
|
||||||
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||||
|
public Boolean get() {
|
||||||
|
return !mgr.getAllTokens().containsKey(tokenId3);
|
||||||
|
}
|
||||||
|
}, 10, 2000);
|
||||||
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||||
|
public Boolean get() {
|
||||||
|
return !tokenState.containsKey(tokenId3);
|
||||||
|
}
|
||||||
|
}, 10, 2000);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void stopAndCleanSecretManager(
|
||||||
|
JHSDelegationTokenSecretManagerForTest mgr) {
|
||||||
mgr.stopThreads();
|
mgr.stopThreads();
|
||||||
|
mgr.reset();
|
||||||
|
Assert.assertEquals("Secret manager should not contain keys",
|
||||||
|
mgr.getAllKeys().length, 0);
|
||||||
|
Assert.assertEquals("Secret manager should not contain tokens",
|
||||||
|
mgr.getAllTokens().size(), 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class JHSDelegationTokenSecretManagerForTest
|
private static class JHSDelegationTokenSecretManagerForTest
|
||||||
|
@ -26,13 +26,16 @@
|
|||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
import com.google.common.base.Supplier;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
|
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.security.token.delegation.DelegationKey;
|
import org.apache.hadoop.security.token.delegation.DelegationKey;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.util.ExitUtil;
|
import org.apache.hadoop.util.ExitUtil;
|
||||||
|
import org.apache.hadoop.util.Time;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
@ -174,6 +177,66 @@ public void testRemoveExpiredMasterKeyInRMStateStore() throws Exception {
|
|||||||
rm1.stop();
|
rm1.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Test removing token without key from state-store.
|
||||||
|
@Test(timeout = 15000)
|
||||||
|
public void testUnknownKeyTokensOnRecover() throws Exception {
|
||||||
|
final int masterID = 1234;
|
||||||
|
final int sequenceNumber = 1000;
|
||||||
|
|
||||||
|
MemoryRMStateStore memStore = new MockMemoryRMStateStore();
|
||||||
|
memStore.init(testConf);
|
||||||
|
// Need RM to get the secret manager and call recover
|
||||||
|
MockRM rm1 = new MyMockRM(testConf, memStore);
|
||||||
|
rm1.start();
|
||||||
|
RMDelegationTokenSecretManager dtSecretManager =
|
||||||
|
rm1.getRMContext().getRMDelegationTokenSecretManager();
|
||||||
|
RMState rmState = memStore.getState();
|
||||||
|
// short cut to generate a basic token with unknown key
|
||||||
|
RMDelegationTokenIdentifier rmDT = new RMDelegationTokenIdentifier(
|
||||||
|
new Text("owner"), new Text("renewer"), new Text("realuser"));
|
||||||
|
// set a master key which is not used
|
||||||
|
rmDT.setMasterKeyId(masterID);
|
||||||
|
rmDT.setSequenceNumber(sequenceNumber);
|
||||||
|
final long tokenTime = Time.now() + 60000;
|
||||||
|
rmDT.setMaxDate(tokenTime);
|
||||||
|
dtSecretManager.storeNewToken(rmDT, tokenTime);
|
||||||
|
// give it time to process
|
||||||
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||||
|
public Boolean get() {
|
||||||
|
return rmState.getRMDTSecretManagerState().getTokenState().
|
||||||
|
containsKey(rmDT);
|
||||||
|
}
|
||||||
|
}, 10, 2000);
|
||||||
|
// Cannot recover while running: stop and clear
|
||||||
|
dtSecretManager.stopThreads();
|
||||||
|
dtSecretManager.reset();
|
||||||
|
Assert.assertEquals("Secret manager should have no tokens",
|
||||||
|
dtSecretManager.getAllTokens().size(), 0);
|
||||||
|
Assert.assertEquals("Secret manager should have no keys",
|
||||||
|
dtSecretManager.getAllMasterKeys().size(), 0);
|
||||||
|
dtSecretManager.recover(rmState);
|
||||||
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||||
|
public Boolean get() {
|
||||||
|
return
|
||||||
|
rmState.getRMDTSecretManagerState().getTokenState().
|
||||||
|
containsKey(rmDT);
|
||||||
|
}
|
||||||
|
}, 10, 2000);
|
||||||
|
Assert.assertEquals("Token should have been expired but is not", 0L,
|
||||||
|
dtSecretManager.getRenewDate(rmDT));
|
||||||
|
// The remover thread should immediately do its work,
|
||||||
|
// still give it some time to process
|
||||||
|
dtSecretManager.startThreads();
|
||||||
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||||
|
public Boolean get() {
|
||||||
|
return
|
||||||
|
!rmState.getRMDTSecretManagerState().getTokenState().
|
||||||
|
containsKey(rmDT);
|
||||||
|
}
|
||||||
|
}, 10, 2000);
|
||||||
|
rm1.stop();
|
||||||
|
}
|
||||||
|
|
||||||
class MyMockRM extends TestSecurityMockRM {
|
class MyMockRM extends TestSecurityMockRM {
|
||||||
|
|
||||||
public MyMockRM(Configuration conf, RMStateStore store) {
|
public MyMockRM(Configuration conf, RMStateStore store) {
|
||||||
|
Loading…
Reference in New Issue
Block a user