diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java index c3ad9f35cd..6c66e98936 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java @@ -361,6 +361,7 @@ public void childEvent(CuratorFramework client, } } }, listenerThreadPool); + loadFromZKCache(false); } } catch (Exception e) { throw new IOException("Could not start PathChildrenCache for keys", e); @@ -389,6 +390,7 @@ public void childEvent(CuratorFramework client, } } }, listenerThreadPool); + loadFromZKCache(true); } } catch (Exception e) { throw new IOException("Could not start PathChildrenCache for tokens", e); @@ -396,6 +398,43 @@ public void childEvent(CuratorFramework client, super.startThreads(); } + /** + * Load the PathChildrenCache into the in-memory map. Possible caches to be + * loaded are keyCache and tokenCache. + * + * @param isTokenCache true if loading tokenCache, false if loading keyCache. + */ + private void loadFromZKCache(final boolean isTokenCache) { + final String cacheName = isTokenCache ? "token" : "key"; + LOG.info("Starting to load {} cache.", cacheName); + final List children; + if (isTokenCache) { + children = tokenCache.getCurrentData(); + } else { + children = keyCache.getCurrentData(); + } + + int count = 0; + for (ChildData child : children) { + try { + if (isTokenCache) { + processTokenAddOrUpdate(child); + } else { + processKeyAddOrUpdate(child.getData()); + } + } catch (Exception e) { + LOG.info("Ignoring node {} because it failed to load.", + child.getPath()); + LOG.debug("Failure exception:", e); + ++count; + } + } + if (count > 0) { + LOG.warn("Ignored {} nodes while loading {} cache.", count, cacheName); + } + LOG.info("Loaded {} cache.", cacheName); + } + private void processKeyAddOrUpdate(byte[] data) throws IOException { ByteArrayInputStream bin = new ByteArrayInputStream(data); DataInputStream din = new DataInputStream(bin); @@ -890,4 +929,9 @@ static String getNodePath(String root, String nodeName) { public ExecutorService getListenerThreadPool() { return listenerThreadPool; } + + @VisibleForTesting + DelegationTokenInformation getTokenInfoFromMemory(TokenIdent ident) { + return currentTokens.get(ident); + } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java index 185a994072..c9571ff21e 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java @@ -24,6 +24,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; +import com.google.common.base.Supplier; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; @@ -37,6 +38,7 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier; import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Id; @@ -44,12 +46,18 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static org.junit.Assert.fail; -import org.junit.Test; public class TestZKDelegationTokenSecretManager { + private static final Logger LOG = + LoggerFactory.getLogger(TestZKDelegationTokenSecretManager.class); private static final int TEST_RETRIES = 2; @@ -61,6 +69,9 @@ public class TestZKDelegationTokenSecretManager { private TestingServer zkServer; + @Rule + public Timeout globalTimeout = new Timeout(300000); + @Before public void setup() throws Exception { zkServer = new TestingServer(); @@ -382,4 +393,84 @@ private void verifyTokenFailWithRetry(DelegationTokenManager tm, } } + @SuppressWarnings({ "unchecked" }) + @Test + public void testNodesLoadedAfterRestart() throws Exception { + final String connectString = zkServer.getConnectString(); + final Configuration conf = getSecretConf(connectString); + final int removeScan = 1; + // Set the remove scan interval to remove expired tokens + conf.setLong(DelegationTokenManager.REMOVAL_SCAN_INTERVAL, removeScan); + // Set the update interval to trigger background thread to run. The thread + // is hard-coded to sleep at least 5 seconds. + conf.setLong(DelegationTokenManager.UPDATE_INTERVAL, 5); + // Set token expire time to 5 seconds. + conf.setLong(DelegationTokenManager.RENEW_INTERVAL, 5); + + DelegationTokenManager tm = + new DelegationTokenManager(conf, new Text("bla")); + tm.init(); + Token token = + (Token) tm + .createToken(UserGroupInformation.getCurrentUser(), "good"); + Assert.assertNotNull(token); + Token cancelled = + (Token) tm + .createToken(UserGroupInformation.getCurrentUser(), "cancelled"); + Assert.assertNotNull(cancelled); + tm.verifyToken(token); + tm.verifyToken(cancelled); + + // Cancel one token, verify it's gone + tm.cancelToken(cancelled, "cancelled"); + final AbstractDelegationTokenSecretManager sm = + tm.getDelegationTokenSecretManager(); + final ZKDelegationTokenSecretManager zksm = + (ZKDelegationTokenSecretManager) sm; + final AbstractDelegationTokenIdentifier idCancelled = + sm.decodeTokenIdentifier(cancelled); + LOG.info("Waiting for the cancelled token to be removed"); + + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + AbstractDelegationTokenSecretManager.DelegationTokenInformation dtinfo = + zksm.getTokenInfo(idCancelled); + return dtinfo == null; + } + }, 100, 5000); + + // Fake a restart which launches a new tm + tm.destroy(); + tm = new DelegationTokenManager(conf, new Text("bla")); + tm.init(); + final AbstractDelegationTokenSecretManager smNew = + tm.getDelegationTokenSecretManager(); + final ZKDelegationTokenSecretManager zksmNew = + (ZKDelegationTokenSecretManager) smNew; + + // The cancelled token should be gone, and not loaded. + AbstractDelegationTokenIdentifier id = + smNew.decodeTokenIdentifier(cancelled); + AbstractDelegationTokenSecretManager.DelegationTokenInformation dtinfo = + zksmNew.getTokenInfo(id); + Assert.assertNull("canceled dt should be gone!", dtinfo); + + // The good token should be loaded on startup, and removed after expiry. + id = smNew.decodeTokenIdentifier(token); + dtinfo = zksmNew.getTokenInfoFromMemory(id); + Assert.assertNotNull("good dt should be in memory!", dtinfo); + + // Wait for the good token to expire. + Thread.sleep(5000); + final ZKDelegationTokenSecretManager zksm1 = zksmNew; + final AbstractDelegationTokenIdentifier id1 = id; + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + LOG.info("Waiting for the expired token to be removed..."); + return zksm1.getTokenInfo(id1) == null; + } + }, 1000, 5000); + } }