HADOOP-13487. Hadoop KMS should load old delegation tokens from Zookeeper on startup. Contributed by Xiao Chen.
This commit is contained in:
parent
22fc46d765
commit
f4d4d3474c
@ -361,6 +361,7 @@ public void childEvent(CuratorFramework client,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}, listenerThreadPool);
|
}, listenerThreadPool);
|
||||||
|
loadFromZKCache(false);
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new IOException("Could not start PathChildrenCache for keys", e);
|
throw new IOException("Could not start PathChildrenCache for keys", e);
|
||||||
@ -389,6 +390,7 @@ public void childEvent(CuratorFramework client,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}, listenerThreadPool);
|
}, listenerThreadPool);
|
||||||
|
loadFromZKCache(true);
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new IOException("Could not start PathChildrenCache for tokens", e);
|
throw new IOException("Could not start PathChildrenCache for tokens", e);
|
||||||
@ -396,6 +398,43 @@ public void childEvent(CuratorFramework client,
|
|||||||
super.startThreads();
|
super.startThreads();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Load the PathChildrenCache into the in-memory map. Possible caches to be
|
||||||
|
* loaded are keyCache and tokenCache.
|
||||||
|
*
|
||||||
|
* @param isTokenCache true if loading tokenCache, false if loading keyCache.
|
||||||
|
*/
|
||||||
|
private void loadFromZKCache(final boolean isTokenCache) {
|
||||||
|
final String cacheName = isTokenCache ? "token" : "key";
|
||||||
|
LOG.info("Starting to load {} cache.", cacheName);
|
||||||
|
final List<ChildData> children;
|
||||||
|
if (isTokenCache) {
|
||||||
|
children = tokenCache.getCurrentData();
|
||||||
|
} else {
|
||||||
|
children = keyCache.getCurrentData();
|
||||||
|
}
|
||||||
|
|
||||||
|
int count = 0;
|
||||||
|
for (ChildData child : children) {
|
||||||
|
try {
|
||||||
|
if (isTokenCache) {
|
||||||
|
processTokenAddOrUpdate(child);
|
||||||
|
} else {
|
||||||
|
processKeyAddOrUpdate(child.getData());
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.info("Ignoring node {} because it failed to load.",
|
||||||
|
child.getPath());
|
||||||
|
LOG.debug("Failure exception:", e);
|
||||||
|
++count;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (count > 0) {
|
||||||
|
LOG.warn("Ignored {} nodes while loading {} cache.", count, cacheName);
|
||||||
|
}
|
||||||
|
LOG.info("Loaded {} cache.", cacheName);
|
||||||
|
}
|
||||||
|
|
||||||
private void processKeyAddOrUpdate(byte[] data) throws IOException {
|
private void processKeyAddOrUpdate(byte[] data) throws IOException {
|
||||||
ByteArrayInputStream bin = new ByteArrayInputStream(data);
|
ByteArrayInputStream bin = new ByteArrayInputStream(data);
|
||||||
DataInputStream din = new DataInputStream(bin);
|
DataInputStream din = new DataInputStream(bin);
|
||||||
@ -890,4 +929,9 @@ static String getNodePath(String root, String nodeName) {
|
|||||||
public ExecutorService getListenerThreadPool() {
|
public ExecutorService getListenerThreadPool() {
|
||||||
return listenerThreadPool;
|
return listenerThreadPool;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
DelegationTokenInformation getTokenInfoFromMemory(TokenIdent ident) {
|
||||||
|
return currentTokens.get(ident);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -24,6 +24,7 @@
|
|||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
|
|
||||||
|
import com.google.common.base.Supplier;
|
||||||
import org.apache.curator.RetryPolicy;
|
import org.apache.curator.RetryPolicy;
|
||||||
import org.apache.curator.framework.CuratorFramework;
|
import org.apache.curator.framework.CuratorFramework;
|
||||||
import org.apache.curator.framework.CuratorFrameworkFactory;
|
import org.apache.curator.framework.CuratorFrameworkFactory;
|
||||||
@ -37,6 +38,7 @@
|
|||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
|
import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
|
||||||
import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager;
|
import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.zookeeper.ZooDefs;
|
import org.apache.zookeeper.ZooDefs;
|
||||||
import org.apache.zookeeper.data.ACL;
|
import org.apache.zookeeper.data.ACL;
|
||||||
import org.apache.zookeeper.data.Id;
|
import org.apache.zookeeper.data.Id;
|
||||||
@ -44,12 +46,18 @@
|
|||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.rules.Timeout;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
import org.junit.Test;
|
|
||||||
|
|
||||||
public class TestZKDelegationTokenSecretManager {
|
public class TestZKDelegationTokenSecretManager {
|
||||||
|
private static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(TestZKDelegationTokenSecretManager.class);
|
||||||
|
|
||||||
private static final int TEST_RETRIES = 2;
|
private static final int TEST_RETRIES = 2;
|
||||||
|
|
||||||
@ -61,6 +69,9 @@ public class TestZKDelegationTokenSecretManager {
|
|||||||
|
|
||||||
private TestingServer zkServer;
|
private TestingServer zkServer;
|
||||||
|
|
||||||
|
@Rule
|
||||||
|
public Timeout globalTimeout = new Timeout(300000);
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setup() throws Exception {
|
public void setup() throws Exception {
|
||||||
zkServer = new TestingServer();
|
zkServer = new TestingServer();
|
||||||
@ -382,4 +393,84 @@ private void verifyTokenFailWithRetry(DelegationTokenManager tm,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings({ "unchecked" })
|
||||||
|
@Test
|
||||||
|
public void testNodesLoadedAfterRestart() throws Exception {
|
||||||
|
final String connectString = zkServer.getConnectString();
|
||||||
|
final Configuration conf = getSecretConf(connectString);
|
||||||
|
final int removeScan = 1;
|
||||||
|
// Set the remove scan interval to remove expired tokens
|
||||||
|
conf.setLong(DelegationTokenManager.REMOVAL_SCAN_INTERVAL, removeScan);
|
||||||
|
// Set the update interval to trigger background thread to run. The thread
|
||||||
|
// is hard-coded to sleep at least 5 seconds.
|
||||||
|
conf.setLong(DelegationTokenManager.UPDATE_INTERVAL, 5);
|
||||||
|
// Set token expire time to 5 seconds.
|
||||||
|
conf.setLong(DelegationTokenManager.RENEW_INTERVAL, 5);
|
||||||
|
|
||||||
|
DelegationTokenManager tm =
|
||||||
|
new DelegationTokenManager(conf, new Text("bla"));
|
||||||
|
tm.init();
|
||||||
|
Token<DelegationTokenIdentifier> token =
|
||||||
|
(Token<DelegationTokenIdentifier>) tm
|
||||||
|
.createToken(UserGroupInformation.getCurrentUser(), "good");
|
||||||
|
Assert.assertNotNull(token);
|
||||||
|
Token<DelegationTokenIdentifier> cancelled =
|
||||||
|
(Token<DelegationTokenIdentifier>) tm
|
||||||
|
.createToken(UserGroupInformation.getCurrentUser(), "cancelled");
|
||||||
|
Assert.assertNotNull(cancelled);
|
||||||
|
tm.verifyToken(token);
|
||||||
|
tm.verifyToken(cancelled);
|
||||||
|
|
||||||
|
// Cancel one token, verify it's gone
|
||||||
|
tm.cancelToken(cancelled, "cancelled");
|
||||||
|
final AbstractDelegationTokenSecretManager sm =
|
||||||
|
tm.getDelegationTokenSecretManager();
|
||||||
|
final ZKDelegationTokenSecretManager zksm =
|
||||||
|
(ZKDelegationTokenSecretManager) sm;
|
||||||
|
final AbstractDelegationTokenIdentifier idCancelled =
|
||||||
|
sm.decodeTokenIdentifier(cancelled);
|
||||||
|
LOG.info("Waiting for the cancelled token to be removed");
|
||||||
|
|
||||||
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||||
|
@Override
|
||||||
|
public Boolean get() {
|
||||||
|
AbstractDelegationTokenSecretManager.DelegationTokenInformation dtinfo =
|
||||||
|
zksm.getTokenInfo(idCancelled);
|
||||||
|
return dtinfo == null;
|
||||||
|
}
|
||||||
|
}, 100, 5000);
|
||||||
|
|
||||||
|
// Fake a restart which launches a new tm
|
||||||
|
tm.destroy();
|
||||||
|
tm = new DelegationTokenManager(conf, new Text("bla"));
|
||||||
|
tm.init();
|
||||||
|
final AbstractDelegationTokenSecretManager smNew =
|
||||||
|
tm.getDelegationTokenSecretManager();
|
||||||
|
final ZKDelegationTokenSecretManager zksmNew =
|
||||||
|
(ZKDelegationTokenSecretManager) smNew;
|
||||||
|
|
||||||
|
// The cancelled token should be gone, and not loaded.
|
||||||
|
AbstractDelegationTokenIdentifier id =
|
||||||
|
smNew.decodeTokenIdentifier(cancelled);
|
||||||
|
AbstractDelegationTokenSecretManager.DelegationTokenInformation dtinfo =
|
||||||
|
zksmNew.getTokenInfo(id);
|
||||||
|
Assert.assertNull("canceled dt should be gone!", dtinfo);
|
||||||
|
|
||||||
|
// The good token should be loaded on startup, and removed after expiry.
|
||||||
|
id = smNew.decodeTokenIdentifier(token);
|
||||||
|
dtinfo = zksmNew.getTokenInfoFromMemory(id);
|
||||||
|
Assert.assertNotNull("good dt should be in memory!", dtinfo);
|
||||||
|
|
||||||
|
// Wait for the good token to expire.
|
||||||
|
Thread.sleep(5000);
|
||||||
|
final ZKDelegationTokenSecretManager zksm1 = zksmNew;
|
||||||
|
final AbstractDelegationTokenIdentifier id1 = id;
|
||||||
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||||
|
@Override
|
||||||
|
public Boolean get() {
|
||||||
|
LOG.info("Waiting for the expired token to be removed...");
|
||||||
|
return zksm1.getTokenInfo(id1) == null;
|
||||||
|
}
|
||||||
|
}, 1000, 5000);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user