diff --git a/hadoop-common-project/hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/util/ZKSignerSecretProvider.java b/hadoop-common-project/hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/util/ZKSignerSecretProvider.java
index a17b6d495d..6c0fbbb0a2 100644
--- a/hadoop-common-project/hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/util/ZKSignerSecretProvider.java
+++ b/hadoop-common-project/hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/util/ZKSignerSecretProvider.java
@@ -197,6 +197,8 @@ public void init(Properties config, ServletContext servletContext,
client = (CuratorFramework) curatorClientObj;
} else {
client = createCuratorClient(config);
+ servletContext.setAttribute(
+ ZOOKEEPER_SIGNER_SECRET_PROVIDER_CURATOR_CLIENT_ATTRIBUTE, client);
}
this.tokenValidity = tokenValidity;
shouldDisconnect = Boolean.parseBoolean(
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index 70579c34e2..2b07f8df65 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -537,6 +537,9 @@ Release 2.6.0 - UNRELEASED
HADOOP-10970. Cleanup KMS configuration keys. (wang)
+ HADOOP-11017. KMS delegation token secret manager should be able to use
+ zookeeper as store. (asuresh via tucu)
+
OPTIMIZATIONS
HADOOP-10838. Byte array native checksumming. (James Thomas via todd)
diff --git a/hadoop-common-project/hadoop-common/pom.xml b/hadoop-common-project/hadoop-common/pom.xml
index 0183e292c8..32e95258a1 100644
--- a/hadoop-common-project/hadoop-common/pom.xml
+++ b/hadoop-common-project/hadoop-common/pom.xml
@@ -218,6 +218,19 @@
com.jcraft
jsch
+
+ org.apache.curator
+ curator-test
+ test
+
+
+ org.apache.curator
+ curator-client
+
+
+ org.apache.curator
+ curator-recipes
+
com.google.code.findbugs
jsr305
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java
index b9e26b545c..f5e7bc9c0d 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java
@@ -127,7 +127,7 @@ public void startThreads() throws IOException {
public synchronized void reset() {
currentId = 0;
allKeys.clear();
- delegationTokenSequenceNumber = 0;
+ setDelegationTokenSeqNum(0);
currentTokens.clear();
}
@@ -141,7 +141,7 @@ public synchronized void addKey(DelegationKey key) throws IOException {
if (key.getKeyId() > currentId) {
currentId = key.getKeyId();
}
- allKeys.put(key.getKeyId(), key);
+ storeDelegationKey(key);
}
public synchronized DelegationKey[] getAllKeys() {
@@ -163,24 +163,108 @@ protected void storeNewMasterKey(DelegationKey key) throws IOException {
return;
}
+ // for ZK based secretManager
+ protected void updateMasterKey(DelegationKey key) throws IOException{
+ return;
+ }
+
// RM
protected void removeStoredMasterKey(DelegationKey key) {
return;
}
// RM
- protected void storeNewToken(TokenIdent ident, long renewDate) {
+ protected void storeNewToken(TokenIdent ident, long renewDate) throws IOException{
return;
}
+
// RM
protected void removeStoredToken(TokenIdent ident) throws IOException {
}
// RM
- protected void updateStoredToken(TokenIdent ident, long renewDate) {
+ protected void updateStoredToken(TokenIdent ident, long renewDate) throws IOException {
return;
}
+ /**
+ * For subclasses externalizing the storage, for example Zookeeper
+ * based implementations
+ */
+ protected int getDelegationTokenSeqNum() {
+ return delegationTokenSequenceNumber;
+ }
+
+ /**
+ * For subclasses externalizing the storage, for example Zookeeper
+ * based implementations
+ */
+ protected int incrementDelegationTokenSeqNum() {
+ return ++delegationTokenSequenceNumber;
+ }
+
+ /**
+ * For subclasses externalizing the storage, for example Zookeeper
+ * based implementations
+ */
+ protected void setDelegationTokenSeqNum(int seqNum) {
+ delegationTokenSequenceNumber = seqNum;
+ }
+
+ /**
+ * For subclasses externalizing the storage, for example Zookeeper
+ * based implementations
+ */
+ protected DelegationKey getDelegationKey(int keyId) {
+ return allKeys.get(keyId);
+ }
+
+ /**
+ * For subclasses externalizing the storage, for example Zookeeper
+ * based implementations
+ */
+ protected void storeDelegationKey(DelegationKey key) throws IOException {
+ allKeys.put(key.getKeyId(), key);
+ storeNewMasterKey(key);
+ }
+
+ /**
+ * For subclasses externalizing the storage, for example Zookeeper
+ * based implementations
+ */
+ protected void updateDelegationKey(DelegationKey key) throws IOException {
+ allKeys.put(key.getKeyId(), key);
+ updateMasterKey(key);
+ }
+
+ /**
+ * For subclasses externalizing the storage, for example Zookeeper
+ * based implementations
+ */
+ protected DelegationTokenInformation getTokenInfo(TokenIdent ident) {
+ return currentTokens.get(ident);
+ }
+
+ /**
+ * For subclasses externalizing the storage, for example Zookeeper
+ * based implementations
+ */
+ protected void storeToken(TokenIdent ident,
+ DelegationTokenInformation tokenInfo) throws IOException {
+ currentTokens.put(ident, tokenInfo);
+ storeNewToken(ident, tokenInfo.getRenewDate());
+ }
+
+ /**
+ * For subclasses externalizing the storage, for example Zookeeper
+ * based implementations
+ */
+ protected void updateToken(TokenIdent ident,
+ DelegationTokenInformation tokenInfo) throws IOException {
+ currentTokens.put(ident, tokenInfo);
+ updateStoredToken(ident, tokenInfo.getRenewDate());
+ }
+
/**
* This method is intended to be used for recovering persisted delegation
* tokens
@@ -196,17 +280,18 @@ public synchronized void addPersistedDelegationToken(
"Can't add persisted delegation token to a running SecretManager.");
}
int keyId = identifier.getMasterKeyId();
- DelegationKey dKey = allKeys.get(keyId);
+ DelegationKey dKey = getDelegationKey(keyId);
if (dKey == null) {
LOG.warn("No KEY found for persisted identifier " + identifier.toString());
return;
}
byte[] password = createPassword(identifier.getBytes(), dKey.getKey());
- if (identifier.getSequenceNumber() > this.delegationTokenSequenceNumber) {
- this.delegationTokenSequenceNumber = identifier.getSequenceNumber();
+ int delegationTokenSeqNum = getDelegationTokenSeqNum();
+ if (identifier.getSequenceNumber() > delegationTokenSeqNum) {
+ setDelegationTokenSeqNum(identifier.getSequenceNumber());
}
- if (currentTokens.get(identifier) == null) {
- currentTokens.put(identifier, new DelegationTokenInformation(renewDate,
+ if (getTokenInfo(identifier) == null) {
+ storeToken(identifier, new DelegationTokenInformation(renewDate,
password, getTrackingIdIfEnabled(identifier)));
} else {
throw new IOException("Same delegation token being added twice.");
@@ -234,7 +319,7 @@ private void updateCurrentKey() throws IOException {
synchronized (this) {
currentId = newKey.getKeyId();
currentKey = newKey;
- allKeys.put(currentKey.getKeyId(), currentKey);
+ storeDelegationKey(currentKey);
}
}
@@ -252,7 +337,7 @@ void rollMasterKey() throws IOException {
* updateMasterKey() isn't called at expected interval. Add it back to
* allKeys just in case.
*/
- allKeys.put(currentKey.getKeyId(), currentKey);
+ updateDelegationKey(currentKey);
}
updateCurrentKey();
}
@@ -276,19 +361,25 @@ private synchronized void removeExpiredKeys() {
protected synchronized byte[] createPassword(TokenIdent identifier) {
int sequenceNum;
long now = Time.now();
- sequenceNum = ++delegationTokenSequenceNumber;
+ sequenceNum = incrementDelegationTokenSeqNum();
identifier.setIssueDate(now);
identifier.setMaxDate(now + tokenMaxLifetime);
identifier.setMasterKeyId(currentId);
identifier.setSequenceNumber(sequenceNum);
LOG.info("Creating password for identifier: " + identifier);
byte[] password = createPassword(identifier.getBytes(), currentKey.getKey());
- storeNewToken(identifier, now + tokenRenewInterval);
- currentTokens.put(identifier, new DelegationTokenInformation(now
- + tokenRenewInterval, password, getTrackingIdIfEnabled(identifier)));
+ DelegationTokenInformation tokenInfo = new DelegationTokenInformation(now
+ + tokenRenewInterval, password, getTrackingIdIfEnabled(identifier));
+ try {
+ storeToken(identifier, tokenInfo);
+ } catch (IOException ioe) {
+ LOG.error("Could not store token !!", ioe);
+ }
return password;
}
+
+
/**
* Find the DelegationTokenInformation for the given token id, and verify that
* if the token is expired. Note that this method should be called with
@@ -297,7 +388,7 @@ protected synchronized byte[] createPassword(TokenIdent identifier) {
protected DelegationTokenInformation checkToken(TokenIdent identifier)
throws InvalidToken {
assert Thread.holdsLock(this);
- DelegationTokenInformation info = currentTokens.get(identifier);
+ DelegationTokenInformation info = getTokenInfo(identifier);
if (info == null) {
throw new InvalidToken("token (" + identifier.toString()
+ ") can't be found in cache");
@@ -322,7 +413,7 @@ protected String getTrackingIdIfEnabled(TokenIdent ident) {
}
public synchronized String getTokenTrackingId(TokenIdent identifier) {
- DelegationTokenInformation info = currentTokens.get(identifier);
+ DelegationTokenInformation info = getTokenInfo(identifier);
if (info == null) {
return null;
}
@@ -373,7 +464,7 @@ public synchronized long renewToken(Token token,
throw new AccessControlException(renewer +
" tries to renew a token with renewer " + id.getRenewer());
}
- DelegationKey key = allKeys.get(id.getMasterKeyId());
+ DelegationKey key = getDelegationKey(id.getMasterKeyId());
if (key == null) {
throw new InvalidToken("Unable to find master key for keyId="
+ id.getMasterKeyId()
@@ -390,11 +481,10 @@ public synchronized long renewToken(Token token,
DelegationTokenInformation info = new DelegationTokenInformation(renewTime,
password, trackingId);
- if (currentTokens.get(id) == null) {
+ if (getTokenInfo(id) == null) {
throw new InvalidToken("Renewal request for unknown token");
}
- currentTokens.put(id, info);
- updateStoredToken(id, renewTime);
+ updateToken(id, info);
return renewTime;
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java
new file mode 100644
index 0000000000..23c7144501
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java
@@ -0,0 +1,727 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.security.token.delegation;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import javax.security.auth.login.AppConfigurationEntry;
+
+import org.apache.curator.ensemble.fixed.FixedEnsembleProvider;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.CuratorFrameworkFactory.Builder;
+import org.apache.curator.framework.api.ACLProvider;
+import org.apache.curator.framework.imps.DefaultACLProvider;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.framework.recipes.shared.SharedCount;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs.Perms;
+import org.apache.zookeeper.client.ZooKeeperSaslClient;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Id;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+/**
+ * An implementation of {@link AbstractDelegationTokenSecretManager} that
+ * persists TokenIdentifiers and DelegationKeys in Zookeeper. This class can
+ * be used by HA (Highly available) services that consists of multiple nodes.
+ * This class ensures that Identifiers and Keys are replicated to all nodes of
+ * the service.
+ */
+@InterfaceAudience.Private
+public abstract class ZKDelegationTokenSecretManager
+ extends AbstractDelegationTokenSecretManager {
+
+ private static final String ZK_CONF_PREFIX = "zk-dt-secret-manager.";
+ public static final String ZK_DTSM_ZK_NUM_RETRIES = ZK_CONF_PREFIX
+ + "zkNumRetries";
+ public static final String ZK_DTSM_ZK_SESSION_TIMEOUT = ZK_CONF_PREFIX
+ + "zkSessionTimeout";
+ public static final String ZK_DTSM_ZK_CONNECTION_TIMEOUT = ZK_CONF_PREFIX
+ + "zkConnectionTimeout";
+ public static final String ZK_DTSM_ZNODE_WORKING_PATH = ZK_CONF_PREFIX
+ + "znodeWorkingPath";
+ public static final String ZK_DTSM_ZK_AUTH_TYPE = ZK_CONF_PREFIX
+ + "zkAuthType";
+ public static final String ZK_DTSM_ZK_CONNECTION_STRING = ZK_CONF_PREFIX
+ + "zkConnectionString";
+ public static final String ZK_DTSM_ZK_KERBEROS_KEYTAB = ZK_CONF_PREFIX
+ + "kerberos.keytab";
+ public static final String ZK_DTSM_ZK_KERBEROS_PRINCIPAL = ZK_CONF_PREFIX
+ + "kerberos.principal";
+
+ public static final int ZK_DTSM_ZK_NUM_RETRIES_DEFAULT = 3;
+ public static final int ZK_DTSM_ZK_SESSION_TIMEOUT_DEFAULT = 10000;
+ public static final int ZK_DTSM_ZK_CONNECTION_TIMEOUT_DEFAULT = 10000;
+ public static final String ZK_DTSM_ZNODE_WORKING_PATH_DEAFULT = "zkdtsm";
+
+ private static Logger LOG = LoggerFactory
+ .getLogger(ZKDelegationTokenSecretManager.class);
+
+ private static final String JAAS_LOGIN_ENTRY_NAME =
+ "ZKDelegationTokenSecretManagerClient";
+
+ private static final String ZK_DTSM_NAMESPACE = "ZKDTSMRoot";
+ private static final String ZK_DTSM_SEQNUM_ROOT = "ZKDTSMSeqNumRoot";
+ private static final String ZK_DTSM_TOKENS_ROOT = "ZKDTSMTokensRoot";
+ private static final String ZK_DTSM_MASTER_KEY_ROOT = "ZKDTSMMasterKeyRoot";
+
+ private static final String DELEGATION_KEY_PREFIX = "DK_";
+ private static final String DELEGATION_TOKEN_PREFIX = "DT_";
+
+ private static final ThreadLocal CURATOR_TL =
+ new ThreadLocal();
+
+ public static void setCurator(CuratorFramework curator) {
+ CURATOR_TL.set(curator);
+ }
+
+ private final boolean isExternalClient;
+ private final CuratorFramework zkClient;
+ private SharedCount seqCounter;
+ private PathChildrenCache keyCache;
+ private PathChildrenCache tokenCache;
+ private ExecutorService listenerThreadPool;
+
+ public ZKDelegationTokenSecretManager(Configuration conf) {
+ super(conf.getLong(DelegationTokenManager.UPDATE_INTERVAL,
+ DelegationTokenManager.UPDATE_INTERVAL_DEFAULT) * 1000,
+ conf.getLong(DelegationTokenManager.MAX_LIFETIME,
+ DelegationTokenManager.MAX_LIFETIME_DEFAULT) * 1000,
+ conf.getLong(DelegationTokenManager.RENEW_INTERVAL,
+ DelegationTokenManager.RENEW_INTERVAL_DEFAULT * 1000),
+ conf.getLong(DelegationTokenManager.REMOVAL_SCAN_INTERVAL,
+ DelegationTokenManager.REMOVAL_SCAN_INTERVAL_DEFAULT) * 1000);
+ if (CURATOR_TL.get() != null) {
+ zkClient = CURATOR_TL.get();
+ isExternalClient = true;
+ } else {
+ String connString = conf.get(ZK_DTSM_ZK_CONNECTION_STRING);
+ Preconditions.checkNotNull(connString,
+ "Zookeeper connection string cannot be null");
+ String authType = conf.get(ZK_DTSM_ZK_AUTH_TYPE);
+
+ // AuthType has to be explicitly set to 'none' or 'sasl'
+ Preconditions.checkNotNull(authType, "Zookeeper authType cannot be null !!");
+ Preconditions.checkArgument(
+ authType.equals("sasl") || authType.equals("none"),
+ "Zookeeper authType must be one of [none, sasl]");
+
+ Builder builder = null;
+ try {
+ ACLProvider aclProvider = null;
+ if (authType.equals("sasl")) {
+ LOG.info("Connecting to ZooKeeper with SASL/Kerberos"
+ + "and using 'sasl' ACLs");
+ String principal = setJaasConfiguration(conf);
+ System.setProperty(ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY,
+ JAAS_LOGIN_ENTRY_NAME);
+ System.setProperty("zookeeper.authProvider.1",
+ "org.apache.zookeeper.server.auth.SASLAuthenticationProvider");
+ aclProvider = new SASLOwnerACLProvider(principal);
+ } else { // "none"
+ LOG.info("Connecting to ZooKeeper without authentication");
+ aclProvider = new DefaultACLProvider(); // open to everyone
+ }
+ int sessionT =
+ conf.getInt(ZK_DTSM_ZK_SESSION_TIMEOUT,
+ ZK_DTSM_ZK_SESSION_TIMEOUT_DEFAULT);
+ int numRetries =
+ conf.getInt(ZK_DTSM_ZK_NUM_RETRIES, ZK_DTSM_ZK_NUM_RETRIES_DEFAULT);
+ builder =
+ CuratorFrameworkFactory
+ .builder()
+ .aclProvider(aclProvider)
+ .namespace(
+ conf.get(ZK_DTSM_ZNODE_WORKING_PATH,
+ ZK_DTSM_ZNODE_WORKING_PATH_DEAFULT)
+ + "/"
+ + ZK_DTSM_NAMESPACE
+ )
+ .sessionTimeoutMs(sessionT)
+ .connectionTimeoutMs(
+ conf.getInt(ZK_DTSM_ZK_CONNECTION_TIMEOUT,
+ ZK_DTSM_ZK_CONNECTION_TIMEOUT_DEFAULT)
+ )
+ .retryPolicy(
+ new RetryNTimes(numRetries, sessionT / numRetries));
+ } catch (Exception ex) {
+ throw new RuntimeException("Could not Load ZK acls or auth");
+ }
+ zkClient = builder.ensembleProvider(new FixedEnsembleProvider(connString))
+ .build();
+ isExternalClient = false;
+ }
+ listenerThreadPool = Executors.newFixedThreadPool(2);
+ }
+
+ private String setJaasConfiguration(Configuration config) throws Exception {
+ String keytabFile =
+ config.get(ZK_DTSM_ZK_KERBEROS_KEYTAB, "").trim();
+ if (keytabFile == null || keytabFile.length() == 0) {
+ throw new IllegalArgumentException(ZK_DTSM_ZK_KERBEROS_KEYTAB
+ + " must be specified");
+ }
+ String principal =
+ config.get(ZK_DTSM_ZK_KERBEROS_PRINCIPAL, "").trim();
+ if (principal == null || principal.length() == 0) {
+ throw new IllegalArgumentException(ZK_DTSM_ZK_KERBEROS_PRINCIPAL
+ + " must be specified");
+ }
+
+ JaasConfiguration jConf =
+ new JaasConfiguration(JAAS_LOGIN_ENTRY_NAME, principal, keytabFile);
+ javax.security.auth.login.Configuration.setConfiguration(jConf);
+ return principal.split("[/@]")[0];
+ }
+
+ /**
+ * Creates a programmatic version of a jaas.conf file. This can be used
+ * instead of writing a jaas.conf file and setting the system property,
+ * "java.security.auth.login.config", to point to that file. It is meant to be
+ * used for connecting to ZooKeeper.
+ */
+ @InterfaceAudience.Private
+ public static class JaasConfiguration extends
+ javax.security.auth.login.Configuration {
+
+ private static AppConfigurationEntry[] entry;
+ private String entryName;
+
+ /**
+ * Add an entry to the jaas configuration with the passed in name,
+ * principal, and keytab. The other necessary options will be set for you.
+ *
+ * @param entryName
+ * The name of the entry (e.g. "Client")
+ * @param principal
+ * The principal of the user
+ * @param keytab
+ * The location of the keytab
+ */
+ public JaasConfiguration(String entryName, String principal, String keytab) {
+ this.entryName = entryName;
+ Map options = new HashMap();
+ options.put("keyTab", keytab);
+ options.put("principal", principal);
+ options.put("useKeyTab", "true");
+ options.put("storeKey", "true");
+ options.put("useTicketCache", "false");
+ options.put("refreshKrb5Config", "true");
+ String jaasEnvVar = System.getenv("HADOOP_JAAS_DEBUG");
+ if (jaasEnvVar != null && "true".equalsIgnoreCase(jaasEnvVar)) {
+ options.put("debug", "true");
+ }
+ entry = new AppConfigurationEntry[] {
+ new AppConfigurationEntry(getKrb5LoginModuleName(),
+ AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
+ options) };
+ }
+
+ @Override
+ public AppConfigurationEntry[] getAppConfigurationEntry(String name) {
+ return (entryName.equals(name)) ? entry : null;
+ }
+
+ private String getKrb5LoginModuleName() {
+ String krb5LoginModuleName;
+ if (System.getProperty("java.vendor").contains("IBM")) {
+ krb5LoginModuleName = "com.ibm.security.auth.module.Krb5LoginModule";
+ } else {
+ krb5LoginModuleName = "com.sun.security.auth.module.Krb5LoginModule";
+ }
+ return krb5LoginModuleName;
+ }
+ }
+
+ @Override
+ public void startThreads() throws IOException {
+ if (!isExternalClient) {
+ try {
+ zkClient.start();
+ } catch (Exception e) {
+ throw new IOException("Could not start Curator Framework", e);
+ }
+ }
+ try {
+ seqCounter = new SharedCount(zkClient, ZK_DTSM_SEQNUM_ROOT, 0);
+ if (seqCounter != null) {
+ seqCounter.start();
+ }
+ } catch (Exception e) {
+ throw new IOException("Could not start Sequence Counter", e);
+ }
+ try {
+ createPersistentNode(ZK_DTSM_MASTER_KEY_ROOT);
+ createPersistentNode(ZK_DTSM_TOKENS_ROOT);
+ } catch (Exception e) {
+ throw new RuntimeException("Could not create ZK paths");
+ }
+ try {
+ keyCache = new PathChildrenCache(zkClient, ZK_DTSM_MASTER_KEY_ROOT, true);
+ if (keyCache != null) {
+ keyCache.start(StartMode.POST_INITIALIZED_EVENT);
+ keyCache.getListenable().addListener(new PathChildrenCacheListener() {
+ @Override
+ public void childEvent(CuratorFramework client,
+ PathChildrenCacheEvent event)
+ throws Exception {
+ switch (event.getType()) {
+ case CHILD_ADDED:
+ processKeyAddOrUpdate(event.getData().getData());
+ break;
+ case CHILD_UPDATED:
+ processKeyAddOrUpdate(event.getData().getData());
+ break;
+ case CHILD_REMOVED:
+ processKeyRemoved(event.getData().getPath());
+ break;
+ default:
+ break;
+ }
+ }
+ }, listenerThreadPool);
+ }
+ } catch (Exception e) {
+ throw new IOException("Could not start PathChildrenCache for keys", e);
+ }
+ try {
+ tokenCache = new PathChildrenCache(zkClient, ZK_DTSM_TOKENS_ROOT, true);
+ if (tokenCache != null) {
+ tokenCache.start(StartMode.POST_INITIALIZED_EVENT);
+ tokenCache.getListenable().addListener(new PathChildrenCacheListener() {
+
+ @Override
+ public void childEvent(CuratorFramework client,
+ PathChildrenCacheEvent event) throws Exception {
+ switch (event.getType()) {
+ case CHILD_ADDED:
+ processTokenAddOrUpdate(event.getData().getData());
+ break;
+ case CHILD_UPDATED:
+ processTokenAddOrUpdate(event.getData().getData());
+ break;
+ case CHILD_REMOVED:
+ processTokenRemoved(event.getData().getData());
+ break;
+ default:
+ break;
+ }
+ }
+ }, listenerThreadPool);
+ }
+ } catch (Exception e) {
+ throw new IOException("Could not start PathChildrenCache for tokens", e);
+ }
+ super.startThreads();
+ }
+
+ private void processKeyAddOrUpdate(byte[] data) throws IOException {
+ ByteArrayInputStream bin = new ByteArrayInputStream(data);
+ DataInputStream din = new DataInputStream(bin);
+ DelegationKey key = new DelegationKey();
+ key.readFields(din);
+ allKeys.put(key.getKeyId(), key);
+ }
+
+ private void processKeyRemoved(String path) {
+ int i = path.lastIndexOf('/');
+ if (i > 0) {
+ String tokSeg = path.substring(i + 1);
+ int j = tokSeg.indexOf('_');
+ if (j > 0) {
+ int keyId = Integer.parseInt(tokSeg.substring(j + 1));
+ allKeys.remove(keyId);
+ }
+ }
+ }
+
+ private void processTokenAddOrUpdate(byte[] data) throws IOException {
+ ByteArrayInputStream bin = new ByteArrayInputStream(data);
+ DataInputStream din = new DataInputStream(bin);
+ TokenIdent ident = createIdentifier();
+ ident.readFields(din);
+ long renewDate = din.readLong();
+ int pwdLen = din.readInt();
+ byte[] password = new byte[pwdLen];
+ int numRead = din.read(password, 0, pwdLen);
+ if (numRead > -1) {
+ DelegationTokenInformation tokenInfo =
+ new DelegationTokenInformation(renewDate, password);
+ currentTokens.put(ident, tokenInfo);
+ }
+ }
+
+ private void processTokenRemoved(byte[] data) throws IOException {
+ ByteArrayInputStream bin = new ByteArrayInputStream(data);
+ DataInputStream din = new DataInputStream(bin);
+ TokenIdent ident = createIdentifier();
+ ident.readFields(din);
+ currentTokens.remove(ident);
+ }
+
+ @Override
+ public void stopThreads() {
+ try {
+ if (!isExternalClient && (zkClient != null)) {
+ zkClient.close();
+ }
+ if (seqCounter != null) {
+ seqCounter.close();
+ }
+ if (keyCache != null) {
+ keyCache.close();
+ }
+ if (tokenCache != null) {
+ tokenCache.close();
+ }
+ } catch (Exception e) {
+ LOG.error("Could not stop Curator Framework", e);
+ // Ignore
+ }
+ super.stopThreads();
+ }
+
+ private void createPersistentNode(String nodePath) throws Exception {
+ try {
+ zkClient.create().withMode(CreateMode.PERSISTENT).forPath(nodePath);
+ } catch (KeeperException.NodeExistsException ne) {
+ LOG.debug(nodePath + " znode already exists !!");
+ } catch (Exception e) {
+ throw new IOException(nodePath + " znode could not be created !!", e);
+ }
+ }
+
+ @Override
+ protected int getDelegationTokenSeqNum() {
+ return seqCounter.getCount();
+ }
+
+ @Override
+ protected int incrementDelegationTokenSeqNum() {
+ try {
+ while (!seqCounter.trySetCount(seqCounter.getCount() + 1)) {
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Could not increment shared counter !!", e);
+ }
+ return seqCounter.getCount();
+ }
+
+ @Override
+ protected void setDelegationTokenSeqNum(int seqNum) {
+ delegationTokenSequenceNumber = seqNum;
+ }
+
+ @Override
+ protected DelegationKey getDelegationKey(int keyId) {
+ // First check if its I already have this key
+ DelegationKey key = allKeys.get(keyId);
+ // Then query ZK
+ if (key == null) {
+ try {
+ key = getKeyFromZK(keyId);
+ if (key != null) {
+ allKeys.put(keyId, key);
+ }
+ } catch (IOException e) {
+ LOG.error("Error retrieving key [" + keyId + "] from ZK", e);
+ }
+ }
+ return key;
+ }
+
+ private DelegationKey getKeyFromZK(int keyId) throws IOException {
+ String nodePath =
+ getNodePath(ZK_DTSM_MASTER_KEY_ROOT, DELEGATION_KEY_PREFIX + keyId);
+ try {
+ byte[] data = zkClient.getData().forPath(nodePath);
+ if ((data == null) || (data.length == 0)) {
+ return null;
+ }
+ ByteArrayInputStream bin = new ByteArrayInputStream(data);
+ DataInputStream din = new DataInputStream(bin);
+ DelegationKey key = new DelegationKey();
+ key.readFields(din);
+ return key;
+ } catch (KeeperException.NoNodeException e) {
+ LOG.error("No node in path [" + nodePath + "]");
+ } catch (Exception ex) {
+ throw new IOException(ex);
+ }
+ return null;
+ }
+
+ @Override
+ protected DelegationTokenInformation getTokenInfo(TokenIdent ident) {
+ // First check if I have this..
+ DelegationTokenInformation tokenInfo = currentTokens.get(ident);
+ // Then query ZK
+ if (tokenInfo == null) {
+ try {
+ tokenInfo = getTokenInfoFromZK(ident);
+ if (tokenInfo != null) {
+ currentTokens.put(ident, tokenInfo);
+ }
+ } catch (IOException e) {
+ LOG.error("Error retrieving tokenInfo [" + ident.getSequenceNumber()
+ + "] from ZK", e);
+ }
+ }
+ return tokenInfo;
+ }
+
+ private DelegationTokenInformation getTokenInfoFromZK(TokenIdent ident)
+ throws IOException {
+ String nodePath =
+ getNodePath(ZK_DTSM_TOKENS_ROOT,
+ DELEGATION_TOKEN_PREFIX + ident.getSequenceNumber());
+ try {
+ byte[] data = zkClient.getData().forPath(nodePath);
+ if ((data == null) || (data.length == 0)) {
+ return null;
+ }
+ ByteArrayInputStream bin = new ByteArrayInputStream(data);
+ DataInputStream din = new DataInputStream(bin);
+ createIdentifier().readFields(din);
+ long renewDate = din.readLong();
+ int pwdLen = din.readInt();
+ byte[] password = new byte[pwdLen];
+ int numRead = din.read(password, 0, pwdLen);
+ if (numRead > -1) {
+ DelegationTokenInformation tokenInfo =
+ new DelegationTokenInformation(renewDate, password);
+ return tokenInfo;
+ }
+ } catch (KeeperException.NoNodeException e) {
+ LOG.error("No node in path [" + nodePath + "]");
+ } catch (Exception ex) {
+ throw new IOException(ex);
+ }
+ return null;
+ }
+
+ @Override
+ protected void storeDelegationKey(DelegationKey key) throws IOException {
+ allKeys.put(key.getKeyId(), key);
+ addOrUpdateDelegationKey(key, false);
+ }
+
+ @Override
+ protected void updateDelegationKey(DelegationKey key) throws IOException {
+ allKeys.put(key.getKeyId(), key);
+ addOrUpdateDelegationKey(key, true);
+ }
+
+ private void addOrUpdateDelegationKey(DelegationKey key, boolean isUpdate)
+ throws IOException {
+ String nodeCreatePath =
+ getNodePath(ZK_DTSM_MASTER_KEY_ROOT,
+ DELEGATION_KEY_PREFIX + key.getKeyId());
+ ByteArrayOutputStream os = new ByteArrayOutputStream();
+ DataOutputStream fsOut = new DataOutputStream(os);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Storing ZKDTSMDelegationKey_" + key.getKeyId());
+ }
+ key.write(fsOut);
+ try {
+ if (zkClient.checkExists().forPath(nodeCreatePath) != null) {
+ zkClient.setData().forPath(nodeCreatePath, os.toByteArray())
+ .setVersion(-1);
+ if (!isUpdate) {
+ LOG.debug("Key with path [" + nodeCreatePath
+ + "] already exists.. Updating !!");
+ }
+ } else {
+ zkClient.create().withMode(CreateMode.PERSISTENT)
+ .forPath(nodeCreatePath, os.toByteArray());
+ if (isUpdate) {
+ LOG.debug("Updating non existent Key path [" + nodeCreatePath
+ + "].. Adding new !!");
+ }
+ }
+ } catch (KeeperException.NodeExistsException ne) {
+ LOG.debug(nodeCreatePath + " znode already exists !!");
+ } catch (Exception ex) {
+ throw new IOException(ex);
+ } finally {
+ os.close();
+ }
+ }
+
+ @Override
+ protected void removeStoredMasterKey(DelegationKey key) {
+ String nodeRemovePath =
+ getNodePath(ZK_DTSM_MASTER_KEY_ROOT,
+ DELEGATION_KEY_PREFIX + key.getKeyId());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Removing ZKDTSMDelegationKey_" + key.getKeyId());
+ }
+ try {
+ if (zkClient.checkExists().forPath(nodeRemovePath) != null) {
+ zkClient.delete().forPath(nodeRemovePath);
+ } else {
+ LOG.debug("Attempted to delete a non-existing znode " + nodeRemovePath);
+ }
+ } catch (Exception e) {
+ LOG.debug(nodeRemovePath + " znode could not be removed!!");
+ }
+ }
+
+ @Override
+ protected void storeToken(TokenIdent ident,
+ DelegationTokenInformation tokenInfo) throws IOException {
+ currentTokens.put(ident, tokenInfo);
+ try {
+ addOrUpdateToken(ident, tokenInfo, false);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ protected void updateToken(TokenIdent ident,
+ DelegationTokenInformation tokenInfo) throws IOException {
+ currentTokens.put(ident, tokenInfo);
+ String nodeRemovePath =
+ getNodePath(ZK_DTSM_TOKENS_ROOT, DELEGATION_TOKEN_PREFIX
+ + ident.getSequenceNumber());
+ try {
+ if (zkClient.checkExists().forPath(nodeRemovePath) != null) {
+ addOrUpdateToken(ident, tokenInfo, false);
+ LOG.debug("Attempted to update a non-existing znode " + nodeRemovePath);
+ } else {
+ addOrUpdateToken(ident, tokenInfo, true);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Could not update Stored Token ZKDTSMDelegationToken_"
+ + ident.getSequenceNumber(), e);
+ }
+ }
+
+ @Override
+ protected void removeStoredToken(TokenIdent ident)
+ throws IOException {
+ String nodeRemovePath =
+ getNodePath(ZK_DTSM_TOKENS_ROOT, DELEGATION_TOKEN_PREFIX
+ + ident.getSequenceNumber());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Removing ZKDTSMDelegationToken_"
+ + ident.getSequenceNumber());
+ }
+ try {
+ if (zkClient.checkExists().forPath(nodeRemovePath) != null) {
+ LOG.debug("Attempted to remove a non-existing znode " + nodeRemovePath);
+ } else {
+ zkClient.delete().forPath(nodeRemovePath);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Could not remove Stored Token ZKDTSMDelegationToken_"
+ + ident.getSequenceNumber(), e);
+ }
+ }
+
+ private void addOrUpdateToken(TokenIdent ident,
+ DelegationTokenInformation info, boolean isUpdate) throws Exception {
+ String nodeCreatePath =
+ getNodePath(ZK_DTSM_TOKENS_ROOT, DELEGATION_TOKEN_PREFIX
+ + ident.getSequenceNumber());
+ ByteArrayOutputStream tokenOs = new ByteArrayOutputStream();
+ DataOutputStream tokenOut = new DataOutputStream(tokenOs);
+ ByteArrayOutputStream seqOs = new ByteArrayOutputStream();
+
+ try {
+ ident.write(tokenOut);
+ tokenOut.writeLong(info.getRenewDate());
+ tokenOut.writeInt(info.getPassword().length);
+ tokenOut.write(info.getPassword());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug((isUpdate ? "Storing " : "Updating ")
+ + "ZKDTSMDelegationToken_" +
+ ident.getSequenceNumber());
+ }
+ if (isUpdate) {
+ zkClient.setData().forPath(nodeCreatePath, tokenOs.toByteArray())
+ .setVersion(-1);
+ } else {
+ zkClient.create().withMode(CreateMode.PERSISTENT)
+ .forPath(nodeCreatePath, tokenOs.toByteArray());
+ }
+ } finally {
+ seqOs.close();
+ }
+ }
+
+ /**
+ * Simple implementation of an {@link ACLProvider} that simply returns an ACL
+ * that gives all permissions only to a single principal.
+ */
+ private static class SASLOwnerACLProvider implements ACLProvider {
+
+ private final List saslACL;
+
+ private SASLOwnerACLProvider(String principal) {
+ this.saslACL = Collections.singletonList(
+ new ACL(Perms.ALL, new Id("sasl", principal)));
+ }
+
+ @Override
+ public List getDefaultAcl() {
+ return saslACL;
+ }
+
+ @Override
+ public List getAclForPath(String path) {
+ return saslACL;
+ }
+ }
+
+ @VisibleForTesting
+ @Private
+ @Unstable
+ static String getNodePath(String root, String nodeName) {
+ return (root + "/" + nodeName);
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenAuthenticationFilter.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenAuthenticationFilter.java
index 64a562254b..aa9ec9948d 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenAuthenticationFilter.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenAuthenticationFilter.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.security.token.delegation.web;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.curator.framework.CuratorFramework;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
@@ -28,9 +29,11 @@
import org.apache.hadoop.security.authentication.server.AuthenticationToken;
import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler;
import org.apache.hadoop.security.authentication.server.PseudoAuthenticationHandler;
+import org.apache.hadoop.security.authentication.util.ZKSignerSecretProvider;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
+import org.apache.hadoop.security.token.delegation.ZKDelegationTokenSecretManager;
import org.apache.hadoop.util.HttpExceptionUtils;
import org.apache.http.NameValuePair;
import org.apache.http.client.utils.URLEncodedUtils;
@@ -153,7 +156,14 @@ protected Configuration getProxyuserConfiguration(FilterConfig filterConfig)
@Override
public void init(FilterConfig filterConfig) throws ServletException {
+ // A single CuratorFramework should be used for a ZK cluster.
+ // If the ZKSignerSecretProvider has already created it, it has to
+ // be set here... to be used by the ZKDelegationTokenSecretManager
+ ZKDelegationTokenSecretManager.setCurator((CuratorFramework)
+ filterConfig.getServletContext().getAttribute(ZKSignerSecretProvider.
+ ZOOKEEPER_SIGNER_SECRET_PROVIDER_CURATOR_CLIENT_ATTRIBUTE));
super.init(filterConfig);
+ ZKDelegationTokenSecretManager.setCurator(null);
AuthenticationHandler handler = getAuthenticationHandler();
AbstractDelegationTokenSecretManager dtSecretManager =
(AbstractDelegationTokenSecretManager) filterConfig.getServletContext().
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenAuthenticationHandler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenAuthenticationHandler.java
index f41f892caa..5a31d6dc29 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenAuthenticationHandler.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenAuthenticationHandler.java
@@ -78,19 +78,6 @@ public abstract class DelegationTokenAuthenticationHandler
public static final String TOKEN_KIND = PREFIX + "token-kind";
- public static final String UPDATE_INTERVAL = PREFIX + "update-interval.sec";
- public static final long UPDATE_INTERVAL_DEFAULT = 24 * 60 * 60;
-
- public static final String MAX_LIFETIME = PREFIX + "max-lifetime.sec";
- public static final long MAX_LIFETIME_DEFAULT = 7 * 24 * 60 * 60;
-
- public static final String RENEW_INTERVAL = PREFIX + "renew-interval.sec";
- public static final long RENEW_INTERVAL_DEFAULT = 24 * 60 * 60;
-
- public static final String REMOVAL_SCAN_INTERVAL = PREFIX +
- "removal-scan-interval.sec";
- public static final long REMOVAL_SCAN_INTERVAL_DEFAULT = 60 * 60;
-
private static final Set DELEGATION_TOKEN_OPS = new HashSet();
static final String DELEGATION_TOKEN_UGI_ATTRIBUTE =
@@ -142,7 +129,6 @@ public void setExternalDelegationTokenSecretManager(
@VisibleForTesting
@SuppressWarnings("unchecked")
public void initTokenManager(Properties config) {
- String configPrefix = authHandler.getType() + ".";
Configuration conf = new Configuration(false);
for (Map.Entry entry : config.entrySet()) {
conf.set((String) entry.getKey(), (String) entry.getValue());
@@ -153,17 +139,7 @@ public void initTokenManager(Properties config) {
"The configuration does not define the token kind");
}
tokenKind = tokenKind.trim();
- long updateInterval = conf.getLong(configPrefix + UPDATE_INTERVAL,
- UPDATE_INTERVAL_DEFAULT);
- long maxLifeTime = conf.getLong(configPrefix + MAX_LIFETIME,
- MAX_LIFETIME_DEFAULT);
- long renewInterval = conf.getLong(configPrefix + RENEW_INTERVAL,
- RENEW_INTERVAL_DEFAULT);
- long removalScanInterval = conf.getLong(
- configPrefix + REMOVAL_SCAN_INTERVAL, REMOVAL_SCAN_INTERVAL_DEFAULT);
- tokenManager = new DelegationTokenManager(new Text(tokenKind),
- updateInterval * 1000, maxLifeTime * 1000, renewInterval * 1000,
- removalScanInterval * 1000);
+ tokenManager = new DelegationTokenManager(conf, new Text(tokenKind));
tokenManager.init();
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenManager.java
index 2e6b46e413..dbde0a29f0 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenManager.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenManager.java
@@ -17,16 +17,20 @@
*/
package org.apache.hadoop.security.token.delegation.web;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
+import org.apache.hadoop.security.token.delegation.ZKDelegationTokenSecretManager;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
+import com.google.common.annotations.VisibleForTesting;
/**
* Delegation Token Manager used by the
@@ -35,20 +39,36 @@
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
-class DelegationTokenManager {
+public class DelegationTokenManager {
+
+ public static final String ENABLE_ZK_KEY = "zk-dt-secret-manager.enable";
+
+ public static final String PREFIX = "delegation-token.";
+
+ public static final String UPDATE_INTERVAL = PREFIX + "update-interval.sec";
+ public static final long UPDATE_INTERVAL_DEFAULT = 24 * 60 * 60;
+
+ public static final String MAX_LIFETIME = PREFIX + "max-lifetime.sec";
+ public static final long MAX_LIFETIME_DEFAULT = 7 * 24 * 60 * 60;
+
+ public static final String RENEW_INTERVAL = PREFIX + "renew-interval.sec";
+ public static final long RENEW_INTERVAL_DEFAULT = 24 * 60 * 60;
+
+ public static final String REMOVAL_SCAN_INTERVAL = PREFIX +
+ "removal-scan-interval.sec";
+ public static final long REMOVAL_SCAN_INTERVAL_DEFAULT = 60 * 60;
private static class DelegationTokenSecretManager
extends AbstractDelegationTokenSecretManager {
private Text tokenKind;
- public DelegationTokenSecretManager(Text tokenKind,
- long delegationKeyUpdateInterval,
- long delegationTokenMaxLifetime,
- long delegationTokenRenewInterval,
- long delegationTokenRemoverScanInterval) {
- super(delegationKeyUpdateInterval, delegationTokenMaxLifetime,
- delegationTokenRenewInterval, delegationTokenRemoverScanInterval);
+ public DelegationTokenSecretManager(Configuration conf, Text tokenKind) {
+ super(conf.getLong(UPDATE_INTERVAL, UPDATE_INTERVAL_DEFAULT) * 1000,
+ conf.getLong(MAX_LIFETIME, MAX_LIFETIME_DEFAULT) * 1000,
+ conf.getLong(RENEW_INTERVAL, RENEW_INTERVAL_DEFAULT) * 1000,
+ conf.getLong(REMOVAL_SCAN_INTERVAL,
+ REMOVAL_SCAN_INTERVAL_DEFAULT * 1000));
this.tokenKind = tokenKind;
}
@@ -56,21 +76,34 @@ public DelegationTokenSecretManager(Text tokenKind,
public DelegationTokenIdentifier createIdentifier() {
return new DelegationTokenIdentifier(tokenKind);
}
+ }
+ private static class ZKSecretManager
+ extends ZKDelegationTokenSecretManager {
+
+ private Text tokenKind;
+
+ public ZKSecretManager(Configuration conf, Text tokenKind) {
+ super(conf);
+ this.tokenKind = tokenKind;
+ }
+
+ @Override
+ public DelegationTokenIdentifier createIdentifier() {
+ return new DelegationTokenIdentifier(tokenKind);
+ }
}
private AbstractDelegationTokenSecretManager secretManager = null;
private boolean managedSecretManager;
private Text tokenKind;
- public DelegationTokenManager(Text tokenKind,
- long delegationKeyUpdateInterval,
- long delegationTokenMaxLifetime,
- long delegationTokenRenewInterval,
- long delegationTokenRemoverScanInterval) {
- this.secretManager = new DelegationTokenSecretManager(tokenKind,
- delegationKeyUpdateInterval, delegationTokenMaxLifetime,
- delegationTokenRenewInterval, delegationTokenRemoverScanInterval);
+ public DelegationTokenManager(Configuration conf, Text tokenKind) {
+ if (conf.getBoolean(ENABLE_ZK_KEY, false)) {
+ this.secretManager = new ZKSecretManager(conf, tokenKind);
+ } else {
+ this.secretManager = new DelegationTokenSecretManager(conf, tokenKind);
+ }
this.tokenKind = tokenKind;
managedSecretManager = true;
}
@@ -150,4 +183,9 @@ public UserGroupInformation verifyToken(Token
return id.getUser();
}
+ @VisibleForTesting
+ @SuppressWarnings("rawtypes")
+ public AbstractDelegationTokenSecretManager getDelegationTokenSecretManager() {
+ return secretManager;
+ }
}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestDelegationToken.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestDelegationToken.java
index 4f83a57f3f..239b8414eb 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestDelegationToken.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestDelegationToken.java
@@ -121,7 +121,7 @@ protected void removeStoredMasterKey(DelegationKey key) {
@Override
protected void storeNewToken(TestDelegationTokenIdentifier ident,
- long renewDate) {
+ long renewDate) throws IOException {
super.storeNewToken(ident, renewDate);
isStoreNewTokenCalled = true;
}
@@ -135,7 +135,7 @@ protected void removeStoredToken(TestDelegationTokenIdentifier ident)
@Override
protected void updateStoredToken(TestDelegationTokenIdentifier ident,
- long renewDate) {
+ long renewDate) throws IOException {
super.updateStoredToken(ident, renewDate);
isUpdateStoredTokenCalled = true;
}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java
new file mode 100644
index 0000000000..076c87ae68
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.security.token.delegation;
+
+import org.apache.curator.test.TestingServer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
+import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestZKDelegationTokenSecretManager {
+
+ private static final long DAY_IN_SECS = 86400;
+
+ @Test
+ public void testZKDelTokSecretManager() throws Exception {
+ TestingServer zkServer = new TestingServer();
+ DelegationTokenManager tm1, tm2 = null;
+ zkServer.start();
+ try {
+ String connectString = zkServer.getConnectString();
+ Configuration conf = new Configuration();
+ conf.setBoolean(DelegationTokenManager.ENABLE_ZK_KEY, true);
+ conf.set(ZKDelegationTokenSecretManager.ZK_DTSM_ZK_CONNECTION_STRING, connectString);
+ conf.set(ZKDelegationTokenSecretManager.ZK_DTSM_ZNODE_WORKING_PATH, "testPath");
+ conf.set(ZKDelegationTokenSecretManager.ZK_DTSM_ZK_AUTH_TYPE, "none");
+ conf.setLong(DelegationTokenManager.UPDATE_INTERVAL, DAY_IN_SECS);
+ conf.setLong(DelegationTokenManager.MAX_LIFETIME, DAY_IN_SECS);
+ conf.setLong(DelegationTokenManager.RENEW_INTERVAL, DAY_IN_SECS);
+ conf.setLong(DelegationTokenManager.REMOVAL_SCAN_INTERVAL, DAY_IN_SECS);
+ tm1 = new DelegationTokenManager(conf, new Text("foo"));
+ tm1.init();
+ tm2 = new DelegationTokenManager(conf, new Text("foo"));
+ tm2.init();
+
+ Token token =
+ tm1.createToken(UserGroupInformation.getCurrentUser(), "foo");
+ Assert.assertNotNull(token);
+ tm2.verifyToken(token);
+
+ token = tm2.createToken(UserGroupInformation.getCurrentUser(), "bar");
+ Assert.assertNotNull(token);
+ tm1.verifyToken(token);
+ } finally {
+ zkServer.close();
+ }
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/web/TestDelegationTokenManager.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/web/TestDelegationTokenManager.java
index 4a0e8342f2..496b762bc0 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/web/TestDelegationTokenManager.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/web/TestDelegationTokenManager.java
@@ -17,27 +17,28 @@
*/
package org.apache.hadoop.security.token.delegation.web;
+import java.io.IOException;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.StringUtils;
import org.junit.Assert;
import org.junit.Test;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.Arrays;
-
public class TestDelegationTokenManager {
private static final long DAY_IN_SECS = 86400;
@Test
public void testDTManager() throws Exception {
- DelegationTokenManager tm = new DelegationTokenManager(new Text("foo"),
- DAY_IN_SECS, DAY_IN_SECS, DAY_IN_SECS, DAY_IN_SECS);
+ Configuration conf = new Configuration(false);
+ conf.setLong(DelegationTokenManager.UPDATE_INTERVAL, DAY_IN_SECS);
+ conf.setLong(DelegationTokenManager.MAX_LIFETIME, DAY_IN_SECS);
+ conf.setLong(DelegationTokenManager.RENEW_INTERVAL, DAY_IN_SECS);
+ conf.setLong(DelegationTokenManager.REMOVAL_SCAN_INTERVAL, DAY_IN_SECS);
+ DelegationTokenManager tm =
+ new DelegationTokenManager(conf, new Text("foo"));
tm.init();
Token token =
tm.createToken(UserGroupInformation.getCurrentUser(), "foo");
diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index a121fafc33..3bc1a570a1 100644
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -864,6 +864,16 @@
2.9.1
+
+ org.apache.curator
+ curator-recipes
+ 2.6.0
+
+
+ org.apache.curator
+ curator-client
+ 2.6.0
+
org.apache.curator
curator-framework