HADOOP-14445. Use DelegationTokenIssuer to create KMS delegation tokens that can authenticate to all KMS instances.
Contributed by Daryn Sharp, Xiao Chen, Rushabh S Shah.
This commit is contained in:
parent
6e0e6daaf3
commit
5ec86b445c
@ -17,8 +17,12 @@
|
||||
*/
|
||||
package org.apache.hadoop.crypto.key;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.org.apache.hadoop.security.token.DelegationTokenIssuer;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
@ -28,7 +32,8 @@
|
||||
*/
|
||||
public class KeyProviderDelegationTokenExtension extends
|
||||
KeyProviderExtension
|
||||
<KeyProviderDelegationTokenExtension.DelegationTokenExtension> {
|
||||
<KeyProviderDelegationTokenExtension.DelegationTokenExtension>
|
||||
implements DelegationTokenIssuer {
|
||||
|
||||
private static DelegationTokenExtension DEFAULT_EXTENSION =
|
||||
new DefaultDelegationTokenExtension();
|
||||
@ -36,22 +41,9 @@ public class KeyProviderDelegationTokenExtension extends
|
||||
/**
|
||||
* DelegationTokenExtension is a type of Extension that exposes methods
|
||||
* needed to work with Delegation Tokens.
|
||||
*/
|
||||
public interface DelegationTokenExtension extends
|
||||
KeyProviderExtension.Extension {
|
||||
|
||||
/**
|
||||
* The implementer of this class will take a renewer and add all
|
||||
* delegation tokens associated with the renewer to the
|
||||
* <code>Credentials</code> object if it is not already present,
|
||||
* @param renewer the user allowed to renew the delegation tokens
|
||||
* @param credentials cache in which to add new delegation tokens
|
||||
* @return list of new delegation tokens
|
||||
* @throws IOException thrown if IOException if an IO error occurs.
|
||||
*/
|
||||
Token<?>[] addDelegationTokens(final String renewer,
|
||||
Credentials credentials) throws IOException;
|
||||
|
||||
*/
|
||||
public interface DelegationTokenExtension
|
||||
extends KeyProviderExtension.Extension, DelegationTokenIssuer {
|
||||
/**
|
||||
* Renews the given token.
|
||||
* @param token The token to be renewed.
|
||||
@ -66,6 +58,12 @@ Token<?>[] addDelegationTokens(final String renewer,
|
||||
* @throws IOException
|
||||
*/
|
||||
Void cancelDelegationToken(final Token<?> token) throws IOException;
|
||||
|
||||
// Do NOT call this. Only intended for internal use.
|
||||
@VisibleForTesting
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
Token<?> selectDelegationToken(Credentials creds);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -81,6 +79,16 @@ public Token<?>[] addDelegationTokens(String renewer,
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getCanonicalServiceName() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Token<?> getDelegationToken(String renewer) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long renewDelegationToken(final Token<?> token) throws IOException {
|
||||
return 0;
|
||||
@ -90,26 +98,29 @@ public long renewDelegationToken(final Token<?> token) throws IOException {
|
||||
public Void cancelDelegationToken(final Token<?> token) throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Token<?> selectDelegationToken(Credentials creds) {
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private KeyProviderDelegationTokenExtension(KeyProvider keyProvider,
|
||||
DelegationTokenExtension extensions) {
|
||||
super(keyProvider, extensions);
|
||||
}
|
||||
|
||||
/**
|
||||
* Passes the renewer and Credentials object to the underlying
|
||||
* {@link DelegationTokenExtension}
|
||||
* @param renewer the user allowed to renew the delegation tokens
|
||||
* @param credentials cache in which to add new delegation tokens
|
||||
* @return list of new delegation tokens
|
||||
* @throws IOException thrown if IOException if an IO error occurs.
|
||||
*/
|
||||
public Token<?>[] addDelegationTokens(final String renewer,
|
||||
Credentials credentials) throws IOException {
|
||||
return getExtension().addDelegationTokens(renewer, credentials);
|
||||
|
||||
@Override
|
||||
public String getCanonicalServiceName() {
|
||||
return getExtension().getCanonicalServiceName();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Token<?> getDelegationToken(final String renewer) throws IOException {
|
||||
return getExtension().getDelegationToken(renewer);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a <code>KeyProviderDelegationTokenExtension</code> using a given
|
||||
* {@link KeyProvider}.
|
||||
|
@ -22,15 +22,17 @@
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.security.token.org.apache.hadoop.security.token.DelegationTokenIssuer;
|
||||
|
||||
/**
|
||||
* File systems that support Encryption Zones have to implement this interface.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
public interface KeyProviderTokenIssuer {
|
||||
public interface KeyProviderTokenIssuer extends DelegationTokenIssuer {
|
||||
|
||||
KeyProvider getKeyProvider() throws IOException;
|
||||
|
||||
URI getKeyProviderUri() throws IOException;
|
||||
|
||||
}
|
@ -32,14 +32,13 @@
|
||||
import org.apache.hadoop.security.ProviderUtils;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
|
||||
import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
|
||||
import org.apache.hadoop.security.authentication.client.KerberosAuthenticator;
|
||||
import org.apache.hadoop.security.ssl.SSLFactory;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.apache.hadoop.security.token.TokenRenewer;
|
||||
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
|
||||
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector;
|
||||
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
|
||||
import org.apache.hadoop.util.HttpExceptionUtils;
|
||||
import org.apache.hadoop.util.JsonSerialization;
|
||||
@ -58,7 +57,6 @@
|
||||
import java.lang.reflect.UndeclaredThrowableException;
|
||||
import java.net.ConnectException;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.MalformedURLException;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.net.URI;
|
||||
@ -99,7 +97,7 @@
|
||||
public class KMSClientProvider extends KeyProvider implements CryptoExtension,
|
||||
KeyProviderDelegationTokenExtension.DelegationTokenExtension {
|
||||
|
||||
private static final Logger LOG =
|
||||
static final Logger LOG =
|
||||
LoggerFactory.getLogger(KMSClientProvider.class);
|
||||
|
||||
private static final String INVALID_SIGNATURE = "Invalid signature";
|
||||
@ -133,12 +131,12 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
|
||||
|
||||
private final ValueQueue<EncryptedKeyVersion> encKeyVersionQueue;
|
||||
|
||||
private KeyProviderDelegationTokenExtension.DelegationTokenExtension
|
||||
clientTokenProvider = this;
|
||||
// the token's service.
|
||||
private final Text dtService;
|
||||
|
||||
// Allow fallback to default kms server port 9600 for certain tests that do
|
||||
// not specify the port explicitly in the kms provider url.
|
||||
@VisibleForTesting
|
||||
public static volatile boolean fallbackDefaultPortForTesting = false;
|
||||
// alias in the credentials.
|
||||
private final Text canonicalService;
|
||||
|
||||
private class EncryptedQueueRefiller implements
|
||||
ValueQueue.QueueRefiller<EncryptedKeyVersion> {
|
||||
@ -162,6 +160,14 @@ public void fillQueueForKey(String keyName,
|
||||
}
|
||||
}
|
||||
|
||||
static class TokenSelector extends AbstractDelegationTokenSelector {
|
||||
static final TokenSelector INSTANCE = new TokenSelector();
|
||||
|
||||
TokenSelector() {
|
||||
super(TOKEN_KIND);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The KMS implementation of {@link TokenRenewer}.
|
||||
*/
|
||||
@ -182,8 +188,7 @@ public boolean isManaged(Token<?> token) throws IOException {
|
||||
@Override
|
||||
public long renew(Token<?> token, Configuration conf) throws IOException {
|
||||
LOG.debug("Renewing delegation token {}", token);
|
||||
KeyProvider keyProvider = KMSUtil.createKeyProvider(conf,
|
||||
KeyProviderFactory.KEY_PROVIDER_PATH);
|
||||
KeyProvider keyProvider = createKeyProvider(token, conf);
|
||||
try {
|
||||
if (!(keyProvider instanceof
|
||||
KeyProviderDelegationTokenExtension.DelegationTokenExtension)) {
|
||||
@ -204,8 +209,7 @@ public long renew(Token<?> token, Configuration conf) throws IOException {
|
||||
@Override
|
||||
public void cancel(Token<?> token, Configuration conf) throws IOException {
|
||||
LOG.debug("Canceling delegation token {}", token);
|
||||
KeyProvider keyProvider = KMSUtil.createKeyProvider(conf,
|
||||
KeyProviderFactory.KEY_PROVIDER_PATH);
|
||||
KeyProvider keyProvider = createKeyProvider(token, conf);
|
||||
try {
|
||||
if (!(keyProvider instanceof
|
||||
KeyProviderDelegationTokenExtension.DelegationTokenExtension)) {
|
||||
@ -222,6 +226,19 @@ public void cancel(Token<?> token, Configuration conf) throws IOException {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static KeyProvider createKeyProvider(
|
||||
Token<?> token, Configuration conf) throws IOException {
|
||||
String service = token.getService().toString();
|
||||
URI uri;
|
||||
if (service != null && service.startsWith(SCHEME_NAME + ":/")) {
|
||||
LOG.debug("Creating key provider with token service value {}", service);
|
||||
uri = URI.create(service);
|
||||
} else { // conf fallback
|
||||
uri = KMSUtil.getKeyProviderUri(conf);
|
||||
}
|
||||
return (uri != null) ? KMSUtil.createKeyProviderFromUri(conf, uri) : null;
|
||||
}
|
||||
}
|
||||
|
||||
public static class KMSEncryptedKeyVersion extends EncryptedKeyVersion {
|
||||
@ -283,12 +300,14 @@ public KeyProvider createProvider(URI providerUri, Configuration conf)
|
||||
}
|
||||
hostsPart = t[0];
|
||||
}
|
||||
return createProvider(conf, origUrl, port, hostsPart);
|
||||
KMSClientProvider[] providers =
|
||||
createProviders(conf, origUrl, port, hostsPart);
|
||||
return new LoadBalancingKMSClientProvider(providerUri, providers, conf);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private KeyProvider createProvider(Configuration conf,
|
||||
private KMSClientProvider[] createProviders(Configuration conf,
|
||||
URL origUrl, int port, String hostsPart) throws IOException {
|
||||
String[] hosts = hostsPart.split(";");
|
||||
KMSClientProvider[] providers = new KMSClientProvider[hosts.length];
|
||||
@ -302,7 +321,7 @@ private KeyProvider createProvider(Configuration conf,
|
||||
throw new IOException("Could not instantiate KMSProvider.", e);
|
||||
}
|
||||
}
|
||||
return new LoadBalancingKMSClientProvider(providers, conf);
|
||||
return providers;
|
||||
}
|
||||
}
|
||||
|
||||
@ -358,13 +377,13 @@ public HttpURLConnection configure(HttpURLConnection conn)
|
||||
public KMSClientProvider(URI uri, Configuration conf) throws IOException {
|
||||
super(conf);
|
||||
kmsUrl = createServiceURL(extractKMSPath(uri));
|
||||
int kmsPort = kmsUrl.getPort();
|
||||
if ((kmsPort == -1) && fallbackDefaultPortForTesting) {
|
||||
kmsPort = 9600;
|
||||
}
|
||||
|
||||
InetSocketAddress addr = new InetSocketAddress(kmsUrl.getHost(), kmsPort);
|
||||
dtService = SecurityUtil.buildTokenService(addr);
|
||||
// the token's service so it can be instantiated for renew/cancel.
|
||||
dtService = getDtService(uri);
|
||||
// the canonical service is the alias for the token in the credentials.
|
||||
// typically it's the actual service in the token but older clients expect
|
||||
// an address.
|
||||
URI serviceUri = URI.create(kmsUrl.toString());
|
||||
canonicalService = SecurityUtil.buildTokenService(serviceUri);
|
||||
|
||||
if ("https".equalsIgnoreCase(kmsUrl.getProtocol())) {
|
||||
sslFactory = new SSLFactory(SSLFactory.Mode.CLIENT, conf);
|
||||
@ -402,8 +421,22 @@ public KMSClientProvider(URI uri, Configuration conf) throws IOException {
|
||||
KMS_CLIENT_ENC_KEY_CACHE_NUM_REFILL_THREADS_DEFAULT),
|
||||
new EncryptedQueueRefiller());
|
||||
authToken = new DelegationTokenAuthenticatedURL.Token();
|
||||
LOG.debug("KMSClientProvider for KMS url: {} delegation token service: {}" +
|
||||
" created.", kmsUrl, dtService);
|
||||
LOG.debug("KMSClientProvider created for KMS url: {} delegation token "
|
||||
+ "service: {} canonical service: {}.", kmsUrl, dtService,
|
||||
canonicalService);
|
||||
}
|
||||
|
||||
protected static Text getDtService(URI uri) {
|
||||
Text service;
|
||||
// remove fragment for forward compatibility with logical naming.
|
||||
final String fragment = uri.getFragment();
|
||||
if (fragment != null) {
|
||||
service = new Text(
|
||||
uri.getScheme() + ":" + uri.getSchemeSpecificPart());
|
||||
} else {
|
||||
service = new Text(uri.toString());
|
||||
}
|
||||
return service;
|
||||
}
|
||||
|
||||
private static Path extractKMSPath(URI uri) throws MalformedURLException, IOException {
|
||||
@ -475,7 +508,7 @@ private HttpURLConnection createConnection(final URL url, String method)
|
||||
@Override
|
||||
public HttpURLConnection run() throws Exception {
|
||||
DelegationTokenAuthenticatedURL authUrl =
|
||||
new DelegationTokenAuthenticatedURL(configurator);
|
||||
createAuthenticatedURL();
|
||||
return authUrl.openConnection(url, authToken, doAsUser);
|
||||
}
|
||||
});
|
||||
@ -931,6 +964,96 @@ public int getEncKeyQueueSize(String keyName) {
|
||||
return encKeyVersionQueue.getSize(keyName);
|
||||
}
|
||||
|
||||
// note: this is only a crutch for backwards compatibility.
|
||||
// override the instance that will be used to select a token, intended
|
||||
// to allow load balancing provider to find a token issued by any of its
|
||||
// sub-providers.
|
||||
protected void setClientTokenProvider(
|
||||
KeyProviderDelegationTokenExtension.DelegationTokenExtension provider) {
|
||||
clientTokenProvider = provider;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
DelegationTokenAuthenticatedURL createAuthenticatedURL() {
|
||||
return new DelegationTokenAuthenticatedURL(configurator) {
|
||||
@Override
|
||||
public org.apache.hadoop.security.token.Token<? extends TokenIdentifier>
|
||||
selectDelegationToken(URL url, Credentials creds) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Looking for delegation token. creds: {}",
|
||||
creds.getAllTokens());
|
||||
}
|
||||
// clientTokenProvider is either "this" or a load balancing instance.
|
||||
// if the latter, it will first look for the load balancer's uri
|
||||
// service followed by each sub-provider for backwards-compatibility.
|
||||
return clientTokenProvider.selectDelegationToken(creds);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@InterfaceAudience.Private
|
||||
@Override
|
||||
public Token<?> selectDelegationToken(Credentials creds) {
|
||||
Token<?> token = selectDelegationToken(creds, dtService);
|
||||
if (token == null) {
|
||||
token = selectDelegationToken(creds, canonicalService);
|
||||
}
|
||||
return token;
|
||||
}
|
||||
|
||||
protected static Token<?> selectDelegationToken(Credentials creds,
|
||||
Text service) {
|
||||
Token<?> token = creds.getToken(service);
|
||||
LOG.debug("selected by alias={} token={}", service, token);
|
||||
if (token != null && TOKEN_KIND.equals(token.getKind())) {
|
||||
return token;
|
||||
}
|
||||
token = TokenSelector.INSTANCE.selectToken(service, creds.getAllTokens());
|
||||
LOG.debug("selected by service={} token={}", service, token);
|
||||
return token;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getCanonicalServiceName() {
|
||||
return canonicalService.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Token<?> getDelegationToken(final String renewer) throws IOException {
|
||||
final URL url = createURL(null, null, null, null);
|
||||
final DelegationTokenAuthenticatedURL authUrl =
|
||||
new DelegationTokenAuthenticatedURL(configurator);
|
||||
Token<?> token = null;
|
||||
try {
|
||||
final String doAsUser = getDoAsUser();
|
||||
token = getActualUgi().doAs(new PrivilegedExceptionAction<Token<?>>() {
|
||||
@Override
|
||||
public Token<?> run() throws Exception {
|
||||
// Not using the cached token here.. Creating a new token here
|
||||
// everytime.
|
||||
LOG.debug("Getting new token from {}, renewer:{}", url, renewer);
|
||||
return authUrl.getDelegationToken(url,
|
||||
new DelegationTokenAuthenticatedURL.Token(), renewer, doAsUser);
|
||||
}
|
||||
});
|
||||
if (token != null) {
|
||||
token.setService(dtService);
|
||||
LOG.info("New token created: ({})", token);
|
||||
} else {
|
||||
throw new IOException("Got NULL as delegation token");
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
} catch (Exception e) {
|
||||
if (e instanceof IOException) {
|
||||
throw (IOException) e;
|
||||
} else {
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
return token;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long renewDelegationToken(final Token<?> dToken) throws IOException {
|
||||
try {
|
||||
@ -941,7 +1064,7 @@ public long renewDelegationToken(final Token<?> dToken) throws IOException {
|
||||
LOG.debug("Renewing delegation token {} with url:{}, as:{}",
|
||||
token, url, doAsUser);
|
||||
final DelegationTokenAuthenticatedURL authUrl =
|
||||
new DelegationTokenAuthenticatedURL(configurator);
|
||||
createAuthenticatedURL();
|
||||
return getActualUgi().doAs(
|
||||
new PrivilegedExceptionAction<Long>() {
|
||||
@Override
|
||||
@ -973,7 +1096,7 @@ public Void run() throws Exception {
|
||||
LOG.debug("Cancelling delegation token {} with url:{}, as:{}",
|
||||
dToken, url, doAsUser);
|
||||
final DelegationTokenAuthenticatedURL authUrl =
|
||||
new DelegationTokenAuthenticatedURL(configurator);
|
||||
createAuthenticatedURL();
|
||||
authUrl.cancelDelegationToken(url, token, doAsUser);
|
||||
return null;
|
||||
}
|
||||
@ -1025,47 +1148,6 @@ private DelegationTokenAuthenticatedURL.Token generateDelegationToken(
|
||||
return token;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Token<?>[] addDelegationTokens(final String renewer,
|
||||
Credentials credentials) throws IOException {
|
||||
Token<?>[] tokens = null;
|
||||
Token<?> token = credentials.getToken(dtService);
|
||||
if (token == null) {
|
||||
final URL url = createURL(null, null, null, null);
|
||||
final DelegationTokenAuthenticatedURL authUrl =
|
||||
new DelegationTokenAuthenticatedURL(configurator);
|
||||
try {
|
||||
final String doAsUser = getDoAsUser();
|
||||
token = getActualUgi().doAs(new PrivilegedExceptionAction<Token<?>>() {
|
||||
@Override
|
||||
public Token<?> run() throws Exception {
|
||||
// Not using the cached token here.. Creating a new token here
|
||||
// everytime.
|
||||
LOG.info("Getting new token from {}, renewer:{}", url, renewer);
|
||||
return authUrl.getDelegationToken(url,
|
||||
new DelegationTokenAuthenticatedURL.Token(), renewer, doAsUser);
|
||||
}
|
||||
});
|
||||
if (token != null) {
|
||||
LOG.info("New token received: ({})", token);
|
||||
credentials.addToken(token.getService(), token);
|
||||
tokens = new Token<?>[] { token };
|
||||
} else {
|
||||
throw new IOException("Got NULL as delegation token");
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
} catch (Exception e) {
|
||||
if (e instanceof IOException) {
|
||||
throw (IOException) e;
|
||||
} else {
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
return tokens;
|
||||
}
|
||||
|
||||
private boolean containsKmsDt(UserGroupInformation ugi) throws IOException {
|
||||
// Add existing credentials from the UGI, since provider is cached.
|
||||
Credentials creds = ugi.getCredentials();
|
||||
|
@ -21,6 +21,7 @@
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.net.ConnectException;
|
||||
import java.net.URI;
|
||||
import java.security.GeneralSecurityException;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.util.Arrays;
|
||||
@ -36,12 +37,15 @@
|
||||
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
|
||||
import org.apache.hadoop.crypto.key.KeyProviderDelegationTokenExtension;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.io.retry.RetryPolicies;
|
||||
import org.apache.hadoop.io.retry.RetryPolicy;
|
||||
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.apache.hadoop.util.KMSUtil;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@ -76,19 +80,42 @@ public WrapperException(Throwable cause) {
|
||||
|
||||
private final KMSClientProvider[] providers;
|
||||
private final AtomicInteger currentIdx;
|
||||
private final Text dtService; // service in token.
|
||||
private final Text canonicalService; // credentials alias for token.
|
||||
|
||||
private RetryPolicy retryPolicy = null;
|
||||
|
||||
public LoadBalancingKMSClientProvider(KMSClientProvider[] providers,
|
||||
Configuration conf) {
|
||||
this(shuffle(providers), Time.monotonicNow(), conf);
|
||||
public LoadBalancingKMSClientProvider(URI providerUri,
|
||||
KMSClientProvider[] providers, Configuration conf) {
|
||||
this(providerUri, providers, Time.monotonicNow(), conf);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
LoadBalancingKMSClientProvider(KMSClientProvider[] providers, long seed,
|
||||
Configuration conf) {
|
||||
this(URI.create("kms://testing"), providers, seed, conf);
|
||||
}
|
||||
|
||||
private LoadBalancingKMSClientProvider(URI uri,
|
||||
KMSClientProvider[] providers, long seed, Configuration conf) {
|
||||
super(conf);
|
||||
this.providers = providers;
|
||||
// uri is the token service so it can be instantiated for renew/cancel.
|
||||
dtService = KMSClientProvider.getDtService(uri);
|
||||
// if provider not in conf, new client will alias on uri else addr.
|
||||
if (KMSUtil.getKeyProviderUri(conf) == null) {
|
||||
canonicalService = dtService;
|
||||
} else {
|
||||
// canonical service (credentials alias) will be the first underlying
|
||||
// provider's service. must be deterministic before shuffle so multiple
|
||||
// calls for a token do not obtain another unnecessary token.
|
||||
canonicalService = new Text(providers[0].getCanonicalServiceName());
|
||||
}
|
||||
|
||||
// shuffle unless seed is 0 which is used by tests for determinism.
|
||||
this.providers = (seed != 0) ? shuffle(providers) : providers;
|
||||
for (KMSClientProvider provider : providers) {
|
||||
provider.setClientTokenProvider(this);
|
||||
}
|
||||
this.currentIdx = new AtomicInteger((int)(seed % providers.length));
|
||||
int maxNumRetries = conf.getInt(CommonConfigurationKeysPublic.
|
||||
KMS_CLIENT_FAILOVER_MAX_RETRIES_KEY, providers.length);
|
||||
@ -106,6 +133,9 @@ public LoadBalancingKMSClientProvider(KMSClientProvider[] providers,
|
||||
this.retryPolicy = RetryPolicies.failoverOnNetworkException(
|
||||
RetryPolicies.TRY_ONCE_THEN_FAIL, maxNumRetries, 0, sleepBaseMillis,
|
||||
sleepMaxMillis);
|
||||
LOG.debug("Created LoadBalancingKMSClientProvider for KMS url: {} with {} "
|
||||
+ "providers. delegation token service: {}, canonical service: {}",
|
||||
uri, providers.length, dtService, canonicalService);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
@ -113,6 +143,23 @@ public KMSClientProvider[] getProviders() {
|
||||
return providers;
|
||||
}
|
||||
|
||||
@Override
|
||||
public org.apache.hadoop.security.token.Token<? extends TokenIdentifier>
|
||||
selectDelegationToken(Credentials creds) {
|
||||
Token<? extends TokenIdentifier> token =
|
||||
KMSClientProvider.selectDelegationToken(creds, canonicalService);
|
||||
// fallback to querying each sub-provider.
|
||||
if (token == null) {
|
||||
for (KMSClientProvider provider : getProviders()) {
|
||||
token = provider.selectDelegationToken(creds);
|
||||
if (token != null) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return token;
|
||||
}
|
||||
|
||||
private <T> T doOp(ProviderCallable<T> op, int currPos,
|
||||
boolean isIdempotent) throws IOException {
|
||||
if (providers.length == 0) {
|
||||
@ -193,13 +240,21 @@ private int nextIdx() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Token<?>[]
|
||||
addDelegationTokens(final String renewer, final Credentials credentials)
|
||||
throws IOException {
|
||||
return doOp(new ProviderCallable<Token<?>[]>() {
|
||||
public String getCanonicalServiceName() {
|
||||
return canonicalService.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Token<?> getDelegationToken(String renewer) throws IOException {
|
||||
return doOp(new ProviderCallable<Token<?>>() {
|
||||
@Override
|
||||
public Token<?>[] call(KMSClientProvider provider) throws IOException {
|
||||
return provider.addDelegationTokens(renewer, credentials);
|
||||
public Token<?> call(KMSClientProvider provider) throws IOException {
|
||||
Token<?> token = provider.getDelegationToken(renewer);
|
||||
// override sub-providers service with our own so it can be used
|
||||
// across all providers.
|
||||
token.setService(dtService);
|
||||
LOG.debug("New token service set. Token: ({})", token);
|
||||
return token;
|
||||
}
|
||||
}, nextIdx(), false);
|
||||
}
|
||||
|
@ -58,13 +58,13 @@
|
||||
import org.apache.hadoop.fs.permission.FsCreateModes;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.io.MultipleIOException;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.org.apache.hadoop.security.token.DelegationTokenIssuer;
|
||||
import org.apache.hadoop.util.ClassUtil;
|
||||
import org.apache.hadoop.util.DataChecksum;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
@ -121,7 +121,8 @@
|
||||
@SuppressWarnings("DeprecatedIsStillUsed")
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Stable
|
||||
public abstract class FileSystem extends Configured implements Closeable {
|
||||
public abstract class FileSystem extends Configured
|
||||
implements Closeable, DelegationTokenIssuer {
|
||||
public static final String FS_DEFAULT_NAME_KEY =
|
||||
CommonConfigurationKeys.FS_DEFAULT_NAME_KEY;
|
||||
public static final String DEFAULT_FS =
|
||||
@ -386,6 +387,7 @@ protected static FileSystem getFSofPath(final Path absOrFqPath,
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
@Override
|
||||
public String getCanonicalServiceName() {
|
||||
return (getChildFileSystems() == null)
|
||||
? SecurityUtil.buildDTServiceName(getUri(), getDefaultPort())
|
||||
@ -600,71 +602,11 @@ public Path makeQualified(Path path) {
|
||||
* @throws IOException on any problem obtaining a token
|
||||
*/
|
||||
@InterfaceAudience.Private()
|
||||
@Override
|
||||
public Token<?> getDelegationToken(String renewer) throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Obtain all delegation tokens used by this FileSystem that are not
|
||||
* already present in the given Credentials. Existing tokens will neither
|
||||
* be verified as valid nor having the given renewer. Missing tokens will
|
||||
* be acquired and added to the given Credentials.
|
||||
*
|
||||
* Default Impl: works for simple FS with its own token
|
||||
* and also for an embedded FS whose tokens are those of its
|
||||
* child FileSystems (i.e. the embedded FS has no tokens of its own).
|
||||
*
|
||||
* @param renewer the user allowed to renew the delegation tokens
|
||||
* @param credentials cache in which to add new delegation tokens
|
||||
* @return list of new delegation tokens
|
||||
* @throws IOException problems obtaining a token
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
public Token<?>[] addDelegationTokens(
|
||||
final String renewer, Credentials credentials) throws IOException {
|
||||
if (credentials == null) {
|
||||
credentials = new Credentials();
|
||||
}
|
||||
final List<Token<?>> tokens = new ArrayList<>();
|
||||
collectDelegationTokens(renewer, credentials, tokens);
|
||||
return tokens.toArray(new Token<?>[tokens.size()]);
|
||||
}
|
||||
|
||||
/**
|
||||
* Recursively obtain the tokens for this FileSystem and all descendant
|
||||
* FileSystems as determined by {@link #getChildFileSystems()}.
|
||||
* @param renewer the user allowed to renew the delegation tokens
|
||||
* @param credentials cache in which to add the new delegation tokens
|
||||
* @param tokens list in which to add acquired tokens
|
||||
* @throws IOException problems obtaining a token
|
||||
*/
|
||||
private void collectDelegationTokens(final String renewer,
|
||||
final Credentials credentials,
|
||||
final List<Token<?>> tokens)
|
||||
throws IOException {
|
||||
final String serviceName = getCanonicalServiceName();
|
||||
// Collect token of the this filesystem and then of its embedded children
|
||||
if (serviceName != null) { // fs has token, grab it
|
||||
final Text service = new Text(serviceName);
|
||||
Token<?> token = credentials.getToken(service);
|
||||
if (token == null) {
|
||||
token = getDelegationToken(renewer);
|
||||
if (token != null) {
|
||||
tokens.add(token);
|
||||
credentials.addToken(service, token);
|
||||
}
|
||||
}
|
||||
}
|
||||
// Now collect the tokens from the children
|
||||
final FileSystem[] children = getChildFileSystems();
|
||||
if (children != null) {
|
||||
for (final FileSystem fs : children) {
|
||||
fs.collectDelegationTokens(renewer, credentials, tokens);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all the immediate child FileSystems embedded in this FileSystem.
|
||||
* It does not recurse and get grand children. If a FileSystem
|
||||
@ -680,6 +622,13 @@ public FileSystem[] getChildFileSystems() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@InterfaceAudience.Private
|
||||
@Override
|
||||
public DelegationTokenIssuer[] getAdditionalTokenIssuers()
|
||||
throws IOException {
|
||||
return getChildFileSystems();
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a file with the provided permission.
|
||||
*
|
||||
|
@ -296,15 +296,11 @@ public HttpURLConnection openConnection(URL url, Token token, String doAs)
|
||||
Credentials creds = UserGroupInformation.getCurrentUser().
|
||||
getCredentials();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Token not set, looking for delegation token. Creds:{}",
|
||||
creds.getAllTokens());
|
||||
LOG.debug("Token not set, looking for delegation token. Creds:{},"
|
||||
+ " size:{}", creds.getAllTokens(), creds.numberOfTokens());
|
||||
}
|
||||
if (!creds.getAllTokens().isEmpty()) {
|
||||
InetSocketAddress serviceAddr = new InetSocketAddress(url.getHost(),
|
||||
url.getPort());
|
||||
Text service = SecurityUtil.buildTokenService(serviceAddr);
|
||||
dToken = creds.getToken(service);
|
||||
LOG.debug("Using delegation token {} from service:{}", dToken, service);
|
||||
dToken = selectDelegationToken(url, creds);
|
||||
if (dToken != null) {
|
||||
if (useQueryStringForDelegationToken()) {
|
||||
// delegation token will go in the query string, injecting it
|
||||
@ -340,6 +336,21 @@ public HttpURLConnection openConnection(URL url, Token token, String doAs)
|
||||
return conn;
|
||||
}
|
||||
|
||||
/**
|
||||
* Select a delegation token from all tokens in credentials, based on url.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public org.apache.hadoop.security.token.Token<? extends TokenIdentifier>
|
||||
selectDelegationToken(URL url, Credentials creds) {
|
||||
final InetSocketAddress serviceAddr = new InetSocketAddress(url.getHost(),
|
||||
url.getPort());
|
||||
final Text service = SecurityUtil.buildTokenService(serviceAddr);
|
||||
org.apache.hadoop.security.token.Token<? extends TokenIdentifier> dToken =
|
||||
creds.getToken(service);
|
||||
LOG.debug("Using delegation token {} from service:{}", dToken, service);
|
||||
return dToken;
|
||||
}
|
||||
|
||||
/**
|
||||
* Requests a delegation token using the configured <code>Authenticator</code>
|
||||
* for authentication.
|
||||
|
@ -0,0 +1,112 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.security.token.org.apache.hadoop.security.token;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Class for issuing delegation tokens.
|
||||
*/
|
||||
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce", "Yarn"})
|
||||
@InterfaceStability.Unstable
|
||||
public interface DelegationTokenIssuer {
|
||||
|
||||
/**
|
||||
* The service name used as the alias for the token in the credential
|
||||
* token map. addDelegationTokens will use this to determine if
|
||||
* a token exists, and if not, add a new token with this alias.
|
||||
*/
|
||||
String getCanonicalServiceName();
|
||||
|
||||
/**
|
||||
* Unconditionally get a new token with the optional renewer. Returning
|
||||
* null indicates the service does not issue tokens.
|
||||
*/
|
||||
Token<?> getDelegationToken(String renewer) throws IOException;
|
||||
|
||||
/**
|
||||
* Issuers may need tokens from additional services.
|
||||
*/
|
||||
default DelegationTokenIssuer[] getAdditionalTokenIssuers()
|
||||
throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Given a renewer, add delegation tokens for issuer and it's child issuers
|
||||
* to the <code>Credentials</code> object if it is not already present.
|
||||
*<p>
|
||||
* Note: This method is not intended to be overridden. Issuers should
|
||||
* implement getCanonicalService and getDelegationToken to ensure
|
||||
* consistent token acquisition behavior.
|
||||
*
|
||||
* @param renewer the user allowed to renew the delegation tokens
|
||||
* @param credentials cache in which to add new delegation tokens
|
||||
* @return list of new delegation tokens
|
||||
* @throws IOException thrown if IOException if an IO error occurs.
|
||||
*/
|
||||
default Token<?>[] addDelegationTokens(
|
||||
final String renewer, Credentials credentials) throws IOException {
|
||||
if (credentials == null) {
|
||||
credentials = new Credentials();
|
||||
}
|
||||
final List<Token<?>> tokens = new ArrayList<>();
|
||||
collectDelegationTokens(this, renewer, credentials, tokens);
|
||||
return tokens.toArray(new Token<?>[tokens.size()]);
|
||||
}
|
||||
|
||||
/**
|
||||
* NEVER call this method directly.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
static void collectDelegationTokens(
|
||||
final DelegationTokenIssuer issuer,
|
||||
final String renewer,
|
||||
final Credentials credentials,
|
||||
final List<Token<?>> tokens) throws IOException {
|
||||
final String serviceName = issuer.getCanonicalServiceName();
|
||||
// Collect token of the this issuer and then of its embedded children
|
||||
if (serviceName != null) {
|
||||
final Text service = new Text(serviceName);
|
||||
Token<?> token = credentials.getToken(service);
|
||||
if (token == null) {
|
||||
token = issuer.getDelegationToken(renewer);
|
||||
if (token != null) {
|
||||
tokens.add(token);
|
||||
credentials.addToken(service, token);
|
||||
}
|
||||
}
|
||||
}
|
||||
// Now collect the tokens from the children.
|
||||
final DelegationTokenIssuer[] ancillary =
|
||||
issuer.getAdditionalTokenIssuers();
|
||||
if (ancillary != null) {
|
||||
for (DelegationTokenIssuer subIssuer : ancillary) {
|
||||
collectDelegationTokens(subIssuer, renewer, credentials, tokens);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -59,12 +59,23 @@ private KMSUtil() { /* Hidden constructor */ }
|
||||
public static KeyProvider createKeyProvider(final Configuration conf,
|
||||
final String configKeyName) throws IOException {
|
||||
LOG.debug("Creating key provider with config key {}", configKeyName);
|
||||
URI uri = getKeyProviderUri(conf, configKeyName);
|
||||
return (uri != null) ? createKeyProviderFromUri(conf, uri) : null;
|
||||
}
|
||||
|
||||
public static URI getKeyProviderUri(final Configuration conf) {
|
||||
return KMSUtil.getKeyProviderUri(
|
||||
conf, KeyProviderFactory.KEY_PROVIDER_PATH);
|
||||
}
|
||||
|
||||
public static URI getKeyProviderUri(final Configuration conf,
|
||||
final String configKeyName) {
|
||||
final String providerUriStr = conf.getTrimmed(configKeyName);
|
||||
// No provider set in conf
|
||||
if (providerUriStr == null || providerUriStr.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
return createKeyProviderFromUri(conf, URI.create(providerUriStr));
|
||||
return URI.create(providerUriStr);
|
||||
}
|
||||
|
||||
public static KeyProvider createKeyProviderFromUri(final Configuration conf,
|
||||
|
@ -51,23 +51,27 @@ public void testCreateExtension() throws Exception {
|
||||
KeyProviderDelegationTokenExtension
|
||||
.createKeyProviderDelegationTokenExtension(kp);
|
||||
Assert.assertNotNull(kpDTE1);
|
||||
// Default implementation should be a no-op and return null
|
||||
Assert.assertNull(kpDTE1.addDelegationTokens("user", credentials));
|
||||
Token<?>[] tokens = kpDTE1.addDelegationTokens("user", credentials);
|
||||
// Default implementation should return no tokens.
|
||||
Assert.assertNotNull(tokens);
|
||||
Assert.assertEquals(0, tokens.length);
|
||||
|
||||
MockKeyProvider mock = mock(MockKeyProvider.class);
|
||||
Mockito.when(mock.getConf()).thenReturn(new Configuration());
|
||||
when(mock.addDelegationTokens("renewer", credentials)).thenReturn(
|
||||
new Token<?>[]{new Token(null, null, new Text("kind"), new Text(
|
||||
"service"))}
|
||||
when(mock.getCanonicalServiceName()).thenReturn("cservice");
|
||||
when(mock.getDelegationToken("renewer")).thenReturn(
|
||||
new Token(null, null, new Text("kind"), new Text(
|
||||
"tservice"))
|
||||
);
|
||||
KeyProviderDelegationTokenExtension kpDTE2 =
|
||||
KeyProviderDelegationTokenExtension
|
||||
.createKeyProviderDelegationTokenExtension(mock);
|
||||
Token<?>[] tokens =
|
||||
kpDTE2.addDelegationTokens("renewer", credentials);
|
||||
tokens = kpDTE2.addDelegationTokens("renewer", credentials);
|
||||
Assert.assertNotNull(tokens);
|
||||
Assert.assertEquals(1, tokens.length);
|
||||
Assert.assertEquals("kind", tokens[0].getKind().toString());
|
||||
|
||||
Assert.assertEquals("tservice", tokens[0].getService().toString());
|
||||
Assert.assertNotNull(credentials.getToken(new Text("cservice")));
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -0,0 +1,138 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.crypto.key.kms;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.Timeout;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.slf4j.event.Level;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.URL;
|
||||
|
||||
import static org.apache.hadoop.crypto.key.kms.KMSDelegationToken.TOKEN_KIND;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNull;
|
||||
|
||||
/**
|
||||
* Unit test for {@link KMSClientProvider} class.
|
||||
*/
|
||||
public class TestKMSClientProvider {
|
||||
|
||||
public static final Logger LOG =
|
||||
LoggerFactory.getLogger(TestKMSClientProvider.class);
|
||||
|
||||
private final Token token = new Token();
|
||||
private final Token oldToken = new Token();
|
||||
private final String urlString = "https://host:16000/kms";
|
||||
private final String providerUriString = "kms://https@host:16000/kms";
|
||||
private final String oldTokenService = "host:16000";
|
||||
|
||||
@Rule
|
||||
public Timeout globalTimeout = new Timeout(60000);
|
||||
|
||||
{
|
||||
GenericTestUtils.setLogLevel(KMSClientProvider.LOG, Level.TRACE);
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
SecurityUtil.setTokenServiceUseIp(false);
|
||||
token.setKind(TOKEN_KIND);
|
||||
token.setService(new Text(providerUriString));
|
||||
oldToken.setKind(TOKEN_KIND);
|
||||
oldToken.setService(new Text(oldTokenService));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSelectDelegationToken() throws Exception {
|
||||
final Credentials creds = new Credentials();
|
||||
creds.addToken(new Text(providerUriString), token);
|
||||
assertNull(KMSClientProvider.selectDelegationToken(creds, null));
|
||||
assertNull(KMSClientProvider
|
||||
.selectDelegationToken(creds, new Text(oldTokenService)));
|
||||
assertEquals(token, KMSClientProvider
|
||||
.selectDelegationToken(creds, new Text(providerUriString)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSelectTokenOldService() throws Exception {
|
||||
final Configuration conf = new Configuration();
|
||||
final URI uri = new URI(providerUriString);
|
||||
final KMSClientProvider kp = new KMSClientProvider(uri, conf);
|
||||
try {
|
||||
final Credentials creds = new Credentials();
|
||||
creds.addToken(new Text(oldTokenService), oldToken);
|
||||
final Token t = kp.selectDelegationToken(creds);
|
||||
assertEquals(oldToken, t);
|
||||
} finally {
|
||||
kp.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSelectTokenWhenBothExist() throws Exception {
|
||||
final Credentials creds = new Credentials();
|
||||
final Configuration conf = new Configuration();
|
||||
final URI uri = new URI(providerUriString);
|
||||
final KMSClientProvider kp = new KMSClientProvider(uri, conf);
|
||||
try {
|
||||
creds.addToken(token.getService(), token);
|
||||
creds.addToken(oldToken.getService(), oldToken);
|
||||
final Token t = kp.selectDelegationToken(creds);
|
||||
assertEquals("new token should be selected when both exist", token, t);
|
||||
} finally {
|
||||
kp.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testURLSelectTokenUriFormat() throws Exception {
|
||||
testURLSelectToken(token);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testURLSelectTokenIpPort() throws Exception {
|
||||
testURLSelectToken(oldToken);
|
||||
}
|
||||
|
||||
private void testURLSelectToken(final Token tok)
|
||||
throws URISyntaxException, IOException {
|
||||
final Configuration conf = new Configuration();
|
||||
final URI uri = new URI(providerUriString);
|
||||
final KMSClientProvider kp = new KMSClientProvider(uri, conf);
|
||||
final DelegationTokenAuthenticatedURL url = kp.createAuthenticatedURL();
|
||||
final Credentials creds = new Credentials();
|
||||
creds.addToken(tok.getService(), tok);
|
||||
final Token chosen = url.selectDelegationToken(new URL(urlString), creds);
|
||||
assertEquals(tok, chosen);
|
||||
}
|
||||
}
|
@ -20,6 +20,7 @@
|
||||
import static org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
|
||||
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Mockito.mock;
|
||||
@ -49,7 +50,6 @@
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.authentication.client.AuthenticationException;
|
||||
import org.apache.hadoop.security.authorize.AuthorizationException;
|
||||
import org.junit.After;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
@ -68,33 +68,27 @@ public static void setup() throws IOException {
|
||||
SecurityUtil.setTokenServiceUseIp(false);
|
||||
}
|
||||
|
||||
@After
|
||||
public void teardown() throws IOException {
|
||||
KMSClientProvider.fallbackDefaultPortForTesting = false;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreation() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
KMSClientProvider.fallbackDefaultPortForTesting = true;
|
||||
KeyProvider kp = new KMSClientProvider.Factory().createProvider(new URI(
|
||||
"kms://http@host1/kms/foo"), conf);
|
||||
"kms://http@host1:9600/kms/foo"), conf);
|
||||
assertTrue(kp instanceof LoadBalancingKMSClientProvider);
|
||||
KMSClientProvider[] providers =
|
||||
((LoadBalancingKMSClientProvider) kp).getProviders();
|
||||
assertEquals(1, providers.length);
|
||||
assertEquals(Sets.newHashSet("http://host1/kms/foo/v1/"),
|
||||
assertEquals(Sets.newHashSet("http://host1:9600/kms/foo/v1/"),
|
||||
Sets.newHashSet(providers[0].getKMSUrl()));
|
||||
|
||||
kp = new KMSClientProvider.Factory().createProvider(new URI(
|
||||
"kms://http@host1;host2;host3/kms/foo"), conf);
|
||||
"kms://http@host1;host2;host3:9600/kms/foo"), conf);
|
||||
assertTrue(kp instanceof LoadBalancingKMSClientProvider);
|
||||
providers =
|
||||
((LoadBalancingKMSClientProvider) kp).getProviders();
|
||||
assertEquals(3, providers.length);
|
||||
assertEquals(Sets.newHashSet("http://host1/kms/foo/v1/",
|
||||
"http://host2/kms/foo/v1/",
|
||||
"http://host3/kms/foo/v1/"),
|
||||
assertEquals(Sets.newHashSet("http://host1:9600/kms/foo/v1/",
|
||||
"http://host2:9600/kms/foo/v1/",
|
||||
"http://host3:9600/kms/foo/v1/"),
|
||||
Sets.newHashSet(providers[0].getKMSUrl(),
|
||||
providers[1].getKMSUrl(),
|
||||
providers[2].getKMSUrl()));
|
||||
@ -257,10 +251,9 @@ public KeyVersion rollNewVersion(final String name)
|
||||
@Test
|
||||
public void testClassCastException() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
KMSClientProvider.fallbackDefaultPortForTesting = true;
|
||||
KMSClientProvider p1 = new MyKMSClientProvider(
|
||||
new URI("kms://http@host1/kms/foo"), conf);
|
||||
LoadBalancingKMSClientProvider kp = new LoadBalancingKMSClientProvider(
|
||||
new URI("kms://http@host1:9600/kms/foo"), conf);
|
||||
LoadBalancingKMSClientProvider kp = new LoadBalancingKMSClientProvider(
|
||||
new KMSClientProvider[] {p1}, 0, conf);
|
||||
try {
|
||||
kp.generateEncryptedKey("foo");
|
||||
@ -878,4 +871,42 @@ public void testClientRetriesNonIdempotentOpWithSocketTimeoutExceptionFails()
|
||||
verify(kp.getProviders()[2], Mockito.times(1))
|
||||
.createKey(Mockito.eq(keyName), Mockito.any(Options.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTokenServiceCreationWithLegacyFormat() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
// Create keyprovider with old token format (ip:port)
|
||||
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
|
||||
"kms:/something");
|
||||
String authority = "host1:9600";
|
||||
URI kmsUri = URI.create("kms://http@" + authority + "/kms/foo");
|
||||
KeyProvider kp =
|
||||
new KMSClientProvider.Factory().createProvider(kmsUri, conf);
|
||||
assertTrue(kp instanceof LoadBalancingKMSClientProvider);
|
||||
LoadBalancingKMSClientProvider lbkp = (LoadBalancingKMSClientProvider) kp;
|
||||
assertEquals(1, lbkp.getProviders().length);
|
||||
assertEquals(authority, lbkp.getCanonicalServiceName());
|
||||
for (KMSClientProvider provider : lbkp.getProviders()) {
|
||||
assertEquals(authority, provider.getCanonicalServiceName());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTokenServiceCreationWithUriFormat() throws Exception {
|
||||
final Configuration conf = new Configuration();
|
||||
final URI kmsUri = URI.create("kms://http@host1;host2;host3:9600/kms/foo");
|
||||
final KeyProvider kp =
|
||||
new KMSClientProvider.Factory().createProvider(kmsUri, conf);
|
||||
assertTrue(kp instanceof LoadBalancingKMSClientProvider);
|
||||
final LoadBalancingKMSClientProvider lbkp =
|
||||
(LoadBalancingKMSClientProvider) kp;
|
||||
assertEquals(kmsUri.toString(), lbkp.getCanonicalServiceName());
|
||||
KMSClientProvider[] providers = lbkp.getProviders();
|
||||
assertEquals(3, providers.length);
|
||||
for (int i = 0; i < providers.length; i++) {
|
||||
assertEquals(URI.create(providers[i].getKMSUrl()).getAuthority(),
|
||||
providers[i].getCanonicalServiceName());
|
||||
assertNotEquals(kmsUri, providers[i].getCanonicalServiceName());
|
||||
}
|
||||
}
|
||||
}
|
@ -36,6 +36,7 @@
|
||||
import org.apache.hadoop.fs.Options.Rename;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.org.apache.hadoop.security.token.DelegationTokenIssuer;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
@ -125,6 +126,8 @@ public void primitiveMkdir(Path f, FsPermission absolutePermission,
|
||||
public int getDefaultPort();
|
||||
public String getCanonicalServiceName();
|
||||
public Token<?> getDelegationToken(String renewer) throws IOException;
|
||||
public DelegationTokenIssuer[] getAdditionalTokenIssuers()
|
||||
throws IOException;
|
||||
public boolean deleteOnExit(Path f) throws IOException;
|
||||
public boolean cancelDeleteOnExit(Path f) throws IOException;
|
||||
public Token<?>[] addDelegationTokens(String renewer, Credentials creds)
|
||||
|
@ -25,6 +25,7 @@
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.org.apache.hadoop.security.token.DelegationTokenIssuer;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
@ -145,6 +146,8 @@ public void primitiveMkdir(Path f, FsPermission absolutePermission,
|
||||
public int getDefaultPort();
|
||||
public String getCanonicalServiceName();
|
||||
public Token<?> getDelegationToken(String renewer) throws IOException;
|
||||
public DelegationTokenIssuer[] getAdditionalTokenIssuers()
|
||||
throws IOException;
|
||||
public FileChecksum getFileChecksum(Path f) throws IOException;
|
||||
public boolean deleteOnExit(Path f) throws IOException;
|
||||
public boolean cancelDeleteOnExit(Path f) throws IOException;
|
||||
|
@ -35,6 +35,7 @@
|
||||
import org.apache.hadoop.crypto.key.kms.ValueQueue;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.io.MultipleIOException;
|
||||
import org.apache.hadoop.minikdc.MiniKdc;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
@ -44,6 +45,7 @@
|
||||
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
|
||||
import org.apache.hadoop.security.ssl.SSLFactory;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.test.Whitebox;
|
||||
@ -96,6 +98,8 @@
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH;
|
||||
|
||||
import static org.junit.Assert.assertArrayEquals;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
@ -141,21 +145,78 @@ public static File getTestDir() throws Exception {
|
||||
}
|
||||
|
||||
public static abstract class KMSCallable<T> implements Callable<T> {
|
||||
private URL kmsUrl;
|
||||
private List<URL> kmsUrl;
|
||||
|
||||
protected URL getKMSUrl() {
|
||||
return kmsUrl;
|
||||
return kmsUrl.get(0);
|
||||
}
|
||||
|
||||
protected URL[] getKMSHAUrl() {
|
||||
URL[] urls = new URL[kmsUrl.size()];
|
||||
return kmsUrl.toArray(urls);
|
||||
}
|
||||
|
||||
protected void addKMSUrl(URL url) {
|
||||
if (kmsUrl == null) {
|
||||
kmsUrl = new ArrayList<URL>();
|
||||
}
|
||||
kmsUrl.add(url);
|
||||
}
|
||||
|
||||
/*
|
||||
* The format of the returned value will be
|
||||
* kms://http:kms1.example1.com:port1,kms://http:kms2.example2.com:port2
|
||||
*/
|
||||
protected String generateLoadBalancingKeyProviderUriString() {
|
||||
if (kmsUrl == null || kmsUrl.size() == 0) {
|
||||
return null;
|
||||
}
|
||||
StringBuffer sb = new StringBuffer();
|
||||
|
||||
for (int i = 0; i < kmsUrl.size(); i++) {
|
||||
sb.append(KMSClientProvider.SCHEME_NAME + "://" +
|
||||
kmsUrl.get(0).getProtocol() + "@");
|
||||
URL url = kmsUrl.get(i);
|
||||
sb.append(url.getAuthority());
|
||||
if (url.getPath() != null) {
|
||||
sb.append(url.getPath());
|
||||
}
|
||||
if (i < kmsUrl.size() - 1) {
|
||||
sb.append(",");
|
||||
}
|
||||
}
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
||||
|
||||
protected KeyProvider createProvider(URI uri, Configuration conf)
|
||||
throws IOException {
|
||||
final KeyProvider ret = new LoadBalancingKMSClientProvider(
|
||||
final KeyProvider ret = new LoadBalancingKMSClientProvider(uri,
|
||||
new KMSClientProvider[] {new KMSClientProvider(uri, conf)}, conf);
|
||||
providersCreated.add(ret);
|
||||
return ret;
|
||||
}
|
||||
|
||||
/**
|
||||
* create a LoadBalancingKMSClientProvider from an array of URIs.
|
||||
* @param uris an array of KMS URIs
|
||||
* @param conf configuration object
|
||||
* @return a LoadBalancingKMSClientProvider object
|
||||
* @throws IOException
|
||||
*/
|
||||
protected LoadBalancingKMSClientProvider createHAProvider(URI lbUri,
|
||||
URI[] uris, Configuration conf) throws IOException {
|
||||
KMSClientProvider[] providers = new KMSClientProvider[uris.length];
|
||||
for (int i = 0; i < providers.length; i++) {
|
||||
providers[i] =
|
||||
new KMSClientProvider(uris[i], conf);
|
||||
}
|
||||
final LoadBalancingKMSClientProvider ret =
|
||||
new LoadBalancingKMSClientProvider(lbUri, providers, conf);
|
||||
providersCreated.add(ret);
|
||||
return ret;
|
||||
}
|
||||
|
||||
private KMSClientProvider createKMSClientProvider(URI uri, Configuration conf)
|
||||
throws IOException {
|
||||
final KMSClientProvider ret = new KMSClientProvider(uri, conf);
|
||||
@ -170,22 +231,34 @@ protected <T> T runServer(String keystore, String password, File confDir,
|
||||
|
||||
protected <T> T runServer(int port, String keystore, String password, File confDir,
|
||||
KMSCallable<T> callable) throws Exception {
|
||||
return runServer(new int[] {port}, keystore, password, confDir, callable);
|
||||
}
|
||||
|
||||
protected <T> T runServer(int[] ports, String keystore, String password,
|
||||
File confDir, KMSCallable<T> callable) throws Exception {
|
||||
MiniKMS.Builder miniKMSBuilder = new MiniKMS.Builder().setKmsConfDir(confDir)
|
||||
.setLog4jConfFile("log4j.properties");
|
||||
if (keystore != null) {
|
||||
miniKMSBuilder.setSslConf(new File(keystore), password);
|
||||
}
|
||||
if (port > 0) {
|
||||
miniKMSBuilder.setPort(port);
|
||||
final List<MiniKMS> kmsList = new ArrayList<>();
|
||||
|
||||
for (int i = 0; i < ports.length; i++) {
|
||||
if (ports[i] > 0) {
|
||||
miniKMSBuilder.setPort(ports[i]);
|
||||
}
|
||||
MiniKMS miniKMS = miniKMSBuilder.build();
|
||||
kmsList.add(miniKMS);
|
||||
miniKMS.start();
|
||||
LOG.info("Test KMS running at: " + miniKMS.getKMSUrl());
|
||||
callable.addKMSUrl(miniKMS.getKMSUrl());
|
||||
}
|
||||
MiniKMS miniKMS = miniKMSBuilder.build();
|
||||
miniKMS.start();
|
||||
try {
|
||||
System.out.println("Test KMS running at: " + miniKMS.getKMSUrl());
|
||||
callable.kmsUrl = miniKMS.getKMSUrl();
|
||||
return callable.call();
|
||||
} finally {
|
||||
miniKMS.stop();
|
||||
for (MiniKMS miniKMS: kmsList) {
|
||||
miniKMS.stop();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -240,6 +313,13 @@ public static URI createKMSUri(URL kmsUrl) throws Exception {
|
||||
return new URI("kms://" + str);
|
||||
}
|
||||
|
||||
public static URI[] createKMSHAUri(URL[] kmsUrls) throws Exception {
|
||||
URI[] uris = new URI[kmsUrls.length];
|
||||
for (int i=0; i< kmsUrls.length; i++) {
|
||||
uris[i] = createKMSUri(kmsUrls[i]);
|
||||
}
|
||||
return uris;
|
||||
}
|
||||
|
||||
private static class KerberosConfiguration
|
||||
extends javax.security.auth.login.Configuration {
|
||||
@ -306,6 +386,7 @@ private static void setUpMiniKdc(Properties kdcConf) throws Exception {
|
||||
principals.add("otheradmin");
|
||||
principals.add("client/host");
|
||||
principals.add("client1");
|
||||
principals.add("foo");
|
||||
for (KMSACLs.Type type : KMSACLs.Type.values()) {
|
||||
principals.add(type.toString());
|
||||
}
|
||||
@ -2011,7 +2092,6 @@ public Void run() throws Exception {
|
||||
return null;
|
||||
}
|
||||
});
|
||||
|
||||
nonKerberosUgi.addCredentials(credentials);
|
||||
|
||||
try {
|
||||
@ -2067,6 +2147,18 @@ public void testDelegationTokensOpsHttpsKerberized() throws Exception {
|
||||
testDelegationTokensOps(true, true);
|
||||
}
|
||||
|
||||
private Text getTokenService(KeyProvider provider) {
|
||||
assertTrue("KeyProvider should be an instance of " +
|
||||
"LoadBalancingKMSClientProvider", (provider instanceof
|
||||
LoadBalancingKMSClientProvider));
|
||||
assertEquals("Num client providers should be 1", 1,
|
||||
((LoadBalancingKMSClientProvider)provider).getProviders().length);
|
||||
final Text tokenService = new Text(
|
||||
(((LoadBalancingKMSClientProvider)provider).getProviders()[0])
|
||||
.getCanonicalServiceName());
|
||||
return tokenService;
|
||||
}
|
||||
|
||||
private void testDelegationTokensOps(final boolean ssl, final boolean kerb)
|
||||
throws Exception {
|
||||
final File confDir = getTestDir();
|
||||
@ -2103,6 +2195,10 @@ public Void call() throws Exception {
|
||||
@Override
|
||||
public Void run() throws Exception {
|
||||
KeyProvider kp = createProvider(uri, clientConf);
|
||||
// Unset the conf value for key provider path just to be sure that
|
||||
// the key provider created for renew and cancel token is from
|
||||
// token service field.
|
||||
clientConf.unset(HADOOP_SECURITY_KEY_PROVIDER_PATH);
|
||||
// test delegation token retrieval
|
||||
KeyProviderDelegationTokenExtension kpdte =
|
||||
KeyProviderDelegationTokenExtension.
|
||||
@ -2110,13 +2206,10 @@ public Void run() throws Exception {
|
||||
final Credentials credentials = new Credentials();
|
||||
final Token<?>[] tokens =
|
||||
kpdte.addDelegationTokens("client1", credentials);
|
||||
Text tokenService = getTokenService(kp);
|
||||
Assert.assertEquals(1, credentials.getAllTokens().size());
|
||||
InetSocketAddress kmsAddr =
|
||||
new InetSocketAddress(getKMSUrl().getHost(),
|
||||
getKMSUrl().getPort());
|
||||
Assert.assertEquals(KMSDelegationToken.TOKEN_KIND,
|
||||
credentials.getToken(SecurityUtil.buildTokenService(kmsAddr)).
|
||||
getKind());
|
||||
credentials.getToken(tokenService).getKind());
|
||||
|
||||
// Test non-renewer user cannot renew.
|
||||
for (Token<?> token : tokens) {
|
||||
@ -2258,15 +2351,16 @@ public Void run() throws Exception {
|
||||
// Get a DT and use it.
|
||||
final Credentials credentials = new Credentials();
|
||||
kpdte.addDelegationTokens("client", credentials);
|
||||
Text tokenService = getTokenService(kp);
|
||||
Assert.assertEquals(1, credentials.getAllTokens().size());
|
||||
Assert.assertEquals(KMSDelegationToken.TOKEN_KIND, credentials.
|
||||
getToken(SecurityUtil.buildTokenService(kmsAddr)).getKind());
|
||||
getToken(tokenService).getKind());
|
||||
UserGroupInformation.getCurrentUser().addCredentials(credentials);
|
||||
LOG.info("Added kms dt to credentials: {}", UserGroupInformation.
|
||||
getCurrentUser().getCredentials().getAllTokens());
|
||||
Token<?> token =
|
||||
UserGroupInformation.getCurrentUser().getCredentials()
|
||||
.getToken(SecurityUtil.buildTokenService(kmsAddr));
|
||||
.getToken(tokenService);
|
||||
Assert.assertNotNull(token);
|
||||
job1Token.add(token);
|
||||
|
||||
@ -2302,17 +2396,17 @@ public Void run() throws Exception {
|
||||
// Get a new DT, but don't use it yet.
|
||||
final Credentials newCreds = new Credentials();
|
||||
kpdte.addDelegationTokens("client", newCreds);
|
||||
Text tokenService = getTokenService(kp);
|
||||
Assert.assertEquals(1, newCreds.getAllTokens().size());
|
||||
Assert.assertEquals(KMSDelegationToken.TOKEN_KIND,
|
||||
newCreds.getToken(SecurityUtil.buildTokenService(kmsAddr)).
|
||||
newCreds.getToken(tokenService).
|
||||
getKind());
|
||||
|
||||
// Using job 1's DT should fail.
|
||||
final Credentials oldCreds = new Credentials();
|
||||
for (Token<?> token : job1Token) {
|
||||
if (token.getKind().equals(KMSDelegationToken.TOKEN_KIND)) {
|
||||
oldCreds
|
||||
.addToken(SecurityUtil.buildTokenService(kmsAddr), token);
|
||||
oldCreds.addToken(tokenService, token);
|
||||
}
|
||||
}
|
||||
UserGroupInformation.getCurrentUser().addCredentials(oldCreds);
|
||||
@ -2328,7 +2422,7 @@ public Void run() throws Exception {
|
||||
// Using the new DT should succeed.
|
||||
Assert.assertEquals(1, newCreds.getAllTokens().size());
|
||||
Assert.assertEquals(KMSDelegationToken.TOKEN_KIND,
|
||||
newCreds.getToken(SecurityUtil.buildTokenService(kmsAddr)).
|
||||
newCreds.getToken(tokenService).
|
||||
getKind());
|
||||
UserGroupInformation.getCurrentUser().addCredentials(newCreds);
|
||||
LOG.info("Credetials now are: {}", UserGroupInformation
|
||||
@ -2357,7 +2451,14 @@ public void testKMSWithZKSignerAndDTSM() throws Exception {
|
||||
doKMSWithZK(true, true);
|
||||
}
|
||||
|
||||
public void doKMSWithZK(boolean zkDTSM, boolean zkSigner) throws Exception {
|
||||
private <T> T runServerWithZooKeeper(boolean zkDTSM, boolean zkSigner,
|
||||
KMSCallable<T> callable) throws Exception {
|
||||
return runServerWithZooKeeper(zkDTSM, zkSigner, callable, 1);
|
||||
}
|
||||
|
||||
private <T> T runServerWithZooKeeper(boolean zkDTSM, boolean zkSigner,
|
||||
KMSCallable<T> callable, int kmsSize) throws Exception {
|
||||
|
||||
TestingServer zkServer = null;
|
||||
try {
|
||||
zkServer = new TestingServer();
|
||||
@ -2403,43 +2504,189 @@ public void doKMSWithZK(boolean zkDTSM, boolean zkSigner) throws Exception {
|
||||
|
||||
writeConf(testDir, conf);
|
||||
|
||||
KMSCallable<KeyProvider> c =
|
||||
new KMSCallable<KeyProvider>() {
|
||||
@Override
|
||||
public KeyProvider call() throws Exception {
|
||||
final Configuration conf = new Configuration();
|
||||
conf.setInt(KeyProvider.DEFAULT_BITLENGTH_NAME, 128);
|
||||
final URI uri = createKMSUri(getKMSUrl());
|
||||
|
||||
final KeyProvider kp =
|
||||
doAs("SET_KEY_MATERIAL",
|
||||
new PrivilegedExceptionAction<KeyProvider>() {
|
||||
@Override
|
||||
public KeyProvider run() throws Exception {
|
||||
KeyProvider kp = createProvider(uri, conf);
|
||||
kp.createKey("k1", new byte[16],
|
||||
new KeyProvider.Options(conf));
|
||||
kp.createKey("k2", new byte[16],
|
||||
new KeyProvider.Options(conf));
|
||||
kp.createKey("k3", new byte[16],
|
||||
new KeyProvider.Options(conf));
|
||||
return kp;
|
||||
}
|
||||
});
|
||||
return kp;
|
||||
}
|
||||
};
|
||||
|
||||
runServer(null, null, testDir, c);
|
||||
int[] ports = new int[kmsSize];
|
||||
for (int i = 0; i < ports.length; i++) {
|
||||
ports[i] = -1;
|
||||
}
|
||||
return runServer(ports, null, null, testDir, callable);
|
||||
} finally {
|
||||
if (zkServer != null) {
|
||||
zkServer.stop();
|
||||
zkServer.close();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public void doKMSWithZK(boolean zkDTSM, boolean zkSigner) throws Exception {
|
||||
KMSCallable<KeyProvider> c =
|
||||
new KMSCallable<KeyProvider>() {
|
||||
@Override
|
||||
public KeyProvider call() throws Exception {
|
||||
final Configuration conf = new Configuration();
|
||||
conf.setInt(KeyProvider.DEFAULT_BITLENGTH_NAME, 128);
|
||||
final URI uri = createKMSUri(getKMSUrl());
|
||||
|
||||
final KeyProvider kp =
|
||||
doAs("SET_KEY_MATERIAL",
|
||||
new PrivilegedExceptionAction<KeyProvider>() {
|
||||
@Override
|
||||
public KeyProvider run() throws Exception {
|
||||
KeyProvider kp = createProvider(uri, conf);
|
||||
kp.createKey("k1", new byte[16],
|
||||
new KeyProvider.Options(conf));
|
||||
kp.createKey("k2", new byte[16],
|
||||
new KeyProvider.Options(conf));
|
||||
kp.createKey("k3", new byte[16],
|
||||
new KeyProvider.Options(conf));
|
||||
return kp;
|
||||
}
|
||||
});
|
||||
return kp;
|
||||
}
|
||||
};
|
||||
|
||||
runServerWithZooKeeper(zkDTSM, zkSigner, c);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testKMSHAZooKeeperDelegationToken() throws Exception {
|
||||
final int kmsSize = 2;
|
||||
doKMSWithZKWithDelegationToken(true, true, kmsSize);
|
||||
}
|
||||
|
||||
private void doKMSWithZKWithDelegationToken(boolean zkDTSM, boolean zkSigner,
|
||||
int kmsSize) throws Exception {
|
||||
// Create a KMSCallable to execute requests after ZooKeeper and KMS are up.
|
||||
KMSCallable<Void> c = new KMSCallable<Void>() {
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
final Configuration conf = new Configuration();
|
||||
conf.setInt(KeyProvider.DEFAULT_BITLENGTH_NAME, 128);
|
||||
final URI[] uris = createKMSHAUri(getKMSHAUrl());
|
||||
final Credentials credentials = new Credentials();
|
||||
// Create a UGI without Kerberos auth. It will authenticate with tokens.
|
||||
final UserGroupInformation nonKerberosUgi =
|
||||
UserGroupInformation.getCurrentUser();
|
||||
final String lbUri = generateLoadBalancingKeyProviderUriString();
|
||||
final LoadBalancingKMSClientProvider lbkp =
|
||||
createHAProvider(URI.create(lbUri), uris, conf);
|
||||
conf.unset(HADOOP_SECURITY_KEY_PROVIDER_PATH);
|
||||
// get delegation tokens using kerberos login
|
||||
doAs("SET_KEY_MATERIAL",
|
||||
new PrivilegedExceptionAction<Void>() {
|
||||
@Override
|
||||
public Void run() throws Exception {
|
||||
KeyProviderDelegationTokenExtension kpdte =
|
||||
KeyProviderDelegationTokenExtension.
|
||||
createKeyProviderDelegationTokenExtension(lbkp);
|
||||
kpdte.addDelegationTokens("foo", credentials);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
|
||||
nonKerberosUgi.addCredentials(credentials);
|
||||
// Access KMS using delegation token for authentication, no Kerberos.
|
||||
nonKerberosUgi.doAs(new PrivilegedExceptionAction<Void>() {
|
||||
@Override
|
||||
public Void run() throws Exception {
|
||||
// Create a kms client with one provider at a time. Must use one
|
||||
// provider so that if it fails to authenticate, it does not fall
|
||||
// back to the next KMS instance.
|
||||
// Should succeed because it has delegation tokens for any instance.
|
||||
int i = 0;
|
||||
for (KMSClientProvider provider : lbkp.getProviders()) {
|
||||
final String key = "k" + i++;
|
||||
LOG.info("Connect to {} to create key {}.", provider, key);
|
||||
provider.createKey(key, new KeyProvider.Options(conf));
|
||||
}
|
||||
return null;
|
||||
}
|
||||
});
|
||||
|
||||
final Collection<Token<? extends TokenIdentifier>> tokens =
|
||||
credentials.getAllTokens();
|
||||
doAs("foo", new PrivilegedExceptionAction<Void>() {
|
||||
@Override
|
||||
public Void run() throws Exception {
|
||||
assertEquals(1, tokens.size());
|
||||
Token token = tokens.iterator().next();
|
||||
assertEquals(KMSDelegationToken.TOKEN_KIND, token.getKind());
|
||||
LOG.info("Got dt for token: {}", token);
|
||||
final long tokenLife = token.renew(conf);
|
||||
LOG.info("Renewed token {}, new lifetime:{}", token, tokenLife);
|
||||
Thread.sleep(10);
|
||||
final long newTokenLife = token.renew(conf);
|
||||
LOG.info("Renewed token {}, new lifetime:{}", token, newTokenLife);
|
||||
assertTrue(newTokenLife > tokenLife);
|
||||
|
||||
// test delegation token cancellation
|
||||
LOG.info("Got dt for token: {}", token);
|
||||
token.cancel(conf);
|
||||
LOG.info("Cancelled token {}", token);
|
||||
try {
|
||||
token.renew(conf);
|
||||
fail("should not be able to renew a canceled token");
|
||||
} catch (Exception e) {
|
||||
LOG.info("Expected exception when renewing token", e);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
});
|
||||
|
||||
final Credentials newCredentials = new Credentials();
|
||||
doAs("SET_KEY_MATERIAL",
|
||||
new PrivilegedExceptionAction<Void>() {
|
||||
@Override
|
||||
public Void run() throws Exception {
|
||||
KeyProviderDelegationTokenExtension kpdte =
|
||||
KeyProviderDelegationTokenExtension.
|
||||
createKeyProviderDelegationTokenExtension(lbkp);
|
||||
kpdte.addDelegationTokens("foo", newCredentials);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
|
||||
doAs("foo", new PrivilegedExceptionAction<Void>() {
|
||||
@Override
|
||||
public Void run() throws Exception {
|
||||
KMSClientProvider kp1 = lbkp.getProviders()[0];
|
||||
URL[] urls = getKMSHAUrl();
|
||||
final Collection<Token<? extends TokenIdentifier>> tokens =
|
||||
newCredentials.getAllTokens();
|
||||
assertEquals(1, tokens.size());
|
||||
Token token = tokens.iterator().next();
|
||||
assertEquals(KMSDelegationToken.TOKEN_KIND,
|
||||
token.getKind());
|
||||
// Testing backward compatibility of token renewal and cancellation.
|
||||
// Set the token service to ip:port format and test to renew/cancel.
|
||||
Text text = SecurityUtil.buildTokenService(
|
||||
new InetSocketAddress(urls[0].getHost(), urls[0].getPort()));
|
||||
token.setService(text);
|
||||
conf.set(HADOOP_SECURITY_KEY_PROVIDER_PATH, lbUri);
|
||||
long tokenLife = 0L;
|
||||
for (KMSClientProvider kp : lbkp.getProviders()) {
|
||||
long renewedTokenLife = token.renew(conf);
|
||||
LOG.info("Renewed token of kind {}, new lifetime:{}",
|
||||
token.getKind(), renewedTokenLife);
|
||||
assertTrue(renewedTokenLife > tokenLife);
|
||||
tokenLife = renewedTokenLife;
|
||||
Thread.sleep(10);
|
||||
}
|
||||
token.cancel(conf);
|
||||
try {
|
||||
token.renew(conf);
|
||||
fail("should not be able to renew a canceled token");
|
||||
} catch (IOException e) {
|
||||
LOG.info("Expected exception when renewing token", e);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
});
|
||||
return null;
|
||||
}
|
||||
};
|
||||
runServerWithZooKeeper(zkDTSM, zkSigner, c, kmsSize);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testProxyUserKerb() throws Exception {
|
||||
|
@ -61,6 +61,7 @@
|
||||
import org.apache.hadoop.crypto.CryptoOutputStream;
|
||||
import org.apache.hadoop.crypto.key.KeyProvider;
|
||||
import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
|
||||
import org.apache.hadoop.crypto.key.KeyProviderTokenIssuer;
|
||||
import org.apache.hadoop.fs.BlockLocation;
|
||||
import org.apache.hadoop.fs.CacheFlag;
|
||||
import org.apache.hadoop.fs.ContentSummary;
|
||||
@ -205,7 +206,7 @@
|
||||
********************************************************/
|
||||
@InterfaceAudience.Private
|
||||
public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
||||
DataEncryptionKeyFactory {
|
||||
DataEncryptionKeyFactory, KeyProviderTokenIssuer {
|
||||
public static final Logger LOG = LoggerFactory.getLogger(DFSClient.class);
|
||||
|
||||
private final Configuration conf;
|
||||
@ -684,6 +685,11 @@ public String getCanonicalServiceName() {
|
||||
return (dtService != null) ? dtService.toString() : null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Token<?>getDelegationToken(String renewer) throws IOException {
|
||||
return getDelegationToken(renewer == null ? null : new Text(renewer));
|
||||
}
|
||||
|
||||
/**
|
||||
* @see ClientProtocol#getDelegationToken(Text)
|
||||
*/
|
||||
@ -3029,7 +3035,8 @@ DFSHedgedReadMetrics getHedgedReadMetrics() {
|
||||
return HEDGED_READ_METRIC;
|
||||
}
|
||||
|
||||
URI getKeyProviderUri() throws IOException {
|
||||
@Override
|
||||
public URI getKeyProviderUri() throws IOException {
|
||||
return HdfsKMSUtil.getKeyProviderUri(ugi, namenodeUri,
|
||||
getServerDefaults().getKeyProviderUri(), conf);
|
||||
}
|
||||
|
@ -102,8 +102,8 @@
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.org.apache.hadoop.security.token.DelegationTokenIssuer;
|
||||
import org.apache.hadoop.util.ChunkedArrayList;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
|
||||
@ -2818,11 +2818,13 @@ public KeyProvider getKeyProvider() throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Token<?>[] addDelegationTokens(
|
||||
final String renewer, Credentials credentials) throws IOException {
|
||||
Token<?>[] tokens = super.addDelegationTokens(renewer, credentials);
|
||||
return HdfsKMSUtil.addDelegationTokensForKeyProvider(
|
||||
this, renewer, credentials, uri, tokens);
|
||||
public DelegationTokenIssuer[] getAdditionalTokenIssuers()
|
||||
throws IOException {
|
||||
KeyProvider keyProvider = getKeyProvider();
|
||||
if (keyProvider instanceof DelegationTokenIssuer) {
|
||||
return new DelegationTokenIssuer[]{(DelegationTokenIssuer)keyProvider};
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public DFSInotifyEventInputStream getInotifyEventStream() throws IOException {
|
||||
|
@ -35,14 +35,12 @@
|
||||
import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
|
||||
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
|
||||
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
|
||||
import org.apache.hadoop.crypto.key.KeyProviderDelegationTokenExtension;
|
||||
import org.apache.hadoop.crypto.key.KeyProviderTokenIssuer;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.fs.FileEncryptionInfo;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.util.KMSUtil;
|
||||
|
||||
/**
|
||||
@ -71,32 +69,6 @@ public static KeyProvider createKeyProvider(
|
||||
return KMSUtil.createKeyProvider(conf, keyProviderUriKeyName);
|
||||
}
|
||||
|
||||
public static Token<?>[] addDelegationTokensForKeyProvider(
|
||||
KeyProviderTokenIssuer kpTokenIssuer, final String renewer,
|
||||
Credentials credentials, URI namenodeUri, Token<?>[] tokens)
|
||||
throws IOException {
|
||||
KeyProvider keyProvider = kpTokenIssuer.getKeyProvider();
|
||||
if (keyProvider != null) {
|
||||
KeyProviderDelegationTokenExtension keyProviderDelegationTokenExtension
|
||||
= KeyProviderDelegationTokenExtension.
|
||||
createKeyProviderDelegationTokenExtension(keyProvider);
|
||||
Token<?>[] kpTokens = keyProviderDelegationTokenExtension.
|
||||
addDelegationTokens(renewer, credentials);
|
||||
credentials.addSecretKey(getKeyProviderMapKey(namenodeUri),
|
||||
DFSUtilClient.string2Bytes(
|
||||
kpTokenIssuer.getKeyProviderUri().toString()));
|
||||
if (tokens != null && kpTokens != null) {
|
||||
Token<?>[] all = new Token<?>[tokens.length + kpTokens.length];
|
||||
System.arraycopy(tokens, 0, all, 0, tokens.length);
|
||||
System.arraycopy(kpTokens, 0, all, tokens.length, kpTokens.length);
|
||||
tokens = all;
|
||||
} else {
|
||||
tokens = (tokens != null) ? tokens : kpTokens;
|
||||
}
|
||||
}
|
||||
return tokens;
|
||||
}
|
||||
|
||||
/**
|
||||
* Obtain the crypto protocol version from the provided FileEncryptionInfo,
|
||||
* checking to see if this version is supported by.
|
||||
@ -161,30 +133,38 @@ public static URI getKeyProviderUri(UserGroupInformation ugi,
|
||||
URI keyProviderUri = null;
|
||||
// Lookup the secret in credentials object for namenodeuri.
|
||||
Credentials credentials = ugi.getCredentials();
|
||||
Text credsKey = getKeyProviderMapKey(namenodeUri);
|
||||
byte[] keyProviderUriBytes =
|
||||
credentials.getSecretKey(getKeyProviderMapKey(namenodeUri));
|
||||
credentials.getSecretKey(credsKey);
|
||||
if(keyProviderUriBytes != null) {
|
||||
keyProviderUri =
|
||||
URI.create(DFSUtilClient.bytes2String(keyProviderUriBytes));
|
||||
return keyProviderUri;
|
||||
}
|
||||
|
||||
if (keyProviderUriStr != null) {
|
||||
if (!keyProviderUriStr.isEmpty()) {
|
||||
if (keyProviderUri == null) {
|
||||
// NN is old and doesn't report provider, so use conf.
|
||||
if (keyProviderUriStr == null) {
|
||||
keyProviderUri = KMSUtil.getKeyProviderUri(conf, keyProviderUriKeyName);
|
||||
} else if (!keyProviderUriStr.isEmpty()) {
|
||||
keyProviderUri = URI.create(keyProviderUriStr);
|
||||
}
|
||||
return keyProviderUri;
|
||||
}
|
||||
|
||||
// Last thing is to trust its own conf to be backwards compatible.
|
||||
String keyProviderUriFromConf = conf.getTrimmed(
|
||||
CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH);
|
||||
if (keyProviderUriFromConf != null && !keyProviderUriFromConf.isEmpty()) {
|
||||
keyProviderUri = URI.create(keyProviderUriFromConf);
|
||||
if (keyProviderUri != null) {
|
||||
credentials.addSecretKey(
|
||||
credsKey, DFSUtilClient.string2Bytes(keyProviderUri.toString()));
|
||||
}
|
||||
}
|
||||
return keyProviderUri;
|
||||
}
|
||||
|
||||
public static KeyProvider getKeyProvider(KeyProviderTokenIssuer issuer,
|
||||
Configuration conf)
|
||||
throws IOException {
|
||||
URI keyProviderUri = issuer.getKeyProviderUri();
|
||||
if (keyProviderUri != null) {
|
||||
return KMSUtil.createKeyProviderFromUri(conf, keyProviderUri);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a key to map namenode uri to key provider uri.
|
||||
* Tasks will lookup this key to find key Provider.
|
||||
|
@ -111,7 +111,6 @@
|
||||
import org.apache.hadoop.ipc.StandbyException;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
||||
@ -119,6 +118,7 @@
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.apache.hadoop.security.token.TokenSelector;
|
||||
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector;
|
||||
import org.apache.hadoop.security.token.org.apache.hadoop.security.token.DelegationTokenIssuer;
|
||||
import org.apache.hadoop.util.JsonSerialization;
|
||||
import org.apache.hadoop.util.KMSUtil;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
@ -1690,6 +1690,16 @@ Token<DelegationTokenIdentifier> decodeResponse(Map<?,?> json)
|
||||
return token;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DelegationTokenIssuer[] getAdditionalTokenIssuers()
|
||||
throws IOException {
|
||||
KeyProvider keyProvider = getKeyProvider();
|
||||
if (keyProvider instanceof DelegationTokenIssuer) {
|
||||
return new DelegationTokenIssuer[] {(DelegationTokenIssuer) keyProvider};
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized Token<?> getRenewToken() {
|
||||
return delegationToken;
|
||||
@ -1725,14 +1735,6 @@ public synchronized void cancelDelegationToken(final Token<?> token
|
||||
).run();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Token<?>[] addDelegationTokens(String renewer,
|
||||
Credentials credentials) throws IOException {
|
||||
Token<?>[] tokens = super.addDelegationTokens(renewer, credentials);
|
||||
return HdfsKMSUtil.addDelegationTokensForKeyProvider(this, renewer,
|
||||
credentials, getUri(), tokens);
|
||||
}
|
||||
|
||||
public BlockLocation[] getFileBlockLocations(final FileStatus status,
|
||||
final long offset, final long length) throws IOException {
|
||||
if (status == null) {
|
||||
|
Loading…
Reference in New Issue
Block a user