HADOOP-13155. Implement TokenRenewer to renew and cancel delegation tokens in KMS. Contributed by Xiao Chen.

This commit is contained in:
Andrew Wang 2016-06-03 16:48:54 -07:00
parent d82bc85018
commit 713cb71820
8 changed files with 410 additions and 56 deletions

View File

@ -34,7 +34,7 @@ public class KeyProviderDelegationTokenExtension extends
new DefaultDelegationTokenExtension();
/**
* DelegationTokenExtension is a type of Extension that exposes methods to
* DelegationTokenExtension is a type of Extension that exposes methods
* needed to work with Delegation Tokens.
*/
public interface DelegationTokenExtension extends
@ -49,8 +49,23 @@ public interface DelegationTokenExtension extends
* @return list of new delegation tokens
* @throws IOException thrown if IOException if an IO error occurs.
*/
public Token<?>[] addDelegationTokens(final String renewer,
Token<?>[] addDelegationTokens(final String renewer,
Credentials credentials) throws IOException;
/**
* Renews the given token.
* @param token The token to be renewed.
* @return The token's lifetime after renewal, or 0 if it can't be renewed.
* @throws IOException
*/
long renewDelegationToken(final Token<?> token) throws IOException;
/**
* Cancels the given token.
* @param token The token to be cancelled.
* @throws IOException
*/
Void cancelDelegationToken(final Token<?> token) throws IOException;
}
/**
@ -66,6 +81,15 @@ public Token<?>[] addDelegationTokens(String renewer,
return null;
}
@Override
public long renewDelegationToken(final Token<?> token) throws IOException {
return 0;
}
@Override
public Void cancelDelegationToken(final Token<?> token) throws IOException {
return null;
}
}
private KeyProviderDelegationTokenExtension(KeyProvider keyProvider,

View File

@ -38,8 +38,11 @@
import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
import org.apache.hadoop.security.ssl.SSLFactory;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenRenewer;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
import org.apache.hadoop.util.HttpExceptionUtils;
import org.apache.hadoop.util.KMSUtil;
import org.apache.http.client.utils.URIBuilder;
import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
@ -94,7 +97,8 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
private static final String ANONYMOUS_REQUESTS_DISALLOWED = "Anonymous requests are disallowed";
public static final String TOKEN_KIND = "kms-dt";
public static final String TOKEN_KIND_STR = "kms-dt";
public static final Text TOKEN_KIND = new Text(TOKEN_KIND_STR);
public static final String SCHEME_NAME = "kms";
@ -146,6 +150,54 @@ public void fillQueueForKey(String keyName,
}
}
/**
* The KMS implementation of {@link TokenRenewer}.
*/
public static class KMSTokenRenewer extends TokenRenewer {
private static final Logger LOG =
LoggerFactory.getLogger(KMSTokenRenewer.class);
@Override
public boolean handleKind(Text kind) {
return kind.equals(TOKEN_KIND);
}
@Override
public boolean isManaged(Token<?> token) throws IOException {
return true;
}
@Override
public long renew(Token<?> token, Configuration conf) throws IOException {
LOG.debug("Renewing delegation token {}", token);
KeyProvider keyProvider = KMSUtil.createKeyProvider(conf,
KeyProviderFactory.KEY_PROVIDER_PATH);
if (!(keyProvider instanceof
KeyProviderDelegationTokenExtension.DelegationTokenExtension)) {
LOG.warn("keyProvider {} cannot renew dt.", keyProvider == null ?
"null" : keyProvider.getClass());
return 0;
}
return ((KeyProviderDelegationTokenExtension.DelegationTokenExtension)
keyProvider).renewDelegationToken(token);
}
@Override
public void cancel(Token<?> token, Configuration conf) throws IOException {
LOG.debug("Canceling delegation token {}", token);
KeyProvider keyProvider = KMSUtil.createKeyProvider(conf,
KeyProviderFactory.KEY_PROVIDER_PATH);
if (!(keyProvider instanceof
KeyProviderDelegationTokenExtension.DelegationTokenExtension)) {
LOG.warn("keyProvider {} cannot cancel dt.", keyProvider == null ?
"null" : keyProvider.getClass());
return;
}
((KeyProviderDelegationTokenExtension.DelegationTokenExtension)
keyProvider).cancelDelegationToken(token);
}
}
public static class KMSEncryptedKeyVersion extends EncryptedKeyVersion {
public KMSEncryptedKeyVersion(String keyName, String keyVersionName,
byte[] iv, String encryptedVersionName, byte[] keyMaterial) {
@ -853,6 +905,100 @@ public int getEncKeyQueueSize(String keyName) {
return encKeyVersionQueue.getSize(keyName);
}
@Override
public long renewDelegationToken(final Token<?> dToken) throws IOException {
try {
final String doAsUser = getDoAsUser();
final DelegationTokenAuthenticatedURL.Token token =
generateDelegationToken(dToken);
final URL url = createURL(null, null, null, null);
LOG.debug("Renewing delegation token {} with url:{}, as:{}",
token, url, doAsUser);
final DelegationTokenAuthenticatedURL authUrl =
new DelegationTokenAuthenticatedURL(configurator);
return actualUgi.doAs(
new PrivilegedExceptionAction<Long>() {
@Override
public Long run() throws Exception {
return authUrl.renewDelegationToken(url, token, doAsUser);
}
}
);
} catch (Exception ex) {
if (ex instanceof IOException) {
throw (IOException) ex;
} else {
throw new IOException(ex);
}
}
}
@Override
public Void cancelDelegationToken(final Token<?> dToken) throws IOException {
try {
final String doAsUser = getDoAsUser();
final DelegationTokenAuthenticatedURL.Token token =
generateDelegationToken(dToken);
return actualUgi.doAs(
new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
final URL url = createURL(null, null, null, null);
LOG.debug("Cancelling delegation token {} with url:{}, as:{}",
dToken, url, doAsUser);
final DelegationTokenAuthenticatedURL authUrl =
new DelegationTokenAuthenticatedURL(configurator);
authUrl.cancelDelegationToken(url, token, doAsUser);
return null;
}
}
);
} catch (Exception ex) {
if (ex instanceof IOException) {
throw (IOException) ex;
} else {
throw new IOException(ex);
}
}
}
/**
* Get the doAs user name.
*
* 'actualUGI' is the UGI of the user creating the client
* It is possible that the creator of the KMSClientProvier
* calls this method on behalf of a proxyUser (the doAsUser).
* In which case this call has to be made as the proxy user.
*
* @return the doAs user name.
* @throws IOException
*/
private String getDoAsUser() throws IOException {
UserGroupInformation currentUgi = UserGroupInformation.getCurrentUser();
return (currentUgi.getAuthenticationMethod() ==
UserGroupInformation.AuthenticationMethod.PROXY)
? currentUgi.getShortUserName() : null;
}
/**
* Generate a DelegationTokenAuthenticatedURL.Token from the given generic
* typed delegation token.
*
* @param dToken The delegation token.
* @return The DelegationTokenAuthenticatedURL.Token, with its delegation
* token set to the delegation token passed in.
*/
private DelegationTokenAuthenticatedURL.Token generateDelegationToken(
final Token<?> dToken) {
DelegationTokenAuthenticatedURL.Token token =
new DelegationTokenAuthenticatedURL.Token();
Token<AbstractDelegationTokenIdentifier> dt =
new Token<>(dToken.getIdentifier(), dToken.getPassword(),
dToken.getKind(), dToken.getService());
token.setDelegationToken(dt);
return token;
}
@Override
public Token<?>[] addDelegationTokens(final String renewer,
Credentials credentials) throws IOException {
@ -864,15 +1010,7 @@ public Token<?>[] addDelegationTokens(final String renewer,
final DelegationTokenAuthenticatedURL authUrl =
new DelegationTokenAuthenticatedURL(configurator);
try {
// 'actualUGI' is the UGI of the user creating the client
// It is possible that the creator of the KMSClientProvier
// calls this method on behalf of a proxyUser (the doAsUser).
// In which case this call has to be made as the proxy user.
UserGroupInformation currentUgi = UserGroupInformation.getCurrentUser();
final String doAsUser = (currentUgi.getAuthenticationMethod() ==
UserGroupInformation.AuthenticationMethod.PROXY)
? currentUgi.getShortUserName() : null;
final String doAsUser = getDoAsUser();
token = actualUgi.doAs(new PrivilegedExceptionAction<Token<?>>() {
@Override
public Token<?> run() throws Exception {

View File

@ -134,6 +134,27 @@ public Token<?>[] call(KMSClientProvider provider) throws IOException {
}, nextIdx());
}
@Override
public long renewDelegationToken(final Token<?> token) throws IOException {
return doOp(new ProviderCallable<Long>() {
@Override
public Long call(KMSClientProvider provider) throws IOException {
return provider.renewDelegationToken(token);
}
}, nextIdx());
}
@Override
public Void cancelDelegationToken(final Token<?> token) throws IOException {
return doOp(new ProviderCallable<Void>() {
@Override
public Void call(KMSClientProvider provider) throws IOException {
provider.cancelDelegationToken(token);
return null;
}
}, nextIdx());
}
// This request is sent to all providers in the load-balancing group
@Override
public void warmUpEncryptedKeys(String... keyNames) throws IOException {

View File

@ -0,0 +1,76 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.util;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.crypto.key.KeyProviderFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
/**
* Utils for KMS.
*/
@InterfaceAudience.Private
public final class KMSUtil {
public static final Logger LOG =
LoggerFactory.getLogger(KMSUtil.class);
private KMSUtil() { /* Hidden constructor */ }
/**
* Creates a new KeyProvider from the given Configuration
* and configuration key name.
*
* @param conf Configuration
* @param configKeyName The configuration key name
* @return new KeyProvider, or null if no provider was found.
* @throws IOException if the KeyProvider is improperly specified in
* the Configuration
*/
public static KeyProvider createKeyProvider(final Configuration conf,
final String configKeyName) throws IOException {
LOG.debug("Creating key provider with config key {}", configKeyName);
final String providerUriStr = conf.getTrimmed(configKeyName, "");
// No provider set in conf
if (providerUriStr.isEmpty()) {
return null;
}
final URI providerUri;
try {
providerUri = new URI(providerUriStr);
} catch (URISyntaxException e) {
throw new IOException(e);
}
KeyProvider keyProvider = KeyProviderFactory.get(providerUri, conf);
if (keyProvider == null) {
throw new IOException("Could not instantiate KeyProvider from " +
configKeyName + " setting of '" + providerUriStr + "'");
}
if (keyProvider.isTransient()) {
throw new IOException("KeyProvider " + keyProvider.toString()
+ " was found but it is a transient provider.");
}
return keyProvider;
}
}

View File

@ -0,0 +1,14 @@
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
org.apache.hadoop.crypto.key.kms.KMSClientProvider$KMSTokenRenewer

View File

@ -72,7 +72,7 @@ protected Properties getConfiguration(String configPrefix,
KerberosDelegationTokenAuthenticationHandler.class.getName());
}
props.setProperty(DelegationTokenAuthenticationHandler.TOKEN_KIND,
KMSClientProvider.TOKEN_KIND);
KMSClientProvider.TOKEN_KIND_STR);
return props;
}

View File

@ -19,7 +19,7 @@
import org.apache.curator.test.TestingServer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.key.kms.server.KeyAuthorizationKeyProvider;
import org.apache.hadoop.crypto.key.KeyProviderFactory;
import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
import org.apache.hadoop.crypto.key.KeyProvider.Options;
@ -31,11 +31,14 @@
import org.apache.hadoop.crypto.key.kms.LoadBalancingKMSClientProvider;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.minikdc.MiniKdc;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
import org.apache.hadoop.security.authentication.client.Authenticator;
import org.apache.hadoop.security.authentication.client.KerberosAuthenticator;
import org.apache.hadoop.security.authentication.client.PseudoAuthenticator;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
import org.apache.hadoop.security.token.Token;
@ -45,11 +48,12 @@
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.security.auth.kerberos.KerberosPrincipal;
import javax.security.auth.login.AppConfigurationEntry;
import java.io.File;
@ -72,9 +76,17 @@
import java.util.UUID;
import java.util.concurrent.Callable;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
public class TestKMS {
private static final Logger LOG = LoggerFactory.getLogger(TestKMS.class);
@Rule
public final Timeout testTimeout = new Timeout(180000);
@Before
public void cleanUp() {
// resetting kerberos security
@ -649,20 +661,6 @@ public Void call() throws Exception {
Assert.assertEquals("d", meta.getDescription());
Assert.assertEquals(attributes, meta.getAttributes());
// test delegation token retrieval
KeyProviderDelegationTokenExtension kpdte =
KeyProviderDelegationTokenExtension.
createKeyProviderDelegationTokenExtension(kp);
Credentials credentials = new Credentials();
kpdte.addDelegationTokens("foo", credentials);
Assert.assertEquals(1, credentials.getAllTokens().size());
InetSocketAddress kmsAddr = new InetSocketAddress(getKMSUrl().getHost(),
getKMSUrl().getPort());
Assert.assertEquals(new Text("kms-dt"), credentials.getToken(
SecurityUtil.buildTokenService(kmsAddr)).getKind());
// test rollover draining
KeyProviderCryptoExtension kpce = KeyProviderCryptoExtension.
createKeyProviderCryptoExtension(kp);
@ -1745,6 +1743,101 @@ public Void run() throws Exception {
});
}
@Test
public void testDelegationTokensOpsSimple() throws Exception {
final Configuration conf = new Configuration();
final Authenticator mock = mock(PseudoAuthenticator.class);
testDelegationTokensOps(conf, mock);
}
@Test
public void testDelegationTokensOpsKerberized() throws Exception {
final Configuration conf = new Configuration();
conf.set("hadoop.security.authentication", "kerberos");
final Authenticator mock = mock(KerberosAuthenticator.class);
testDelegationTokensOps(conf, mock);
}
private void testDelegationTokensOps(Configuration conf,
final Authenticator mockAuthenticator) throws Exception {
UserGroupInformation.setConfiguration(conf);
File confDir = getTestDir();
conf = createBaseKMSConf(confDir);
writeConf(confDir, conf);
doNothing().when(mockAuthenticator).authenticate(any(URL.class),
any(AuthenticatedURL.Token.class));
runServer(null, null, confDir, new KMSCallable<Void>() {
@Override
public Void call() throws Exception {
Configuration conf = new Configuration();
URI uri = createKMSUri(getKMSUrl());
KeyProvider kp = createProvider(uri, conf);
conf.set(KeyProviderFactory.KEY_PROVIDER_PATH,
createKMSUri(getKMSUrl()).toString());
// test delegation token retrieval
KeyProviderDelegationTokenExtension kpdte =
KeyProviderDelegationTokenExtension.
createKeyProviderDelegationTokenExtension(kp);
Credentials credentials = new Credentials();
final Token<?>[] tokens = kpdte.addDelegationTokens(
UserGroupInformation.getCurrentUser().getUserName(), credentials);
Assert.assertEquals(1, credentials.getAllTokens().size());
InetSocketAddress kmsAddr = new InetSocketAddress(getKMSUrl().getHost(),
getKMSUrl().getPort());
Assert.assertEquals(KMSClientProvider.TOKEN_KIND,
credentials.getToken(SecurityUtil.buildTokenService(kmsAddr)).
getKind());
// After this point, we're supposed to use the delegation token to auth.
doThrow(new IOException("Authenticator should not fall back"))
.when(mockAuthenticator).authenticate(any(URL.class),
any(AuthenticatedURL.Token.class));
// test delegation token renewal
boolean renewed = false;
for (Token<?> token : tokens) {
if (!(token.getKind().equals(KMSClientProvider.TOKEN_KIND))) {
LOG.info("Skipping token {}", token);
continue;
}
LOG.info("Got dt for " + uri + "; " + token);
long tokenLife = token.renew(conf);
LOG.info("Renewed token of kind {}, new lifetime:{}",
token.getKind(), tokenLife);
Thread.sleep(100);
long newTokenLife = token.renew(conf);
LOG.info("Renewed token of kind {}, new lifetime:{}",
token.getKind(), newTokenLife);
Assert.assertTrue(newTokenLife > tokenLife);
renewed = true;
}
Assert.assertTrue(renewed);
// test delegation token cancellation
for (Token<?> token : tokens) {
if (!(token.getKind().equals(KMSClientProvider.TOKEN_KIND))) {
LOG.info("Skipping token {}", token);
continue;
}
LOG.info("Got dt for " + uri + "; " + token);
token.cancel(conf);
LOG.info("Cancelled token of kind {}", token.getKind());
doNothing().when(mockAuthenticator).
authenticate(any(URL.class), any(AuthenticatedURL.Token.class));
try {
token.renew(conf);
Assert.fail("should not be able to renew a canceled token");
} catch (Exception e) {
LOG.info("Expected exception when trying to renew token", e);
}
}
return null;
}
});
}
@Test
public void testKMSWithZKSigner() throws Exception {
doKMSWithZK(true, false);

View File

@ -23,7 +23,6 @@
import org.apache.commons.io.Charsets;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.crypto.key.KeyProviderFactory;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
@ -53,6 +52,7 @@
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.KMSUtil;
import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -70,7 +70,6 @@
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.channels.SocketChannel;
import java.text.SimpleDateFormat;
import java.util.Collection;
@ -514,6 +513,17 @@ public static ReconfigurationProtocol createReconfigurationProtocolProxy(
return new ReconfigurationProtocolTranslatorPB(addr, ticket, conf, factory);
}
private static String keyProviderUriKeyName =
HdfsClientConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI;
/**
* Set the key provider uri configuration key name for creating key providers.
* @param keyName The configuration key name.
*/
public static void setKeyProviderUriKeyName(final String keyName) {
keyProviderUriKeyName = keyName;
}
/**
* Creates a new KeyProvider from the given Configuration.
*
@ -524,29 +534,7 @@ public static ReconfigurationProtocol createReconfigurationProtocolProxy(
*/
public static KeyProvider createKeyProvider(
final Configuration conf) throws IOException {
final String providerUriStr =
conf.getTrimmed(HdfsClientConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI, "");
// No provider set in conf
if (providerUriStr.isEmpty()) {
return null;
}
final URI providerUri;
try {
providerUri = new URI(providerUriStr);
} catch (URISyntaxException e) {
throw new IOException(e);
}
KeyProvider keyProvider = KeyProviderFactory.get(providerUri, conf);
if (keyProvider == null) {
throw new IOException("Could not instantiate KeyProvider from " +
HdfsClientConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI + " setting of '"
+ providerUriStr + "'");
}
if (keyProvider.isTransient()) {
throw new IOException("KeyProvider " + keyProvider.toString()
+ " was found but it is a transient provider.");
}
return keyProvider;
return KMSUtil.createKeyProvider(conf, keyProviderUriKeyName);
}
public static Peer peerFromSocket(Socket socket)